Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid indexing wrapped messages. #316

Merged
merged 2 commits into from
May 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions python/fusion_engine_client/parsers/fast_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
# NOTE: This needs to be larger then the biggest possible FE message. The
# smaller it is, the faster the indexer can run, but if it's not large enough,
# messages may be missed.
_MAX_FE_MSG_SIZE_BYTES = 1024 * 12
_MAX_FE_MSG_SIZE_BYTES = 1024 * 16

# This has been tuned on my laptop to where increasing the read size was giving
# diminishing returns on speed.
_READ_SIZE_BYTES = 80 * 1024

_PREAMBLE = struct.unpack('<H', MessageHeader.SYNC)

# This is FileIndex._RAW_DTYPE with an additional size field. The size field is
# used to check for overlapped messages created by any wrapper messages that may
# contain valid FusionEngine content in their payload split across blocks.
_RAW_DTYPE_WITH_SIZE = np.dtype([('int', '<u4'), ('type', '<u2'), ('offset', '<u8'), ('size', '<u2')])

_logger = logging.getLogger('point_one.fusion_engine.parsers.fast_indexer')


Expand All @@ -35,7 +40,7 @@ def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[i
"""
if len(block_starts) == 0:
_logger.trace(f'Skipping search thread {thread_idx}. [num_blocks={len(block_starts)}]', depth=2)
return np.array([], dtype=FileIndex._RAW_DTYPE)
return np.array([], dtype=_RAW_DTYPE_WITH_SIZE)
else:
_logger.trace(f'Starting search thread {thread_idx}. '
f'[num_blocks={len(block_starts)}, first={block_starts[0]} B, last={block_starts[-1]} B]',
Expand All @@ -44,7 +49,7 @@ def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[i
message_end = 0
num_syncs = 0
# Data corresponding to raw values in FileIndex._RAW_DTYPE.
raw_list: List[Tuple[int, int, int]] = []
raw_list: List[Tuple[int, int, int, int]] = []
with open(input_path, 'rb') as fd:
for i in range(len(block_starts)):
block_offset = block_starts[i]
Expand Down Expand Up @@ -117,12 +122,12 @@ def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[i
_logger.trace(f'Thread {thread_idx}, block {i}: message={header.message_type.to_string()}, '
f'file_offset={absolute_offset} B, p1_time={p1_time}',
depth=3)
raw_list.append((p1_time_raw, int(header.message_type), absolute_offset))
raw_list.append((p1_time_raw, int(header.message_type), absolute_offset, header.get_message_size()))
except BaseException:
pass
_logger.trace(f'Thread {thread_idx}: {num_syncs} sync with {len(raw_list)} valid FE.')
# Return the index data for this section of the file.
return np.array(raw_list, dtype=FileIndex._RAW_DTYPE)
return np.array(raw_list, dtype=_RAW_DTYPE_WITH_SIZE)


def fast_generate_index(
Expand Down Expand Up @@ -190,6 +195,20 @@ def fast_generate_index(
# Kick off the threads to process with their args. Then concatenate their returned data.
index_raw = np.concatenate([o for o in p.starmap(_search_blocks_for_fe, args)])

# Some messages may encapsulate other complete FE messages. Normally, these
# are ignored. However, if a message straddles one of the processing blocks,
# it can end up indexed. Look at the offsets and sizes of the detected
# messages, and filter out messages that fall within previous messages.
#
# Find the end offsets of the messages.
expected_msg_ends = index_raw[:]['offset'] + index_raw[:]['size']
# Propagate forward the largest endpoint found to handle multiple encapsulated messages.
expected_msg_ends = np.maximum.accumulate(expected_msg_ends)
# Find the messages that start after the previous message.
non_overlapped_idx = np.concatenate([[True], index_raw[1:]['offset'] >= expected_msg_ends[:-1]])
_logger.debug(f'Dropped {np.sum(~non_overlapped_idx)} wrapped messages.')
index_raw = index_raw[non_overlapped_idx]

total_entries = len(index_raw)

_logger.debug(f'FE messages found: {total_entries}')
Expand Down
Loading