From 1b2112d816a815c70cd3c01a1ccb00618085d5b1 Mon Sep 17 00:00:00 2001 From: Adam Shapiro Date: Tue, 23 Apr 2024 12:04:58 -0400 Subject: [PATCH] Added additional trace prints to the fast indexer. --- .../parsers/fast_indexer.py | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/python/fusion_engine_client/parsers/fast_indexer.py b/python/fusion_engine_client/parsers/fast_indexer.py index de80343f..cf4f0dd7 100644 --- a/python/fusion_engine_client/parsers/fast_indexer.py +++ b/python/fusion_engine_client/parsers/fast_indexer.py @@ -24,7 +24,7 @@ _logger = logging.getLogger('point_one.fusion_engine.parsers.fast_indexer') -def _search_blocks_for_fe(input_path: str, block_starts: List[int]): +def _search_blocks_for_fe(input_path: str, thread_idx: int, block_starts: List[int]): """! @brief Search the specified portions of the file for the start offsets of valid FE messages. @@ -33,6 +33,13 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]): @return The raw index data corresponding to this thread's data blocks. """ + if len(block_starts) == 0: + _logger.trace(f'Skipping search thread {thread_idx}. [num_blocks={len(block_starts)}]', depth=2) + return np.array([], dtype=FileIndex._RAW_DTYPE) + else: + _logger.trace(f'Starting search thread {thread_idx}. ' + f'[num_blocks={len(block_starts)}, first={block_starts[0]} B, last={block_starts[-1]} B]', + depth=2) header = MessageHeader() message_end = 0 num_syncs = 0 @@ -71,6 +78,7 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]): # This is lot faster then doing this check in raw Python due to numpy optimizations. sync_matches = np.where(np_data == _PREAMBLE)[0] + _logger.trace(f'Thread {thread_idx}, block {i}: {len(sync_matches)} matches', depth=2) num_syncs += len(sync_matches) # To do the CRC check and find a p1_time the full message needs to be parsed. This @@ -105,10 +113,14 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]): # Convert the Timestamp to an integer. p1_time_raw = Timestamp._INVALID if math.isnan(p1_time.seconds) else int(p1_time.seconds) message_end = absolute_offset + header.get_message_size() + if _logger.isEnabledFor(logging.getTraceLevel(depth=3)): + _logger.trace(f'Thread {thread_idx}, block {i}: message={header.message_type.to_string()}, ' + f'file_offset={absolute_offset} B, p1_time={p1_time}', + depth=3) raw_list.append((p1_time_raw, int(header.message_type), absolute_offset)) except BaseException: pass - _logger.trace(f'{num_syncs} sync with {len(raw_list)} valid FE.') + _logger.trace(f'Thread {thread_idx}: {num_syncs} sync with {len(raw_list)} valid FE.') # Return the index data for this section of the file. return np.array(raw_list, dtype=FileIndex._RAW_DTYPE) @@ -156,7 +168,7 @@ def fast_generate_index( _logger.debug(f'Using Threads: {num_threads}') # These are the args passed to the _search_blocks_for_fe instances. - args: list[Tuple[str, List[int]]] = [] + args: list[Tuple[str, int, List[int]]] = [] # Allocate which blocks of bytes will be processed by each thread. num_blocks = math.ceil(file_size / _READ_SIZE_BYTES) # Each thread will process at least blocks_per_thread blocks. If the number @@ -167,7 +179,8 @@ def fast_generate_index( blocks = blocks_per_thread if i < blocks_remainder: blocks += 1 - args.append((input_path, list(range(byte_offset, byte_offset + blocks * _READ_SIZE_BYTES, _READ_SIZE_BYTES)))) + args.append((input_path, i, + list(range(byte_offset, byte_offset + blocks * _READ_SIZE_BYTES, _READ_SIZE_BYTES)))) byte_offset += blocks * _READ_SIZE_BYTES _logger.debug(f'Reads/thread: {blocks_per_thread}')