# coding=utf-8
import logging; logger = logging.getLogger("robots.robot")
if (hasattr(logging, "NullHandler")): # python >= 2.7
logger.addHandler(logging.NullHandler())
import time
import pkgutil, sys
from functools import partial
from robots.helpers.misc import valuefilter
from robots.poses import PoseManager
from robots.introspection import introspection
from robots.events import Events
from robots.mw import * # ROS, NAOQI...
from robots.concurrency import RobotActionExecutor, ACTIVE_SLEEP_RESOLUTION
[docs]class State(dict):
__getattr__= dict.__getitem__
__setattr__= dict.__setitem__
__delattr__= dict.__delitem__
[docs]class GenericRobot(object):
""" This class manages functionalities that are shared across every robot 'backends' (ROS, Aseba,...)
You are expected to derive your own robot implementation from this class,
and it is advised to use instances of :class:`GenericRobot` within a context
manager (ie ``with MyRobot as robot: ...`` construct).
Its role comprises of:
- automatic addition of proxy methods for the robot actions
- actions execution (spawning threads for actions via ``self.executor``
- pose management through the ``robot.poses`` instance variable
- event monitoring through the ``robot.on(...).do(...)`` interface
:class:`GenericRobot` defines several important instance variables,
documented below.
:ivar state: the state vector of the robot. By default, a simple dictionary.
You can overwrite it with a custom object, but it is expected to provide a
dictionary-like interface.
:ivar poses: an instance of :class:`.PoseManager`.
:ivar executor: instance of :class:`.RobotActionExecutor`
responsible for spawning and starting threads for the robot actions. You
should not need to access it directly.
Example of a custom robot:
.. code:: python
from robots import GenericRobot
class MyRobot(GenericRobot):
def __init__(self):
super(MyRobot, self).__init__()
# create (and set) one element in the robot's state. Here a bumper.
# (by default, self.state is a dictionary. You can safely
# overwrite it with any dict-like object.
self.state["my_bumper"] = False
# do whatever other initialization you need for your robot
# Implement here all the accessors you need to talk to the robot
# low-level, like:
def send_goal(self, pose):
# move your robot using your favorite middleware
print("Starting to move towards %s" % pose)
def stop(self):
# stop your robot using your favorite middleware
print("Motion stopped")
def whatever_other_lowlevel_method_you_need(self):
#...
pass
# create actions
@action
def move_forward(robot):
#...
pass
with MyRobot() as robot:
# Turn on DEBUG logging.
# Shortcut for logging.getLogger("robots").setLevel(logging.DEBUG)
robot.debug()
# subscribe to events...
robot.whenever("my_bumper", value = True).do(move_forward)
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
pass
.. note:: A note on debugging
Several methods are there to help with debugging:
- :meth:`loglevel`: default to ``INFO``. ``logging.DEBUG`` can be useful.
- :meth:`silent`: alias for ``loglevel(logging.WARNING)``
- :meth:`info`: alias for ``loglevel(logging.INFO)``
- :meth:`debug`: alias for ``loglevel(logging.DEBUG)``
- :meth:`running`: prints the list of running tasks (with their IDs)
- :meth:`actioninfo`: give details on a given action, including the exact line being currently executed
"""
def __init__(self,
actions = None,
supports = 0,
dummy = False,
immediate = False,
configure_logging = True):
"""
:param list actions: a list of packages that contains modules with
actions (ie, modules with functions decorated with ``@action``). Proxies to
these actions are appended to the instance of GenericRobot upon
construction.
:param supports: (default: 0) a mask of middlewares the robot
supports. Supported middlewares are listed in
``robots.mw.__init__.py``. For example ``supports = ROS|POCOLIBS``
means that both ROS and Pocolibs are supported. This requires the
corresponding Python bindings to be available.
:param boolean dummy: if ``True`` (defults to ``False``), the robot is in
'dummy' mode: no actual actions are performed. The exact meaning of
'dummy' is left to the subclasses of GenericRobot.
:param boolean immediate: if ``True`` (defaults to ``False``), actions are
executed in the main thread instead of their own separate threads.
Useful for some specific debugging scenarios.
:param boolean configure_logging: if ``True`` (default), configures
a default colorized console logging handler.
"""
self.dummy = dummy
if configure_logging:
self.configure_console_logging()
# normally, start logging level for all robots at INFO (instead of
# Python's default of WARNING)
if not dummy:
self.loglevel(logging.INFO)
else:
self.loglevel(logging.DEBUG)
# initially, empty state (a state is actually a simple dictionary, with
# direct member accessors). Users are expected to override this member
self.state = State()
self.executor = RobotActionExecutor()
self.immediate = immediate
self._filteredvalues = {} # holds the filters for sensors that need filtering (like scale, IR sensors...)
self.poses = PoseManager(self)
self.events = Events(self)
# make the 'Events.on(...)' method available at robot level
self.on = self.events.on
self.whenever = self.events.whenever
## Initialization of low-level middlewares
self.mw = supports
if self.supports(ROS):
import sys, os.path
try:
import rospy
except ImportError:
logger.fatal("ROS support required, but rospy not installed! Quitting.")
sys.exit(1)
nodename = os.path.basename(sys.argv[0])
if not nodename: # robot created from Python REPL
nodename = "pyrobots_repl"
else:
# replace dot (like in 'toto.py') by underscore to be valid ROS id
nodename = nodename.replace(".","_")
logger.info("Initializing ROS node <%s>" % nodename)
rospy.init_node(nodename, disable_signals=True)
# Using ROS: automatically configure the logging to use
# ROS RX Console, but first make other handlers quieter
for i,handler in enumerate(logger.handlers):
logger.handlers[i].level=logging.WARNING
import robots.roslogger
rxconsole = robots.roslogger.RXConsoleHandler()
logging.getLogger("robot").addHandler(rxconsole)
from .poses import ROSFrames
self.rosframes = ROSFrames()
self.poses.add_frame_provider(self.rosframes)
# Dynamically add available actions (ie, actions defined with @action in
# actions/* submodules.
self.load_actions(actions)
if introspection:
introspection.ping()
[docs] def loglevel(self, level = logging.INFO):
logging.getLogger("robots").setLevel(level)
[docs] def silent(self):
self.loglevel(logging.WARNING)
[docs] def info(self):
self.loglevel(logging.INFO)
[docs] def debug(self):
self.loglevel(logging.DEBUG)
[docs] def running(self):
""" Print the list of running actions.
"""
logger.info(str(self.executor))
[docs] def actioninfo(self, id):
""" Print details on a running action (including the current line
number).
"""
logger.info(self.executor.actioninfo(id))
@staticmethod
[docs] def supports(self, middleware):
return bool(self.mw & middleware) and not self.dummy
[docs] def load_actions(self, actions):
if not actions:
logger.warning("No action packages specified when creating an instance of GenericRobot. Likely an error!")
else:
for action in self._available_actions(actions):
setattr(self, action.__name__, partial(action, self))
logger.info("Added " + action.__name__ + " as available action.")
[docs] def wait_for_state_update(self, timeout = None):
""" Blocks until the robot state has been updated.
This is highly dependent on the low-level mechanisms of your robot, and
should almost certainly be overriden in your implementation of a
GenericRobot subclass.
The default implementation simply waits ``ACTIVE_SLEEP_RESOLUTION``
seconds.
"""
time.sleep(ACTIVE_SLEEP_RESOLUTION)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
[docs] def close(self):
self.cancel_all()
self.events.close()
if self.supports(ROS):
import rospy
rospy.signal_shutdown("executive controller closing")
@staticmethod
[docs] def sleep(duration):
""" Active sleep. Must used by actions to make sure they can be quickly
cancelled.
"""
tot_time = 0
while tot_time // duration == 0:
time.sleep(ACTIVE_SLEEP_RESOLUTION)
tot_time += ACTIVE_SLEEP_RESOLUTION
time.sleep(duration % ACTIVE_SLEEP_RESOLUTION)
[docs] def wait(self, var, **kwargs):
""" Alias to wait on a given condition. Cf :class:`robots.events.Events`
for details on the acceptable conditions.
"""
self.on(var, **kwargs).wait()
[docs] def cancel_all(self):
""" Sends a 'cancel' signal (ie, the
:class:`.ActionCancelled` exception is raised) to all
running actions.
Note that, if called within a running action, this action *is cancelled
as well*. If this is not what you want, use
:meth:`cancel_all_others` instead.
Actions that are not yet started (eg, actions waiting on a resource
availability) are simply removed for the run queue.
"""
self.executor.cancel_all()
[docs] def cancel_all_others(self):
""" Sends a 'cancel' signal (ie, the
:class:`.ActionCancelled` exception is raised) to all
running actions, *except for the action that call
:meth:`cancel_all_others`* (note that its currently running subactions
*will be cancelled*).
Actions that are not yet started (eg, actions waiting on a resource
availability) are simply removed for the run queue.
"""
self.executor.cancel_all_others()
[docs] def filtered(self, name, val):
""" Helper to easily filter values (uses an accumulator to average a
given 'name' quantity)
"""
filter = self._filteredvalues.setdefault(name, valuefilter())
filter.append(val)
return filter.get()
@staticmethod
def _available_actions(pkgs):
""" Iterate over all loaded modules, and retrieve actions (ie functions
with the @action decorator).
"""
actions = []
for pkg in pkgs:
if isinstance(pkg, str):
if pkg not in sys.modules:
try:
__import__(pkg)
except ImportError:
raise RuntimeError("While collecting robot actions, I encountered an unknown module <%s>!" % pkg)
path = sys.modules[pkg].__path__
for loader, module_name, is_pkg in pkgutil.walk_packages(path):
__import__(pkg + "." + module_name)
m = sys.modules[pkg + "." + module_name]
for member in [getattr(m, fn) for fn in dir(m)]:
if hasattr(member, "_action"):
actions.append(member)
else:
# we assume a list of methods has been passed
if hasattr(pkg, "_action"):
actions.append(pkg)
return actions