From a5d26d1b6896f6c6ddb2ab8b56c66a8483df33f0 Mon Sep 17 00:00:00 2001 From: Jonathan Diamond Date: Tue, 3 Oct 2023 20:47:16 -0700 Subject: [PATCH] Integrate `fast_generate_index` into MixedLogReader. --- python/bin/p1_lband_extract | 3 - python/bin/p1_print | 10 +- .../analysis/data_loader.py | 12 +- .../parsers/fast_indexer.py | 44 +++++- .../parsers/mixed_log_reader.py | 146 +++--------------- 5 files changed, 63 insertions(+), 152 deletions(-) diff --git a/python/bin/p1_lband_extract b/python/bin/p1_lband_extract index fe1d5732..5083ea60 100755 --- a/python/bin/p1_lband_extract +++ b/python/bin/p1_lband_extract @@ -115,9 +115,6 @@ Extract L-band corrections data from a log containing FusionEngine L-band frame ignore_index=not read_index, generate_index=generate_index, message_types=message_types, time_range=time_range) - if reader.generating_index(): - _logger.info('Generating index file - processing complete file. This may take some time.') - total_messages = 0 bytes_decoded = 0 diff --git a/python/bin/p1_print b/python/bin/p1_print index 37ceea5d..65c2a447 100755 --- a/python/bin/p1_print +++ b/python/bin/p1_print @@ -112,7 +112,6 @@ other types of data. options = parser.parse_args() read_index = not options.ignore_index - generate_index = not options.ignore_index # Configure logging. if options.verbose >= 1: @@ -162,11 +161,7 @@ other types of data. # Process all data in the file. reader = MixedLogReader(input_path, return_bytes=True, return_offset=True, show_progress=options.progress, - ignore_index=not read_index, generate_index=generate_index and read_index, - message_types=message_types, time_range=time_range) - - if reader.generating_index() and (len(message_types) > 0 or options.time is not None): - _logger.info('Generating index file - processing complete file. This may take some time.') + ignore_index=not read_index, message_types=message_types, time_range=time_range) first_p1_time_sec = None last_p1_time_sec = None @@ -259,7 +254,8 @@ other types of data. _logger.info(format_string.format('Message Name', 'Type', 'Count')) _logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) for type, info in sorted(message_stats.items(), key=lambda x: int(x[0])): - _logger.info(format_string.format(message_type_to_class[type].__name__, int(type), info['count'])) + name = message_type_to_class[type].__name__ if type in message_type_to_class else "Unknown" + _logger.info(format_string.format(name, int(type), info['count'])) _logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) _logger.info(format_string.format('Total', '', total_messages)) elif total_messages == 0: diff --git a/python/fusion_engine_client/analysis/data_loader.py b/python/fusion_engine_client/analysis/data_loader.py index 23d3c25d..e53784c6 100644 --- a/python/fusion_engine_client/analysis/data_loader.py +++ b/python/fusion_engine_client/analysis/data_loader.py @@ -173,8 +173,6 @@ def open(self, path, generate_index=True, ignore_index=False): self.reader = MixedLogReader(input_file=path, generate_index=generate_index, ignore_index=ignore_index, return_bytes=True, return_message_index=True) - if self.reader.have_index(): - self.logger.debug("Using index file '%s'." % self.reader.index_path) # Read the first message (with P1 time) in the file to set self.t0. # @@ -434,7 +432,6 @@ def _read(self, else: self.reader.rewind() self.reader.clear_filters() - self.reader.set_generate_index(self._generate_index and not disable_index_generation) self.reader.set_show_progress(show_progress) self.reader.set_max_bytes(max_bytes) @@ -576,13 +573,8 @@ def _read(self, data_cache[header.message_type].message_index.append(message_index) if max_messages is not None: - # If we reached the max message count but we're generating an index file, we need to read through the - # entire data file. Keep reading but discard any further messages. - if self.reader.generating_index() and message_count > abs(max_messages): - logger.debug(' Max messages reached. Discarding. [# messages=%d]' % message_count) - continue - # If we're not generating an index and we hit the max message count, we're done reading. - elif not self.reader.generating_index() and message_count == abs(max_messages): + # If we hit the max message count, we're done reading. + if message_count == abs(max_messages): logger.debug(' Max messages reached. Done reading. [# messages=%d]' % message_count) break diff --git a/python/fusion_engine_client/parsers/fast_indexer.py b/python/fusion_engine_client/parsers/fast_indexer.py index 9c51b0be..ff9192e4 100644 --- a/python/fusion_engine_client/parsers/fast_indexer.py +++ b/python/fusion_engine_client/parsers/fast_indexer.py @@ -60,7 +60,7 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]): # This is lot faster then doing this check in raw Python due to numpy optimizations. sync_matches = np.where(np_data == _PREAMBLE)[0] - # To do the CRC check and find a p1_time the full message needs to be parsed. Depending on the application it might be better to + # To do the CRC check and find a p1_time the full message needs to be parsed. for i in sync_matches: try: # Check if the message has a valid length and CRC. This could probably be optimized. @@ -91,15 +91,44 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]): return np.array(raw_list, dtype=FileIndex._RAW_DTYPE) -def fast_generate_index(input_path: str, num_threads=cpu_count()) -> FileIndex: +def fast_generate_index( + input_path: str, + force_reindex=False, + save_index=True, + max_bytes=None, + num_threads=cpu_count()) -> FileIndex: """! - @brief Quickly build a FileIndex + @brief Quickly build a FileIndex. + + This basic logic would be relatively easy to extend to do general parallel processing on messages. @param input_path The path to the file to be read. + @param force_reindex If `True` regenerate the index even if there's an existing file. + @param save_index If `True` save the index to disk after generation. + @param max_bytes If specified, read up to the maximum number of bytes. @param block_starts The blocks of data to search for FE messages in. """ file_size = os.stat(input_path).st_size _logger.debug(f'File size: {int(file_size/1024/1024)}MB') + + if max_bytes and max_bytes < file_size: + _logger.debug(f'Only indexing: {max_bytes/1024/1024}MB') + file_size = max_bytes + if save_index: + save_index = False + _logger.info('Max bytes specified. Disabling saving index.') + + index_path = FileIndex.get_path(input_path) + # Check if index file can be loaded. + if not force_reindex and os.path.exists(index_path): + try: + index = FileIndex(index_path, input_path) + _logger.info(f'Loading existing cache: "{index_path}".') + return index + except ValueError as e: + _logger.warning(f'Couldn\'t load cache "{index_path}": {str(e)}.') + + _logger.info(f'Indexing file "{input_path}". This may take a few seconds.') _logger.debug(f'Using Threads: {num_threads}') # These are the args passed to the _search_blocks_for_fe instances. @@ -128,11 +157,12 @@ def fast_generate_index(input_path: str, num_threads=cpu_count()) -> FileIndex: _logger.debug(f'FE messages found: {total_entries}') - index_path = FileIndex.get_path(input_path) - _logger.debug("Saving index file as '%s'." % index_path) - # This does an unnecessary conversion back and forth from the _RAW_DTYPE to # the _DTYPE and back again when saving. This is to avoid needing to deal # with the EOF entry directly. This adds less then a second, and could be # avoided by reproducing the eof stamping logic. - return FileIndex(data=FileIndex._from_raw(index_raw)) + index = FileIndex(data=FileIndex._from_raw(index_raw)) + if save_index: + _logger.info(f'Saving index to "{index_path}".') + index.save(index_path, input_path) + return index diff --git a/python/fusion_engine_client/parsers/mixed_log_reader.py b/python/fusion_engine_client/parsers/mixed_log_reader.py index 4389acec..a99a0650 100644 --- a/python/fusion_engine_client/parsers/mixed_log_reader.py +++ b/python/fusion_engine_client/parsers/mixed_log_reader.py @@ -7,7 +7,7 @@ import numpy as np -from . import file_index +from . import fast_indexer, file_index from ..messages import MessageType, MessageHeader, MessagePayload, Timestamp, message_type_to_class from ..utils import trace as logging from ..utils.time_range import TimeRange @@ -102,38 +102,13 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool = self.max_bytes = sys.maxsize else: self.max_bytes = max_bytes - if generate_index: - self.logger.debug('Max bytes specified. Disabling index generation.') - generate_index = False - - # Open the companion index file if one exists. - self.index_path = file_index.FileIndex.get_path(input_path) - self._original_index = None - self.index = None - self.next_index_elem = 0 - if ignore_index: - if os.path.exists(self.index_path): - if generate_index: - self.logger.debug("Deleting/regenerating index file @ '%s'." % self.index_path) - os.remove(self.index_path) - else: - self.logger.debug("Ignoring index file @ '%s'." % self.index_path) - else: - if os.path.exists(self.index_path): - try: - self.logger.debug("Loading index file '%s'." % self.index_path) - self._original_index = file_index.FileIndex(index_path=self.index_path, data_path=input_path, - delete_on_error=generate_index) - self.index = self._original_index[self.message_types][self.time_range] - self.filtered_message_types = len(np.unique(self._original_index.type)) != \ - len(np.unique(self.index.type)) - except ValueError as e: - self.logger.error("Error loading index file: %s" % str(e)) - else: - self.logger.debug("No index file found @ '%s'." % self.index_path) - self.index_builder = None - self.set_generate_index(generate_index) + # Open the companion index file if one exists, otherwise index the file. + self._original_index = fast_indexer.fast_generate_index(input_path, ignore_index, generate_index, max_bytes) + self.next_index_elem = 0 + self.index = self._original_index[self.message_types][self.time_range] + self.filtered_message_types = len(np.unique(self._original_index.type)) != \ + len(np.unique(self.index.type)) def rewind(self): self.logger.debug('Rewinding to the start of the file.') @@ -155,9 +130,6 @@ def rewind(self): self.next_index_elem = 0 self.input_file.seek(0, os.SEEK_SET) - if self.index_builder is not None: - self.index_builder = file_index.FileIndexBuilder() - def seek_to_message(self, message_index: int, is_filtered_index: bool = False): if self.index is None: raise NotImplemented('A file index is required to seek by message index.') @@ -185,18 +157,6 @@ def have_index(self): def get_index(self): return self._original_index - def generating_index(self): - return self.index_builder is not None - - def set_generate_index(self, generate_index): - if self._original_index is None: - if generate_index: - self.logger.debug("Generating index file '%s'." % self.index_path) - self.index_builder = file_index.FileIndexBuilder() - else: - self.logger.debug("Index generation disabled.") - self.index_builder = None - def set_show_progress(self, show_progress): self.show_progress = show_progress @@ -205,9 +165,6 @@ def set_max_bytes(self, max_bytes): self.max_bytes = sys.maxsize else: self.max_bytes = max_bytes - if self.index_builder is not None: - self.logger.debug('Max bytes specified. Disabling index generation.') - self.set_generate_index(False) def get_bytes_read(self): return self.total_bytes_read @@ -215,16 +172,12 @@ def get_bytes_read(self): def next(self): return self.read_next() - def read_next(self, require_p1_time=False, require_system_time=False, generate_index=True): - return self._read_next(require_p1_time=require_p1_time, require_system_time=require_system_time, - generate_index=generate_index) + def read_next(self, require_p1_time=False, require_system_time=False): + return self._read_next(require_p1_time=require_p1_time, require_system_time=require_system_time) - def _read_next(self, require_p1_time=False, require_system_time=False, generate_index=True, force_eof=False): + def _read_next(self, require_p1_time=False, require_system_time=False, force_eof=False): if force_eof: if not self.reached_eof(): - if self.generating_index(): - raise ValueError('Cannot jump to EOF while building an index file.') - self.logger.debug('Forcibly seeking to EOF.') if self.index is None: self.input_file.seek(self.file_size_bytes, os.SEEK_SET) @@ -337,8 +290,7 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ # Deserialize the payload if we need it. need_payload = self.return_payload or \ self.time_range is not None or \ - require_p1_time or require_system_time or \ - (self.index_builder is not None and generate_index) + require_p1_time or require_system_time if need_payload: cls = message_type_to_class.get(header.message_type, None) @@ -362,11 +314,6 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ # Extract P1 time if available. p1_time = payload.get_p1_time() if payload is not None else Timestamp() - # Add this message to the index file. - if self.index_builder is not None and generate_index: - self.index_builder.append(message_type=header.message_type, offset_bytes=start_offset_bytes, - p1_time=p1_time) - # Now, if this message is not in the user-specified filter criteria, skip it. # # If we have an index available, this is implied by the index (we won't seek to messages that don't meet @@ -381,7 +328,7 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ self.logger.trace("Message does not have valid P1 time. Skipping.", depth=1) continue elif self.time_range is not None and not self.time_range.is_in_range(payload): - if self.time_range.in_range_started() and (self.index_builder is None or not generate_index): + if self.time_range.in_range_started(): self.logger.debug("End of time range reached. Finished processing.") break else: @@ -414,19 +361,6 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ self._print_progress(self.total_bytes_read) self.logger.debug("Read %d bytes total." % self.total_bytes_read) - # If we are creating an index file, save it now. - if self.index_builder is not None and generate_index: - self.logger.debug("Saving index file as '%s'." % self.index_path) - self._original_index = self.index_builder.save(self.index_path, self.input_file.name) - self.index_builder = None - - self.index = self._original_index[self.message_types][self.time_range] - if self.remove_invalid_p1_time: - self.index = self.index.get_time_range(hint='remove_nans') - self.message_types = None - self.time_range = None - self.next_index_elem = len(self.index) - # Finished iterating. if force_eof: return @@ -434,50 +368,15 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ raise StopIteration() def _advance_to_next_sync(self): - if self.index is None: - try: - if self.logger.isEnabledFor(logging.getTraceLevel(depth=2)): - self.logger.trace('Starting next sync search @ %d (0x%x).' % - (self.total_bytes_read, self.total_bytes_read), - depth=2) - while True: - if self.total_bytes_read + 1 >= self.max_bytes: - self.logger.debug('Max read length exceeded (%d B).' % self.max_bytes) - return False - - byte0 = self.input_file.read(1)[0] - self.total_bytes_read += 1 - while True: - if byte0 == MessageHeader.SYNC0: - if self.total_bytes_read + 1 >= self.max_bytes: - self.logger.debug('Max read length exceeded (%d B).' % self.max_bytes) - return False - - byte1 = self.input_file.read(1)[0] - self.total_bytes_read += 1 - if byte1 == MessageHeader.SYNC1: - self.input_file.seek(-2, os.SEEK_CUR) - self.total_bytes_read -= 2 - if self.logger.isEnabledFor(logging.getTraceLevel(depth=3)): - self.logger.trace('Sync bytes found @ %d (0x%x).' % - (self.total_bytes_read, self.total_bytes_read), - depth=3) - return True - byte0 = byte1 - else: - break - except IndexError: - return False + if self.next_index_elem == len(self.index): + return False else: - if self.next_index_elem == len(self.index): - return False - else: - offset_bytes = self.index.offset[self.next_index_elem] - self.current_message_index = self.index.message_index[self.next_index_elem] - self.next_index_elem += 1 - self.input_file.seek(offset_bytes, os.SEEK_SET) - self.total_bytes_read = offset_bytes - return True + offset_bytes = self.index.offset[self.next_index_elem] + self.current_message_index = self.index.message_index[self.next_index_elem] + self.next_index_elem += 1 + self.input_file.seek(offset_bytes, os.SEEK_SET) + self.total_bytes_read = offset_bytes + return True def _print_progress(self, file_size=None): show_progress = self.show_progress @@ -664,7 +563,4 @@ def __next__(self): @classmethod def generate_index_file(cls, input_file): - reader = MixedLogReader(input_file=input_file, ignore_index=False, generate_index=True, return_payload=False) - if reader.index is None: - for _ in reader: - pass + return fast_indexer.fast_generate_index(input_file)