Skip to content

Commit

Permalink
Add tool for filtering FE data in realtime.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Diamond committed May 22, 2024
1 parent 6d21fbf commit f24a208
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 11 deletions.
103 changes: 103 additions & 0 deletions python/fusion_engine_client/applications/p1_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/usr/bin/env -S python3 -u

from datetime import datetime
import os
import sys
import time

# Since stdout is used for data stream, don't write any print statements to stdout.
# Done here to avoid any log/print statements triggered by imports.
original_stdout = sys.stdout
sys.stdout = sys.stderr

# Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports
# if this application is being run directly out of the repository and is not installed as a pip package.
root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, root_dir)

from fusion_engine_client.messages import MessagePayload, message_type_by_name
from fusion_engine_client.parsers import FusionEngineDecoder
from fusion_engine_client.utils.argument_parser import ArgumentParser, ExtendedBooleanAction


if __name__ == "__main__":
parser = ArgumentParser(description="""\
Filter FusionEngine data coming through stdin. Examples:
netcat 192.168.1.138 30210 | ./p1_filter.py --blacklist -m SatelliteInfo --display > /tmp/out.p1log
cat /tmp/out.p1log | ./p1_filter.py -m Pose > /tmp/pose_out.p1log
stty -F /dev/ttyUSB0 speed 460800 cs8 -cstopb -parenb -icrnl -ixon -ixoff -opost -isig -icanon -echo && cat /dev/ttyUSB0 | ./p1_filter.py -m Pose > /tmp/pose_out.p1log
""")

parser.add_argument(
'-m', '--message-type', type=str, action='append',
help="An list of class names corresponding with the message types to whitelist (default) or blacklist. May be specified "
"multiple times (-m Pose -m PoseAux), or as a comma-separated list (-m Pose,PoseAux). All matches are"
"case-insensitive.\n"
"\n"
"If a partial name is specified, the best match will be returned. Use the wildcard '*' to match multiple "
"message types.\n"
"\n"
"Supported types:\n%s" % '\n'.join(['- %s' % c for c in message_type_by_name.keys()]))
parser.add_argument(
'--blacklist', action=ExtendedBooleanAction,
help="By default '--message-type' is a whitelist. If 'blacklist' is set, use them as a blacklist instead.")
parser.add_argument(
'--display', action=ExtendedBooleanAction,
help="Periodically display messages received and forwarded on sterr.")
options = parser.parse_args()

# If the user specified a set of message names, lookup their type values. Below, we will limit the printout to only
# those message types.
message_types = set()
if options.message_type is not None:
# Pattern match to any of:
# -m Type1
# -m Type1 -m Type2
# -m Type1,Type2
# -m Type1,Type2 -m Type3
# -m Type*
try:
message_types = MessagePayload.find_matching_message_types(options.message_type)
if len(message_types) == 0:
# find_matching_message_types() will print an error.
sys.exit(1)
except ValueError as e:
print(str(e))
sys.exit(1)

start_time = datetime.now()
last_print_time = datetime.now()
bytes_received = 0
bytes_forwarded = 0
messages_received = 0
messages_forwarded = 0

# Listen for incoming data.
decoder = FusionEngineDecoder(return_bytes=True)
try:
while True:
# Need to specify read size or read waits for end of file character.
# This returns immediately even if 0 bytes are available.
received_data = sys.stdin.buffer.read(64)
if len(received_data) == 0:
time.sleep(0.1)
else:
bytes_received += len(received_data)
messages = decoder.on_data(received_data)
for (header, message, raw_data) in messages:
messages_received += 1
pass_through_message = (options.blacklist and header.message_type not in message_types) or (not options.blacklist and header.message_type in message_types)
if pass_through_message:
messages_forwarded += 1
bytes_forwarded += len(raw_data)
original_stdout.buffer.write(raw_data)

if options.display:
now = datetime.now()
if (now - last_print_time).total_seconds() > 5.0:
print('Status: [bytes_received=%d, messages_received=%d, bytes_forwarded=%d, messages_forwarded=%d, elapsed_time=%d sec]' %
(bytes_received, messages_received, bytes_forwarded, messages_forwarded, (now - start_time).total_seconds()))
last_print_time = now

except KeyboardInterrupt:
pass
12 changes: 9 additions & 3 deletions python/fusion_engine_client/messages/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ def find_matching_message_types(cls, pattern: Union[str, List[str]], return_clas
part or all of a class name. Patterns may include wildcards (`*`) to match multiple classes. If no
wildcards are specified and multiple classes match, a single result will be returned if there is an exact
match (e.g., `pose` will match to @ref MessageType.POSE, not @ref MessageType.POSE_AUX). All matches are
case-insensitive.
case-insensitive. Patterns can also be the exact integer value of a MessageType. To specify an
unrecognized integer MessageType, precede the value with 'u'.
@param return_class If `True`, return classes for each matching message type (derived from @ref MessagePayload).
Otherwise, return @ref MessageType enum values.
Expand All @@ -453,8 +454,13 @@ def find_matching_message_types(cls, pattern: Union[str, List[str]], return_clas
for pattern in requested_types:
# Check if pattern is the message integer value.
try:
int_val = int(pattern)
result.add(MessageType(int_val))
# Allow specifying unknown MessageType values if they are preceded by 'u'.
if len(pattern) > 1 and pattern[0] == 'u':
int_val = int(pattern[1:])
result.add(MessageType(int_val, raise_on_unrecognized=False))
else:
int_val = int(pattern)
result.add(MessageType(int_val))
except:
allow_multiple = '*' in pattern
re_pattern = pattern.replace('*', '.*')
Expand Down
16 changes: 8 additions & 8 deletions python/fusion_engine_client/parsers/fast_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ def fast_generate_index(
# messages, and filter out messages that fall within previous messages.
#
# Find the end offsets of the messages.
expected_msg_ends = index_raw[:]['offset'] + index_raw[:]['size']
# Propagate forward the largest endpoint found to handle multiple encapsulated messages.
expected_msg_ends = np.maximum.accumulate(expected_msg_ends)
# Find the messages that start after the previous message.
non_overlapped_idx = np.concatenate([[True], index_raw[1:]['offset'] >= expected_msg_ends[:-1]])
_logger.debug(f'Dropped {np.sum(~non_overlapped_idx)} wrapped messages.')
index_raw = index_raw[non_overlapped_idx]

total_entries = len(index_raw)
if total_entries > 0:
expected_msg_ends = index_raw[:]['offset'] + index_raw[:]['size']
# Propagate forward the largest endpoint found to handle multiple encapsulated messages.
expected_msg_ends = np.maximum.accumulate(expected_msg_ends)
# Find the messages that start after the previous message.
non_overlapped_idx = np.concatenate([[True], index_raw[1:]['offset'] >= expected_msg_ends[:-1]])
_logger.debug(f'Dropped {np.sum(~non_overlapped_idx)} wrapped messages.')
index_raw = index_raw[non_overlapped_idx]

_logger.debug(f'FE messages found: {total_entries}')

Expand Down

0 comments on commit f24a208

Please sign in to comment.