Skip to content

Commit

Permalink
Avoid indexing wrapped messages. (#316)
Browse files Browse the repository at this point in the history
# Fixes
- Fixed fast indexing error if a FusionEngine message encapsulates
another complete FusionEngine message
  • Loading branch information
axlan authored May 18, 2024
2 parents 3d11b6b + 2fa8ab3 commit 6d21fbf
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions python/fusion_engine_client/parsers/fast_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
# NOTE: This needs to be larger then the biggest possible FE message. The
# smaller it is, the faster the indexer can run, but if it's not large enough,
# messages may be missed.
_MAX_FE_MSG_SIZE_BYTES = 1024 * 12
_MAX_FE_MSG_SIZE_BYTES = 1024 * 16

# This has been tuned on my laptop to where increasing the read size was giving
# diminishing returns on speed.
_READ_SIZE_BYTES = 80 * 1024

_PREAMBLE = struct.unpack('<H', MessageHeader.SYNC)

# This is FileIndex._RAW_DTYPE with an additional size field. The size field is
# used to check for overlapped messages created by any wrapper messages that may
# contain valid FusionEngine content in their payload split across blocks.
_RAW_DTYPE_WITH_SIZE = np.dtype([('int', '<u4'), ('type', '<u2'), ('offset', '<u8'), ('size', '<u2')])

_logger = logging.getLogger('point_one.fusion_engine.parsers.fast_indexer')


Expand All @@ -35,7 +40,7 @@ def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[i
"""
if len(block_starts) == 0:
_logger.trace(f'Skipping search thread {thread_idx}. [num_blocks={len(block_starts)}]', depth=2)
return np.array([], dtype=FileIndex._RAW_DTYPE)
return np.array([], dtype=_RAW_DTYPE_WITH_SIZE)
else:
_logger.trace(f'Starting search thread {thread_idx}. '
f'[num_blocks={len(block_starts)}, first={block_starts[0]} B, last={block_starts[-1]} B]',
Expand All @@ -44,7 +49,7 @@ def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[i
message_end = 0
num_syncs = 0
# Data corresponding to raw values in FileIndex._RAW_DTYPE.
raw_list: List[Tuple[int, int, int]] = []
raw_list: List[Tuple[int, int, int, int]] = []
with open(input_path, 'rb') as fd:
for i in range(len(block_starts)):
block_offset = block_starts[i]
Expand Down Expand Up @@ -117,12 +122,12 @@ def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[i
_logger.trace(f'Thread {thread_idx}, block {i}: message={header.message_type.to_string()}, '
f'file_offset={absolute_offset} B, p1_time={p1_time}',
depth=3)
raw_list.append((p1_time_raw, int(header.message_type), absolute_offset))
raw_list.append((p1_time_raw, int(header.message_type), absolute_offset, header.get_message_size()))
except BaseException:
pass
_logger.trace(f'Thread {thread_idx}: {num_syncs} sync with {len(raw_list)} valid FE.')
# Return the index data for this section of the file.
return np.array(raw_list, dtype=FileIndex._RAW_DTYPE)
return np.array(raw_list, dtype=_RAW_DTYPE_WITH_SIZE)


def fast_generate_index(
Expand Down Expand Up @@ -190,6 +195,20 @@ def fast_generate_index(
# Kick off the threads to process with their args. Then concatenate their returned data.
index_raw = np.concatenate([o for o in p.starmap(_search_blocks_for_fe, args)])

# Some messages may encapsulate other complete FE messages. Normally, these
# are ignored. However, if a message straddles one of the processing blocks,
# it can end up indexed. Look at the offsets and sizes of the detected
# messages, and filter out messages that fall within previous messages.
#
# Find the end offsets of the messages.
expected_msg_ends = index_raw[:]['offset'] + index_raw[:]['size']
# Propagate forward the largest endpoint found to handle multiple encapsulated messages.
expected_msg_ends = np.maximum.accumulate(expected_msg_ends)
# Find the messages that start after the previous message.
non_overlapped_idx = np.concatenate([[True], index_raw[1:]['offset'] >= expected_msg_ends[:-1]])
_logger.debug(f'Dropped {np.sum(~non_overlapped_idx)} wrapped messages.')
index_raw = index_raw[non_overlapped_idx]

total_entries = len(index_raw)

_logger.debug(f'FE messages found: {total_entries}')
Expand Down

0 comments on commit 6d21fbf

Please sign in to comment.