Source code for robots.concurrency.concurrency

# coding=utf-8
"""
Concurrency support for pyRobot.

This module provides:

- an implementation of :class:`SignalingThread` (threads that explicitely
  handle signals like cancelation)
- heavily modified Python futures to support robot action management.
- A future executor that simply spawn one thread per future (action) instead of
  a thread pool.

These objects should not be directly used. Users should instead rely on the
:meth:`~robots.concurrency.action.action` decorator.

Helpful debugging commands::

    >>> sys._current_frames()
    >>> inspect.getouterframes(sys._current_frames()[<id>])[0][0].f_locals

"""
import logging; logger = logging.getLogger("robots.actions")

import sys

import uuid

MAX_FUTURES = 20
MAX_TIME_TO_COMPLETE = 1 # sec: time allowed to tasks to complete when cancelled. If they take more than that, force termination.
ACTIVE_SLEEP_RESOLUTION = 0.1 # sec

try:
    from concurrent.futures import Future, TimeoutError
except ImportError:
    import sys
    sys.stderr.write("[error] install python-concurrent.futures\n")
    sys.exit(1)

import os.path #for basename
import weakref
import threading 
import thread # for get_ident
from collections import deque

import traceback

from .signals import ActionCancelled, ActionPaused


[docs]class SignalingThread(threading.Thread): def __init__(self, *args, **kwargs): threading.Thread.__init__(self, *args, **kwargs) self.debugger_trace = None
[docs] def cancel(self): self.__cancel = True
[docs] def pause(self): self.__pause = True
def _Thread__bootstrap(self): """ The name come from Python name mangling for __double_leading_underscore_names Note that in Python3, __bootstrap becomes _bootstrap, thus making it easier to override. """ if threading._trace_hook is not None: self.debugger_trace = threading._trace_hook else: self.debugger_trace = None self.__cancel = False self.__pause = False sys.settrace(self.__signal_emitter) self.name = "Ranger action thread (initialization)" super(SignalingThread, self)._Thread__bootstrap() def __signal_emitter(self, frame, event, arg): if self.__cancel: if frame.f_globals["__name__"] == "threading": # Raising exception at uncontrolled time is a dangerous sport, # especially if the thread is in the middle of locking/unlocking shared resources # like (in our case) setting result in futures and reading them. # After thinking about it for a day, I could not find any good solution except for # postponing raising the signals until out of the threading module. This is not # very nice, but seem to work out well. #logger.debug("Thread <%s> in threading module. Postponing cancelation" % self.name) pass else: self.__cancel = False desc = "Cancelling thread <%s>:\n" % self.name tb = traceback.extract_stack(frame, limit = 6) for f in tb: file, line, fn, instruction = f desc += " - in <%s> (l.%s of %s): %s\n" % (fn, line, os.path.basename(file), instruction) logger.debug(desc) raise ActionCancelled() if self.__pause: self.__pause = False logger.debug("Pausing thread <%s>" % self.name) raise ActionPaused() if self.debugger_trace: return self.debugger_trace else: return self.__signal_emitter
[docs]class RobotActionThread(SignalingThread): def __init__(self, future, initialized, fn, args, kwargs): SignalingThread.__init__(self) initialized.set() self.future = future self.fn = fn self.args = args self.kwargs = kwargs
[docs] def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(self.future, str(self.future),*self.args, **self.kwargs) self.future.set_result(result) logger.debug("Action <%s>: completed." % str(self.future)) except BaseException: e = sys.exc_info()[1] logger.error("Exception in action <%s>: %s"%(str(self.future), e)) #self.fn.__name__ logger.error(traceback.format_exc()) self.future.set_exception(e)
[docs]class RobotAction(Future): def __init__(self, actionname): Future.__init__(self) self.actionname = actionname self.thread = None self.id = uuid.uuid4() self.subactions = [] self.parent_action = None self.has_acquired_resource = False
[docs] def add_subaction(self, action): self.subactions = [a for a in self.subactions if a() is not None and a().thread() is not None] self.subactions.append(action) logger.debug("Added sub-action %s to action %s" % (str(action()), str(self)))#.actionname)) 1: action().actionname
[docs] def set_parent(self, action): self.parent_action = action
[docs] def childof(self, action): """ Returns true if this action is a child of the given action, ie, has been spawned from the given action or any of its descendants. """ parent = self.parent_action() # weakref! if parent is None: return False if parent is action: return True return parent.childof(action)
[docs] def set_thread(self, thread): self.thread = thread
[docs] def cancel(self): # we do not call the 'standard' Future.cancel method since we do not have a thread pool (ie, # the future can not be 'pending for execution'), which is the only useful usecase (well, with callback on # cancellation, I imagine... so those are not supported for now) thread = self.thread() # weakref! if thread is None: logger.debug("Action <%s>: already done" % self) return # first, cancel myself (to make sure I won't restart subactions) logger.debug("Action <%s>: signaling cancelation to action's thread" % self) thread.cancel() # then, tell all the subactions that they should stop # (can not do that in the thread's cancel (_signal_emitter), because the # thread may hold locks that are not released until the exception is raised and # the context manager are left) logger.debug("Action <%s>: %s subactions to cancel" % (self, len(self.subactions))) #self.actionname for weak_subaction in self.subactions: subaction = weak_subaction() if subaction: logger.debug("Action <%s>: Cancelling subaction %s..." % (self, subaction)) subaction.cancel() # then, make sure everybody actually terminates logger.debug("Action <%s>: now waiting for completion" % self) try: self.exception(timeout = MAX_TIME_TO_COMPLETE) # waits this amount of time for the task to effectively complete except TimeoutError: raise RuntimeError("Unable to cancel action %s (still running %s after cancellation)!" % (self, MAX_TIME_TO_COMPLETE)) logger.debug("Action <%s>: successfully cancelled" % self)
#t = 0 #while t < MAX_TIME_TO_COMPLETE: # time.sleep(ACTIVE_SLEEP_RESOLUTION) # t+=ACTIVE_SLEEP_RESOLUTION # if self.thread() is None: # logger.debug("Action <%s>: successfully cancelled" % self.actionname) # return #raise RuntimeError("Unable to cancel action %s (still running %s after cancellation)!" % (self.actionname, MAX_TIME_TO_COMPLETE))
[docs] def result(self): if self.parent_action and self.parent_action(): threading.current_thread().name = "Action %s (waiting for sub-action %s)" % (self.parent_action(), self) else: threading.current_thread().name = "Main thread (waiting for sub-action %s)" % self # active wait! Instead of blocking on the condition variable in super.result() # we do an active wait to make sure we can cancel/suspend the action via our # __signal_emitter trace function while True: try: return super(RobotAction, self).result(ACTIVE_SLEEP_RESOLUTION) except TimeoutError: pass
[docs] def wait(self): """ alias for result() """ return self.result()
def __lt__(self, other): """ Overrides the comparision operator (used by ==, !=, <, >) to first wait for the result of the future. """ return self.result().__lt__(other) def __le__(self, other): return self.result().__le__(other) def __eq__(self, other): return self.result().__eq__(other) def __ne__(self, other): return self.result().__ne__(other) def __gt__(self, other): return self.result().__gt__(other) def __ge__(self, other): return self.result().__ge__(other) def __repr__(self): return str(self.id) def __str__(self): return self.actionname + "[" + self.__repr__() + "]"
[docs]class FakeFuture: """ Used in the 'immediate' mode. """ def __init__(self, result): self._result = result
[docs] def result(self): return self._result
[docs] def wait(self): return self._result
[docs]class RobotActionExecutor(): def __init__(self): # Attention, RobotActionExecutor must be thread-safe self.futures = [] self.futures_lock = threading.Lock()
[docs] def submit(self, fn, *args, **kwargs): with self.futures_lock: self.futures = [f for f in self.futures if not f.done()] name = fn.__name__ if args and not kwargs: name += "(%s)" % ", ".join([str(a) for a in args[1:]]) # start at 1 because 0 is the robot instance elif kwargs and not args: name += "(%s)" % ", ".join(["%s=%s" % (str(k), str(v)) for k, v in kwargs.items()]) elif args and kwargs: name += "(%s, " % ", ".join([str(a) for a in args[1:]]) name += "%s)" % ", ".join(["%s=%s" % (str(k), str(v)) for k, v in kwargs.items()]) if len([f for f in self.futures if f.has_acquired_resource]) > MAX_FUTURES: raise RuntimeError("You have more than %s actions running in parallel! Likely a bug in your application logic!" % MAX_FUTURES) f = RobotAction(name) initialized = threading.Event() t = RobotActionThread(f, initialized, fn, args, kwargs) f.set_thread(weakref.ref(t)) current_action = self.get_current_action() if current_action: f.set_parent(weakref.ref(current_action)) current_action.add_subaction(weakref.ref(f)) t.start() while not initialized.is_set(): # waits for the thread to actually start pass with self.futures_lock: self.futures.append(f) return f
[docs] def get_current_action(self): """Returns the RobotAction linked to the current thread. """ thread_id = threading.current_thread().ident with self.futures_lock: for f in self.futures: if not f.done(): thread = f.thread() # weak ref if thread is not None and thread.ident == thread_id: return f logger.debug("The current thread (<%s>) is not a robot action (main thread?)" % threading.current_thread().name) return None
[docs] def cancel_all(self): """ Blocks until all the currently running actions are actually stopped. """ with self.futures_lock: for f in self.futures: if not f.done(): f.cancel() self.futures = []
[docs] def cancel_all_others(self): """ Blocks until all the currently running actions *except the calling one* are actually stopped. """ thread_id = threading.current_thread().ident with self.futures_lock: for f in self.futures: if not f.done(): thread = f.thread() # weak ref if thread is not None and thread.ident == thread_id: myself = f continue f.cancel() self.futures = [myself]
[docs] def actioninfo(self, future_id): with self.futures_lock: future = [f for f in self.futures if id(f) == future_id] if not future: return "No task with ID %s. Maybe the task is already done?" % future_id future = future[0] desc = "Task <%s>\n" % future thread = future.thread() # weak ref if thread: frame = sys._current_frames()[thread.ident] tb = traceback.extract_stack(frame, limit = 6) for f in tb: file, line, fn, instruction = f desc += " - in <%s> (l.%s of %s): %s\n" % (fn, line, os.path.basename(file), instruction) return desc else: return "Task ID %s is already done." % future_id
def __str__(self): with self.futures_lock: return "Running tasks:\n" + \ "\n".join(["Task %s (id: %s, thread: <%s>)" % (f, id(f), str(f.thread())) for f in self.futures if not f.done()])