diff --git a/python/bin/p1_lband_extract b/python/bin/p1_lband_extract index fe1d5732..974a7849 100755 --- a/python/bin/p1_lband_extract +++ b/python/bin/p1_lband_extract @@ -65,7 +65,6 @@ Extract L-band corrections data from a log containing FusionEngine L-band frame options = parser.parse_args() read_index = not options.ignore_index - generate_index = not options.ignore_index # Configure logging. if options.verbose >= 1: @@ -112,11 +111,7 @@ Extract L-band corrections data from a log containing FusionEngine L-band frame # Process all LBandFrameMessage data in the file. reader = MixedLogReader(input_path, return_bytes=True, return_offset=True, show_progress=options.progress, - ignore_index=not read_index, generate_index=generate_index, - message_types=message_types, time_range=time_range) - - if reader.generating_index(): - _logger.info('Generating index file - processing complete file. This may take some time.') + ignore_index=not read_index, message_types=message_types, time_range=time_range) total_messages = 0 bytes_decoded = 0 diff --git a/python/bin/p1_print b/python/bin/p1_print index 06173d6f..fb55078e 100755 --- a/python/bin/p1_print +++ b/python/bin/p1_print @@ -112,7 +112,6 @@ other types of data. options = parser.parse_args() read_index = not options.ignore_index - generate_index = not options.ignore_index # Configure logging. if options.verbose >= 1: @@ -162,11 +161,7 @@ other types of data. # Process all data in the file. reader = MixedLogReader(input_path, return_bytes=True, return_offset=True, show_progress=options.progress, - ignore_index=not read_index, generate_index=generate_index and read_index, - message_types=message_types, time_range=time_range) - - if reader.generating_index() and (len(message_types) > 0 or options.time is not None): - _logger.info('Generating index file - processing complete file. This may take some time.') + ignore_index=not read_index, message_types=message_types, time_range=time_range) first_p1_time_sec = None last_p1_time_sec = None @@ -262,7 +257,8 @@ other types of data. _logger.info(format_string.format('Message Name', 'Type', 'Count')) _logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) for type, info in sorted(message_stats.items(), key=lambda x: int(x[0])): - _logger.info(format_string.format(message_type_to_class[type].__name__, int(type), info['count'])) + name = message_type_to_class[type].__name__ if type in message_type_to_class else "Unknown" + _logger.info(format_string.format(name, int(type), info['count'])) _logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8)) _logger.info(format_string.format('Total', '', total_messages)) elif total_messages == 0: diff --git a/python/fusion_engine_client/analysis/analyzer.py b/python/fusion_engine_client/analysis/analyzer.py index 72f33b12..8a0ff7e7 100755 --- a/python/fusion_engine_client/analysis/analyzer.py +++ b/python/fusion_engine_client/analysis/analyzer.py @@ -2206,10 +2206,6 @@ def generate_index(self, auto_open=True): self._open_browser(index_path) def _calculate_duration(self, return_index=False): - # Generate an index file, which we need to calculate the log duration, in case it wasn't created earlier (i.e., - # we didn't read anything to plot). - self.reader.generate_index(show_progress=True) - # Restrict the index to the user-requested time range. full_index = self.reader.get_index() reduced_index = full_index[self.params['time_range']] diff --git a/python/fusion_engine_client/analysis/data_loader.py b/python/fusion_engine_client/analysis/data_loader.py index 23d3c25d..38a5d80b 100644 --- a/python/fusion_engine_client/analysis/data_loader.py +++ b/python/fusion_engine_client/analysis/data_loader.py @@ -135,15 +135,15 @@ class DataLoader(object): logger = logging.getLogger('point_one.fusion_engine.analysis.data_loader') - def __init__(self, path=None, generate_index=True, ignore_index=False): + def __init__(self, path=None, save_index=True, ignore_index=False): """! @brief Create a new reader instance. @param path The path to a binary file containing FusionEngine messages, or an existing Python file object. - @param generate_index If `True`, generate a `.p1i` index file if one does not exist for faster reading in the + @param save_index If `True`, save a `.p1i` index file if one does not exist for faster reading in the future. See @ref FileIndex for details. @param ignore_index If `True`, ignore the existing index file and read from the `.p1log` binary file directly. - If `generate_index == True`, this will delete the existing file and create a new one. + If `save_index == True`, this will delete the existing file and create a new one. """ self.reader: MixedLogReader = None @@ -155,26 +155,24 @@ def __init__(self, path=None, generate_index=True, ignore_index=False): self._need_t0 = True self._need_system_t0 = True - self._generate_index = generate_index + self._generate_index = save_index if path is not None: - self.open(path, generate_index=generate_index, ignore_index=ignore_index) + self.open(path, save_index=save_index, ignore_index=ignore_index) - def open(self, path, generate_index=True, ignore_index=False): + def open(self, path, save_index=True, ignore_index=False): """! @brief Open a FusionEngine binary file. @param path The path to a binary file containing FusionEngine messages, or an existing Python file object. - @param generate_index If `True`, generate a `.p1i` index file if one does not exist for faster reading in the + @param save_index If `True`, generate a `.p1i` index file if one does not exist for faster reading in the future. See @ref FileIndex for details. @param ignore_index If `True`, ignore the existing index file and read from the `.p1log` binary file directly. - If `generate_index == True`, this will delete the existing file and create a new one. + If `save_index == True`, this will delete the existing file and create a new one. """ self.close() - self.reader = MixedLogReader(input_file=path, generate_index=generate_index, ignore_index=ignore_index, + self.reader = MixedLogReader(input_file=path, save_index=save_index, ignore_index=ignore_index, return_bytes=True, return_message_index=True) - if self.reader.have_index(): - self.logger.debug("Using index file '%s'." % self.reader.index_path) # Read the first message (with P1 time) in the file to set self.t0. # @@ -218,28 +216,6 @@ def close(self): if self.reader is not None: self.reader = None - def generate_index(self, show_progress=False): - """! - @brief Generate an index file for the current binary file if one does not already exist. - """ - if not self.reader.have_index(): - # We'll read pose data (doesn't actually matter which). Store the currently cached data and restore it when - # we're done. That way if the user already did a read (with generate_index == False), they don't have to - # re-read the data if they try to use it again. - prev_data = self.data.get(MessageType.POSE, None) - - if show_progress: - self.logger.info('Generating data index for faster access. This may take a few minutes...') - else: - self.logger.debug('Generating data index for faster access. This may take a few minutes...') - - self._generate_index = True - self.read(message_types=[MessageType.POSE], max_messages=1, disable_index_generation=False, - ignore_cache=True, show_progress=show_progress, quiet=True) - - if prev_data is not None: - self.data[MessageType.POSE] = prev_data - def read(self, *args, **kwargs) \ -> Union[Dict[MessageType, MessageData], MessageData]: """! @@ -248,7 +224,7 @@ def read(self, *args, **kwargs) \ The read data will be cached internally. Subsequent reads for the same data type will return the cached data. @note - This function uses a data index file to speed up reads when available. If `generate_index == True` and no index + This function uses a data index file to speed up reads when available. If `save_index == True` and no index file exists, one will be generated automatically. In order to do this, this function must read the entire data file, even if it could normally return early when `max_messages` or the end of `time_range` are reached. @@ -258,7 +234,7 @@ def read(self, *args, **kwargs) \ be read. See @ref TimeRange for more details. @param show_progress If `True`, print the read progress every 10 MB (useful for large files). - @param disable_index_generation If `True`, override the `generate_index` argument provided to `open()` and do + @param disable_index_generation If `True`, override the `save_index` argument provided to `open()` and do not generate an index file during this call (intended for internal use only). @param ignore_cache If `True`, ignore any cached data from a previous @ref read() call, and reload the requested data from disk. @@ -434,7 +410,6 @@ def _read(self, else: self.reader.rewind() self.reader.clear_filters() - self.reader.set_generate_index(self._generate_index and not disable_index_generation) self.reader.set_show_progress(show_progress) self.reader.set_max_bytes(max_bytes) @@ -576,13 +551,8 @@ def _read(self, data_cache[header.message_type].message_index.append(message_index) if max_messages is not None: - # If we reached the max message count but we're generating an index file, we need to read through the - # entire data file. Keep reading but discard any further messages. - if self.reader.generating_index() and message_count > abs(max_messages): - logger.debug(' Max messages reached. Discarding. [# messages=%d]' % message_count) - continue - # If we're not generating an index and we hit the max message count, we're done reading. - elif not self.reader.generating_index() and message_count == abs(max_messages): + # If we hit the max message count, we're done reading. + if message_count == abs(max_messages): logger.debug(' Max messages reached. Done reading. [# messages=%d]' % message_count) break diff --git a/python/fusion_engine_client/parsers/fast_indexer.py b/python/fusion_engine_client/parsers/fast_indexer.py new file mode 100644 index 00000000..c27e2318 --- /dev/null +++ b/python/fusion_engine_client/parsers/fast_indexer.py @@ -0,0 +1,184 @@ +import os +import math +from multiprocessing import Pool, cpu_count +from typing import List, Tuple +import struct + +import numpy as np + +from ..messages import MessageHeader, Timestamp, message_type_to_class +from ..utils import trace as logging +from .file_index import FileIndex + +# NOTE: This needs to be larger then the biggest possible FE message. The +# smaller it is, the faster the indexer can run, but if it's not large enough, +# messages may be missed. +_MAX_FE_MSG_SIZE_BYTES = 1024 * 12 + +# This has been tuned on my laptop to where increasing the read size was giving +# diminishing returns on speed. +_READ_SIZE_BYTES = 80 * 1024 + +_PREAMBLE = struct.unpack(' FileIndex: + """! + @brief Quickly build a FileIndex. + + This basic logic would be relatively easy to extend to do general parallel processing on messages. + + @param input_path The path to the file to be read. + @param force_reindex If `True` regenerate the index even if there's an existing file. + @param save_index If `True` save the index to disk after generation. + @param max_bytes If specified, read up to the maximum number of bytes. + @param num_threads The number of parallel processes to spawn for searching the file. + + @return The loaded or generated @ref FileIndex. + """ + file_size = os.stat(input_path).st_size + _logger.debug(f'File size: {int(file_size/1024/1024)}MB') + + if max_bytes and max_bytes < file_size: + _logger.debug(f'Only indexing: {max_bytes/1024/1024}MB') + file_size = max_bytes + if save_index: + save_index = False + _logger.info('Max bytes specified. Disabling saving index.') + + index_path = FileIndex.get_path(input_path) + # Check if index file can be loaded. + if not force_reindex and os.path.exists(index_path): + try: + index = FileIndex(index_path, input_path) + _logger.info(f'Loading existing cache: "{index_path}".') + return index + except ValueError as e: + _logger.warning(f'Couldn\'t load cache "{index_path}": {str(e)}.') + + _logger.info(f'Indexing file "{input_path}". This may take a few seconds.') + _logger.debug(f'Using Threads: {num_threads}') + + # These are the args passed to the _search_blocks_for_fe instances. + args: list[Tuple[str, List[int]]] = [] + # Allocate which blocks of bytes will be processed by each thread. + num_blocks = math.ceil(file_size / _READ_SIZE_BYTES) + # Each thread will process at least blocks_per_thread blocks. If the number + # doesn't divide evenly, distribute the remainder. + blocks_per_thread, blocks_remainder = divmod(num_blocks, num_threads) + byte_offset = 0 + for i in range(num_threads): + blocks = blocks_per_thread + if i < blocks_remainder: + blocks += 1 + args.append((input_path, list(range(byte_offset, byte_offset + blocks * _READ_SIZE_BYTES, _READ_SIZE_BYTES)))) + byte_offset += blocks * _READ_SIZE_BYTES + + _logger.debug(f'Reads/thread: {blocks_per_thread}') + + # Create a threadpool. + with Pool(num_threads) as p: + # Kick off the threads to process with their args. Then concatenate their returned data. + index_raw = np.concatenate([o for o in p.starmap(_search_blocks_for_fe, args)]) + + total_entries = len(index_raw) + + _logger.debug(f'FE messages found: {total_entries}') + + # This does an unnecessary conversion back and forth from the _RAW_DTYPE to + # the _DTYPE and back again when saving. This is to avoid needing to deal + # with the EOF entry directly. This adds less than a second, and could be + # avoided by reproducing the FileIndex EOF entry logic. + index = FileIndex(data=FileIndex._from_raw(index_raw)) + if save_index: + _logger.info(f'Saving index to "{index_path}".') + index.save(index_path, input_path) + return index diff --git a/python/fusion_engine_client/parsers/mixed_log_reader.py b/python/fusion_engine_client/parsers/mixed_log_reader.py index 4389acec..b1497d82 100644 --- a/python/fusion_engine_client/parsers/mixed_log_reader.py +++ b/python/fusion_engine_client/parsers/mixed_log_reader.py @@ -7,7 +7,7 @@ import numpy as np -from . import file_index +from . import fast_indexer, file_index from ..messages import MessageType, MessageHeader, MessagePayload, Timestamp, message_type_to_class from ..utils import trace as logging from ..utils.time_range import TimeRange @@ -22,7 +22,7 @@ class MixedLogReader(object): logger = logging.getLogger('point_one.fusion_engine.parsers.mixed_log_reader') def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool = False, - generate_index: bool = True, ignore_index: bool = False, max_bytes: int = None, + save_index: bool = True, ignore_index: bool = False, max_bytes: int = None, time_range: TimeRange = None, message_types: Union[Iterable[MessageType], MessageType] = None, return_header: bool = True, return_payload: bool = True, return_bytes: bool = False, return_offset: bool = False, return_message_index: bool = False): @@ -36,10 +36,10 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool = object. @param warn_on_gaps If `True`, print warnings if gaps are detected in the FusionEngine message sequence numbers. @param show_progress If `True`, print file read progress to the console periodically. - @param generate_index If `True`, generate an index file if one does not exist for faster reading in the future. + @param save_index If `True`, save an index file if one does not exist for faster reading in the future. See @ref FileIndex for details. Ignored if `max_bytes` is specified. @param ignore_index If `True`, ignore the existing index file and read from the binary file directly. If - `generate_index == True`, this will delete the existing file and create a new one. + `save_index == True`, this will delete the existing file and create a new one. @param max_bytes If specified, read up to the maximum number of bytes. @param time_range An optional @ref TimeRange object specifying desired start and end time bounds of the data to be read. See @ref TimeRange for more details. @@ -102,38 +102,13 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool = self.max_bytes = sys.maxsize else: self.max_bytes = max_bytes - if generate_index: - self.logger.debug('Max bytes specified. Disabling index generation.') - generate_index = False - - # Open the companion index file if one exists. - self.index_path = file_index.FileIndex.get_path(input_path) - self._original_index = None - self.index = None - self.next_index_elem = 0 - if ignore_index: - if os.path.exists(self.index_path): - if generate_index: - self.logger.debug("Deleting/regenerating index file @ '%s'." % self.index_path) - os.remove(self.index_path) - else: - self.logger.debug("Ignoring index file @ '%s'." % self.index_path) - else: - if os.path.exists(self.index_path): - try: - self.logger.debug("Loading index file '%s'." % self.index_path) - self._original_index = file_index.FileIndex(index_path=self.index_path, data_path=input_path, - delete_on_error=generate_index) - self.index = self._original_index[self.message_types][self.time_range] - self.filtered_message_types = len(np.unique(self._original_index.type)) != \ - len(np.unique(self.index.type)) - except ValueError as e: - self.logger.error("Error loading index file: %s" % str(e)) - else: - self.logger.debug("No index file found @ '%s'." % self.index_path) - self.index_builder = None - self.set_generate_index(generate_index) + # Open the companion index file if one exists, otherwise index the file. + self._original_index = fast_indexer.fast_generate_index(input_path, force_reindex=ignore_index, save_index=save_index, max_bytes=max_bytes) + self.next_index_elem = 0 + self.index = self._original_index[self.message_types][self.time_range] + self.filtered_message_types = len(np.unique(self._original_index.type)) != \ + len(np.unique(self.index.type)) def rewind(self): self.logger.debug('Rewinding to the start of the file.') @@ -155,9 +130,6 @@ def rewind(self): self.next_index_elem = 0 self.input_file.seek(0, os.SEEK_SET) - if self.index_builder is not None: - self.index_builder = file_index.FileIndexBuilder() - def seek_to_message(self, message_index: int, is_filtered_index: bool = False): if self.index is None: raise NotImplemented('A file index is required to seek by message index.') @@ -185,18 +157,6 @@ def have_index(self): def get_index(self): return self._original_index - def generating_index(self): - return self.index_builder is not None - - def set_generate_index(self, generate_index): - if self._original_index is None: - if generate_index: - self.logger.debug("Generating index file '%s'." % self.index_path) - self.index_builder = file_index.FileIndexBuilder() - else: - self.logger.debug("Index generation disabled.") - self.index_builder = None - def set_show_progress(self, show_progress): self.show_progress = show_progress @@ -205,9 +165,6 @@ def set_max_bytes(self, max_bytes): self.max_bytes = sys.maxsize else: self.max_bytes = max_bytes - if self.index_builder is not None: - self.logger.debug('Max bytes specified. Disabling index generation.') - self.set_generate_index(False) def get_bytes_read(self): return self.total_bytes_read @@ -215,16 +172,12 @@ def get_bytes_read(self): def next(self): return self.read_next() - def read_next(self, require_p1_time=False, require_system_time=False, generate_index=True): - return self._read_next(require_p1_time=require_p1_time, require_system_time=require_system_time, - generate_index=generate_index) + def read_next(self, require_p1_time=False, require_system_time=False): + return self._read_next(require_p1_time=require_p1_time, require_system_time=require_system_time) - def _read_next(self, require_p1_time=False, require_system_time=False, generate_index=True, force_eof=False): + def _read_next(self, require_p1_time=False, require_system_time=False, force_eof=False): if force_eof: if not self.reached_eof(): - if self.generating_index(): - raise ValueError('Cannot jump to EOF while building an index file.') - self.logger.debug('Forcibly seeking to EOF.') if self.index is None: self.input_file.seek(self.file_size_bytes, os.SEEK_SET) @@ -337,8 +290,7 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ # Deserialize the payload if we need it. need_payload = self.return_payload or \ self.time_range is not None or \ - require_p1_time or require_system_time or \ - (self.index_builder is not None and generate_index) + require_p1_time or require_system_time if need_payload: cls = message_type_to_class.get(header.message_type, None) @@ -362,11 +314,6 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ # Extract P1 time if available. p1_time = payload.get_p1_time() if payload is not None else Timestamp() - # Add this message to the index file. - if self.index_builder is not None and generate_index: - self.index_builder.append(message_type=header.message_type, offset_bytes=start_offset_bytes, - p1_time=p1_time) - # Now, if this message is not in the user-specified filter criteria, skip it. # # If we have an index available, this is implied by the index (we won't seek to messages that don't meet @@ -381,7 +328,7 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ self.logger.trace("Message does not have valid P1 time. Skipping.", depth=1) continue elif self.time_range is not None and not self.time_range.is_in_range(payload): - if self.time_range.in_range_started() and (self.index_builder is None or not generate_index): + if self.time_range.in_range_started(): self.logger.debug("End of time range reached. Finished processing.") break else: @@ -414,19 +361,6 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ self._print_progress(self.total_bytes_read) self.logger.debug("Read %d bytes total." % self.total_bytes_read) - # If we are creating an index file, save it now. - if self.index_builder is not None and generate_index: - self.logger.debug("Saving index file as '%s'." % self.index_path) - self._original_index = self.index_builder.save(self.index_path, self.input_file.name) - self.index_builder = None - - self.index = self._original_index[self.message_types][self.time_range] - if self.remove_invalid_p1_time: - self.index = self.index.get_time_range(hint='remove_nans') - self.message_types = None - self.time_range = None - self.next_index_elem = len(self.index) - # Finished iterating. if force_eof: return @@ -434,50 +368,15 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_ raise StopIteration() def _advance_to_next_sync(self): - if self.index is None: - try: - if self.logger.isEnabledFor(logging.getTraceLevel(depth=2)): - self.logger.trace('Starting next sync search @ %d (0x%x).' % - (self.total_bytes_read, self.total_bytes_read), - depth=2) - while True: - if self.total_bytes_read + 1 >= self.max_bytes: - self.logger.debug('Max read length exceeded (%d B).' % self.max_bytes) - return False - - byte0 = self.input_file.read(1)[0] - self.total_bytes_read += 1 - while True: - if byte0 == MessageHeader.SYNC0: - if self.total_bytes_read + 1 >= self.max_bytes: - self.logger.debug('Max read length exceeded (%d B).' % self.max_bytes) - return False - - byte1 = self.input_file.read(1)[0] - self.total_bytes_read += 1 - if byte1 == MessageHeader.SYNC1: - self.input_file.seek(-2, os.SEEK_CUR) - self.total_bytes_read -= 2 - if self.logger.isEnabledFor(logging.getTraceLevel(depth=3)): - self.logger.trace('Sync bytes found @ %d (0x%x).' % - (self.total_bytes_read, self.total_bytes_read), - depth=3) - return True - byte0 = byte1 - else: - break - except IndexError: - return False + if self.next_index_elem == len(self.index): + return False else: - if self.next_index_elem == len(self.index): - return False - else: - offset_bytes = self.index.offset[self.next_index_elem] - self.current_message_index = self.index.message_index[self.next_index_elem] - self.next_index_elem += 1 - self.input_file.seek(offset_bytes, os.SEEK_SET) - self.total_bytes_read = offset_bytes - return True + offset_bytes = self.index.offset[self.next_index_elem] + self.current_message_index = self.index.message_index[self.next_index_elem] + self.next_index_elem += 1 + self.input_file.seek(offset_bytes, os.SEEK_SET) + self.total_bytes_read = offset_bytes + return True def _print_progress(self, file_size=None): show_progress = self.show_progress @@ -664,7 +563,4 @@ def __next__(self): @classmethod def generate_index_file(cls, input_file): - reader = MixedLogReader(input_file=input_file, ignore_index=False, generate_index=True, return_payload=False) - if reader.index is None: - for _ in reader: - pass + return fast_indexer.fast_generate_index(input_file) diff --git a/python/fusion_engine_client/utils/log.py b/python/fusion_engine_client/utils/log.py index ae716c1d..9d3c300c 100644 --- a/python/fusion_engine_client/utils/log.py +++ b/python/fusion_engine_client/utils/log.py @@ -356,7 +356,7 @@ def find_p1log_file(input_path, return_output_dir=False, return_log_id=False, lo raise FileExistsError('Specified file is not a .p1log file.') -def extract_fusion_engine_log(input_path, output_path=None, warn_on_gaps=True, return_counts=False, generate_index=True): +def extract_fusion_engine_log(input_path, output_path=None, warn_on_gaps=True, return_counts=False, save_index=True): """! @brief Extract FusionEngine data from a file containing mixed binary data. @@ -365,7 +365,7 @@ def extract_fusion_engine_log(input_path, output_path=None, warn_on_gaps=True, r `input_path` is `.`. @param warn_on_gaps If `True`, print a warning if gaps are detected in the data sequence numbers. @param return_counts If `True`, return the number of messages extracted for each message type. - @param generate_index If `True`, generate an index file to go along with the output file for faster reading in the + @param save_index If `True`, generate an index file to go along with the output file for faster reading in the future. See @ref FileIndex for details. @return A tuple containing: @@ -377,10 +377,10 @@ def extract_fusion_engine_log(input_path, output_path=None, warn_on_gaps=True, r if output_path is None: output_path = os.path.splitext(input_path)[0] + '.p1log' - index_builder = FileIndexBuilder() if generate_index else None + index_builder = FileIndexBuilder() if save_index else None with open(input_path, 'rb') as in_fd, open(output_path, 'wb') as out_path: - reader = MixedLogReader(in_fd, warn_on_gaps=warn_on_gaps, generate_index=False, + reader = MixedLogReader(in_fd, warn_on_gaps=warn_on_gaps, save_index=False, return_header=True, return_payload=True, return_bytes=True, return_offset=False, show_progress=True) for header, payload, data in reader: diff --git a/python/tests/test_data_loader.py b/python/tests/test_data_loader.py index 4df56612..2b1c9f85 100644 --- a/python/tests/test_data_loader.py +++ b/python/tests/test_data_loader.py @@ -154,18 +154,15 @@ def test_read_all(self, data_path): expected_messages = generate_data(data_path=str(data_path), include_binary=False, return_dict=False) expected_result = message_list_to_dict(expected_messages) - # Construct a reader. This will attempt to set t0 immediately by scanning the data file. If an index file - # exists, the reader will use the index file to find t0 quickly. If not, it'll read the file directly, but will - # _not_ attempt to generate an index (which requires reading the entire data file). + # Construct a reader. This will attempt to generate an index and set t0 immediately by scanning the data file. reader = DataLoader(path=str(data_path)) + assert reader.reader.have_index() assert reader.t0 is not None assert reader.system_t0 is not None - assert not reader.reader.have_index() # Now read the data itself. This _will_ generate an index file. result = reader.read() self._check_results(result, expected_result) - assert reader.reader.have_index() assert len(reader.reader._original_index) == len(expected_messages) assert len(reader.reader.index) == len(expected_messages) @@ -224,34 +221,12 @@ def test_read_pose_mixed_binary(self, data_path): assert len(reader.reader._original_index) == len(messages) assert len(reader.reader.index) == len(expected_messages) - def test_read_no_generate_index(self, data_path): - expected_result = generate_data(data_path=str(data_path), include_binary=False) - - # Construct a reader with index generation disabled. This never generates an index, but the read() call below - # would if we did not set this. - reader = DataLoader(path=str(data_path), generate_index=False) - assert reader.t0 is not None - assert reader.system_t0 is not None - assert not reader.reader.have_index() - - # Now read the data itself. This will _not_ generate an index file. - result = reader.read() - self._check_results(result, expected_result) - assert not reader.reader.have_index() - - # Do the same but this time using the disable argument to read(). - reader = DataLoader(path=str(data_path)) - result = reader.read(disable_index_generation=True) - self._check_results(result, expected_result) - assert not reader.reader.have_index() - def test_read_in_order(self, data_path): messages = generate_data(data_path=str(data_path), include_binary=False, return_dict=False) expected_messages = [m for m in messages if m.get_type() in (MessageType.POSE, MessageType.EVENT_NOTIFICATION)] expected_result = message_list_to_messagedata(expected_messages) reader = DataLoader(path=str(data_path)) - assert not reader.reader.have_index() result = reader.read(message_types=[PoseMessage, EventNotificationMessage], return_in_order=True) self._check_results(result, expected_result) diff --git a/python/tests/test_mixed_log_reader.py b/python/tests/test_mixed_log_reader.py index 79c48ad4..3f795877 100644 --- a/python/tests/test_mixed_log_reader.py +++ b/python/tests/test_mixed_log_reader.py @@ -130,7 +130,6 @@ def test_read_all(self, data_path): messages = self._generate_mixed_data(data_path) reader = MixedLogReader(str(data_path)) - assert reader.index is None self._check_results(reader, messages) # Verify that we successfully generated an index file. @@ -145,7 +144,6 @@ def test_read_pose(self, data_path): expected_messages = [m for m in messages if isinstance(m, PoseMessage)] reader = MixedLogReader(str(data_path)) - assert reader.index is None reader.filter_in_place((PoseMessage,)) self._check_results(reader, expected_messages) @@ -175,7 +173,6 @@ def test_read_pose_mixed_binary(self, data_path): expected_messages = [m for m in messages if isinstance(m, PoseMessage)] reader = MixedLogReader(str(data_path)) - assert reader.index is None reader.filter_in_place((PoseMessage,)) self._check_results(reader, expected_messages) @@ -215,16 +212,6 @@ def test_read_events(self, data_path): reader.filter_in_place((EventNotificationMessage,)) self._check_results(reader, expected_messages) - def test_read_no_generate_index(self, data_path): - messages = self._generate_mixed_data(data_path) - expected_messages = [m for m in messages if isinstance(m, PoseMessage)] - - reader = MixedLogReader(str(data_path), generate_index=False) - assert reader.index is None - reader.filter_in_place((PoseMessage,)) - self._check_results(reader, expected_messages) - assert reader.index is None - def test_read_with_index(self, data_path): messages = self._generate_mixed_data(data_path) expected_messages = [m for m in messages if isinstance(m, PoseMessage)] @@ -256,16 +243,11 @@ def test_read_overwrite_index(self, data_path): MixedLogReader.generate_index_file(str(data_path)) reader = MixedLogReader(str(data_path), ignore_index=True) - # By default, we will be generating an index file, overwriting the existing one, so the reader should have - # deleted the existing one. - assert reader.index is None - assert not os.path.exists(reader.index_path) - reader.filter_in_place((PoseMessage,)) self._check_results(reader, expected_messages) # Check that we generated a new index. - assert os.path.exists(reader.index_path) + assert os.path.exists(FileIndex.get_path(data_path)) assert reader.index is not None and len(reader.index) == len(expected_messages) assert len(reader._original_index) == len(messages) @@ -274,18 +256,14 @@ def test_read_ignore_index(self, data_path): expected_messages = [m for m in messages if isinstance(m, PoseMessage)] MixedLogReader.generate_index_file(str(data_path)) - reader = MixedLogReader(str(data_path), ignore_index=True, generate_index=False) - assert reader.index is None + reader = MixedLogReader(str(data_path), ignore_index=True, save_index=False) # This time, we are not generating an index so we do _not_ delete the existing file and leave it intact. - assert reader.index is None - assert os.path.exists(reader.index_path) + assert os.path.exists(FileIndex.get_path(data_path)) reader.filter_in_place((PoseMessage,)) self._check_results(reader, expected_messages) - assert reader.index is None - # Note: TimeRange objects keep internal state, so we can't use them here since the state will remain across multiple # calls for different use_index values. Instead we store range strings and parse them on each call. @pytest.mark.parametrize("time_range", [ @@ -319,10 +297,8 @@ def _test_rewind(self, data_path, use_index): _, message = next(reader) self._check_message(message, expected_messages[i]) - if use_index: - assert reader.index is not None - else: - assert reader.index is None + + assert reader.index is not None reader.rewind() self._check_results(reader, expected_messages) @@ -425,12 +401,8 @@ def _test_seek_to_eof(self, data_path, use_index): _, message = next(reader) self._check_message(message, expected_messages[i]) - # Now jump to EOF. If we're generating an index file, this is illegal. - if use_index: - reader.seek_to_eof() - else: - with pytest.raises(ValueError): - reader.seek_to_eof() + # Now jump to EOF. + reader.seek_to_eof() def test_seek_to_eof_no_index(self, data_path): self._test_seek_to_eof(data_path, use_index=False)