Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid indexing wrapped messages. #316

Merged
merged 2 commits into from
May 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions python/fusion_engine_client/parsers/fast_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
# NOTE: This needs to be larger then the biggest possible FE message. The
# smaller it is, the faster the indexer can run, but if it's not large enough,
# messages may be missed.
_MAX_FE_MSG_SIZE_BYTES = 1024 * 12
_MAX_FE_MSG_SIZE_BYTES = 1024 * 16

# This has been tuned on my laptop to where increasing the read size was giving
# diminishing returns on speed.
_READ_SIZE_BYTES = 80 * 1024

_PREAMBLE = struct.unpack('<H', MessageHeader.SYNC)

# This is FileIndex._RAW_DTYPE with an additional size field. The size field is
# used to check for overlapped messages created by `INPUT_DATA_WRAPPER` messages
axlan marked this conversation as resolved.
Show resolved Hide resolved
# split across blocks.
_RAW_DTYPE_WITH_SIZE = np.dtype([('int', '<u4'), ('type', '<u2'), ('offset', '<u8'), ('size', '<u2')])

_logger = logging.getLogger('point_one.fusion_engine.parsers.fast_indexer')


Expand All @@ -44,7 +49,7 @@ def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[i
message_end = 0
num_syncs = 0
# Data corresponding to raw values in FileIndex._RAW_DTYPE.
raw_list: List[Tuple[int, int, int]] = []
raw_list: List[Tuple[int, int, int, int]] = []
with open(input_path, 'rb') as fd:
for i in range(len(block_starts)):
block_offset = block_starts[i]
Expand Down Expand Up @@ -117,12 +122,12 @@ def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[i
_logger.trace(f'Thread {thread_idx}, block {i}: message={header.message_type.to_string()}, '
f'file_offset={absolute_offset} B, p1_time={p1_time}',
depth=3)
raw_list.append((p1_time_raw, int(header.message_type), absolute_offset))
raw_list.append((p1_time_raw, int(header.message_type), absolute_offset, header.get_message_size()))
except BaseException:
pass
_logger.trace(f'Thread {thread_idx}: {num_syncs} sync with {len(raw_list)} valid FE.')
# Return the index data for this section of the file.
return np.array(raw_list, dtype=FileIndex._RAW_DTYPE)
return np.array(raw_list, dtype=_RAW_DTYPE_WITH_SIZE)


def fast_generate_index(
Expand Down Expand Up @@ -190,6 +195,20 @@ def fast_generate_index(
# Kick off the threads to process with their args. Then concatenate their returned data.
index_raw = np.concatenate([o for o in p.starmap(_search_blocks_for_fe, args)])

# INPUT_DATA_WRAPPER messages may encapsulate other complete FE messages.
axlan marked this conversation as resolved.
Show resolved Hide resolved
# Normally, these are ignored. However, if a message straddles one of the
# processing blocks, it can end up indexed. Look at the offsets and sizes of
# the detected messages, and filter out messages that fall within previous
# messages.
# Find the end offsets of the messages.
expected_msg_ends = index_raw[:]['offset'] +index_raw[:]['size']
axlan marked this conversation as resolved.
Show resolved Hide resolved
# Propagate forward the largest endpoint found to handle multiple encapsulated messages.
expected_msg_ends = np.maximum.accumulate(expected_msg_ends)
# Find the messages that start after the previous message.
non_overlapped_idx = np.concatenate([[True], index_raw[1:]['offset'] >= expected_msg_ends[:-1]])
_logger.debug(f'Dropped {np.sum(~non_overlapped_idx)} wrapped messages.')
index_raw = index_raw[non_overlapped_idx]

total_entries = len(index_raw)

_logger.debug(f'FE messages found: {total_entries}')
Expand Down
Loading