From f24a20892557bf4dce3c8426a05c7c1739d14eb4 Mon Sep 17 00:00:00 2001 From: Jonathan Diamond Date: Wed, 22 May 2024 15:58:55 -0700 Subject: [PATCH] Add tool for filtering FE data in realtime. --- .../applications/p1_filter.py | 103 ++++++++++++++++++ python/fusion_engine_client/messages/defs.py | 12 +- .../parsers/fast_indexer.py | 16 +-- 3 files changed, 120 insertions(+), 11 deletions(-) create mode 100755 python/fusion_engine_client/applications/p1_filter.py diff --git a/python/fusion_engine_client/applications/p1_filter.py b/python/fusion_engine_client/applications/p1_filter.py new file mode 100755 index 00000000..62cfa004 --- /dev/null +++ b/python/fusion_engine_client/applications/p1_filter.py @@ -0,0 +1,103 @@ +#!/usr/bin/env -S python3 -u + +from datetime import datetime +import os +import sys +import time + +# Since stdout is used for data stream, don't write any print statements to stdout. +# Done here to avoid any log/print statements triggered by imports. +original_stdout = sys.stdout +sys.stdout = sys.stderr + +# Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports +# if this application is being run directly out of the repository and is not installed as a pip package. +root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.insert(0, root_dir) + +from fusion_engine_client.messages import MessagePayload, message_type_by_name +from fusion_engine_client.parsers import FusionEngineDecoder +from fusion_engine_client.utils.argument_parser import ArgumentParser, ExtendedBooleanAction + + +if __name__ == "__main__": + parser = ArgumentParser(description="""\ +Filter FusionEngine data coming through stdin. Examples: + netcat 192.168.1.138 30210 | ./p1_filter.py --blacklist -m SatelliteInfo --display > /tmp/out.p1log + cat /tmp/out.p1log | ./p1_filter.py -m Pose > /tmp/pose_out.p1log + stty -F /dev/ttyUSB0 speed 460800 cs8 -cstopb -parenb -icrnl -ixon -ixoff -opost -isig -icanon -echo && cat /dev/ttyUSB0 | ./p1_filter.py -m Pose > /tmp/pose_out.p1log +""") + + parser.add_argument( + '-m', '--message-type', type=str, action='append', + help="An list of class names corresponding with the message types to whitelist (default) or blacklist. May be specified " + "multiple times (-m Pose -m PoseAux), or as a comma-separated list (-m Pose,PoseAux). All matches are" + "case-insensitive.\n" + "\n" + "If a partial name is specified, the best match will be returned. Use the wildcard '*' to match multiple " + "message types.\n" + "\n" + "Supported types:\n%s" % '\n'.join(['- %s' % c for c in message_type_by_name.keys()])) + parser.add_argument( + '--blacklist', action=ExtendedBooleanAction, + help="By default '--message-type' is a whitelist. If 'blacklist' is set, use them as a blacklist instead.") + parser.add_argument( + '--display', action=ExtendedBooleanAction, + help="Periodically display messages received and forwarded on sterr.") + options = parser.parse_args() + + # If the user specified a set of message names, lookup their type values. Below, we will limit the printout to only + # those message types. + message_types = set() + if options.message_type is not None: + # Pattern match to any of: + # -m Type1 + # -m Type1 -m Type2 + # -m Type1,Type2 + # -m Type1,Type2 -m Type3 + # -m Type* + try: + message_types = MessagePayload.find_matching_message_types(options.message_type) + if len(message_types) == 0: + # find_matching_message_types() will print an error. + sys.exit(1) + except ValueError as e: + print(str(e)) + sys.exit(1) + + start_time = datetime.now() + last_print_time = datetime.now() + bytes_received = 0 + bytes_forwarded = 0 + messages_received = 0 + messages_forwarded = 0 + + # Listen for incoming data. + decoder = FusionEngineDecoder(return_bytes=True) + try: + while True: + # Need to specify read size or read waits for end of file character. + # This returns immediately even if 0 bytes are available. + received_data = sys.stdin.buffer.read(64) + if len(received_data) == 0: + time.sleep(0.1) + else: + bytes_received += len(received_data) + messages = decoder.on_data(received_data) + for (header, message, raw_data) in messages: + messages_received += 1 + pass_through_message = (options.blacklist and header.message_type not in message_types) or (not options.blacklist and header.message_type in message_types) + if pass_through_message: + messages_forwarded += 1 + bytes_forwarded += len(raw_data) + original_stdout.buffer.write(raw_data) + + if options.display: + now = datetime.now() + if (now - last_print_time).total_seconds() > 5.0: + print('Status: [bytes_received=%d, messages_received=%d, bytes_forwarded=%d, messages_forwarded=%d, elapsed_time=%d sec]' % + (bytes_received, messages_received, bytes_forwarded, messages_forwarded, (now - start_time).total_seconds())) + last_print_time = now + + except KeyboardInterrupt: + pass diff --git a/python/fusion_engine_client/messages/defs.py b/python/fusion_engine_client/messages/defs.py index e3557a88..54b2f437 100644 --- a/python/fusion_engine_client/messages/defs.py +++ b/python/fusion_engine_client/messages/defs.py @@ -429,7 +429,8 @@ def find_matching_message_types(cls, pattern: Union[str, List[str]], return_clas part or all of a class name. Patterns may include wildcards (`*`) to match multiple classes. If no wildcards are specified and multiple classes match, a single result will be returned if there is an exact match (e.g., `pose` will match to @ref MessageType.POSE, not @ref MessageType.POSE_AUX). All matches are - case-insensitive. + case-insensitive. Patterns can also be the exact integer value of a MessageType. To specify an + unrecognized integer MessageType, precede the value with 'u'. @param return_class If `True`, return classes for each matching message type (derived from @ref MessagePayload). Otherwise, return @ref MessageType enum values. @@ -453,8 +454,13 @@ def find_matching_message_types(cls, pattern: Union[str, List[str]], return_clas for pattern in requested_types: # Check if pattern is the message integer value. try: - int_val = int(pattern) - result.add(MessageType(int_val)) + # Allow specifying unknown MessageType values if they are preceded by 'u'. + if len(pattern) > 1 and pattern[0] == 'u': + int_val = int(pattern[1:]) + result.add(MessageType(int_val, raise_on_unrecognized=False)) + else: + int_val = int(pattern) + result.add(MessageType(int_val)) except: allow_multiple = '*' in pattern re_pattern = pattern.replace('*', '.*') diff --git a/python/fusion_engine_client/parsers/fast_indexer.py b/python/fusion_engine_client/parsers/fast_indexer.py index 0c909ab3..55436a75 100644 --- a/python/fusion_engine_client/parsers/fast_indexer.py +++ b/python/fusion_engine_client/parsers/fast_indexer.py @@ -201,15 +201,15 @@ def fast_generate_index( # messages, and filter out messages that fall within previous messages. # # Find the end offsets of the messages. - expected_msg_ends = index_raw[:]['offset'] + index_raw[:]['size'] - # Propagate forward the largest endpoint found to handle multiple encapsulated messages. - expected_msg_ends = np.maximum.accumulate(expected_msg_ends) - # Find the messages that start after the previous message. - non_overlapped_idx = np.concatenate([[True], index_raw[1:]['offset'] >= expected_msg_ends[:-1]]) - _logger.debug(f'Dropped {np.sum(~non_overlapped_idx)} wrapped messages.') - index_raw = index_raw[non_overlapped_idx] - total_entries = len(index_raw) + if total_entries > 0: + expected_msg_ends = index_raw[:]['offset'] + index_raw[:]['size'] + # Propagate forward the largest endpoint found to handle multiple encapsulated messages. + expected_msg_ends = np.maximum.accumulate(expected_msg_ends) + # Find the messages that start after the previous message. + non_overlapped_idx = np.concatenate([[True], index_raw[1:]['offset'] >= expected_msg_ends[:-1]]) + _logger.debug(f'Dropped {np.sum(~non_overlapped_idx)} wrapped messages.') + index_raw = index_raw[non_overlapped_idx] _logger.debug(f'FE messages found: {total_entries}')