From 05a4ff6229c062dad26de26711666172a1b98e3b Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 07:53:28 -0500 Subject: [PATCH 01/17] Pass logger to print_message() helper function. --- python/fusion_engine_client/applications/p1_print.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/fusion_engine_client/applications/p1_print.py b/python/fusion_engine_client/applications/p1_print.py index 5f0fc1f1..c8ef77a2 100755 --- a/python/fusion_engine_client/applications/p1_print.py +++ b/python/fusion_engine_client/applications/p1_print.py @@ -36,7 +36,10 @@ def add_print_format_argument(parser, *arg_names): "- oneline-binary-payload - Like `oneline-binary`, but exclude the message header from the binary") -def print_message(header, contents, offset_bytes=None, format='pretty', bytes=None): +def print_message(header, contents, offset_bytes=None, format='pretty', bytes=None, logger=None): + if logger is None: + logger = _logger + if format == 'binary': if bytes is None: raise ValueError('No data provided for binary format.') @@ -89,7 +92,7 @@ def print_message(header, contents, offset_bytes=None, format='pretty', bytes=No byte_string = ' ' + bytes_to_hex(bytes, bytes_per_row=16, bytes_per_col=2).replace('\n', '\n ') parts.insert(1, byte_string) - _logger.info('\n'.join(parts)) + logger.info('\n'.join(parts)) def main(): From c4141b34f93049561cf1026ff3d2f7356b96a94e Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 07:53:45 -0500 Subject: [PATCH 02/17] Added print_summary_table() helper function. --- .../applications/p1_print.py | 53 +++++++++++++------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/python/fusion_engine_client/applications/p1_print.py b/python/fusion_engine_client/applications/p1_print.py index c8ef77a2..2b5d0acf 100755 --- a/python/fusion_engine_client/applications/p1_print.py +++ b/python/fusion_engine_client/applications/p1_print.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from typing import Dict + from collections import defaultdict import sys @@ -95,6 +97,37 @@ def print_message(header, contents, offset_bytes=None, format='pretty', bytes=No logger.info('\n'.join(parts)) +class MessageStatsEntry: + def __init__(self): + self.count = 0 + self.total_bytes = 0 + + def update(self, header: MessageHeader, message: MessagePayload): + self.count += 1 + self.total_bytes = header.get_message_size() + + +def print_summary_table(message_stats: Dict[MessageType, MessageStatsEntry], logger=None): + if logger is None: + logger = _logger + + format_string = '| {:<50} | {:>5} | {:>8} |' + logger.info(format_string.format('Message Name', 'Type', 'Count')) + logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) + total_messages = 0 + for type, entry in sorted(message_stats.items(), key=lambda x: int(x[0])): + if type in message_type_to_class: + name = message_type_to_class[type].__name__ + elif type.is_unrecognized(): + name = str(type) + else: + name = f'Unsupported ({str(type)})' + logger.info(format_string.format(name, int(type), entry.count)) + total_messages += entry.count + logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) + logger.info(format_string.format('Total', '', total_messages)) + + def main(): parser = ArgumentParser(description="""\ Decode and print the contents of messages contained in a *.p1log file or other @@ -240,8 +273,7 @@ def main(): total_messages = 0 bytes_decoded = 0 - def create_stats_entry(): return {'count': 0} - message_stats = defaultdict(create_stats_entry) + message_stats = defaultdict(MessageStatsEntry) try: for header, message, data, offset_bytes in reader: total_decoded_messages += 1 @@ -255,7 +287,7 @@ def create_stats_entry(): return {'count': 0} bytes_decoded += len(data) if options.summary: entry = message_stats[header.message_type] - entry['count'] += 1 + entry.update(header, message) if message is not None: p1_time = message.get_p1_time() @@ -321,20 +353,7 @@ def create_stats_entry(): return {'count': 0} _logger.info('Total data read: %d B' % reader.get_bytes_read()) _logger.info('Selected data size: %d B' % bytes_decoded) _logger.info('') - - format_string = '| {:<50} | {:>5} | {:>8} |' - _logger.info(format_string.format('Message Name', 'Type', 'Count')) - _logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) - for type, info in sorted(message_stats.items(), key=lambda x: int(x[0])): - if type in message_type_to_class: - name = message_type_to_class[type].__name__ - elif type.is_unrecognized(): - name = str(type) - else: - name = f'Unsupported ({str(type)})' - _logger.info(format_string.format(name, int(type), info['count'])) - _logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) - _logger.info(format_string.format('Total', '', total_messages)) + print_summary_table(message_stats) elif total_messages == 0: _logger.warning('No valid FusionEngine messages found.') From 4edae40cb33a9377e3ffaea0b2a36e70f80512f3 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 07:56:01 -0500 Subject: [PATCH 03/17] Configure/use logging in serial_client example app. --- python/examples/serial_client.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/python/examples/serial_client.py b/python/examples/serial_client.py index 3b0d813b..57f7448f 100755 --- a/python/examples/serial_client.py +++ b/python/examples/serial_client.py @@ -40,6 +40,8 @@ help="The path to a file where incoming data will be stored.") parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', help="Do not print anything to the console.") + parser.add_argument('-v', '--verbose', action='count', default=0, + help="Print verbose/trace debugging messages.") parser.add_argument('port', help="The serial device to use (e.g., /dev/ttyUSB0, COM1)") @@ -48,9 +50,19 @@ if options.quiet: options.display = False - logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s:%(lineno)d - %(message)s', stream=sys.stdout) + # Configure logging. + if options.verbose >= 1: + logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s', + stream=sys.stdout) + if options.verbose == 1: + logging.getLogger('point_one.fusion_engine').setLevel(logging.DEBUG) + else: + logging.getLogger('point_one.fusion_engine').setLevel( + logging.getTraceLevel(depth=options.verbose - 1)) + else: + logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout) + logger = logging.getLogger('point_one.fusion_engine') - logger.setLevel(logging.INFO) # Open the output file if logging was requested. if options.output is not None: @@ -84,11 +96,11 @@ if not options.quiet: now = datetime.now() if (now - last_print_time).total_seconds() > 5.0: - print('Status: [bytes_received=%d, messages_received=%d elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) + logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % + (bytes_received, messages_received, (now - start_time).total_seconds())) last_print_time = now except serial.SerialException as e: - print('Unexpected error reading from device:\r%s' % str(e)) + logger.error('Unexpected error reading from device:\r%s' % str(e)) break except KeyboardInterrupt: break @@ -113,7 +125,7 @@ output_file.write(raw_data) if options.display: - print_message(header, message, format=options.display_format) + print_message(header, message, format=options.display_format, logger=logger) # Close the serial port. port.close() @@ -125,5 +137,5 @@ if not options.quiet: now = datetime.now() elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 - print('Status: [bytes_received=%d, messages_received=%d elapsed_time=%d sec]' % - (bytes_received, messages_received, elapsed_sec)) + logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % + (bytes_received, messages_received, (now - start_time).total_seconds())) From 9c78f6ffbc8db517c7615c9234b559271ab4a830 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 07:57:10 -0500 Subject: [PATCH 04/17] Added single _print_status() call. --- python/examples/serial_client.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/examples/serial_client.py b/python/examples/serial_client.py index 57f7448f..32030a61 100755 --- a/python/examples/serial_client.py +++ b/python/examples/serial_client.py @@ -87,6 +87,9 @@ messages_received = 0 start_time = datetime.now() last_print_time = start_time + def _print_status(now): + logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % + (bytes_received, messages_received, (now - start_time).total_seconds())) while True: # Read some data. try: @@ -96,8 +99,7 @@ if not options.quiet: now = datetime.now() if (now - last_print_time).total_seconds() > 5.0: - logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) + _print_status(now) last_print_time = now except serial.SerialException as e: logger.error('Unexpected error reading from device:\r%s' % str(e)) @@ -137,5 +139,4 @@ if not options.quiet: now = datetime.now() elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 - logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) + _print_status(now) From c7077579709a73c9ec5c80eb322b59ac3ae33a3f Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 07:57:41 -0500 Subject: [PATCH 05/17] Set serial read timeout so status prints if there's no incoming data. --- python/examples/serial_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/examples/serial_client.py b/python/examples/serial_client.py index 32030a61..672ee760 100755 --- a/python/examples/serial_client.py +++ b/python/examples/serial_client.py @@ -79,7 +79,7 @@ generating_p1log = (output_file is not None and options.format == 'p1log') # Connect to the device. - port = serial.Serial(port=options.port, baudrate=options.baud) + port = serial.Serial(port=options.port, baudrate=options.baud, timeout=1.0) # Listen for incoming data. decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet, return_bytes=True) From 5637761b236be5d683d1d861d537b4e178da7b16 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 08:00:57 -0500 Subject: [PATCH 06/17] Handle keyboard interrupts during log prints gracefully. --- python/examples/serial_client.py | 76 +++++++++++++++++--------------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/python/examples/serial_client.py b/python/examples/serial_client.py index 672ee760..e80f85ae 100755 --- a/python/examples/serial_client.py +++ b/python/examples/serial_client.py @@ -90,44 +90,48 @@ def _print_status(now): logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % (bytes_received, messages_received, (now - start_time).total_seconds())) - while True: - # Read some data. - try: - received_data = port.read(1024) - bytes_received += len(received_data) + try: + while True: + # Read some data. + try: + received_data = port.read(1024) + bytes_received += len(received_data) - if not options.quiet: now = datetime.now() - if (now - last_print_time).total_seconds() > 5.0: - _print_status(now) - last_print_time = now - except serial.SerialException as e: - logger.error('Unexpected error reading from device:\r%s' % str(e)) - break - except KeyboardInterrupt: - break - - # If logging in raw format, write the data to disk as is. - if generating_raw_log: - output_file.write(received_data) - - # Decode the incoming data and print the contents of any complete messages. - # - # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: - # - So that we get a count of the number of incoming messages - # - So we print warnings if the CRC fails on any of the incoming data - # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any - # non-FusionEngine data in the stream - messages = decoder.on_data(received_data) - messages_received += len(messages) - - if options.display or generating_p1log: - for (header, message, raw_data) in messages: - if generating_p1log: - output_file.write(raw_data) - - if options.display: - print_message(header, message, format=options.display_format, logger=logger) + if not options.quiet: + if (now - last_print_time).total_seconds() > print_timeout_sec: + _print_status(now) + last_print_time = now + except serial.SerialException as e: + logger.error('Unexpected error reading from device:\r%s' % str(e)) + break + + # If logging in raw format, write the data to disk as is. + if generating_raw_log: + output_file.write(received_data) + + # Decode the incoming data and print the contents of any complete messages. + # + # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: + # - So that we get a count of the number of incoming messages + # - So we print warnings if the CRC fails on any of the incoming data + # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any + # non-FusionEngine data in the stream + messages = decoder.on_data(received_data) + messages_received += len(messages) + + if options.display or generating_p1log: + for (header, message, raw_data) in messages: + entry = message_stats[header.message_type] + entry.update(header, message) + + if generating_p1log: + output_file.write(raw_data) + + if options.display: + print_message(header, message, format=options.display_format, logger=logger) + except KeyboardInterrupt: + pass # Close the serial port. port.close() From 78f796a724675feeb72f0e89fbeed0dc21252358 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 08:01:16 -0500 Subject: [PATCH 07/17] Added --summary mode to serial_client example app. --- python/examples/serial_client.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/python/examples/serial_client.py b/python/examples/serial_client.py index e80f85ae..999e5bca 100755 --- a/python/examples/serial_client.py +++ b/python/examples/serial_client.py @@ -1,9 +1,12 @@ #!/usr/bin/env python3 +from collections import defaultdict from datetime import datetime import os import sys +import colorama + try: import serial except ImportError: @@ -15,7 +18,8 @@ root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, root_dir) -from fusion_engine_client.applications.p1_print import add_print_format_argument, print_message +from fusion_engine_client.applications.p1_print import \ + MessageStatsEntry, add_print_format_argument, print_message, print_summary_table from fusion_engine_client.parsers import FusionEngineDecoder from fusion_engine_client.utils import trace as logging from fusion_engine_client.utils.argument_parser import ArgumentParser @@ -40,6 +44,8 @@ help="The path to a file where incoming data will be stored.") parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', help="Do not print anything to the console.") + parser.add_argument('-s', '--summary', action='store_true', + help="Print a summary of the incoming messages instead of the message content.") parser.add_argument('-v', '--verbose', action='count', default=0, help="Print verbose/trace debugging messages.") @@ -82,14 +88,23 @@ port = serial.Serial(port=options.port, baudrate=options.baud, timeout=1.0) # Listen for incoming data. - decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet, return_bytes=True) + decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) bytes_received = 0 messages_received = 0 + message_stats = defaultdict(MessageStatsEntry) start_time = datetime.now() last_print_time = start_time + print_timeout_sec = 1.0 if options.summary else 5.0 + def _print_status(now): + if options.summary: + # Clear the terminal. + print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='') logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % (bytes_received, messages_received, (now - start_time).total_seconds())) + if options.summary: + print_summary_table(message_stats, logger=logger) + try: while True: # Read some data. @@ -129,7 +144,11 @@ def _print_status(now): output_file.write(raw_data) if options.display: - print_message(header, message, format=options.display_format, logger=logger) + if options.summary: + if (now - last_print_time).total_seconds() > 0.1: + _print_status(now) + else: + print_message(header, message, format=options.display_format, logger=logger) except KeyboardInterrupt: pass @@ -140,7 +159,7 @@ def _print_status(now): if output_file is not None: output_file.close() - if not options.quiet: + if not options.quiet and not options.summary: now = datetime.now() elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 _print_status(now) From b0df1870886a8e9498dc539ae926b1bc696f9f58 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 08:23:58 -0500 Subject: [PATCH 08/17] Refactored TCP/serial clients to use a common implementation. These apps are identical except for the type of transport they use. --- python/examples/client_implementation.py | 160 +++++++++++++++++++++++ python/examples/serial_client.py | 142 ++------------------ python/examples/tcp_client.py | 99 ++------------ 3 files changed, 176 insertions(+), 225 deletions(-) create mode 100644 python/examples/client_implementation.py diff --git a/python/examples/client_implementation.py b/python/examples/client_implementation.py new file mode 100644 index 00000000..fb4b6494 --- /dev/null +++ b/python/examples/client_implementation.py @@ -0,0 +1,160 @@ +from collections import defaultdict +from datetime import datetime +import os +import socket +import sys + +import colorama + +try: + # pySerial is optional. + import serial +except ImportError: + # Dummy stand-in if pySerial is not installed. + class serial: + class SerialException: pass + +# Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports +# if this application is being run directly out of the repository and is not installed as a pip package. +root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.insert(0, root_dir) + +from fusion_engine_client.applications.p1_print import \ + MessageStatsEntry, add_print_format_argument, print_message, print_summary_table +from fusion_engine_client.parsers import FusionEngineDecoder +from fusion_engine_client.utils import trace as logging +from fusion_engine_client.utils.argument_parser import ArgumentParser + + +def define_arguments(parser): + add_print_format_argument(parser, '--display-format') + parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw'), + help="The format of the file to be generated when --output is enabled." + "If 'p1log' (default), create a *.p1log file containing only FusionEngine messages." + "If 'raw', create a generic binary file containing all incoming data.") + parser.add_argument('-n', '--no-display', dest='display', action='store_false', + help="Do not display the incoming message contents.") + parser.add_argument('-o', '--output', type=str, + help="The path to a file where incoming data will be stored.") + parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', + help="Do not print anything to the console.") + parser.add_argument('-s', '--summary', action='store_true', + help="Print a summary of the incoming messages instead of the message content.") + parser.add_argument('-v', '--verbose', action='count', default=0, + help="Print verbose/trace debugging messages.") + + +def configure_logging(options): + if options.verbose >= 1: + logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s', + stream=sys.stdout) + if options.verbose == 1: + logging.getLogger('point_one.fusion_engine').setLevel(logging.DEBUG) + else: + logging.getLogger('point_one.fusion_engine').setLevel( + logging.getTraceLevel(depth=options.verbose - 1)) + else: + logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout) + + +def run_client(options, transport): + configure_logging(options) + logger = logging.getLogger('point_one.fusion_engine') + + if options.quiet: + options.display = False + + # Open the output file if logging was requested. + if options.output is not None: + if options.format == 'p1log': + p1i_path = os.path.splitext(options.output)[0] + '.p1i' + if os.path.exists(p1i_path): + os.remove(p1i_path) + + output_file = open(options.output, 'wb') + else: + output_file = None + + generating_raw_log = (output_file is not None and options.format == 'raw') + generating_p1log = (output_file is not None and options.format == 'p1log') + + # Listen for incoming data. + decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) + bytes_received = 0 + messages_received = 0 + message_stats = defaultdict(MessageStatsEntry) + start_time = datetime.now() + last_print_time = start_time + print_timeout_sec = 1.0 if options.summary else 5.0 + + def _print_status(now): + if options.summary: + # Clear the terminal. + print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='') + logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % + (bytes_received, messages_received, (now - start_time).total_seconds())) + if options.summary: + print_summary_table(message_stats, logger=logger) + + try: + while True: + # Read some data. + try: + if isinstance(transport, socket.socket): + received_data = transport.recv(1024) + else: + received_data = transport.read(1024) + + bytes_received += len(received_data) + + now = datetime.now() + if not options.quiet: + if (now - last_print_time).total_seconds() > print_timeout_sec: + _print_status(now) + last_print_time = now + except serial.SerialException as e: + logger.error('Unexpected error reading from device:\r%s' % str(e)) + break + + # If logging in raw format, write the data to disk as is. + if generating_raw_log: + output_file.write(received_data) + + # Decode the incoming data and print the contents of any complete messages. + # + # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: + # - So that we get a count of the number of incoming messages + # - So we print warnings if the CRC fails on any of the incoming data + # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any + # non-FusionEngine data in the stream + messages = decoder.on_data(received_data) + messages_received += len(messages) + + if options.display or generating_p1log: + for (header, message, raw_data) in messages: + entry = message_stats[header.message_type] + entry.update(header, message) + + if generating_p1log: + output_file.write(raw_data) + + if options.display: + if options.summary: + if (now - last_print_time).total_seconds() > 0.1: + _print_status(now) + else: + print_message(header, message, format=options.display_format, logger=logger) + except KeyboardInterrupt: + pass + + # Close the transport. + transport.close() + + # Close the output file. + if output_file is not None: + output_file.close() + + if not options.quiet and not options.summary: + now = datetime.now() + elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 + _print_status(now) diff --git a/python/examples/serial_client.py b/python/examples/serial_client.py index 999e5bca..c7a0510b 100755 --- a/python/examples/serial_client.py +++ b/python/examples/serial_client.py @@ -1,12 +1,8 @@ #!/usr/bin/env python3 -from collections import defaultdict -from datetime import datetime import os import sys -import colorama - try: import serial except ImportError: @@ -18,148 +14,26 @@ root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, root_dir) -from fusion_engine_client.applications.p1_print import \ - MessageStatsEntry, add_print_format_argument, print_message, print_summary_table -from fusion_engine_client.parsers import FusionEngineDecoder -from fusion_engine_client.utils import trace as logging from fusion_engine_client.utils.argument_parser import ArgumentParser +from examples.client_implementation import define_arguments, run_client + if __name__ == "__main__": + # Parse command-line arguments. parser = ArgumentParser(description="""\ -Connect to an Point One device over serial and print out the incoming message +Connect to a Point One device over serial and print out the incoming message contents and/or log the messages to disk. """) - + define_arguments(parser) parser.add_argument('-b', '--baud', type=int, default=460800, help="The serial baud rate to be used.") - add_print_format_argument(parser, '--display-format') - parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw'), - help="The format of the file to be generated when --output is enabled." - "If 'p1log' (default), create a *.p1log file containing only FusionEngine messages." - "If 'raw', create a generic binary file containing all incoming data.") - parser.add_argument('-n', '--no-display', dest='display', action='store_false', - help="Do not display the incoming message contents.") - parser.add_argument('-o', '--output', type=str, - help="The path to a file where incoming data will be stored.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', - help="Do not print anything to the console.") - parser.add_argument('-s', '--summary', action='store_true', - help="Print a summary of the incoming messages instead of the message content.") - parser.add_argument('-v', '--verbose', action='count', default=0, - help="Print verbose/trace debugging messages.") - parser.add_argument('port', help="The serial device to use (e.g., /dev/ttyUSB0, COM1)") options = parser.parse_args() - if options.quiet: - options.display = False - - # Configure logging. - if options.verbose >= 1: - logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s', - stream=sys.stdout) - if options.verbose == 1: - logging.getLogger('point_one.fusion_engine').setLevel(logging.DEBUG) - else: - logging.getLogger('point_one.fusion_engine').setLevel( - logging.getTraceLevel(depth=options.verbose - 1)) - else: - logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout) - - logger = logging.getLogger('point_one.fusion_engine') - - # Open the output file if logging was requested. - if options.output is not None: - if options.format == 'p1log': - p1i_path = os.path.splitext(options.output)[0] + '.p1i' - if os.path.exists(p1i_path): - os.remove(p1i_path) - - output_file = open(options.output, 'wb') - else: - output_file = None - - generating_raw_log = (output_file is not None and options.format == 'raw') - generating_p1log = (output_file is not None and options.format == 'p1log') - # Connect to the device. - port = serial.Serial(port=options.port, baudrate=options.baud, timeout=1.0) - - # Listen for incoming data. - decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) - bytes_received = 0 - messages_received = 0 - message_stats = defaultdict(MessageStatsEntry) - start_time = datetime.now() - last_print_time = start_time - print_timeout_sec = 1.0 if options.summary else 5.0 - - def _print_status(now): - if options.summary: - # Clear the terminal. - print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='') - logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) - if options.summary: - print_summary_table(message_stats, logger=logger) - - try: - while True: - # Read some data. - try: - received_data = port.read(1024) - bytes_received += len(received_data) - - now = datetime.now() - if not options.quiet: - if (now - last_print_time).total_seconds() > print_timeout_sec: - _print_status(now) - last_print_time = now - except serial.SerialException as e: - logger.error('Unexpected error reading from device:\r%s' % str(e)) - break - - # If logging in raw format, write the data to disk as is. - if generating_raw_log: - output_file.write(received_data) - - # Decode the incoming data and print the contents of any complete messages. - # - # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: - # - So that we get a count of the number of incoming messages - # - So we print warnings if the CRC fails on any of the incoming data - # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any - # non-FusionEngine data in the stream - messages = decoder.on_data(received_data) - messages_received += len(messages) - - if options.display or generating_p1log: - for (header, message, raw_data) in messages: - entry = message_stats[header.message_type] - entry.update(header, message) - - if generating_p1log: - output_file.write(raw_data) - - if options.display: - if options.summary: - if (now - last_print_time).total_seconds() > 0.1: - _print_status(now) - else: - print_message(header, message, format=options.display_format, logger=logger) - except KeyboardInterrupt: - pass - - # Close the serial port. - port.close() - - # Close the output file. - if output_file is not None: - output_file.close() + transport = serial.Serial(port=options.port, baudrate=options.baud, timeout=1.0) - if not options.quiet and not options.summary: - now = datetime.now() - elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 - _print_status(now) + # Now run the client to listen for incoming data and decode/print the received message contents. + run_client(options, transport) diff --git a/python/examples/tcp_client.py b/python/examples/tcp_client.py index 8e3ce8cb..d99323b9 100755 --- a/python/examples/tcp_client.py +++ b/python/examples/tcp_client.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -from datetime import datetime import os import socket import sys @@ -10,108 +9,26 @@ root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, root_dir) -from fusion_engine_client.applications.p1_print import add_print_format_argument, print_message -from fusion_engine_client.parsers import FusionEngineDecoder from fusion_engine_client.utils.argument_parser import ArgumentParser +from examples.client_implementation import define_arguments, run_client + if __name__ == "__main__": parser = ArgumentParser(description="""\ -Connect to an Point One device over TCP and print out the incoming message +Connect to a Point One device over TCP and print out the incoming message contents and/or log the messages to disk. """) - - add_print_format_argument(parser, '--display-format') - parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw'), - help="The format of the file to be generated when --output is enabled." - "If 'p1log' (default), create a *.p1log file containing only FusionEngine messages." - "If 'raw', create a generic binary file containing all incoming data.") - parser.add_argument('-n', '--no-display', dest='display', action='store_false', - help="Do not display the incoming message contents.") - parser.add_argument('-o', '--output', type=str, - help="The path to a file where incoming data will be stored.") + define_arguments(parser) parser.add_argument('-p', '--port', type=int, default=30201, help="The FusionEngine TCP port on the data source.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', - help="Do not print anything to the console.") - parser.add_argument('hostname', help="The IP address or hostname of the data source.") options = parser.parse_args() - if options.quiet: - options.display = False - - # Open the output file if logging was requested. - if options.output is not None: - if options.format == 'p1log': - p1i_path = os.path.splitext(options.output)[0] + '.p1i' - if os.path.exists(p1i_path): - os.remove(p1i_path) - - output_file = open(options.output, 'wb') - else: - output_file = None - - generating_raw_log = (output_file is not None and options.format == 'raw') - generating_p1log = (output_file is not None and options.format == 'p1log') - # Connect to the device. - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((socket.gethostbyname(options.hostname), options.port)) - - # Listen for incoming data. - decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet, return_bytes=True) - bytes_received = 0 - messages_received = 0 - start_time = datetime.now() - last_print_time = start_time - while True: - # Read some data. - try: - received_data = sock.recv(1024) - bytes_received += len(received_data) - - if not options.quiet: - now = datetime.now() - if (now - last_print_time).total_seconds() > 5.0: - print('Status: [bytes_received=%d, messages_received=%d elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) - last_print_time = now - except KeyboardInterrupt: - break - - # If logging in raw format, write the data to disk as is. - if generating_raw_log: - output_file.write(received_data) - - # Decode the incoming data and print the contents of any complete messages. - # - # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: - # - So that we get a count of the number of incoming messages - # - So we print warnings if the CRC fails on any of the incoming data - # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any - # non-FusionEngine data in the stream - messages = decoder.on_data(received_data) - messages_received += len(messages) - - if options.display or generating_p1log: - for (header, message, raw_data) in messages: - if generating_p1log: - output_file.write(raw_data) - - if options.display: - print_message(header, message, format=options.display_format) - - # Close the socket. - sock.close() - - # Close the output file. - if output_file is not None: - output_file.close() + transport = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + transport.connect((socket.gethostbyname(options.hostname), options.port)) - if not options.quiet: - now = datetime.now() - elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 - print('Status: [bytes_received=%d, messages_received=%d elapsed_time=%d sec]' % - (bytes_received, messages_received, elapsed_sec)) + # Now run the client to listen for incoming data and decode/print the received message contents. + run_client(options, transport) From b3cf001a454f448d0245692bd66bfdd9d7faae75 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 08:28:33 -0500 Subject: [PATCH 09/17] Added CSV output format to TCP/serial clients. --- python/examples/client_implementation.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/python/examples/client_implementation.py b/python/examples/client_implementation.py index fb4b6494..dd277a2e 100644 --- a/python/examples/client_implementation.py +++ b/python/examples/client_implementation.py @@ -1,8 +1,10 @@ from collections import defaultdict from datetime import datetime +import math import os import socket import sys +import time import colorama @@ -28,10 +30,12 @@ class SerialException: pass def define_arguments(parser): add_print_format_argument(parser, '--display-format') - parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw'), - help="The format of the file to be generated when --output is enabled." - "If 'p1log' (default), create a *.p1log file containing only FusionEngine messages." - "If 'raw', create a generic binary file containing all incoming data.") + parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw', 'csv'), + help="""\ +The format of the file to be generated when --output is enabled: +- p1log - Create a *.p1log file containing only FusionEngine messages (default) +- raw - Create a generic binary file containing all incoming data +- csv - Create a CSV file with the received message types and timestamps""") parser.add_argument('-n', '--no-display', dest='display', action='store_false', help="Do not display the incoming message contents.") parser.add_argument('-o', '--output', type=str, @@ -77,6 +81,10 @@ def run_client(options, transport): generating_raw_log = (output_file is not None and options.format == 'raw') generating_p1log = (output_file is not None and options.format == 'p1log') + generating_csv = (output_file is not None and options.format == 'csv') + + if generating_csv: + output_file.write(b'host_time,type,p1_time,sys_time\n') # Listen for incoming data. decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) @@ -138,6 +146,14 @@ def _print_status(now): if generating_p1log: output_file.write(raw_data) + if generating_csv: + p1_time = message.get_p1_time() + sys_time = message.get_system_time_sec() + p1_str = str(p1_time.seconds) if p1_time is not None and not math.isnan(p1_time) else '' + sys_str = str(sys_time) if sys_time is not None and not math.isnan(sys_time) else '' + output_file.write( + f'{time.monotonic()},{header.message_type},{p1_str},{sys_str}\n'.encode('utf-8')) + if options.display: if options.summary: if (now - last_print_time).total_seconds() > 0.1: From 82f6a8cad1bfb8ab6ca46c1ccad4627fb5e16d91 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 08:29:57 -0500 Subject: [PATCH 10/17] Use common implementation for UDP client too. --- python/examples/udp_client.py | 116 +++------------------------------- 1 file changed, 10 insertions(+), 106 deletions(-) diff --git a/python/examples/udp_client.py b/python/examples/udp_client.py index 6983ef9f..1f9381ff 100755 --- a/python/examples/udp_client.py +++ b/python/examples/udp_client.py @@ -1,21 +1,18 @@ #!/usr/bin/env python3 -from datetime import datetime import os -import math import socket import sys -import time # Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports # if this application is being run directly out of the repository and is not installed as a pip package. root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, root_dir) -from fusion_engine_client.applications.p1_print import add_print_format_argument, print_message -from fusion_engine_client.parsers import FusionEngineDecoder from fusion_engine_client.utils.argument_parser import ArgumentParser +from examples.client_implementation import define_arguments, run_client + if __name__ == "__main__": parser = ArgumentParser(description="""\ @@ -24,110 +21,17 @@ When using UDP, you must configure the device to send data to your machine. """) - - add_print_format_argument(parser, '--display-format') - parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw', 'csv'), - help="""\ -The format of the file to be generated when --output is enabled: -- p1log - (default), create a *.p1log file containing only FusionEngine messages." -- raw - create a generic binary file containing all incoming data." -- csv - create a csv of time vs message type.""") - parser.add_argument('-n', '--no-display', dest='display', action='store_false', - help="Do not display the incoming message contents.") - parser.add_argument('-o', '--output', type=str, - help="The path to a file where incoming data will be stored.") + define_arguments(parser) parser.add_argument('-p', '--port', type=int, default=30400, help="The FusionEngine UDP port on the data source.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', - help="Do not print anything to the console.") + parser.add_argument('hostname', + help="The IP address or hostname of the data source.") options = parser.parse_args() - if options.quiet: - options.display = False - - # Open the output file if logging was requested. - if options.output is not None: - if options.format == 'p1log': - p1i_path = os.path.splitext(options.output)[0] + '.p1i' - if os.path.exists(p1i_path): - os.remove(p1i_path) - - output_file = open(options.output, 'wb') - else: - output_file = None - - generating_raw_log = (output_file is not None and options.format == 'raw') - generating_p1log = (output_file is not None and options.format == 'p1log') - generating_csv = (output_file is not None and options.format == 'csv') - - if generating_csv: - output_file.write(b'host_time,type,p1_time,sys_time\n') - # Connect to the device. - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('', options.port)) - - # Listen for incoming data. - decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet, return_bytes=True) - bytes_received = 0 - messages_received = 0 - start_time = datetime.now() - last_print_time = start_time - while True: - # Read some data. - try: - received_data = sock.recv(1024) - bytes_received += len(received_data) - - if not options.quiet: - now = datetime.now() - if (now - last_print_time).total_seconds() > 5.0: - print('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) - last_print_time = now - except KeyboardInterrupt: - break - - # If logging in raw format, write the data to disk as is. - if generating_raw_log: - output_file.write(received_data) - - # Decode the incoming data and print the contents of any complete messages. - # - # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: - # - So that we get a count of the number of incoming messages - # - So we print warnings if the CRC fails on any of the incoming data - # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any - # non-FusionEngine data in the stream - messages = decoder.on_data(received_data) - messages_received += len(messages) - - if options.display or generating_p1log or generating_csv: - for (header, message, raw_data) in messages: - if generating_p1log: - output_file.write(raw_data) - - if options.display: - print_message(header, message, format=options.display_format) - - if generating_csv: - p1_time = message.get_p1_time() - sys_time = message.get_system_time_sec() - p1_str = str(p1_time.seconds) if p1_time is not None and not math.isnan(p1_time) else '' - sys_str = str(sys_time) if sys_time is not None and not math.isnan(sys_time) else '' - output_file.write(f'{time.monotonic()},{header.message_type},{p1_str},{sys_str}\n'.encode('utf-8')) - - - # Close the socket. - sock.close() - - # Close the output file. - if output_file is not None: - output_file.close() + transport = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + transport.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + transport.bind(('', options.port)) - if not options.quiet: - now = datetime.now() - elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 - print('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, elapsed_sec)) + # Now run the client to listen for incoming data and decode/print the received message contents. + run_client(options, transport) From be1ab0a3d99e9d8476851e392244ecb939de35c1 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 08:33:46 -0500 Subject: [PATCH 11/17] Use select to apply a read timeout for TCP/UDP sockets. --- python/examples/client_implementation.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/python/examples/client_implementation.py b/python/examples/client_implementation.py index dd277a2e..6e5aed30 100644 --- a/python/examples/client_implementation.py +++ b/python/examples/client_implementation.py @@ -2,6 +2,7 @@ from datetime import datetime import math import os +import select import socket import sys import time @@ -86,6 +87,10 @@ def run_client(options, transport): if generating_csv: output_file.write(b'host_time,type,p1_time,sys_time\n') + # Configure the socket for non-blocking reads. + if isinstance(transport, socket.socket): + transport.setblocking(0) + # Listen for incoming data. decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) bytes_received = 0 @@ -109,7 +114,11 @@ def _print_status(now): # Read some data. try: if isinstance(transport, socket.socket): - received_data = transport.recv(1024) + ready = select.select([transport], [], [], 1.0) + if ready[0]: + received_data = transport.recv(1024) + else: + received_data = [] else: received_data = transport.read(1024) From be77cb86ebc87c9a6778394e9492b9fa741a7715 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sat, 7 Dec 2024 08:37:01 -0500 Subject: [PATCH 12/17] Use consistent read timeouts on all transports. --- python/examples/client_implementation.py | 11 +++++++++-- python/examples/serial_client.py | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/python/examples/client_implementation.py b/python/examples/client_implementation.py index 6e5aed30..27c3a13f 100644 --- a/python/examples/client_implementation.py +++ b/python/examples/client_implementation.py @@ -87,9 +87,13 @@ def run_client(options, transport): if generating_csv: output_file.write(b'host_time,type,p1_time,sys_time\n') - # Configure the socket for non-blocking reads. + # If this is a TCP/UDP socket, configure it for non-blocking reads. We'll apply a read timeout with select() below. + read_timeout_sec = 1.0 if isinstance(transport, socket.socket): transport.setblocking(0) + # If this is a serial port, configure its read timeout. + else: + transport.timeout = read_timeout_sec # Listen for incoming data. decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) @@ -113,12 +117,15 @@ def _print_status(now): while True: # Read some data. try: + # If this is a TCP/UDP socket, use select() to implement a read timeout so we can wakeup periodically + # and print status if there's no incoming data. if isinstance(transport, socket.socket): - ready = select.select([transport], [], [], 1.0) + ready = select.select([transport], [], [], read_timeout_sec) if ready[0]: received_data = transport.recv(1024) else: received_data = [] + # If this is a serial port, we set the read timeout above. else: received_data = transport.read(1024) diff --git a/python/examples/serial_client.py b/python/examples/serial_client.py index c7a0510b..a651c9fb 100755 --- a/python/examples/serial_client.py +++ b/python/examples/serial_client.py @@ -33,7 +33,7 @@ options = parser.parse_args() # Connect to the device. - transport = serial.Serial(port=options.port, baudrate=options.baud, timeout=1.0) + transport = serial.Serial(port=options.port, baudrate=options.baud) # Now run the client to listen for incoming data and decode/print the received message contents. run_client(options, transport) From 770e6e04a4bc9c91f602f34b28dbbb8d4557997f Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sun, 8 Dec 2024 08:47:29 -0500 Subject: [PATCH 13/17] Include device ID, type, and SW version in summary table. --- python/examples/client_implementation.py | 11 ++--- .../applications/p1_print.py | 40 ++++++++++++++++--- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/python/examples/client_implementation.py b/python/examples/client_implementation.py index 27c3a13f..74364259 100644 --- a/python/examples/client_implementation.py +++ b/python/examples/client_implementation.py @@ -23,7 +23,7 @@ class SerialException: pass sys.path.insert(0, root_dir) from fusion_engine_client.applications.p1_print import \ - MessageStatsEntry, add_print_format_argument, print_message, print_summary_table + DeviceSummary, add_print_format_argument, print_message, print_summary_table from fusion_engine_client.parsers import FusionEngineDecoder from fusion_engine_client.utils import trace as logging from fusion_engine_client.utils.argument_parser import ArgumentParser @@ -97,9 +97,11 @@ def run_client(options, transport): # Listen for incoming data. decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) + bytes_received = 0 messages_received = 0 - message_stats = defaultdict(MessageStatsEntry) + device_summary = DeviceSummary() + start_time = datetime.now() last_print_time = start_time print_timeout_sec = 1.0 if options.summary else 5.0 @@ -111,7 +113,7 @@ def _print_status(now): logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % (bytes_received, messages_received, (now - start_time).total_seconds())) if options.summary: - print_summary_table(message_stats, logger=logger) + print_summary_table(device_summary, logger=logger) try: while True: @@ -156,8 +158,7 @@ def _print_status(now): if options.display or generating_p1log: for (header, message, raw_data) in messages: - entry = message_stats[header.message_type] - entry.update(header, message) + device_summary.update(header, message) if generating_p1log: output_file.write(raw_data) diff --git a/python/fusion_engine_client/applications/p1_print.py b/python/fusion_engine_client/applications/p1_print.py index 2b5d0acf..9e9fbc54 100755 --- a/python/fusion_engine_client/applications/p1_print.py +++ b/python/fusion_engine_client/applications/p1_print.py @@ -107,15 +107,44 @@ def update(self, header: MessageHeader, message: MessagePayload): self.total_bytes = header.get_message_size() -def print_summary_table(message_stats: Dict[MessageType, MessageStatsEntry], logger=None): +class DeviceSummary: + def __init__(self): + self.device_id = None + self.version_info = None + self.stats = defaultdict(MessageStatsEntry) + + def update(self, header: MessageHeader, message: MessagePayload): + self.stats[header.message_type].update(header, message) + + if header.message_type == MessageType.DEVICE_ID: + self.device_id = message + elif header.message_type == MessageType.VERSION_INFO: + self.version_info = message + + +def print_summary_table(device_summary: DeviceSummary, logger=None): if logger is None: logger = _logger + device_type = DeviceType.UNKNOWN + device_id = '' + if device_summary.device_id is not None: + device_type = device_summary.device_id.device_type + if len(device_summary.device_id.user_id_data) != 0: + device_id = DeviceIDMessage._get_str(device_summary.device_id.user_id_data) + logger.info(f'Device ID: {device_id} | ' + f'Device type: {"" if device_type == DeviceType.UNKNOWN else str(device_type)}') + + if device_summary.version_info is not None and device_summary.version_info.engine_version_str != "": + logger.info(f'Software version: {device_summary.version_info.engine_version_str}') + else: + logger.info(f'Software version: ') + format_string = '| {:<50} | {:>5} | {:>8} |' logger.info(format_string.format('Message Name', 'Type', 'Count')) logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) total_messages = 0 - for type, entry in sorted(message_stats.items(), key=lambda x: int(x[0])): + for type, entry in sorted(device_summary.stats.items(), key=lambda x: int(x[0])): if type in message_type_to_class: name = message_type_to_class[type].__name__ elif type.is_unrecognized(): @@ -272,8 +301,8 @@ def main(): total_decoded_messages = 0 total_messages = 0 bytes_decoded = 0 + device_summary = DeviceSummary() - message_stats = defaultdict(MessageStatsEntry) try: for header, message, data, offset_bytes in reader: total_decoded_messages += 1 @@ -286,8 +315,7 @@ def main(): total_messages += 1 bytes_decoded += len(data) if options.summary: - entry = message_stats[header.message_type] - entry.update(header, message) + device_summary.update(header, message) if message is not None: p1_time = message.get_p1_time() @@ -353,7 +381,7 @@ def main(): _logger.info('Total data read: %d B' % reader.get_bytes_read()) _logger.info('Selected data size: %d B' % bytes_decoded) _logger.info('') - print_summary_table(message_stats) + print_summary_table(device_summary) elif total_messages == 0: _logger.warning('No valid FusionEngine messages found.') From 85ad5713f8c8fc976eaa98f5b7cc97746c190de4 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Sun, 8 Dec 2024 09:32:19 -0500 Subject: [PATCH 14/17] Added p1_capture application, and simplified TCP/UDP/serial example apps. The examples are not intended to be fully featured applications, just very simple examples. --- python/README.md | 14 +- python/examples/serial_client.py | 27 +++- python/examples/tcp_client.py | 26 ++- python/examples/udp_client.py | 28 +++- .../applications/p1_capture.py} | 150 +++++++++++++----- python/setup.py | 1 + 6 files changed, 181 insertions(+), 65 deletions(-) rename python/{examples/client_implementation.py => fusion_engine_client/applications/p1_capture.py} (58%) mode change 100644 => 100755 diff --git a/python/README.md b/python/README.md index ffa7c568..c430c9c1 100644 --- a/python/README.md +++ b/python/README.md @@ -22,6 +22,9 @@ FusionEngine message specification. ### Applications +- [p1_capture](fusion_engine_client/applications/p1_capture.py) - Connect to a FusionEngine device in real time over + serial, TCP, UDP, or UNIX domain socket, and display incoming FusionEngine contents and/or log the incoming data to + disk - [p1_display](fusion_engine_client/applications/p1_display.py) - Generate plots of vehicle trajectory, GNSS signal status, wheel speed measurements, etc. from a file of logged FusionEngine messages - [p1_extract](fusion_engine_client/applications/p1_extract.py) - Extract FusionEngine messages from a binary file @@ -59,13 +62,12 @@ FusionEngine message specification. optionally mixed with other binary data, and decode the contents using the `FusionEngineDecoder` helper class - [send_command.py](examples/send_command.py) - Send a command to a device over serial or TCP, and wait for a response - - [serial_client.py](examples/serial_client.py) - Connect to a device over a local serial port and decode messages - in real time to be displayed and/or logged to disk using the `FusionEngineDecoder` helper class - - [tcp_client.py](examples/tcp_client.py) - Connect to a device over TCP and decode messages in real time to be - displayed and/or logged to disk using the `FusionEngineDecoder` helper class + - [serial_client.py](examples/serial_client.py) - Connect to a device over a local serial port and decode/print + incoming FusionEngine messages + - [tcp_client.py](examples/tcp_client.py) - Connect to a device over TCP and decode messages in real time and + decode/print incoming FusionEngine messages - [udp_client.py](examples/udp_client.py) - Connect to a device over UDP and decode/display messages in real time - - Unlike [tcp_client.py](examples/tcp_client.py), currently assumes all incoming UDP packets contain - FusionEngine messages and does not use the `FusionEngineDecoder` helper class + and decode/print incoming FusionEngine messages - `fusion_engine_client` - Top-level Python package directory - `analysis` - [analyzer.py](fusion_engine_client/analysis/analyzer.py) - `Analyzer` class, used by diff --git a/python/examples/serial_client.py b/python/examples/serial_client.py index a651c9fb..ebe688cb 100755 --- a/python/examples/serial_client.py +++ b/python/examples/serial_client.py @@ -15,17 +15,16 @@ sys.path.insert(0, root_dir) from fusion_engine_client.utils.argument_parser import ArgumentParser - -from examples.client_implementation import define_arguments, run_client +from fusion_engine_client.messages import MessagePayload +from fusion_engine_client.parsers import FusionEngineDecoder if __name__ == "__main__": # Parse command-line arguments. parser = ArgumentParser(description="""\ Connect to a Point One device over serial and print out the incoming message -contents and/or log the messages to disk. +contents. """) - define_arguments(parser) parser.add_argument('-b', '--baud', type=int, default=460800, help="The serial baud rate to be used.") parser.add_argument('port', @@ -35,5 +34,21 @@ # Connect to the device. transport = serial.Serial(port=options.port, baudrate=options.baud) - # Now run the client to listen for incoming data and decode/print the received message contents. - run_client(options, transport) + # Listen for incoming data and parse FusionEngine messages. + try: + decoder = FusionEngineDecoder() + while True: + received_data = transport.read(1024) + messages = decoder.on_data(received_data) + for header, message in messages: + if isinstance(message, MessagePayload): + print(str(message)) + else: + print(f'{header.message_type} message (not supported)') + except KeyboardInterrupt: + pass + except serial.SerialException as e: + print('Unexpected error reading from device:\r%s' % str(e)) + + # Close the transport when finished. + transport.close() diff --git a/python/examples/tcp_client.py b/python/examples/tcp_client.py index d99323b9..278e46d4 100755 --- a/python/examples/tcp_client.py +++ b/python/examples/tcp_client.py @@ -10,16 +10,16 @@ sys.path.insert(0, root_dir) from fusion_engine_client.utils.argument_parser import ArgumentParser - -from examples.client_implementation import define_arguments, run_client +from fusion_engine_client.messages import MessagePayload +from fusion_engine_client.parsers import FusionEngineDecoder if __name__ == "__main__": + # Parse command-line arguments. parser = ArgumentParser(description="""\ Connect to a Point One device over TCP and print out the incoming message -contents and/or log the messages to disk. +contents. """) - define_arguments(parser) parser.add_argument('-p', '--port', type=int, default=30201, help="The FusionEngine TCP port on the data source.") parser.add_argument('hostname', @@ -30,5 +30,19 @@ transport = socket.socket(socket.AF_INET, socket.SOCK_STREAM) transport.connect((socket.gethostbyname(options.hostname), options.port)) - # Now run the client to listen for incoming data and decode/print the received message contents. - run_client(options, transport) + # Listen for incoming data and parse FusionEngine messages. + try: + decoder = FusionEngineDecoder() + while True: + received_data = transport.recv(1024) + messages = decoder.on_data(received_data) + for header, message in messages: + if isinstance(message, MessagePayload): + print(str(message)) + else: + print(f'{header.message_type} message (not supported)') + except KeyboardInterrupt: + pass + + # Close the transport when finished. + transport.close() diff --git a/python/examples/udp_client.py b/python/examples/udp_client.py index 1f9381ff..b482c5fb 100755 --- a/python/examples/udp_client.py +++ b/python/examples/udp_client.py @@ -10,22 +10,20 @@ sys.path.insert(0, root_dir) from fusion_engine_client.utils.argument_parser import ArgumentParser - -from examples.client_implementation import define_arguments, run_client +from fusion_engine_client.messages import MessagePayload +from fusion_engine_client.parsers import FusionEngineDecoder if __name__ == "__main__": + # Parse command-line arguments. parser = ArgumentParser(description="""\ Connect to a Point One device over UDP and print out the incoming message -contents and/or log the messages to disk. +contents. When using UDP, you must configure the device to send data to your machine. """) - define_arguments(parser) parser.add_argument('-p', '--port', type=int, default=30400, help="The FusionEngine UDP port on the data source.") - parser.add_argument('hostname', - help="The IP address or hostname of the data source.") options = parser.parse_args() # Connect to the device. @@ -33,5 +31,19 @@ transport.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) transport.bind(('', options.port)) - # Now run the client to listen for incoming data and decode/print the received message contents. - run_client(options, transport) + # Listen for incoming data and parse FusionEngine messages. + try: + decoder = FusionEngineDecoder() + while True: + received_data = transport.recv(1024) + messages = decoder.on_data(received_data) + for header, message in messages: + if isinstance(message, MessagePayload): + print(str(message)) + else: + print(f'{header.message_type} message (not supported)') + except KeyboardInterrupt: + pass + + # Close the transport when finished. + transport.close() diff --git a/python/examples/client_implementation.py b/python/fusion_engine_client/applications/p1_capture.py old mode 100644 new mode 100755 similarity index 58% rename from python/examples/client_implementation.py rename to python/fusion_engine_client/applications/p1_capture.py index 74364259..5610d213 --- a/python/examples/client_implementation.py +++ b/python/fusion_engine_client/applications/p1_capture.py @@ -1,7 +1,10 @@ +#!/usr/bin/env python3 + from collections import defaultdict from datetime import datetime import math import os +import re import select import socket import sys @@ -12,62 +15,128 @@ try: # pySerial is optional. import serial + serial_supported = True except ImportError: + serial_supported = False # Dummy stand-in if pySerial is not installed. class serial: class SerialException: pass -# Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports -# if this application is being run directly out of the repository and is not installed as a pip package. -root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) -sys.path.insert(0, root_dir) +if __package__ is None or __package__ == "": + from import_utils import enable_relative_imports + __package__ = enable_relative_imports(__name__, __file__) -from fusion_engine_client.applications.p1_print import \ +from .p1_print import \ DeviceSummary, add_print_format_argument, print_message, print_summary_table -from fusion_engine_client.parsers import FusionEngineDecoder -from fusion_engine_client.utils import trace as logging -from fusion_engine_client.utils.argument_parser import ArgumentParser +from ..parsers import FusionEngineDecoder +from ..utils import trace as logging +from ..utils.argument_parser import ArgumentParser, ExtendedBooleanAction + +_logger = logging.getLogger('point_one.fusion_engine.applications.p1_capture') + + +def create_transport(descriptor: str): + m = re.match(r'^tcp://([a-zA-Z0-9-_.]+)?:([0-9]+)$', descriptor) + if m: + transport = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + transport.connect((socket.gethostbyname(m.group(1)), int(m.group(2)))) + return transport + + m = re.match(r'^udp://:([0-9]+)$', descriptor) + if m: + transport = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + transport.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + transport.bind(('', int(m.group(1)))) + return transport + + m = re.match(r'^unix://([a-zA-Z0-9-_./]+)$', descriptor) + if m: + transport = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + transport.connect(m.group(1)) + return transport + + m = re.match(r'^(?:(?:serial|tty)://)?([^:]+):([0-9]+)$', descriptor) + if m: + if serial_supported: + transport = serial.Serial(port=m.group(1), baudrate=int(m.group(2))) + return transport + else: + raise RuntimeError( + "This application requires pyserial. Please install (pip install pyserial) and run again.") + raise ValueError('Unsupported transport descriptor.') -def define_arguments(parser): + +def main(): + # Parse command-line arguments. + parser = ArgumentParser(description="""\ +Connect to a Point One device and print out the incoming FusionEngine message +contents and/or log the messages to disk. +""") add_print_format_argument(parser, '--display-format') - parser.add_argument('-f', '--format', default='p1log', choices=('p1log', 'raw', 'csv'), - help="""\ + parser.add_argument( + '--display', action=ExtendedBooleanAction, default=True, + help="Print the incoming message contents to the console.") + parser.add_argument( + '-q', '--quiet', dest='quiet', action=ExtendedBooleanAction, default=False, + help="Do not print anything to the console.") + parser.add_argument( + '-s', '--summary', action=ExtendedBooleanAction, default=False, + help="Print a summary of the incoming messages instead of the message content.") + parser.add_argument( + '-v', '--verbose', action='count', default=0, + help="Print verbose/trace debugging messages.") + + file_group = parser.add_argument_group('File Capture') + file_group.add_argument( + '-f', '--output-format', default='p1log', choices=('p1log', 'raw', 'csv'), + help="""\ The format of the file to be generated when --output is enabled: - p1log - Create a *.p1log file containing only FusionEngine messages (default) - raw - Create a generic binary file containing all incoming data - csv - Create a CSV file with the received message types and timestamps""") - parser.add_argument('-n', '--no-display', dest='display', action='store_false', - help="Do not display the incoming message contents.") - parser.add_argument('-o', '--output', type=str, - help="The path to a file where incoming data will be stored.") - parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', - help="Do not print anything to the console.") - parser.add_argument('-s', '--summary', action='store_true', - help="Print a summary of the incoming messages instead of the message content.") - parser.add_argument('-v', '--verbose', action='count', default=0, - help="Print verbose/trace debugging messages.") - - -def configure_logging(options): + file_group.add_argument( + '-o', '--output', type=str, + help="The path to a file where incoming data will be stored.") + + parser.add_argument( + 'transport', + help="""\ +The method used to communicate with the target device: +- tcp://HOSTNAME:PORT - Connect to the specified hostname (or IP address) and + port over TCP (e.g., tty://192.168.0.3:30201) +- udp://:PORT - Listen for incoming data on the specified UDP port (e.g., + udp://:12345) + Note: When using UDP, you must configure the device to send data to your + machine. +- unix://FILENAME - Connect to the specified UNIX domain socket file +- [tty://]DEVICE:BAUD - Connect to a serial device with the specified baud rate + (e.g., tty:///dev/ttyUSB0:460800 or /dev/ttyUSB0:460800) +""") + + options = parser.parse_args() + + if options.quiet: + options.display = False + + # Configure logging. if options.verbose >= 1: logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s', stream=sys.stdout) if options.verbose == 1: - logging.getLogger('point_one.fusion_engine').setLevel(logging.DEBUG) + logging.getLogger('point_one.fusion_engine.parsers').setLevel(logging.DEBUG) else: - logging.getLogger('point_one.fusion_engine').setLevel( + logging.getLogger('point_one.fusion_engine.parsers').setLevel( logging.getTraceLevel(depth=options.verbose - 1)) else: logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout) - -def run_client(options, transport): - configure_logging(options) - logger = logging.getLogger('point_one.fusion_engine') - - if options.quiet: - options.display = False + # Connect to the device using the specified transport. + try: + transport = create_transport(options.transport) + except Exception as e: + _logger.error(str(e)) + sys.exit(1) # Open the output file if logging was requested. if options.output is not None: @@ -110,10 +179,10 @@ def _print_status(now): if options.summary: # Clear the terminal. print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='') - logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) + _logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % + (bytes_received, messages_received, (now - start_time).total_seconds())) if options.summary: - print_summary_table(device_summary, logger=logger) + print_summary_table(device_summary, logger=_logger) try: while True: @@ -139,7 +208,7 @@ def _print_status(now): _print_status(now) last_print_time = now except serial.SerialException as e: - logger.error('Unexpected error reading from device:\r%s' % str(e)) + _logger.error('Unexpected error reading from device:\r%s' % str(e)) break # If logging in raw format, write the data to disk as is. @@ -176,7 +245,7 @@ def _print_status(now): if (now - last_print_time).total_seconds() > 0.1: _print_status(now) else: - print_message(header, message, format=options.display_format, logger=logger) + print_message(header, message, format=options.display_format, logger=_logger) except KeyboardInterrupt: pass @@ -189,5 +258,8 @@ def _print_status(now): if not options.quiet and not options.summary: now = datetime.now() - elapsed_sec = (now - last_print_time).total_seconds() if last_print_time else 0.0 _print_status(now) + + +if __name__ == "__main__": + main() diff --git a/python/setup.py b/python/setup.py index 52f8208a..e0355d90 100644 --- a/python/setup.py +++ b/python/setup.py @@ -82,6 +82,7 @@ def find_version(*file_paths): packages=find_packages(where='.'), entry_points={ 'console_scripts': [ + 'p1_capture = fusion_engine_client.applications.p1_capture:main', 'p1_display = fusion_engine_client.applications.p1_display:main', 'p1_extract = fusion_engine_client.applications.p1_extract:main', 'p1_lband_extract = fusion_engine_client.applications.p1_lband_extract:main', From 763ea6b4dc6080ec56360576f45f3d81ac33f6ca Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Mon, 9 Dec 2024 12:34:03 -0500 Subject: [PATCH 15/17] Added p1_filter description to README. --- python/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/README.md b/python/README.md index c430c9c1..8350e75e 100644 --- a/python/README.md +++ b/python/README.md @@ -29,6 +29,8 @@ FusionEngine message specification. status, wheel speed measurements, etc. from a file of logged FusionEngine messages - [p1_extract](fusion_engine_client/applications/p1_extract.py) - Extract FusionEngine messages from a binary file containing multiple data streams (e.g., interleaved RTCM and FusionEngine messages) +- [p1_filter](fusion_engine_client/applications/p1_filter.py) - Filter an incoming FusionEngine data stream, outputting + a new FusionEngine stream containing only the requested messages - [p1_lband_extract](fusion_engine_client/applications/p1_lband_extract.py) - Extract L-band data bits contained from a log of FusionEngine `LBandFrameMessage` messages - [p1_print](fusion_engine_client/applications/p1_print.py) - Print the contents of FusionEngine messages found in a From 70f99078fce12adfbd60defb24820d46ed3a693a Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Mon, 9 Dec 2024 14:30:54 -0500 Subject: [PATCH 16/17] Added a type hint to create_transport() return. --- python/fusion_engine_client/applications/p1_capture.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/fusion_engine_client/applications/p1_capture.py b/python/fusion_engine_client/applications/p1_capture.py index 5610d213..d3d51896 100755 --- a/python/fusion_engine_client/applications/p1_capture.py +++ b/python/fusion_engine_client/applications/p1_capture.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from typing import Union + from collections import defaultdict from datetime import datetime import math @@ -35,7 +37,7 @@ class SerialException: pass _logger = logging.getLogger('point_one.fusion_engine.applications.p1_capture') -def create_transport(descriptor: str): +def create_transport(descriptor: str) -> Union[socket.socket, serial.Serial]: m = re.match(r'^tcp://([a-zA-Z0-9-_.]+)?:([0-9]+)$', descriptor) if m: transport = socket.socket(socket.AF_INET, socket.SOCK_STREAM) From 567a4c98e03b6fd4975eecf0e100e7a93b3e2442 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Mon, 9 Dec 2024 15:37:16 -0500 Subject: [PATCH 17/17] Moved p1_print/p1_capture print functions to new print_utils file. --- .../applications/p1_capture.py | 9 +- .../applications/p1_print.py | 141 +---------------- .../fusion_engine_client/utils/print_utils.py | 148 ++++++++++++++++++ 3 files changed, 153 insertions(+), 145 deletions(-) create mode 100644 python/fusion_engine_client/utils/print_utils.py diff --git a/python/fusion_engine_client/applications/p1_capture.py b/python/fusion_engine_client/applications/p1_capture.py index d3d51896..6cc09abe 100755 --- a/python/fusion_engine_client/applications/p1_capture.py +++ b/python/fusion_engine_client/applications/p1_capture.py @@ -2,7 +2,6 @@ from typing import Union -from collections import defaultdict from datetime import datetime import math import os @@ -28,11 +27,11 @@ class SerialException: pass from import_utils import enable_relative_imports __package__ = enable_relative_imports(__name__, __file__) -from .p1_print import \ - DeviceSummary, add_print_format_argument, print_message, print_summary_table from ..parsers import FusionEngineDecoder from ..utils import trace as logging from ..utils.argument_parser import ArgumentParser, ExtendedBooleanAction +from ..utils.print_utils import \ + DeviceSummary, add_print_format_argument, print_message, print_summary_table _logger = logging.getLogger('point_one.fusion_engine.applications.p1_capture') @@ -184,7 +183,7 @@ def _print_status(now): _logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % (bytes_received, messages_received, (now - start_time).total_seconds())) if options.summary: - print_summary_table(device_summary, logger=_logger) + print_summary_table(device_summary) try: while True: @@ -247,7 +246,7 @@ def _print_status(now): if (now - last_print_time).total_seconds() > 0.1: _print_status(now) else: - print_message(header, message, format=options.display_format, logger=_logger) + print_message(header, message, format=options.display_format) except KeyboardInterrupt: pass diff --git a/python/fusion_engine_client/applications/p1_print.py b/python/fusion_engine_client/applications/p1_print.py index 9e9fbc54..efab1fd3 100755 --- a/python/fusion_engine_client/applications/p1_print.py +++ b/python/fusion_engine_client/applications/p1_print.py @@ -1,8 +1,5 @@ #!/usr/bin/env python3 -from typing import Dict - -from collections import defaultdict import sys if __package__ is None or __package__ == "": @@ -13,150 +10,14 @@ from ..parsers import MixedLogReader from ..utils import trace as logging from ..utils.argument_parser import ArgumentParser, ExtendedBooleanAction, CSVAction -from ..utils.bin_utils import bytes_to_hex from ..utils.log import locate_log, DEFAULT_LOG_BASE_DIR +from ..utils.print_utils import DeviceSummary, add_print_format_argument, print_message, print_summary_table from ..utils.time_range import TimeRange from ..utils.trace import HighlightFormatter, BrokenPipeStreamHandler _logger = logging.getLogger('point_one.fusion_engine.applications.print_contents') -def add_print_format_argument(parser, *arg_names): - parser.add_argument( - *arg_names, - choices=['binary', 'pretty', 'pretty-binary', 'pretty-binary-payload', - 'oneline', 'oneline-detailed', 'oneline-binary', 'oneline-binary-payload'], - default='pretty', - help="Specify the format used to print the message contents:\n" - "- Print the binary representation of each message on a single line, but no other details\n" - "- pretty - Print the message contents in a human-readable format (default)\n" - "- pretty-binary - Use `pretty` format, but include the binary representation of each message\n" - "- pretty-binary-payload - Like `pretty-binary`, but exclude the message header from the binary\n" - "- oneline - Print a summary of each message on a single line\n" - "- oneline-detailed - Print a one-line summary, including message offset details\n" - "- oneline-binary - Use `oneline-detailed` format, but include the binary representation of each message\n" - "- oneline-binary-payload - Like `oneline-binary`, but exclude the message header from the binary") - - -def print_message(header, contents, offset_bytes=None, format='pretty', bytes=None, logger=None): - if logger is None: - logger = _logger - - if format == 'binary': - if bytes is None: - raise ValueError('No data provided for binary format.') - parts = [] - elif isinstance(contents, MessagePayload): - if format.startswith('oneline'): - # The repr string should always start with the message type, then other contents: - # [POSE (10000), p1_time=12.029 sec, gps_time=2249:528920.500 (1360724120.500 sec), ...] - # We want to reformat and insert the additional details as follows for consistency: - # POSE (10000) [sequence=10, ... p1_time=12.029 sec, gps_time=2249:528920.500 (1360724120.500 sec), ...] - message_str = repr(contents).split('\n')[0] - message_str = message_str.replace('[', '', 1) - break_idx = message_str.find(',') - if break_idx >= 0: - message_str = f'{message_str[:break_idx]} [{message_str[(break_idx + 2):]}' - else: - message_str = message_str.rstrip(']') - parts = [message_str] - else: - parts = str(contents).split('\n') - else: - parts = [f'{header.get_type_string()} (unsupported)'] - - if format != 'oneline': - details = 'source_id=%d, sequence=%d, size=%d B' % (header.source_identifier, - header.sequence_number, - header.get_message_size()) - if offset_bytes is not None: - details += ', offset=%d B (0x%x)' % (offset_bytes, offset_bytes) - - idx = parts[0].find('[') - if idx < 0: - parts[0] += f' [{details}]' - else: - parts[0] = f'{parts[0][:(idx + 1)]}{details}, {parts[0][(idx + 1):]}' - - if bytes is None: - pass - elif format == 'binary': - byte_string = bytes_to_hex(bytes, bytes_per_row=-1, bytes_per_col=2).replace('\n', '\n ') - parts.insert(1, byte_string) - elif format == 'pretty-binary' or format == 'pretty-binary-payload': - if format.endswith('-payload'): - bytes = bytes[MessageHeader.calcsize():] - byte_string = ' ' + bytes_to_hex(bytes, bytes_per_row=16, bytes_per_col=2).replace('\n', '\n ') - parts.insert(1, " Binary:\n%s" % byte_string) - elif format == 'oneline-binary' or format == 'oneline-binary-payload': - if format.endswith('-payload'): - bytes = bytes[MessageHeader.calcsize():] - byte_string = ' ' + bytes_to_hex(bytes, bytes_per_row=16, bytes_per_col=2).replace('\n', '\n ') - parts.insert(1, byte_string) - - logger.info('\n'.join(parts)) - - -class MessageStatsEntry: - def __init__(self): - self.count = 0 - self.total_bytes = 0 - - def update(self, header: MessageHeader, message: MessagePayload): - self.count += 1 - self.total_bytes = header.get_message_size() - - -class DeviceSummary: - def __init__(self): - self.device_id = None - self.version_info = None - self.stats = defaultdict(MessageStatsEntry) - - def update(self, header: MessageHeader, message: MessagePayload): - self.stats[header.message_type].update(header, message) - - if header.message_type == MessageType.DEVICE_ID: - self.device_id = message - elif header.message_type == MessageType.VERSION_INFO: - self.version_info = message - - -def print_summary_table(device_summary: DeviceSummary, logger=None): - if logger is None: - logger = _logger - - device_type = DeviceType.UNKNOWN - device_id = '' - if device_summary.device_id is not None: - device_type = device_summary.device_id.device_type - if len(device_summary.device_id.user_id_data) != 0: - device_id = DeviceIDMessage._get_str(device_summary.device_id.user_id_data) - logger.info(f'Device ID: {device_id} | ' - f'Device type: {"" if device_type == DeviceType.UNKNOWN else str(device_type)}') - - if device_summary.version_info is not None and device_summary.version_info.engine_version_str != "": - logger.info(f'Software version: {device_summary.version_info.engine_version_str}') - else: - logger.info(f'Software version: ') - - format_string = '| {:<50} | {:>5} | {:>8} |' - logger.info(format_string.format('Message Name', 'Type', 'Count')) - logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) - total_messages = 0 - for type, entry in sorted(device_summary.stats.items(), key=lambda x: int(x[0])): - if type in message_type_to_class: - name = message_type_to_class[type].__name__ - elif type.is_unrecognized(): - name = str(type) - else: - name = f'Unsupported ({str(type)})' - logger.info(format_string.format(name, int(type), entry.count)) - total_messages += entry.count - logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) - logger.info(format_string.format('Total', '', total_messages)) - - def main(): parser = ArgumentParser(description="""\ Decode and print the contents of messages contained in a *.p1log file or other diff --git a/python/fusion_engine_client/utils/print_utils.py b/python/fusion_engine_client/utils/print_utils.py new file mode 100644 index 00000000..65962683 --- /dev/null +++ b/python/fusion_engine_client/utils/print_utils.py @@ -0,0 +1,148 @@ +from typing import Optional, Union + +import argparse +from collections import defaultdict + +from ..messages import * +from ..utils import trace as logging +from ..utils.bin_utils import bytes_to_hex + +_logger = logging.getLogger('point_one.fusion_engine.utils.print_utils') + + +def add_print_format_argument(parser: argparse._ActionsContainer, *arg_names): + parser.add_argument( + *arg_names, + choices=['binary', 'pretty', 'pretty-binary', 'pretty-binary-payload', + 'oneline', 'oneline-detailed', 'oneline-binary', 'oneline-binary-payload'], + default='pretty', + help="Specify the format used to print the message contents:\n" + "- Print the binary representation of each message on a single line, but no other details\n" + "- pretty - Print the message contents in a human-readable format (default)\n" + "- pretty-binary - Use `pretty` format, but include the binary representation of each message\n" + "- pretty-binary-payload - Like `pretty-binary`, but exclude the message header from the binary\n" + "- oneline - Print a summary of each message on a single line\n" + "- oneline-detailed - Print a one-line summary, including message offset details\n" + "- oneline-binary - Use `oneline-detailed` format, but include the binary representation of each message\n" + "- oneline-binary-payload - Like `oneline-binary`, but exclude the message header from the binary") + + +def print_message(header: MessageHeader, contents: Union[MessagePayload, bytes], + offset_bytes: Optional[int] = None, format: str = 'pretty', bytes: Optional[int] = None, + logger: Optional[logging.Logger] = None): + if logger is None: + logger = _logger + + if format == 'binary': + if bytes is None: + raise ValueError('No data provided for binary format.') + parts = [] + elif isinstance(contents, MessagePayload): + if format.startswith('oneline'): + # The repr string should always start with the message type, then other contents: + # [POSE (10000), p1_time=12.029 sec, gps_time=2249:528920.500 (1360724120.500 sec), ...] + # We want to reformat and insert the additional details as follows for consistency: + # POSE (10000) [sequence=10, ... p1_time=12.029 sec, gps_time=2249:528920.500 (1360724120.500 sec), ...] + message_str = repr(contents).split('\n')[0] + message_str = message_str.replace('[', '', 1) + break_idx = message_str.find(',') + if break_idx >= 0: + message_str = f'{message_str[:break_idx]} [{message_str[(break_idx + 2):]}' + else: + message_str = message_str.rstrip(']') + parts = [message_str] + else: + parts = str(contents).split('\n') + else: + parts = [f'{header.get_type_string()} (unsupported)'] + + if format != 'oneline': + details = 'source_id=%d, sequence=%d, size=%d B' % (header.source_identifier, + header.sequence_number, + header.get_message_size()) + if offset_bytes is not None: + details += ', offset=%d B (0x%x)' % (offset_bytes, offset_bytes) + + idx = parts[0].find('[') + if idx < 0: + parts[0] += f' [{details}]' + else: + parts[0] = f'{parts[0][:(idx + 1)]}{details}, {parts[0][(idx + 1):]}' + + if bytes is None: + pass + elif format == 'binary': + byte_string = bytes_to_hex(bytes, bytes_per_row=-1, bytes_per_col=2).replace('\n', '\n ') + parts.insert(1, byte_string) + elif format == 'pretty-binary' or format == 'pretty-binary-payload': + if format.endswith('-payload'): + bytes = bytes[MessageHeader.calcsize():] + byte_string = ' ' + bytes_to_hex(bytes, bytes_per_row=16, bytes_per_col=2).replace('\n', '\n ') + parts.insert(1, " Binary:\n%s" % byte_string) + elif format == 'oneline-binary' or format == 'oneline-binary-payload': + if format.endswith('-payload'): + bytes = bytes[MessageHeader.calcsize():] + byte_string = ' ' + bytes_to_hex(bytes, bytes_per_row=16, bytes_per_col=2).replace('\n', '\n ') + parts.insert(1, byte_string) + + logger.info('\n'.join(parts)) + + +class MessageStatsEntry: + def __init__(self): + self.count = 0 + self.total_bytes = 0 + + def update(self, header: MessageHeader, message: MessagePayload): + self.count += 1 + self.total_bytes = header.get_message_size() + + +class DeviceSummary: + def __init__(self): + self.device_id = None + self.version_info = None + self.stats = defaultdict(MessageStatsEntry) + + def update(self, header: MessageHeader, message: MessagePayload): + self.stats[header.message_type].update(header, message) + + if header.message_type == MessageType.DEVICE_ID: + self.device_id = message + elif header.message_type == MessageType.VERSION_INFO: + self.version_info = message + + +def print_summary_table(device_summary: DeviceSummary, logger: Optional[logging.Logger] = None): + if logger is None: + logger = _logger + + device_type = DeviceType.UNKNOWN + device_id = '' + if device_summary.device_id is not None: + device_type = device_summary.device_id.device_type + if len(device_summary.device_id.user_id_data) != 0: + device_id = DeviceIDMessage._get_str(device_summary.device_id.user_id_data) + logger.info(f'Device ID: {device_id} | ' + f'Device type: {"" if device_type == DeviceType.UNKNOWN else str(device_type)}') + + if device_summary.version_info is not None and device_summary.version_info.engine_version_str != "": + logger.info(f'Software version: {device_summary.version_info.engine_version_str}') + else: + logger.info(f'Software version: ') + + format_string = '| {:<50} | {:>5} | {:>8} |' + logger.info(format_string.format('Message Name', 'Type', 'Count')) + logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) + total_messages = 0 + for type, entry in sorted(device_summary.stats.items(), key=lambda x: int(x[0])): + if type in message_type_to_class: + name = message_type_to_class[type].__name__ + elif type.is_unrecognized(): + name = str(type) + else: + name = f'Unsupported ({str(type)})' + logger.info(format_string.format(name, int(type), entry.count)) + total_messages += entry.count + logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) + logger.info(format_string.format('Total', '', total_messages))