Source code for robots.events.events

# coding=utf-8
"""
pyRobots' events implementation
"""

import logging; logger = logging.getLogger("robots.events")
import time
import weakref

import threading # for current_thread()
from robots.concurrency import SignalingThread, ACTIVE_SLEEP_RESOLUTION

from robots.introspection import introspection

[docs]class Events: """ Exposes high-level primitives to create and cancel event monitors. :class:`robots.robot.GenericRobot` creates and holds an instance of :meth:`Events` that you can use: you should not need to instanciate yourself this class. """ def __init__(self, robot): self.robot = robot self.eventmonitors = []
[docs] def on(self, var, **kwargs): """ Creates a new :class:`EventMonitor` to watch a given event model (one shot). On the first time the event is fired, the monitor is removed. :returns: a new instance of :class:`EventMonitor` for this event. """ monitor = EventMonitor(self.robot, var, oneshot=True, **kwargs) self.eventmonitors.append(weakref.ref(monitor)) return monitor
[docs] def every(self, var, max_firing_freq = 10, blocking = True, **kwargs): """ Alias for :meth:`whenever`. """ return self.whenever(var, max_firing_freq, blocking, **kwargs)
[docs] def whenever(self, var, max_firing_freq = 10, blocking = True, **kwargs): """ Creates a new :class:`EventMonitor` to continuously watch a given event. ``var`` can either be a predicate or the name of an entry in the robot's state container (``robot.state``). In the later case, a supplementary keyword argument amongst ``value=``, ``become=``, ``above=``, ``below=``, ``increase=``, ``decrease=`` must be provided to define the behaviour of the monitor. Example: .. code-block:: python # using the robot state: robot.whenever("touch_sensor", value = True).do(on_touched) robot.whenever("sonar", below = 0.4).do(on_obstacle_near) robot.whenever("bumper", becomes = True).do(on_hit_obstacle) # using a predicate: def is_tired(robot): # do any computation you want... now = datetime.datetime.now() evening = now.replace(hour=20, minute=0, second=0, microsecond=0) return robot.state["speed"] > 1.0 and now > evening robot.whenever(is_tired).do(go_to_sleep) :param var: either a predicate (callable) or one of the key of ``robot.state``. :param max_firing_freq: set how many times pe second this event may be triggered (default to 10Hz. 0 means as many as possible). :param blocking: if ``True``, the event callback is blocking, preventing new event to be triggered until the callback has completed (defaults to ``True``). :param kwargs: the monitor behaviour (cf above) :returns: a new instance of :class:`EventMonitor` for this event. """ monitor = EventMonitor(self.robot, var, oneshot=False, max_firing_freq = max_firing_freq, blocking = blocking, **kwargs) self.eventmonitors.append(weakref.ref(monitor)) return monitor
[docs] def stop_all_monitoring(self): """ Stops all event monitoring, but do not interrupt event callbacks, if any is running. You may want to use :meth:`stop_all_monitoring` instead of :meth:`cancel_all` when you need to prevent new events of being raised *from* an event callback (:meth:`cancel_all` would interrupt this callback as well). """ for m in self.eventmonitors: monitor = m() # weakref! if monitor: monitor.stop_monitoring()
[docs] def cancel_all(self): """ Cancels all event monitors and interrupt running event callbacks (if any). """ # first, we tell *all* monitors not to trigger any events anymore # then we actually stop the monitors by interupting the callbacks # they may be processing. self.stop_all_monitoring() for m in self.eventmonitors: monitor = m() # weakref! if monitor: monitor.close()
[docs] def close(self): self.cancel_all()
[docs]class EventMonitor: VALUE = "=" BECOMES = "becomes" ABOVE = ">" BELOW = "<" INCREASE = "+=" DECREASE = "-=" def __init__(self, robot, var, value = None, becomes = None, above = None, below = None, increase = None, decrease = None, oneshot = False, max_firing_freq = 10, blocking = True): self.cbs= [] # callbacks self.robot = robot self.valid = False if not callable(var): if var not in robot.state: raise Exception("%s is neither a member of the robot's state or a predicate" % var) if robot.state[var] is None: logger.error("'%s' does not seem to be published yet! Can not create the event monitor." % var) return self.valid = True self.var = var self.oneshot = oneshot self.max_firing_freq= max_firing_freq self.blocking = blocking self.monitoring = False self.thread = None # store initial value, used by INCREASE/DECREASE modes # and last value, used by BECOMES modes if not self.robot.dummy and not callable(self.var): self.start_inc_value = self.robot.state[self.var] self.start_dec_value = self.robot.state[self.var] self.last_value = self.robot.state[self.var] if not callable(self.var): if value is not None: self.mode = EventMonitor.VALUE self.target = value elif becomes is not None: self.mode = EventMonitor.BECOMES self.target = becomes elif above is not None: self.mode = EventMonitor.ABOVE self.target = above elif below is not None: self.mode = EventMonitor.BELOW self.target = below elif increase is not None: self.mode = EventMonitor.INCREASE self.target = increase elif decrease is not None: self.mode = EventMonitor.DECREASE self.target = decrease else: raise Exception("Event created without condition!") else: self.mode = None self.target= None logger.info("Added new event monitor: %s" % self)
[docs] def do(self, cb): if not self.valid: return self if introspection: introspection.action_subscribe_event("BROKEN TDB", str(self)) # first add callback? start a thread to monitor the event! if not self.thread: self.monitoring = True self.thread = SignalingThread(target=self._monitor) self.thread.start() self.cbs.append(cb) return self # to allow for chaining
def _monitor(self): threading.current_thread().name = "Event monitor on %s" % self while self.monitoring: ok = self._wait_for_condition() if not ok: # monitoring has been interrupted! return if introspection: introspection.action_event_fired("BROKEN TDB", str(self)) for cb in self.cbs: if not self.blocking: cb(self.robot) else: cb(self.robot).wait() # after a blocking event, reset the reference values for # events INCREASE and DECREASE if not callable(self.var): self.start_inc_value = self.robot.state[self.var] self.start_dec_value = self.robot.state[self.var] if self.oneshot: logger.info("Removing event on %s" % self) return else: if self.max_firing_freq > 0: self.robot.sleep(1./self.max_firing_freq)
[docs] def stop_monitoring(self): self.monitoring = False
[docs] def close(self): if self.valid and self.thread and self.thread.is_alive: self.thread.cancel() self.thread.join()
def _check_condition(self, val): ok = False if self.mode == EventMonitor.VALUE and val == self.target: ok = True elif self.mode == EventMonitor.BECOMES and self.last_value != val and val == self.target: ok = True elif self.mode == EventMonitor.ABOVE and val > self.target: ok = True elif self.mode == EventMonitor.BELOW and val < self.target: ok = True elif self.mode == EventMonitor.INCREASE and val > (self.start_inc_value + self.target): self.start_inc_value = val ok = True elif self.mode == EventMonitor.DECREASE and val < (self.start_dec_value - self.target): self.start_dec_value = val ok = True # TODO: could be improved with a bit of hysteris filtering if val > self.start_dec_value: self.start_dec_value = val if val < self.start_inc_value: self.start_inc_value = val self.last_value = val return ok def _wait_for_condition(self): import time if not self.robot.dummy: # predicate-based event if callable(self.var): if not self.monitoring: logger.info("<%s> not monitored anymore" % str(self)) return False while not self.var(self.robot): time.sleep(ACTIVE_SLEEP_RESOLUTION) # state-based event else: if self.var not in self.robot.state: # value not yet read from the robot. logger.warning("Waiting for %s to be published by the robot..." % self.var) while not self.var in self.robot.state: self.robot.wait_for_state_update(2) while not self._check_condition(self.robot.state[self.var]): if not self.monitoring: logger.info("<%s> not monitored anymore" % str(self)) return False self.robot.wait_for_state_update(ACTIVE_SLEEP_RESOLUTION) else: #dummy mode. Wait a little bit, and assume the condition is true time.sleep(0.2) logger.info("%s is true" % str(self) + (" (dummy mode)" if self.robot.dummy else "")) return True
[docs] def wait(self): """ Blocks until an event occurs. """ if introspection: introspection.action_waiting("BROKEN TDB", str(self)) self._wait_for_condition() if introspection: introspection.action_waiting_over("BROKEN TDB")
def __str__(self): return "condition <%s %s %s>"% (self.var, self.mode, self.target)