diff --git a/pyluos/__init__.py b/pyluos/__init__.py index 420ab71..7bb78cc 100644 --- a/pyluos/__init__.py +++ b/pyluos/__init__.py @@ -1,6 +1,6 @@ import logging -from .device import Device +from .device import Device, map_custom_service from .services import * nh = logging.NullHandler() diff --git a/pyluos/device.py b/pyluos/device.py index c40cc79..1311613 100644 --- a/pyluos/device.py +++ b/pyluos/device.py @@ -22,6 +22,8 @@ def run_from_unittest(): return 'unittest' in sys.services +def map_custom_service(service_type, service_class): + name2mod[service_type] = service_class class contList(list): def __repr__(self): @@ -85,7 +87,6 @@ class Device(object): def __init__(self, host, IO=None, log_conf=_base_log_conf, - test_mode=False, background_task=True, *args, **kwargs): if IO is not None: @@ -109,7 +110,7 @@ def __init__(self, host, self._setup() self.logger.info('Device setup.') - self._last_update = time.time() + self._last_update = 0.0 self._running = True self._pause = False @@ -142,14 +143,12 @@ def play(self): self._pause = False def _setup(self): - self.logger.info('Sending detection signal.') - self._send({}) - time.sleep(0.01) + startTime = time.time() + retry = 0 + self.logger.info(f'Sending detection signal ({retry=})') self._send({'detection': {}}) self.logger.info('Waiting for routing table...') - startTime = time.time() state = self._poll_once() - retry = 0 while ('routing_table' not in state): if ('route_table' in state): self.logger.info("Watch out the Luos revision you are using on your board is too old to work with this revision of pyluos.\n Please consider updating Luos on your boards") @@ -160,6 +159,7 @@ def _setup(self): if retry > 5: # detection is not working sys.exit("Detection failed.") + self.logger.info(f'Sending detection signal ({retry=})') self._send({'detection': {}}) startTime = time.time() # Save routing table data @@ -180,9 +180,15 @@ def _setup(self): break # create the node self._nodes.append(AnyNode(id=node["node_id"], parent=parent_elem, connection=node["con"])) - filtered_services = contList([mod for mod in node["services"] if 'type' in mod and mod['type'] in name2mod.keys()]) + # list unrecognized services and print a warning + unrecognized_services = [mod for mod in node["services"] + if 'type' in mod and mod['type'] not in name2mod.keys()] + if (len(unrecognized_services) > 0): + self.logger.warning("Unrecognized services have been detected on node %d" % node["node_id"]) + for mod in unrecognized_services: + self.logger.warning(" - service %s of type %s" % (mod['alias'], mod['type'])) # Create a list of services in the node self._nodes[i].services = [ name2mod[mod['type']](id=mod['id'], @@ -212,21 +218,25 @@ def services(self): def nodes(self): return nodeList(self._nodes) + @property + def last_update(self) -> float: + """The last_update property.""" + return self._last_update + # Poll state from hardware. def _poll_once(self): - self._state = self._io.read() - if self._state != []: - self._state['timestamp'] = time.time() - return self._state - return [] + state = self._io.read() + if state: + state['timestamp'] = time.time() + return state def _poll_and_up(self): while self._running: if not self._pause: state = self._poll_once() - if self._state != []: + if state: self._update(state) - self._push_once() + self._push_once() else: time.sleep(0.1) @@ -260,7 +270,7 @@ def _update(self, new_state): self._freedomLink._kill(service.alias) service._kill() s += "\n* Service " + str(service.alias) + " have been excluded from the network due to no responses." - + s += "\n*************************************************************" print(s) break @@ -281,16 +291,16 @@ def _update(self, new_state): break if (self._freedomLink != None): self._freedomLink._assert(alias) - if 'services' not in new_state.keys(): + if 's' not in new_state.keys(): return - for alias, mod in new_state['services'].items(): + for alias, mod in new_state['s'].items(): if hasattr(self, alias): getattr(self, alias)._update(mod) if (self._freedomLink != None): self._freedomLink._update(alias, mod) - self._last_update = time.time() + self._last_update = float(new_state["timestamp"]) def update_cmd(self, alias, key, val): with self._cmd_lock: @@ -304,11 +314,11 @@ def update_data(self, alias, key, val, data): def _push_once(self): with self._cmd_lock: if self._cmd: - self._write(json.dumps({'services': self._cmd}).encode()) + self._write(json.dumps({'s': self._cmd}).encode()) self._cmd = defaultdict(lambda: defaultdict(lambda: None)) for cmd, binary in zip(self._cmd_data, self._binary): time.sleep(0.01) - self._write(json.dumps({'services': cmd}).encode() + '\n'.encode() + binary) + self._write(json.dumps({'s': cmd}).encode() + '\n'.encode() + binary) self._cmd_data = [] self._binary = [] diff --git a/pyluos/io/__init__.py b/pyluos/io/__init__.py index 27e3525..aebbcbf 100644 --- a/pyluos/io/__init__.py +++ b/pyluos/io/__init__.py @@ -1,5 +1,6 @@ import json import logging +from mergedeep import merge, Strategy class IOHandler(object): @@ -16,7 +17,17 @@ def is_ready(self): def read(self, trials=5): try: data = self.recv() - return self.loads(data) + if data is None: + return {} + table = data.splitlines() + if len(table) > 1: + # load the Json of each substring + jsn = [self.loads(sub_data) for sub_data in table] + # merge all the Json data + result = merge({}, *jsn, strategy=Strategy.ADDITIVE) + return result + else: + return self.loads(data) except Exception as e: logging.getLogger(__name__).debug('Msg read failed: {}'.format(str(e))) if trials == 0: diff --git a/pyluos/io/serial_io.py b/pyluos/io/serial_io.py index 19d361b..7a6fcfe 100644 --- a/pyluos/io/serial_io.py +++ b/pyluos/io/serial_io.py @@ -41,7 +41,7 @@ def is_host_compatible(cls, host): def __init__(self, host, baudrate=None): if baudrate is None: - baudrate = os.getenv('LUOS_BAUDRATE', 1000000) + baudrate = os.getenv('LUOS_BAUDRATE', 3000000) self._serial = _serial.Serial(host, baudrate) self._serial.flush() @@ -75,7 +75,7 @@ def write(self, data): def close(self): self._running = False - self._poll_loop.join() + self._poll_loop.join(timeout = 1) self._serial.close() diff --git a/pyluos/io/ws.py b/pyluos/io/ws.py index b1cd29f..b2d93b8 100644 --- a/pyluos/io/ws.py +++ b/pyluos/io/ws.py @@ -63,7 +63,7 @@ def is_ready(self): def recv(self): try: - data = self._msg.get(True, 1) + data = self._msg.get(True, 0.01) except queue.Empty: data = None return data @@ -73,7 +73,7 @@ def write(self, data): def close(self): self._running = False - self._poll_loop.join() + self._poll_loop.join(timeout = 2) self._ws.close() def _poll(self): @@ -94,6 +94,8 @@ def extract_line(s): while self._running: s = self._ws.recv() + if isinstance(s, str): + return buff = buff + s while self._running: line, buff = extract_line(buff) diff --git a/pyluos/services/load.py b/pyluos/services/load.py index 81fb655..9383381 100644 --- a/pyluos/services/load.py +++ b/pyluos/services/load.py @@ -1,4 +1,5 @@ from .service import Service +import time class Load(Service): diff --git a/pyluos/services/service.py b/pyluos/services/service.py index 2aaa225..8e9a433 100644 --- a/pyluos/services/service.py +++ b/pyluos/services/service.py @@ -44,7 +44,8 @@ def __init__(self, self._luos_revision = "Unknown" self._robus_revision = "Unknown" self._killed = False - self._last_update = [] + self._last_update = [time.time()] + self._tracked_property = "" self._luos_statistics = {} def __repr__(self): @@ -56,13 +57,21 @@ def _update(self, new_state): if not isinstance(new_state, dict): new_state = {new_state: ""} - self._last_update.append(time.time()) - if (len(self._last_update) > 1): - self.max_refresh_time = max(self.max_refresh_time, self._last_update[-1] - self._last_update[-2]) - if (self._last_update[0] < time.time() - 1.0): - while (self._last_update[0] < time.time() - 10.0): - self._last_update.pop(0) - self.refresh_freq = (len(self._last_update) / 10.0) * 0.05 + 0.95 * self.refresh_freq + # Check if we alredy have a property to track or if we didn't receive any property since 2 seconds + if (self._tracked_property == "") or (self._last_update[-1] < time.time() - 2.0): + # the property we track is void or not available anymore, we have to get one of the property received. + for key in new_state.keys(): + self._tracked_property = key + self._last_update.append(time.time()) + break + elif (self._tracked_property in new_state.keys()): + self._last_update.append(time.time()) + if (len(self._last_update) > 1): + self.max_refresh_time = max(self.max_refresh_time, self._last_update[-1] - self._last_update[-2]) + if (self._last_update[0] < time.time() - 1.0): + while (self._last_update[0] < time.time() - 10.0): + self._last_update.pop(0) + self.refresh_freq = (len(self._last_update) / 10.0) * 0.05 + 0.95 * self.refresh_freq if 'revision' in new_state.keys(): self._firmware_revision = new_state['revision'] diff --git a/pyluos/services/servoMotor.py b/pyluos/services/servoMotor.py index 29a2efa..292c1fe 100644 --- a/pyluos/services/servoMotor.py +++ b/pyluos/services/servoMotor.py @@ -62,6 +62,7 @@ def __init__(self, id, alias, device): self._target_rot_position = 0.0 self._target_trans_speed = 0.0 self._target_trans_position = 0.0 + self._target_torque = 0.0 # report modes self._rot_position = 0.0 @@ -390,6 +391,35 @@ def target_trans_position(self, s): else : self._push_value("target_trans_position", s) + # torque + @property + def target_torque(self): + if (self._config[ServoMotor._MODE_TORQUE] != True): + print("torque mode could be not enabled in the service please use 'device.service.torque_mode = True' to enable it") + return self._target_torque + + @target_torque.setter + def target_torque(self, s): + self._target_torque = s + self._push_value("target_torque", s) + + @property + def torque_mode(self): + return self._config[ServoMotor._MODE_TORQUE] + + @torque_mode.setter + def torque_mode(self, enable): + self._config[ServoMotor._MODE_TORQUE] = True if enable != 0 else False + if (enable == True) : + self._config[ServoMotor._MODE_LINEAR_POSITION] = False + self._config[ServoMotor._MODE_POWER] = False + self._config[ServoMotor._MODE_ANGULAR_POSITION] = False + self._config[ServoMotor._MODE_ANGULAR_SPEED ] = False + self._config[ServoMotor._MODE_LINEAR_SPEED] = False + self._push_value('parameters', self._convert_config()) + time.sleep(0.01) + + @property def trans_position_mode(self): return self._config[ServoMotor._MODE_LINEAR_POSITION] diff --git a/pyluos/tools/robus_broker.py b/pyluos/tools/robus_broker.py new file mode 100644 index 0000000..89cc637 --- /dev/null +++ b/pyluos/tools/robus_broker.py @@ -0,0 +1,138 @@ +from simple_websocket_server import WebSocketServer, WebSocket +import json +import argparse + + +class RobusEmulator(WebSocket): + prev_node = None + ptpa = False # Linked to prev + ptpb = False # Linked to next + ptpa_poke = False + ptpb_poke = False + next_node = None + msg_index = 0 + + def handle(self): + if isinstance(self.data, str): + # This is a PTP command + print("\nI receive : " + str(self.data) + + " from " + str(self.address)) + + # PTP emulation: + # Because PTP have been designed for real time response, the Robus algorythm is not really friendly to PTP management over WebSocket. + # This broker have to drive data in a specific way allowing to make it work. + # Robus HAL will send messages only during PTP reset state and read line. + # Read_line mean Poke. When we have this we can set the line depending on the presence of another node and save this poke state on the line + # The next reset received will need to be send to the other node. + # + # if (ptp line read (PTP up) : + # if (a node is connected) : + # send state 1 to the other node + # send state 1 back + # pass this ptp to poking + # else : + # send state 0 back + # if (PTP down and ptp is poking) : + # send state to the other node + + # PTPA + if self.data[3] == 'A': + # We get a PTPA data + + if (self.data[4] == '1'): + if (self.prev_node != None): + print("\t\tPTPB1 val sent to the node", + str(self.prev_node.address)) + self.prev_node.send_message("PTPB1") + print("\t\tPTPA1 val sent back to the node", + str(self.address)) + self.send_message("PTPA1") + self.prev_node.ptpb_poke = True + self.ptpa_poke = True + else: + print("\t\tPTPA0 val sent back to the node", + str(self.address)) + self.send_message("PTPA0") + + if (self.data[4] == '0' and self.ptpa_poke == True and self.prev_node != None): + print("\t\tPTPB0 val sent to the node", + str(self.prev_node.address)) + self.prev_node.send_message("PTPB0") + self.prev_node.ptpb_poke = False + self.ptpa_poke = False + + # PTPB + if self.data[3] == 'B': + # We get a PTPB data + + if (self.data[4] == '1'): + if (self.next_node != None): + print("\t\tPTPA1 val sent to the node", + str(self.next_node.address)) + self.next_node.send_message("PTPA1") + print("\t\tPTPB1 val sent back to the node", + str(self.address)) + self.send_message("PTPB1") + self.next_node.ptpa_poke = True + self.ptpb_poke = True + else: + print("\t\tPTPB0 val sent back to the node", + str(self.address)) + self.send_message("PTPB0") + + if (self.data[4] == '0' and self.ptpb_poke == True and self.next_node != None): + print("\t\tPTPA0 val sent to the node", + str(self.next_node.address)) + self.next_node.send_message("PTPA0") + self.next_node.ptpa_poke = False + self.ptpb_poke = False + + else: + # This is a broadcast message + print(str(self.msg_index)+" Data received from " + str(self.address)) + self.msg_index += 1 + for client in clients: + if client != self: + client.send_message(self.data) + + def connected(self): + print(self.address, 'connected') + clients.append(self) + # Save links to other nodes + if len(clients) >= 2: + self.prev_node = clients[-2] + self.prev_node.next_node = clients[-1] + print("connect PTPB of " + str(self.prev_node.address) + + " with PTPA of " + str(self.address)) + + def handle_close(self): + print(self.address, 'closed') + # Save links to other nodes + if self.next_node != None: + self.next_node.prev_node = self.prev_node + if self.prev_node != None: + self.prev_node.next_node = self.next_node + clients.remove(self) + + +## Parse arguments ## +parser = argparse.ArgumentParser(description='Robus WebSocket emulator broker\n', + formatter_class=argparse.RawTextHelpFormatter) +# General arguments +parser.add_argument("-p", "--port", metavar="PORT", action="store", + help="The port used by the websocket.\n" + "By default port = 8000.\n", + default=8000) +parser.add_argument("--ip", metavar="IP", action="store", + help="The ip used by the websocket.\n" + "By default ip = '127.0.0.1'.\n", + default='127.0.0.1') + +args = parser.parse_args() +clients = [] + +server = WebSocketServer(args.ip, args.port, RobusEmulator) +print("WebSocket Robus emulation is deprecaated since Luos_engine 3.1.0, please consider using a WebSocket network instead.\n") +print("WebSocket Robus emulation opened on " + + str(args.ip) + ":" + str(args.port)) +server.serve_forever() diff --git a/pyluos/tools/ws_broker.py b/pyluos/tools/ws_broker.py new file mode 100644 index 0000000..730e402 --- /dev/null +++ b/pyluos/tools/ws_broker.py @@ -0,0 +1,85 @@ +from simple_websocket_server import WebSocketServer, WebSocket +import json +import argparse + +PING = 0 +END = 1 +OK = 2 +NOK = 3 + +class WsBroker(WebSocket): + pinged = False + next_node = None + global pinger + + def handle(self): + if len(self.data) == 1: + if len(pinger) == 0: + # Data should be a ping + if self.data[0] != PING: + print("Error: received data is not a ping, received data is " + str(self.data[0])) + else: + pinger.append(self) + find_someone = False + # This is a ping command, find the next unpinged node and ping it + for client in clients: + if client != self and client.pinged == False: + # We have someone to ping + find_someone = True + client.pinged = True + client.send_message([PING]) + # ack the ping to the sender + self.send_message([OK]) + break + if find_someone == False: + # We have no one to ping, this branch is finished, we can send a NOK to this ping and reset the pinged state of all nodes + self.send_message([NOK]) + pinger.remove(pinger[0]) + for client in clients: + client.pinged = False + else: + # Data should be an end + if self.data[0] != END: + print("Error: received data is not an end, received data is " + str(self.data[0]) + " from " + str(self.address)) + else: + # send the end to the pinger + pinger[0].send_message([END]) + #remove the pinger + pinger.remove(pinger[0]) + else: + # This is a broadcast message + #print(str(len(self.data)) + str(" Data received from " + str(self.address))) + for client in clients: + if client != self: + client.send_message(self.data) + + def connected(self): + print(self.address, 'connected\n') + clients.append(self) + + def handle_close(self): + print(self.address, 'closed') + clients.remove(self) + + +## Parse arguments ## +parser = argparse.ArgumentParser(description='Luos_engine WebSocket network broker\n', + formatter_class=argparse.RawTextHelpFormatter) +# General arguments +parser.add_argument("-p", "--port", metavar="PORT", action="store", + help="The port used by the websocket.\n" + "By default port = 8000.\n", + default=8000) +parser.add_argument("--ip", metavar="IP", action="store", + help="The ip used by the websocket.\n" + "By default ip = '127.0.0.1'.\n", + default='127.0.0.1') + +args = parser.parse_args() +clients = [] +pinger = [] + +server = WebSocketServer(args.ip, args.port, WsBroker) +print("Luos_engine WebSocket network broker opened on " + + str(args.ip) + ":" + str(args.port)) +server.serve_forever() diff --git a/pyluos/version.py b/pyluos/version.py index 5918abd..64236b0 100644 --- a/pyluos/version.py +++ b/pyluos/version.py @@ -1 +1 @@ -version = '3.0.0' +version = '3.1.0' diff --git a/setup.py b/setup.py index fc2bfae..e7bc68b 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,9 @@ 'anytree', 'crc8', 'ipython', - 'requests' + 'requests', + 'simple_websocket_server==0.4.2', + 'mergedeep' ], extras_require={ 'tests': ['pytest', 'flake8'], @@ -44,7 +46,9 @@ 'pyluos-usb2ws = pyluos.tools.usb2ws:main', 'pyluos-bootloader = pyluos.tools.bootloader:main', 'pyluos-shell = pyluos.tools.shell:main', - 'pyluos-discover = pyluos.tools.discover:main' + 'pyluos-discover = pyluos.tools.discover:main', + 'pyluos-ws-broker = pyluos.tools.ws_broker:main', + 'pyluos-robus-broker = pyluos.tools.robus_broker:main' ], }, )