diff --git a/Communication/communicator.py b/Communication/communicator.py index 51a2b9a..508300e 100644 --- a/Communication/communicator.py +++ b/Communication/communicator.py @@ -12,6 +12,10 @@ import struct from copy import deepcopy from typing import Optional, Union +from subprocess import run, PIPE + +from thirdparty import dynamixel_sdk +from thirdparty.dynamixel.driver import DynamixelDriver logger = logging.getLogger(__name__) @@ -19,7 +23,6 @@ STJ_MAX_PACKET_SIZE = 33 STJ_MIN_PACKET_SIZE = 10 - class MiniPCCommunicationError(Exception): pass @@ -30,6 +33,18 @@ def __init__(self, **kwargs): self.dict = dict(kwargs) self.lock = threading.Lock() + def keys(self): + with self.lock: + return self.dict.keys() + + def values(self): + with self.lock: + return self.dict.values() + + def items(self): + with self.lock: + return self.dict.items + def deepcopy(self): with self.lock: return deepcopy(self.dict) @@ -58,6 +73,9 @@ def __init__(self): def start_listening(self) -> None: pass + def _listen(self, hz=200): + pass + def is_valid(self) -> bool: pass @@ -161,11 +179,11 @@ def __init__( def start_listening(self): """Start a thread to listen to the serial port.""" - self.listen_thread = threading.Thread(target=self.listen_) + self.listen_thread = threading.Thread(target=self._listen) self.listen_thread.daemon = True self.listen_thread.start() - def listen_(self, interval=0.001): + def _listen(self, hz=200): """ Listen to the serial port. @@ -179,7 +197,7 @@ def listen_(self, interval=0.001): while True: self.try_read_one() self.packet_search() - time.sleep(interval) + time.sleep(1/hz) def is_valid(self): """Return if communicator is valid.""" @@ -189,7 +207,11 @@ def is_vacuum(self): return not (self.is_valid() and bool(self.serial_port.port)) def is_alive(self): - self.serial_port.inWaiting() + port = self.get_port() + try: + self.serial_port.inWaiting() + except Exception as e: + raise MiniPCCommunicationError(f'{port} no longer exists.') from e return True def get_port(self): @@ -204,7 +226,7 @@ def try_read_one(self): """ try: self.is_alive() - except BaseException: + except Exception: return False # Read from serial port, if any packet is waiting if self.serial_port is not None: @@ -253,7 +275,7 @@ def use_uart_device_path(self, dev_path, in_use): dev = UARTCommunicator.try_uart_device(dev_path, in_use) if dev is None: if self.warn: - logger.warning("NO SERIAL DEVICE FOUND! WRITING TO VACUUM!") + logger.warning("NO SERIAL DEVICE FOUND! I/O TO VACUUM!") else: in_use += [dev.port] logger.debug(f'I ({self.__class__=}) am using {dev}.') @@ -266,7 +288,7 @@ def use_uart_device(self, dev, in_use): else: in_use += [dev.port] except AttributeError: # dev is None - logger.warning("NO SERIAL DEVICE FOUND! WRITING TO VACUUM!") + logger.warning("NO SERIAL DEVICE FOUND! I/O TO VACUUM!") finally: logger.debug(f'I ({self.__class__=}) am using {dev}.') self.serial_port = dev @@ -279,23 +301,22 @@ def list_uart_device_paths(): OSX prefix: "tty.usbmodem" Jetson / Linux prefix: "ttyUSB", "ttyACM" - Linux: look under "/dev/serial/by-id": "usb-STMicroelectronics_STM32_STLink_" + Linux: look under "/dev/serial/by-id" for "usb-STMicroelectronics_STM32_STLink_" Returns: - [Maybe dev_path] : a list of possible device paths + [Maybe dev_paths] : a list of possible device paths """ # list of possible prefixes - UART_PREFIX_LIST = ('tty.usbmodem', 'usb-STMicroelectronics_STM32_STLink_') + UART_PREFIX_LIST = ('usb-STMicroelectronics_STM32_STLink_',) dev_basename = '/dev/serial/by-id' try: - dev_list = os.listdir(dev_basename) + path_list = os.listdir(dev_basename) except FileNotFoundError: - dev_list = [] + path_list = [] - dev_paths = [] # ret val - for dev_name in dev_list: - if dev_name.startswith(UART_PREFIX_LIST): - dev_paths += [os.path.join(dev_basename, dev_name)] + dev_paths = [os.path.join(dev_basename, path) + for path in path_list + if path.startswith(UART_PREFIX_LIST)] return dev_paths or [None] # path -> [path] -> Maybe serial.Serial @@ -411,7 +432,7 @@ def try_parse_one(self, possible_packet): cmd_id = int(possible_packet[self.cfg.CMD_ID_OFFSET]) try: packet_len = self.cfg.CMD_TO_LEN[cmd_id] + self.cfg.HT_LEN - except BaseException: + except Exception: print("Incorrect CMD_ID " + str(cmd_id)) return None @@ -610,41 +631,112 @@ def get_current_stm32_state(self): return self.stm32_state.deepcopy() def read_out(self): - self.is_alive() - return self.get_current_stm32_state() + if self.is_alive(): + return self.get_current_stm32_state() class ARMCommunicator(Communicator): - def __init__(self): - pass + def __init__(self, cfg, serial_dev_path, warn=True, in_use=None,): + self.cfg = cfg + self.serial_dev_path = serial_dev_path + self.in_use = in_use + + self.warn = warn + + ids = [1, 2, 3, 4, 5, 6] + + if self.serial_dev_path is not None: + in_use += [self.serial_dev_path] + self.servo = DynamixelDriver(ids, port=serial_dev_path) + elif self.warn: + logger.warning("NO SERIAL DEVICE FOUND! I/O TO VACUUM!") + self.servo = None + logger.debug(f'I ({self.__class__=}) am using {self.servo}.') + + self.arm_state = StateDict(**{ + 'float0': 0.0, + 'float1': 0.0, + 'float2': 0.0, + 'float3': 0.0, + 'float4': 0.0, + 'float5': 0.0, + }) + + @staticmethod + def list_arm_device_paths(id_serial_short): + """Guess a port for the arm with `udevadm`. + + Note: this function is for UNIX-like systems only! + + Linux: look under "/dev/serial/by-id" + + Returns: + [Maybe dev_paths] : a list of possible device paths + """ + # Exclude UARTs + id_cmd = "udevadm info -q property '%s' | awk -F= '/^ID_SERIAL_SHORT/ { print $2 }'" + UART_PREFIX_LIST = ('usb-STMicroelectronics_STM32_STLink_',) + # list of possible prefixes + dev_basename = '/dev/serial/by-id' + try: + path_list = os.listdir(dev_basename) + except FileNotFoundError: + path_list = [] + + dev_paths = [os.path.join(dev_basename, path) + for path in path_list + if not path.startswith(UART_PREFIX_LIST) + if run(['/bin/bash', '-c', cmd % dev_path], + stdout=pipe).stdout.decode() == id_serial_short] + + return dev_paths or [None] def start_listening(self) -> None: - pass + """Start a thread to listen to the serial port.""" + self.listen_thread = threading.Thread(target=self._listen) + self.listen_thread.daemon = True + self.listen_thread.start() + + def _listen(self, hz=200) -> None: + while True: + joint_angles = servo.get_joints() + self.arm_state.update(zip(self.arm_state.keys(), joint_angles)) + time.sleep(1/hz) def is_valid(self) -> bool: - pass + return self.is_alive() def is_vacuum(self) -> bool: - pass + return self.is_alive() def is_alive(self) -> bool: - pass + port = self.get_port() + try: + if os.path.exists(port): + return True + else: + raise FileNotFoundError + except Exception as e: + if self.servo is not None: + self.servo.close() + raise MiniPCCommunicationError(f'{port} no longer exists.') from e def get_port(self) -> Optional[str]: - pass + return self.serial_dev_path def create_and_send_packet(self, cmd_id, data) -> None: - pass + return None def create_packet(self, cmd_id, data) -> Union[bytes, dict]: - pass + return None def send_packet(self, packet) -> None: - pass + return None def read_out(self) -> dict: - pass + if self.is_alive(): + return self.arm_state.deepcopy() class SPMCommunicator(Communicator): @@ -655,6 +747,9 @@ def __init__(self): def start_listening(self) -> None: pass + def _listen(self, hz=200): + pass + def is_valid(self) -> bool: pass @@ -680,22 +775,22 @@ def read_out(self) -> dict: pass -# XXX: Move tests out, leave simpler unit tests? i.e. only pingpong -# Latency test by Richard, flash example/minipc/LatencyTest.cc -# Modified by Austin. -# Tests Minipc <-> Type C board circuit time +# xxx: move tests out, leave simpler unit tests? i.e. only pingpong +# latency test by richard, flash example/minipc/latencytest.cc +# modified by austin. +# tests minipc <-> type c board circuit time def test_board_latency(uart, rounds=15, timeout=1, hz=200, listening=True, verbose=True): - print('\nCommunicator beginning minipc <-> board latency test: ' - f'{rounds} rounds at {hz} Hertz') - cmd_id = uart.cfg.SELFCHECK_CMD_ID + print('\ncommunicator beginning minipc <-> board latency test: ' + f'{rounds} rounds at {hz} hertz') + cmd_id = uart.cfg.selfcheck_cmd_id def send_packets(rounds, hz): send_time = [0] * rounds - packet_status = [False] * rounds + packet_status = [false] * rounds for i in range(rounds): - logger.debug(f'Sending packet #{i} to stm32...') - data = {'mode': 'ECHO', 'debug_int': i} + logger.debug(f'sending packet #{i} to stm32...') + data = {'mode': 'echo', 'debug_int': i} send_time[i] = time.time() uart.create_and_send_packet(cmd_id, data) @@ -705,11 +800,11 @@ def send_packets(rounds, hz): return (send_time, packet_status) - def receive_packets(rounds, timeout, listening, ret): # Async + def receive_packets(rounds, timeout, listening, ret): # async received = 0 receive_time = [0] * rounds - packet_status = [False] * rounds - # Receive loop + packet_status = [false] * rounds + # receive loop current_time = time.time() while time.time() - current_time < timeout and received != rounds: if not listening: @@ -724,28 +819,28 @@ def receive_packets(rounds, timeout, listening, ret): # Async # debug_int acts as the index if not receive_time[i]: receive_time[i] = time.time() - logger.debug(f'Received packet #{i} from stm32...') - except IndexError: + logger.debug(f'received packet #{i} from stm32...') + except indexerror: pass - time.sleep(0.001) # Use same frequency as listen_. + time.sleep(0.001) # use same frequency as _listen. for i, t in enumerate(receive_time): if t != 0: packet_status[i] = True ret[0:1] = [receive_time, packet_status] - return ret[0:1] # If not run as Thread. + return ret[0:1] # if not run as thread. - # Start the receive thread first + # start the receive thread first rt_return = [] - receive_thread = threading.Thread(target=receive_packets, + receive_thread = threading.thread(target=receive_packets, args=(rounds, timeout, listening, rt_return)) receive_thread.start() - # Send packets second + # send packets second send_time, send_packet_status = send_packets(rounds, hz) receive_thread.join() receive_time, receive_packet_status = rt_return - # Flatten data + # flatten data not_all_received = not all(receive_packet_status) # 0 if packet not received latencies = [(tf or ti) - ti @@ -753,52 +848,52 @@ def receive_packets(rounds, timeout, listening, ret): # Async statuses = [*zip(send_packet_status, receive_packet_status)] loss = latencies.count(0.0) - average_latency = sum(latencies) / (len(latencies) - loss or 1) # Prevent 0/0 + average_latency = sum(latencies) / (len(latencies) - loss or 1) # prevent 0/0 for i in range(rounds): is_sent = statuses[i][0] is_received = statuses[i][1] - logger.debug('Status of packet %d: send: %s, receive: %s' % - (i, ('UNSENT!', 'sent')[is_sent], - ('NOT RECEIVED!', 'received')[is_received])) - logger.debug(f'Latency of packet #{i}: {latencies[i]}') + logger.debug('status of packet %d: send: %s, receive: %s' % + (i, ('unsent!', 'sent')[is_sent], + ('not received!', 'received')[is_received])) + logger.debug(f'latency of packet #{i}: {latencies[i]}') - print('Attempted to send', rounds, 'packets.', - send_packet_status.count(True), 'Packets transmitted,', + print('attempted to send', rounds, 'packets.', + send_packet_status.count(True), 'packets transmitted,', rounds - loss, 'packets received.') - print(f'Packets lost: {loss}/{loss/rounds*100}%. ' - f'Average latency: {average_latency}') + print(f'packets lost: {loss}/{loss/rounds*100}%. ' + f'average latency: {average_latency}') if not_all_received: - logger.warning('Latency test: not all packets were received.') + logger.warning('latency test: not all packets were received.') return {'average': average_latency, 'loss': (loss, loss / rounds), 'detailed': [*zip(statuses, latencies)]} -# RX/TX test by YHY modified by Richard, flash example/minipc/PingPongTest.cc +# rx/tx test by yhy modified by richard, flash example/minipc/pingpongtest.cc # this ping pong test first trys to send a packet # and then attmepts to read the response from stm32 for 10 seconds # then send a second packet # after that entering ping pong mode: # receive packet from stm32, rel_pitch += 1 then immediately send back -# each "ping pong" has a ID for differentiating during pingping-ing -# TODO: This test shows the issue that a response can only be received after the data -# in circular_buffer is at least the maximum size of a packet (STJ_MAX_PACKET_SIZE). -# So if sending some small packets, +# each "ping pong" has a id for differentiating during pingping-ing +# todo: this test shows the issue that a response can only be received after the data +# in circular_buffer is at least the maximum size of a packet (stj_max_packet_size). +# so if sending some small packets, # they will stay in the circular_buffer waiting to be parsed, # until new packets are received. -# For example, if STJ_MAX_PACKET_SIZE is 21 and GIMBAL data size is 19, +# for example, if stj_max_packet_size is 21 and gimbal data size is 19, # then only after receiving 2 packets (2 * 19 > 21) # then the first packet will be parsed. -# If a data type is 10 bytes long then sending a third packet is necessary +# if a data type is 10 bytes long then sending a third packet is necessary # before pingpong -# Modified by Austin +# modified by austin def test_board_pingpong(uart, rounds=5, timeout=1, hz=2, listening=True, verbose=True): - print('\nCommunicator beginning minipc <-> board pingpong test: ' - f'{rounds} rounds at {hz} Hertz') + print('\ncommunicator beginning minipc <-> board pingpong test: ' + f'{rounds} rounds at {hz} hertz') def receive_packet(j, timeout): current_time = time.time() @@ -812,17 +907,17 @@ def receive_packet(j, timeout): i = int(received_data['debug_int']) if i == j: return True - time.sleep(0.001) # Use same frequency as listen_. + time.sleep(0.001) # use same frequency as _listen. - return False + return false def send_recv_packets(rounds, timeout, hz): sent, received = 0, 0 - cmd_id = uart.cfg.SELFCHECK_CMD_ID - flusher = uart.create_packet(cmd_id, {'mode': 'FLUSH', 'debug_int': 0}) + cmd_id = uart.cfg.selfcheck_cmd_id + flusher = uart.create_packet(cmd_id, {'mode': 'flush', 'debug_int': 0}) for i in range(rounds): - print(f'Sending packet #{i} to stm32...') - data = {'mode': 'ECHO', 'debug_int': i + 1} + print(f'sending packet #{i} to stm32...') + data = {'mode': 'echo', 'debug_int': i + 1} uart.create_and_send_packet(cmd_id, data) for _ in range(5): time.sleep(1 / 200) @@ -832,36 +927,36 @@ def send_recv_packets(rounds, timeout, hz): received_data = receive_packet(i + 1, timeout) if received_data: received += 1 - print(f'Received packet #{i}') + print(f'received packet #{i}') else: - print(f'Lost packet #{i}.') + print(f'lost packet #{i}.') time.sleep(1 / hz) return (sent, received) sent, received = send_recv_packets(rounds, timeout, hz) -# rate test by Roger modified by Richard, flash example/minipc/StressTestTypeC.cc -# Modified by Austin. -# TODO: currently this test will never receive full 1000 packets +# rate test by roger modified by richard, flash example/minipc/stresstesttypec.cc +# modified by austin. +# todo: currently this test will never receive full 1000 packets # but only 998 packets because the last two packets # remain in circular buffer and not parsed because -# its size is not reaching STJ_MAX_PACKET_SIZE -# NOTE: please reflash or restart program on stm32 every time you want to run this test -# TODO: Notify the board and use COLOR packets instead? +# its size is not reaching stj_max_packet_size +# note: please reflash or restart program on stm32 every time you want to run this test +# todo: notify the board and use color packets instead? def test_board_crc(uart, rounds=15, timeout=1, hz=200, listening=True, verbose=True): - print('\nCommunicator beginning minipc <-> board crc stress test: ' - f'{rounds} rounds at {hz} Hertz') - cmd_id = uart.cfg.SELFCHECK_CMD_ID + print('\ncommunicator beginning minipc <-> board crc stress test: ' + f'{rounds} rounds at {hz} hertz') + cmd_id = uart.cfg.selfcheck_cmd_id def send_packets(rounds, hz): - packet_status = [False] * rounds + packet_status = [false] * rounds for i in range(rounds): - logger.debug(f'Sending packet #{i} to stm32...') - data = {'mode': 'ECHO', 'debug_int': 0} + logger.debug(f'sending packet #{i} to stm32...') + data = {'mode': 'echo', 'debug_int': 0} uart.create_and_send_packet(cmd_id, data) packet_status[i] = True @@ -870,10 +965,10 @@ def send_packets(rounds, hz): return packet_status - def receive_packets(rounds, timeout, ret): # Async + def receive_packets(rounds, timeout, ret): # async received = 0 - packet_status = [False] * rounds - # Receive loop + packet_status = [false] * rounds + # receive loop current_time = time.time() while time.time() - current_time < timeout and received != rounds: if not listening: @@ -888,44 +983,44 @@ def receive_packets(rounds, timeout, ret): # Async i = int(received_data['debug_int']) if not packet_status[i]: packet_status[i] = True - logger.debug(f'Received packet #{i} from stm32...') + logger.debug(f'received packet #{i} from stm32...') received += 1 - except IndexError: + except indexerror: pass - time.sleep(0.001) # Use same frequency as listen_. + time.sleep(0.001) # use same frequency as _listen. ret[0] = packet_status - return ret[0] # If not run as Thread. + return ret[0] # if not run as thread. - # Send packets first - print('This test should be run without a listening thread. ' - 'Otherwise, expect only one packet.') + # send packets first + print('this test should be run without a listening thread. ' + 'otherwise, expect only one packet.') send_packet_status = send_packets(rounds, hz) - print(f'Packet sending test complete: sent {rounds} packets.') - print('You should see the light change from blue to green on type C board.') - print('When the led turns red the stm32 is sending data.') - print('Starting packet receiving test.') - # Start the receive thread second + print(f'packet sending test complete: sent {rounds} packets.') + print('you should see the light change from blue to green on type c board.') + print('when the led turns red the stm32 is sending data.') + print('starting packet receiving test.') + # start the receive thread second rt_return = [None] - receive_thread = threading.Thread(target=receive_packets, + receive_thread = threading.thread(target=receive_packets, args=(rounds, timeout, rt_return)) receive_thread.daemon = True receive_thread.start() receive_thread.join() receive_packet_status = rt_return[0] - # Flatten data + # flatten data not_all_received = not all(receive_packet_status) statuses = [*zip(send_packet_status, receive_packet_status)] - loss = receive_packet_status.count(False) + loss = receive_packet_status.count(false) - print(f'\nAttempted to send {rounds} packets: ' + print(f'\nattempted to send {rounds} packets: ' f'{send_packet_status.count(True)} packets transmitted, ' f'{rounds-loss} packets received.') - print(f'Packets lost: {loss}/{loss/rounds*100}%.') + print(f'packets lost: {loss}/{loss/rounds*100}%.') if not_all_received: - logger.warning('Crc test: not all packets were received.') + logger.warning('crc test: not all packets were received.') return {'loss': (loss, loss / rounds), 'detailed': statuses} @@ -933,9 +1028,9 @@ def receive_packets(rounds, timeout, ret): # Async def test_board_typea(uart, rounds=5, interval=1, verbose=True): - print('Communicator beginning minipc <-> board Type A test: ' + print('communicator beginning minipc <-> board type a test: ' f'{rounds} rounds at {interval} intervals') - # vanilla send test, flash typeA.cc + # vanilla send test, flash typea.cc cur_packet_cnt = uart.parsed_packet_cnt prv_parsed_packet_cnt = 0 for i in range(rounds): @@ -946,50 +1041,50 @@ def test_board_typea(uart, rounds=5, interval=1, if uart.parsed_packet_cnt > cur_packet_cnt: cur_packet_cnt = uart.parsed_packet_cnt time.sleep(0.001) - print("Parsed {} packets in 1 second.".format( + print("parsed {} packets in 1 second.".format( cur_packet_cnt - prv_parsed_packet_cnt)) prv_parsed_packet_cnt = cur_packet_cnt cur_time = time.time() if __name__ == '__main__': - # Unit testing + # unit testing sys.path.append(os.path.join(os.path.dirname(__file__), '..')) - from enum import Enum + from enum import enum import config - uart = UARTCommunicator(config) + uart = uartcommunicator(config) - class Test(Enum): - """Class used to choose test for communicator.""" - LATENCY = 1 - PINGPONG = 2 - CRC = 3 - TYPE_A = 4 - testing = Test.PINGPONG + class test(enum): + """class used to choose test for communicator.""" + latency = 1 + pingpong = 2 + crc = 3 + type_a = 4 + testing = test.pingpong - # Remove first arg if called with python. + # remove first arg if called with python. if 'python' in sys.argv[0]: sys.argv.pop(0) - testing = Test.PINGPONG + testing = test.pingpong if len(sys.argv) > 1: - testing = (Test.LATENCY, Test.PINGPONG, - Test.CRC, Test.TYPE_A)[int(sys.argv[1]) - 1] - print(f'\nUsing test type: {testing}') + testing = (test.latency, test.pingpong, + test.crc, test.type_a)[int(sys.argv[1]) - 1] + print(f'\nusing test type: {testing}') else: - print(f'\nUsing default test type: {testing}') - print("Change test type: ./communicator.py [1|2|3|4]") - print("1: LATENCY, 2: PINGPONG, 3: CRC, 4: TYPE_A\n") + print(f'\nusing default test type: {testing}') + print("change test type: ./communicator.py [1|2|3|4]") + print("1: latency, 2: pingpong, 3: crc, 4: type_a\n") match testing: - case Test.LATENCY: + case test.latency: test_board_latency() - case Test.PINGPONG: + case test.pingpong: test_board_pingpong() - case Test.CRC: + case test.crc: test_board_crc() - case Test.TYPE_A: + case test.type_a: test_board_typea() case _: print("Invalid selection") diff --git a/config.py b/config.py index dd877f7..8d2ca6e 100644 --- a/config.py +++ b/config.py @@ -40,6 +40,9 @@ CMD_ID_OFFSET = DATA_LENGTH_OFFSET + 1 DATA_OFFSET = CMD_ID_OFFSET + 1 +# `udevadm` short serial id of the small arm motors +ARM_ID_SERIAL_SHORT = 'FF' + # ========== DEBUGGING ========== DEBUG_DISPLAY = True diff --git a/minipc.py b/minipc.py index 8f2fbcd..0048e22 100644 --- a/minipc.py +++ b/minipc.py @@ -6,6 +6,7 @@ import serial import logging import argparse +import itertools import threading from enum import IntEnum from typing import Optional, Union, \ @@ -22,7 +23,7 @@ from Communication import communicator from Communication.communicator import Communicator, \ UARTCommunicator, ARMCommunicator, SPMCommunicator, \ - StateDict + StateDict, MiniPCCommunicationError _m = MatchAll() @@ -58,24 +59,29 @@ c_logger := logging.getLogger('Communication.communicator')] -# UART >= 0, others < 0 +# UART > 0, USB <= 0 class DeviceType(IntEnum): - UART = 0 # portless uart + UART = 1 # portless uart BRD = 2 # board + USB = 0 # portless uart SPM = -1 # spacemouse ARM = -2 # small arm UART = DeviceType.UART +USB = DeviceType.USB BRD = DeviceType.BRD SPM = DeviceType.SPM ARM = DeviceType.ARM +EMPTY = UARTCommunicator(config, serial_dev_path=False, warn=False) + # This is expected to be thread-safe. # Muting data in an (Int)Enum is not expected behavior, so: serial_devices: Dict[DeviceType, Communicator] = { - UART: UARTCommunicator(config, serial_dev_path=False, warn=False), + UART: EMPTY, + USB: EMPTY, BRD: None, SPM: None, ARM: None, @@ -83,6 +89,7 @@ class DeviceType(IntEnum): intenum_to_name = { UART: 'UART', + USB: 'USB', BRD: 'BOARD', SPM: 'SPACEMOUSE', ARM: 'ARM', @@ -96,9 +103,22 @@ class DeviceType(IntEnum): unified_state = StateDict() -def dname(intenum: DeviceType) -> str: +# [i]nt[e]num +def iename(intenum: DeviceType) -> str: return intenum_to_name[intenum] +# [c]lass +def ctype(device: Communicator) -> DeviceType: + match device: + case UARTCommunicator(): + return UART + case ARMCommunicator(): + return ARM + case SPMCommunicator(): + return SPM + case _: + return None + def get_device(dev_type: DeviceType) -> Communicator: return serial_devices[dev_type] @@ -108,7 +128,7 @@ def is_uart(dev_type: DeviceType) -> bool: return dev_type >= UART -def is_other(dev_type: DeviceType) -> bool: +def is_usb(dev_type: DeviceType) -> bool: return dev_type < UART @@ -132,7 +152,7 @@ def get_communicator( if is_uart(dev_type): return UARTCommunicator(in_use=in_use, *args, **kwargs) elif is_arm(dev_type): - return UARTCommunicator(in_use=in_use, *args, **kwargs) + return ARMCommunicator(in_use=in_use, *args, **kwargs) elif is_spm(dev_type): return UARTCommunicator(in_use=in_use, *args, **kwargs) @@ -250,19 +270,18 @@ def _id_uart(hz): # Look for devices. # `list_uart_device_paths` returns a list or [None] paths = set(UARTCommunicator.list_uart_device_paths()) - known_paths - if None not in paths: - for pth in paths - prev_paths: - try: - serial_experiments += [get_communicator( - UART, config, serial_dev_path=pth)] - logger.info( - f'Added {serial_experiments[-1]} ' - 'as identification candidate') - # Do not skip on error. - # Prevent `pth' from going into `prev_paths'. - except BaseException: - paths -= {pth} - prev_paths = paths + for pth in paths - prev_paths - {None}: + try: + serial_experiments += [get_communicator( + UART, config, serial_dev_path=pth)] + logger.info( + f'Added {serial_experiments[-1]} ' + 'as identification candidate') + # Do not skip on error. + # Prevent `pth' from going into `prev_paths'. + except Exception: + paths -= {pth} + prev_paths = paths # Poll found devices. for dev in serial_experiments.copy(): @@ -271,25 +290,79 @@ def _id_uart(hz): _receive_uart_packet_dev(dev) received_packet = _read_packet_dev(dev) dev_type = received_packet['debug_int'] - 127 + except Exception: + pass + else: + # Success. if dev_type in id_queue: - # Success. serial_experiments.remove(dev) - logger.info(f'Identified {dname(dev_type)}, ' + logger.info(f'Identified {iename(dev_type)}, ' f'pushing {dev} to listen queue.') push_to_listen_queue(dev_type, dev) - logger.info(f'Freeing {dname(dev_type)} ' + logger.info(f'Freeing {iename(dev_type)} ' 'from identification queue.') free_from_id_queue(dev_type) - except BaseException: - serial_experiments.remove(dev) - logger.info( - f'Removed {dev} as identification candidate') - delist_device(dev) time.sleep(1 / hz) - # XXX: Add when ready def _id_usb(hz): + arm_cycle = itertools.repeat(ARM) + spm_cycle = itertools.repeat(SPM) + serial_experiments = [] + paths, prev_paths = [set()] * 2 while True: + known_paths = {dev.get_port() + for dev in serial_devices.values() + if dev is not None + if not dev.is_vacuum()} + id_queue = get_id_queue() + if not any(map(is_usb, id_queue)): + for dev in serial_experiments: + delist_device(dev) + serial_experiments = [] + paths, prev_paths = [set()] * 2 + time.sleep(1 / hz) + continue + + # Look for devices. + # `list_*_device_paths` returns a list or [None] + if ARM in id_queue: + arm_paths = set( + ARMCommunicator.list_arm_device_paths( + config.ARM_ID_SERIAL_SHORT)) - known_paths + if SPM in id_queue: + spm_paths = set() - known_paths + paths = set.union(arm_paths, spm_paths) + for dt, pth in itertools.chain( + zip(arm_cycle, arm_paths-prev_paths-{None}), + zip(spm_cycle, spm_paths-prev_paths-{None})): + try: + serial_experiments += [get_communicator( + dt, config, serial_dev_path=pth)] + logger.info( + f'Added {serial_experiments[-1]} ' + 'as identification candidate') + # Do not skip on error. + # Prevent `pth' from going into `prev_paths'. + except Exception: + paths -= {pth} + prev_paths = paths + + # Check health of found devices + for dev in serial_experiments.copy(): + try: + dev_type = ctype(dev) + dev.is_alive() + except MiniPCCommunicationError: + pass + else: + # Success. + serial_experiments.remove(dev) + logger.info(f'Identified {iename(dev_type)}, ' + f'pushing {dev} to listen queue.') + push_to_listen_queue(dev_type, dev) + logger.info(f'Freeing {iename(dev_type)} ' + 'from identification queue.') + free_from_id_queue(dev_type) time.sleep(1 / hz) id_uart_thread = threading.Thread(target=_id_uart, @@ -335,18 +408,18 @@ def _queue_puller(hz): try: dev.start_listening() assign_device(dev_type, dev) - logger.info(f'{dname(dev_type)} listening as ' + logger.info(f'{iename(dev_type)} listening as ' f'{dev}.') - print(f'=> {dname(dev_type)}: ' + print(f'=> {iename(dev_type)}: ' f'{GREEN if dev else RED}{dev}{RESET}') - except BaseException: - logger.error(f'{dname(dev_type)} failed to listen as ' + except Exception: + logger.error(f'{iename(dev_type)} failed to listen as ' f'{dev}.') - logger.info(f'Returning {dname(dev_type)} ' + logger.info(f'Returning {iename(dev_type)} ' 'to the identification queue.') push_to_id_queue(dev_type) finally: - logger.info(f'Freeing {dname(dev_type)} from listen queue.') + logger.info(f'Freeing {iename(dev_type)} from listen queue.') free_from_listen_queue(dev_type) time.sleep(1 / hz) @@ -359,20 +432,20 @@ def _state_pusher(hz): try: packet = _read_packet(dev_type) unified_state.update(_read_packet(dev_type)) - except BaseException: + except MiniPCCommunicationError: logger.error('Lost connection to ' - f'{dname(dev_type)}, deassigning ' + f'{iename(dev_type)}, deassigning ' f'{get_device(dev_type)}.') deassign_device(dev_type) - logger.info(f'Pushing {dname(dev_type)} ' + logger.info(f'Pushing {iename(dev_type)} ' 'to identification queue.') push_to_id_queue(dev_type) - print(f'=> {dname(dev_type)}: ' + print(f'=> {iename(dev_type)}: ' f'{RED}{get_device(dev_type)}{RESET}') else: if dev_type not in id_queue and \ (dev_type, _m) not in listen_queue: - logger.info(f'Pushing {dname(dev_type)} ' + logger.info(f'Pushing {iename(dev_type)} ' 'to identification queue.') push_to_id_queue(dev_type) time.sleep(1 / hz) @@ -400,7 +473,7 @@ def taken_from_send_queue( dev_type, packet = head try: _send_packet(dev_type, packet) - except BaseException: + except Exception: pass time.sleep(1 / hz) @@ -410,10 +483,8 @@ def taken_from_send_queue( def startup_tests( testable: List[Tuple[Callable, DeviceType]], verb=0o10) -> None: # i.e. 0o16 => reversed('1110') will run test_board_{latency,pingpong,crc} - bit_lst = reversed(str(bin(verb))[2:]) # Remove '0b' - selected_tests = [test - for bit, test in zip(bit_lst, testable) - if bit == '1'] + bits = map(int, reversed(bin(verb)[2:])) + selected_tests = itertools.compress(testable, bits) return [action(get_device(dev_type)) for action, dev_type in selected_tests] @@ -452,7 +523,7 @@ def main(args) -> None: for dev in dev_reqlst): print('==> Not all devices required for testing attached, waiting.') for dt, dev in dt_dev_reqset: - print(f'=> {dname(dt)}: {GREEN if dev else RED}{dev}{RESET}') + print(f'=> {iename(dt)}: {GREEN if dev else RED}{dev}{RESET}') # Wait for devices to be ready. while any(get_device(dt) is None for dt in dt_reqlst): @@ -487,7 +558,7 @@ def main(args) -> None: packet = create_packet(BRD, cmd_id, data) push_to_send_queue(BRD, packet) print(unified_state.deepcopy()['floats']) - except BaseException: + except Exception: pass time.sleep(1) diff --git a/requirements.txt b/requirements.txt index 1d5beea..0b96de4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ pylint pyserial crc +numpy diff --git a/thirdparty/__init__.py b/thirdparty/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/thirdparty/dynamixel/driver.py b/thirdparty/dynamixel/driver.py index 29a707e..09cbb81 100644 --- a/thirdparty/dynamixel/driver.py +++ b/thirdparty/dynamixel/driver.py @@ -3,11 +3,11 @@ from typing import Protocol, Sequence import numpy as np -from dynamixel_sdk.group_sync_read import GroupSyncRead -from dynamixel_sdk.group_sync_write import GroupSyncWrite -from dynamixel_sdk.packet_handler import PacketHandler -from dynamixel_sdk.port_handler import PortHandler -from dynamixel_sdk.robotis_def import ( +from thirdparty.dynamixel_sdk.group_sync_read import GroupSyncRead +from thirdparty.dynamixel_sdk.group_sync_write import GroupSyncWrite +from thirdparty.dynamixel_sdk.packet_handler import PacketHandler +from thirdparty.dynamixel_sdk.port_handler import PortHandler +from thirdparty.dynamixel_sdk.robotis_def import ( COMM_SUCCESS, DXL_HIBYTE, DXL_HIWORD, diff --git a/util/matchall.py b/util/matchall.py index e172816..fc6034c 100644 --- a/util/matchall.py +++ b/util/matchall.py @@ -1,24 +1,24 @@ class MatchAll: - def __init__(self): - pass + def __init__(self, ret=True): + self.ret = ret def __eq__(self, other): - return True + return self.ret def __lt__(self, other): - return True + return self.ret def __le__(self, other): - return True + return self.ret def __eq__(self, other): - return True + return self.ret def __ne__(self, other): - return True + return self.ret def __gt__(self, other): - return True + return self.ret def __ge__(self, other): - return True + return self.ret