Skip to content

Commit

Permalink
Added additional trace prints to the fast indexer. (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamshapiro0 authored May 3, 2024
2 parents e5e6bda + 1b2112d commit 53e14f8
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions python/fusion_engine_client/parsers/fast_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
_logger = logging.getLogger('point_one.fusion_engine.parsers.fast_indexer')


def _search_blocks_for_fe(input_path: str, block_starts: List[int]):
def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[int]):
"""!
@brief Search the specified portions of the file for the start offsets of valid FE messages.
Expand All @@ -33,6 +33,13 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]):
@return The raw index data corresponding to this thread's data blocks.
"""
if len(block_starts) == 0:
_logger.trace(f'Skipping search thread {thread_idx}. [num_blocks={len(block_starts)}]', depth=2)
return np.array([], dtype=FileIndex._RAW_DTYPE)
else:
_logger.trace(f'Starting search thread {thread_idx}. '
f'[num_blocks={len(block_starts)}, first={block_starts[0]} B, last={block_starts[-1]} B]',
depth=2)
header = MessageHeader()
message_end = 0
num_syncs = 0
Expand Down Expand Up @@ -71,6 +78,7 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]):
# This is lot faster then doing this check in raw Python due to numpy optimizations.
sync_matches = np.where(np_data == _PREAMBLE)[0]

_logger.trace(f'Thread {thread_idx}, block {i}: {len(sync_matches)} matches', depth=2)
num_syncs += len(sync_matches)

# To do the CRC check and find a p1_time the full message needs to be parsed. This
Expand Down Expand Up @@ -105,10 +113,14 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]):
# Convert the Timestamp to an integer.
p1_time_raw = Timestamp._INVALID if math.isnan(p1_time.seconds) else int(p1_time.seconds)
message_end = absolute_offset + header.get_message_size()
if _logger.isEnabledFor(logging.getTraceLevel(depth=3)):
_logger.trace(f'Thread {thread_idx}, block {i}: message={header.message_type.to_string()}, '
f'file_offset={absolute_offset} B, p1_time={p1_time}',
depth=3)
raw_list.append((p1_time_raw, int(header.message_type), absolute_offset))
except BaseException:
pass
_logger.trace(f'{num_syncs} sync with {len(raw_list)} valid FE.')
_logger.trace(f'Thread {thread_idx}: {num_syncs} sync with {len(raw_list)} valid FE.')
# Return the index data for this section of the file.
return np.array(raw_list, dtype=FileIndex._RAW_DTYPE)

Expand Down Expand Up @@ -156,7 +168,7 @@ def fast_generate_index(
_logger.debug(f'Using Threads: {num_threads}')

# These are the args passed to the _search_blocks_for_fe instances.
args: list[Tuple[str, List[int]]] = []
args: list[Tuple[str, int, List[int]]] = []
# Allocate which blocks of bytes will be processed by each thread.
num_blocks = math.ceil(file_size / _READ_SIZE_BYTES)
# Each thread will process at least blocks_per_thread blocks. If the number
Expand All @@ -167,7 +179,8 @@ def fast_generate_index(
blocks = blocks_per_thread
if i < blocks_remainder:
blocks += 1
args.append((input_path, list(range(byte_offset, byte_offset + blocks * _READ_SIZE_BYTES, _READ_SIZE_BYTES))))
args.append((input_path, i,
list(range(byte_offset, byte_offset + blocks * _READ_SIZE_BYTES, _READ_SIZE_BYTES))))
byte_offset += blocks * _READ_SIZE_BYTES

_logger.debug(f'Reads/thread: {blocks_per_thread}')
Expand Down

0 comments on commit 53e14f8

Please sign in to comment.