Skip to content

Commit

Permalink
PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Diamond committed Oct 9, 2023
1 parent e5a792c commit 25faaf7
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 18 deletions.
13 changes: 6 additions & 7 deletions python/fusion_engine_client/parsers/fast_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
# This has been tuned on my laptop to where increasing the read size was giving
# diminishing returns on speed.
_READ_SIZE_BYTES = 80 * 1024
_READ_WORDS = int(_READ_SIZE_BYTES / 2)

_PREAMBLE = struct.unpack('<H', MessageHeader.SYNC)

Expand All @@ -27,7 +26,7 @@

def _search_blocks_for_fe(input_path: str, block_starts: List[int]):
"""!
@brief Processing function called in parallel.
@brief Search the specified portions of the file for the start offsets of valid FE messages.
@param input_path The path to the file to be read.
@param block_starts The blocks of data to search for FE messages in.
Expand All @@ -48,7 +47,7 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]):
# last message.
data = fd.read(_READ_SIZE_BYTES + _MAX_FE_MSG_SIZE_BYTES)
if len(data) == _READ_SIZE_BYTES + _MAX_FE_MSG_SIZE_BYTES:
word_count = _READ_WORDS
word_count = int(_READ_SIZE_BYTES / 2)
# The last read on the last thread will run out of data, so read
# whatever is left.
else:
Expand Down Expand Up @@ -121,9 +120,9 @@ def fast_generate_index(
@param force_reindex If `True` regenerate the index even if there's an existing file.
@param save_index If `True` save the index to disk after generation.
@param max_bytes If specified, read up to the maximum number of bytes.
@param block_starts The blocks of data to search for FE messages in.
@param num_threads The number of parallel processes to spawn for searching the file.
@return The loaded or generated @ref FileIndex
@return The loaded or generated @ref FileIndex.
"""
file_size = os.stat(input_path).st_size
_logger.debug(f'File size: {int(file_size/1024/1024)}MB')
Expand Down Expand Up @@ -153,7 +152,7 @@ def fast_generate_index(
# Allocate which blocks of bytes will be processed by each thread.
num_blocks = math.ceil(file_size / _READ_SIZE_BYTES)
# Each thread will process at least blocks_per_thread blocks. If the number
# doesn't device evenly, distribute the remainder.
# doesn't divide evenly, distribute the remainder.
blocks_per_thread, blocks_remainder = divmod(num_blocks, num_threads)
byte_offset = 0
for i in range(num_threads):
Expand All @@ -176,7 +175,7 @@ def fast_generate_index(

# This does an unnecessary conversion back and forth from the _RAW_DTYPE to
# the _DTYPE and back again when saving. This is to avoid needing to deal
# with the EOF entry directly. This adds less then a second, and could be
# with the EOF entry directly. This adds less than a second, and could be
# avoided by reproducing the FileIndex EOF entry logic.
index = FileIndex(data=FileIndex._from_raw(index_raw))
if save_index:
Expand Down
5 changes: 2 additions & 3 deletions python/tests/test_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,15 @@ def test_read_all(self, data_path):
expected_messages = generate_data(data_path=str(data_path), include_binary=False, return_dict=False)
expected_result = message_list_to_dict(expected_messages)

# Construct a reader. This will attempt to set t0 immediately by scanning the data file. If an index file
# exists, the reader will use the index file to find t0 quickly.
# Construct a reader. This will attempt to generate an index and set t0 immediately by scanning the data file.
reader = DataLoader(path=str(data_path))
assert reader.reader.have_index()
assert reader.t0 is not None
assert reader.system_t0 is not None

# Now read the data itself. This _will_ generate an index file.
result = reader.read()
self._check_results(result, expected_result)
assert reader.reader.have_index()
assert len(reader.reader._original_index) == len(expected_messages)
assert len(reader.reader.index) == len(expected_messages)

Expand Down
8 changes: 0 additions & 8 deletions python/tests/test_mixed_log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,6 @@ def test_read_events(self, data_path):
reader.filter_in_place((EventNotificationMessage,))
self._check_results(reader, expected_messages)

def test_read_no_generate_index(self, data_path):
messages = self._generate_mixed_data(data_path)
expected_messages = [m for m in messages if isinstance(m, PoseMessage)]

reader = MixedLogReader(str(data_path), save_index=False)
reader.filter_in_place((PoseMessage,))
self._check_results(reader, expected_messages)

def test_read_with_index(self, data_path):
messages = self._generate_mixed_data(data_path)
expected_messages = [m for m in messages if isinstance(m, PoseMessage)]
Expand Down

0 comments on commit 25faaf7

Please sign in to comment.