Source code for commanduino.commanddevices.commanddevice

"""

.. module:: commanddevice
   :platform: Unix
   :synopsis: Forms the template of the different Arduino devices.

.. moduleauthor:: Jonathan Grizou <Jonathan.Grizou@gla.ac.uk>

"""
from ..commandhandler import CommandHandler
from ..lock import Lock

from .._logger import create_logger

#Defualt timeout value
DEFAULT_TIMEOUT = 1

#Bonjour Information
BONJOUR_ID = 'TEMPLATE'
CLASS_NAME = 'CommandDevice'


[docs]class DeviceTimeOutError(Exception): """ Exception for when the device does not response within a set timeframe. Args: device_name (str): Name of the device. elapsed (float): Time elapsed until the exception was thrown. """ def __init__(self, device_name, elapsed): self.device_name = device_name self.elapsed = elapsed def __str__(self): return '{device_name} did not respond within {elapsed}s'.format(device_name=self.device_name, elapsed=round(self.elapsed, 3))
[docs]class CommandTimeOutError(Exception): """ Exception for when the device does not respond to a command after a set timeframe. Args: device_name (str): The name of the device. command_name (str): The name of the command. elapsed (float): Time elapsed until the exception was thrown. """ def __init__(self, device_name, command_name, elapsed): self.device_name = device_name self.elapsed = elapsed self.command_name = command_name def __str__(self): return '{device_name} did not respond to {command_name} within {elapsed}s'.format(device_name=self.device_name, command_name=self.command_name, elapsed=round(self.elapsed, 3))
[docs]class CommandDevice(object): """ Base class to represent the different Arduino devices. """ def __init__(self): self.logger = create_logger(self.__class__.__name__) self.cmdHdl = CommandHandler() self.cmdHdl.add_default_handler(self.unrecognized)
[docs] def init(self): """ .. note:: This function is called once the write function is set. Do your setup here by sending command to the devices """ pass
@classmethod
[docs] def from_config(cls, config): """ Obtains the device information from a configuration setup. Returns: CommandDevice: A new instance of CommandDevice with details set from the configuration. """ return cls(**config)
[docs] def handle_command(self, cmd): """ Handles a command to the device. Args: cmd (str): The command to handle. """ self.cmdHdl.handle(cmd)
[docs] def set_command_header(self, cmdHeader): """ Sets the command header. Args: cmdHeader (str): The command header to be set. """ self.cmdHdl.set_command_header(cmdHeader)
[docs] def set_write_function(self, write_func): """ Sets the write function for the device. Args: write_func (str): The write function to be set. """ self.write = write_func
[docs] def send(self, command_id, *arg): """ Sends a command to/from the device. Args: command_id (str): The ID of the command. *arg: Variable argument. """ self.write(self.cmdHdl.forge_command(command_id, *arg))
[docs] def unrecognized(self, cmd): """ The supplied command is unrecognised. Args: cmd (str): The supplied command. """ self.logger.warning('Received unknown command "{}"'.format(cmd))
[docs] def register_request(self, request_command, answer_command, variable_name, callback_function_for_variable_update, variable_init_value=None, timeout=DEFAULT_TIMEOUT): """ Registers a new request to/from the device. Args: request_command (str): The requesting command. answer_command (str): The answering command. variable_name (str): The name of the variable. callback_function_for_variable_update (str): The callback function for updating the variable. variable_init_value: Initialisation value for the variable, default set to None. timeout (float): Time to wait until timeout, default set to DEFAULT_TIMEOUT (1) """ setattr(self, variable_name, variable_init_value) lock_variable_name = variable_name + '_lock' setattr(self, lock_variable_name, Lock(timeout)) self.cmdHdl.add_command(answer_command, callback_function_for_variable_update) request_function_name = 'request_' + variable_name def request(): """ Sends the request command to/from device. """ self.send(request_command) setattr(self, request_function_name, request) get_function_name = 'get_' + variable_name def get(): """ Gets the variable name. Returns: variable_name (str): Name of the variable. Raises: CommandTimeOutError: Device did not response to command after X time. """ variable_lock = getattr(self, lock_variable_name) variable_lock.acquire() getattr(self, request_function_name)() is_valid, elapsed = variable_lock.wait_until_released() variable_lock.ensure_released() if is_valid: return getattr(self, variable_name) else: raise CommandTimeOutError(self.cmdHdl.cmd_header, request_command, elapsed) setattr(self, get_function_name, get)