# coding=utf-8
import logging; logger = logging.getLogger("robots.actions")
import time
import threading
import robots
from robots.introspection import introspection
from .signals import ActionCancelled
from .concurrency import FakeFuture
[docs]def action(fn):
""" When applied to a function, this decorator turns it into
a asynchronous task, starts it in a different thread, and returns
a 'future' object that can be used to query the result/cancel it/etc.
The main methods available on these 'future' object include
:meth:`.RobotAction.wait` to wait until the action completes, and
:meth:`.RobotAction.cancel` to request the action to stop (ie, it raises an
:class:`.ActionCancelled` signal within the action thread). See
:class:`.RobotAction` for the full list of available methods.
Action implementation may want to handle the
:class:`.ActionCancelled` signal to properly process
cancellation requests.
Usage example:
.. code-block:: python
@action
def safe_walk(robot):
try:
robot.walk()
except ActionCancelled:
robot.go_back_to_rest_pose()
action = robot.safe_walk()
time.sleep(1)
action.cancel()
In this example, after one second, the ``safe_walk`` action is cancelled.
This sends the signal :class:`.ActionCancelled` to the
action, that can appropriately terminate.
"""
# wrapper for the original function that locks/unlocks shared
# resources
def lockawarefn(future,actionname,*args, **kwargs):
try:
# we acquire resources *within the future thread* that
# we want to *wait* for.
if hasattr(fn, "_locked_res"):
for res, wait in fn._locked_res:
if wait:
threading.current_thread().name = "Robot Action %s, waiting on resource %s" % (actionname, res) #fn.__name__
need_to_wait = False
if res.owner is not None:
need_to_wait = True
logger.info("Robot action <%s> is waiting on resource %s" % (actionname, res)) #fn.__name__
res.acquire(wait, acquirer = fn.__name__)
if need_to_wait:
logger.info("Robot action <%s> has acquired resource %s" % (actionname, res)) #fn.__name__
else:
logger.info("Robot action <%s> acquired free resource %s " %(actionname, res))
except ActionCancelled:
# action cancelled while it was waiting for a resource to become
# available
threading.current_thread().name = "Idle Robot action thread"
logger.debug("Action <%s> cancelled while it was waiting for a lock on a resource." % actionname) #fn.__name__
return None
try:
future.has_acquired_resource = True
threading.current_thread().name = "Robot Action %s (running)" % actionname #fn.__name__
logger.debug("Starting action <%s> now." % actionname) #fn.__name__
try:
result = fn(*args, **kwargs)
except TypeError:
logger.error("Exception when invoking action <%s>. Did you forget to add the parameter 'robot'?" % actionname) #fn.__name__
raise
logger.debug("Action <%s> returned." % actionname) #fn.__name__
return result
except ActionCancelled:
logger.warning("Action cancellation ignored by %s. Forced stop!" % actionname) #fn.__name__
finally:
if hasattr(fn, "_locked_res"):
for res, wait in fn._locked_res:
res.release()
threading.current_thread().name = "Idle Robot action thread"
lockawarefn.__name__ = fn.__name__
lockawarefn.__doc__ = fn.__doc__
# wrapper that submits the function to the executor and returns
# a future.
def innerfunc(*args, **kwargs):
if len(args) == 0 or not isinstance(args[0], robots.GenericRobot):
raise Exception("No robot instance passed to the action!")
robot = args[0]
# we acquire resources *outside the future* (to fail fast)
# for resources we do not want to wait for.
if hasattr(fn, "_locked_res"):
for res, wait in fn._locked_res:
if not wait:
got_the_lock = res.acquire(False, acquirer = fn.__name__)
if not got_the_lock:
logger.info("Required resource <%s> locked while attempting to start %s. Cancelling it as required." % (res.name, fn.__name__))
return FakeFuture(None)
if robot.immediate:
res = FakeFuture(lockawarefn(*args, **kwargs))
return res
else:
executor = robot.executor
if args and kwargs:
future = executor.submit(lockawarefn, *args, **kwargs)
elif args:
future = executor.submit(lockawarefn, *args)
else:
future = executor.submit(lockawarefn)
if introspection:
introspection.action_started(fn.__name__,
str("FUTURE ID BROKEN"),
str("ACTION ID BROKEN TDB"), #id of the current action
args[1:],
kwargs)
future.add_done_callback(lambda x : introspection.action_completed(fn.__name__, str("future.id BROKEN")))
return future
innerfunc.__name__ = fn.__name__
innerfunc.__doc__ = fn.__doc__
innerfunc._action = True
return innerfunc