Skip to content

Commit

Permalink
[FUS-1585] Added fast indexing support in Python. (#267)
Browse files Browse the repository at this point in the history
# Changes
 - Speed up indexing to ~100MB/s on a 16 core laptop with SSD
 - Make the MixedLogReader assume it will index the file on construction

# Bug Fixes
 - Fix p1_print for unknown types

This generally will make indexing faster then iterating through a log
with MixedLogReader with the index file.
  • Loading branch information
axlan authored Oct 9, 2023
2 parents 0d9f392 + fb2cb83 commit e2be15c
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 254 deletions.
7 changes: 1 addition & 6 deletions python/bin/p1_lband_extract
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ Extract L-band corrections data from a log containing FusionEngine L-band frame
options = parser.parse_args()

read_index = not options.ignore_index
generate_index = not options.ignore_index

# Configure logging.
if options.verbose >= 1:
Expand Down Expand Up @@ -112,11 +111,7 @@ Extract L-band corrections data from a log containing FusionEngine L-band frame

# Process all LBandFrameMessage data in the file.
reader = MixedLogReader(input_path, return_bytes=True, return_offset=True, show_progress=options.progress,
ignore_index=not read_index, generate_index=generate_index,
message_types=message_types, time_range=time_range)

if reader.generating_index():
_logger.info('Generating index file - processing complete file. This may take some time.')
ignore_index=not read_index, message_types=message_types, time_range=time_range)

total_messages = 0
bytes_decoded = 0
Expand Down
10 changes: 3 additions & 7 deletions python/bin/p1_print
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ other types of data.
options = parser.parse_args()

read_index = not options.ignore_index
generate_index = not options.ignore_index

# Configure logging.
if options.verbose >= 1:
Expand Down Expand Up @@ -162,11 +161,7 @@ other types of data.

# Process all data in the file.
reader = MixedLogReader(input_path, return_bytes=True, return_offset=True, show_progress=options.progress,
ignore_index=not read_index, generate_index=generate_index and read_index,
message_types=message_types, time_range=time_range)

if reader.generating_index() and (len(message_types) > 0 or options.time is not None):
_logger.info('Generating index file - processing complete file. This may take some time.')
ignore_index=not read_index, message_types=message_types, time_range=time_range)

first_p1_time_sec = None
last_p1_time_sec = None
Expand Down Expand Up @@ -262,7 +257,8 @@ other types of data.
_logger.info(format_string.format('Message Name', 'Type', 'Count'))
_logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8))
for type, info in sorted(message_stats.items(), key=lambda x: int(x[0])):
_logger.info(format_string.format(message_type_to_class[type].__name__, int(type), info['count']))
name = message_type_to_class[type].__name__ if type in message_type_to_class else "Unknown"
_logger.info(format_string.format(name, int(type), info['count']))
_logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8))
_logger.info(format_string.format('Total', '', total_messages))
elif total_messages == 0:
Expand Down
4 changes: 0 additions & 4 deletions python/fusion_engine_client/analysis/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2206,10 +2206,6 @@ def generate_index(self, auto_open=True):
self._open_browser(index_path)

def _calculate_duration(self, return_index=False):
# Generate an index file, which we need to calculate the log duration, in case it wasn't created earlier (i.e.,
# we didn't read anything to plot).
self.reader.generate_index(show_progress=True)

# Restrict the index to the user-requested time range.
full_index = self.reader.get_index()
reduced_index = full_index[self.params['time_range']]
Expand Down
56 changes: 13 additions & 43 deletions python/fusion_engine_client/analysis/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ class DataLoader(object):

logger = logging.getLogger('point_one.fusion_engine.analysis.data_loader')

def __init__(self, path=None, generate_index=True, ignore_index=False):
def __init__(self, path=None, save_index=True, ignore_index=False):
"""!
@brief Create a new reader instance.
@param path The path to a binary file containing FusionEngine messages, or an existing Python file object.
@param generate_index If `True`, generate a `.p1i` index file if one does not exist for faster reading in the
@param save_index If `True`, save a `.p1i` index file if one does not exist for faster reading in the
future. See @ref FileIndex for details.
@param ignore_index If `True`, ignore the existing index file and read from the `.p1log` binary file directly.
If `generate_index == True`, this will delete the existing file and create a new one.
If `save_index == True`, this will delete the existing file and create a new one.
"""
self.reader: MixedLogReader = None

Expand All @@ -155,26 +155,24 @@ def __init__(self, path=None, generate_index=True, ignore_index=False):
self._need_t0 = True
self._need_system_t0 = True

self._generate_index = generate_index
self._generate_index = save_index
if path is not None:
self.open(path, generate_index=generate_index, ignore_index=ignore_index)
self.open(path, save_index=save_index, ignore_index=ignore_index)

def open(self, path, generate_index=True, ignore_index=False):
def open(self, path, save_index=True, ignore_index=False):
"""!
@brief Open a FusionEngine binary file.
@param path The path to a binary file containing FusionEngine messages, or an existing Python file object.
@param generate_index If `True`, generate a `.p1i` index file if one does not exist for faster reading in the
@param save_index If `True`, generate a `.p1i` index file if one does not exist for faster reading in the
future. See @ref FileIndex for details.
@param ignore_index If `True`, ignore the existing index file and read from the `.p1log` binary file directly.
If `generate_index == True`, this will delete the existing file and create a new one.
If `save_index == True`, this will delete the existing file and create a new one.
"""
self.close()

self.reader = MixedLogReader(input_file=path, generate_index=generate_index, ignore_index=ignore_index,
self.reader = MixedLogReader(input_file=path, save_index=save_index, ignore_index=ignore_index,
return_bytes=True, return_message_index=True)
if self.reader.have_index():
self.logger.debug("Using index file '%s'." % self.reader.index_path)

# Read the first message (with P1 time) in the file to set self.t0.
#
Expand Down Expand Up @@ -218,28 +216,6 @@ def close(self):
if self.reader is not None:
self.reader = None

def generate_index(self, show_progress=False):
"""!
@brief Generate an index file for the current binary file if one does not already exist.
"""
if not self.reader.have_index():
# We'll read pose data (doesn't actually matter which). Store the currently cached data and restore it when
# we're done. That way if the user already did a read (with generate_index == False), they don't have to
# re-read the data if they try to use it again.
prev_data = self.data.get(MessageType.POSE, None)

if show_progress:
self.logger.info('Generating data index for faster access. This may take a few minutes...')
else:
self.logger.debug('Generating data index for faster access. This may take a few minutes...')

self._generate_index = True
self.read(message_types=[MessageType.POSE], max_messages=1, disable_index_generation=False,
ignore_cache=True, show_progress=show_progress, quiet=True)

if prev_data is not None:
self.data[MessageType.POSE] = prev_data

def read(self, *args, **kwargs) \
-> Union[Dict[MessageType, MessageData], MessageData]:
"""!
Expand All @@ -248,7 +224,7 @@ def read(self, *args, **kwargs) \
The read data will be cached internally. Subsequent reads for the same data type will return the cached data.
@note
This function uses a data index file to speed up reads when available. If `generate_index == True` and no index
This function uses a data index file to speed up reads when available. If `save_index == True` and no index
file exists, one will be generated automatically. In order to do this, this function must read the entire data
file, even if it could normally return early when `max_messages` or the end of `time_range` are reached.
Expand All @@ -258,7 +234,7 @@ def read(self, *args, **kwargs) \
be read. See @ref TimeRange for more details.
@param show_progress If `True`, print the read progress every 10 MB (useful for large files).
@param disable_index_generation If `True`, override the `generate_index` argument provided to `open()` and do
@param disable_index_generation If `True`, override the `save_index` argument provided to `open()` and do
not generate an index file during this call (intended for internal use only).
@param ignore_cache If `True`, ignore any cached data from a previous @ref read() call, and reload the requested
data from disk.
Expand Down Expand Up @@ -434,7 +410,6 @@ def _read(self,
else:
self.reader.rewind()
self.reader.clear_filters()
self.reader.set_generate_index(self._generate_index and not disable_index_generation)
self.reader.set_show_progress(show_progress)
self.reader.set_max_bytes(max_bytes)

Expand Down Expand Up @@ -576,13 +551,8 @@ def _read(self,
data_cache[header.message_type].message_index.append(message_index)

if max_messages is not None:
# If we reached the max message count but we're generating an index file, we need to read through the
# entire data file. Keep reading but discard any further messages.
if self.reader.generating_index() and message_count > abs(max_messages):
logger.debug(' Max messages reached. Discarding. [# messages=%d]' % message_count)
continue
# If we're not generating an index and we hit the max message count, we're done reading.
elif not self.reader.generating_index() and message_count == abs(max_messages):
# If we hit the max message count, we're done reading.
if message_count == abs(max_messages):
logger.debug(' Max messages reached. Done reading. [# messages=%d]' % message_count)
break

Expand Down
184 changes: 184 additions & 0 deletions python/fusion_engine_client/parsers/fast_indexer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import os
import math
from multiprocessing import Pool, cpu_count
from typing import List, Tuple
import struct

import numpy as np

from ..messages import MessageHeader, Timestamp, message_type_to_class
from ..utils import trace as logging
from .file_index import FileIndex

# NOTE: This needs to be larger then the biggest possible FE message. The
# smaller it is, the faster the indexer can run, but if it's not large enough,
# messages may be missed.
_MAX_FE_MSG_SIZE_BYTES = 1024 * 12

# This has been tuned on my laptop to where increasing the read size was giving
# diminishing returns on speed.
_READ_SIZE_BYTES = 80 * 1024

_PREAMBLE = struct.unpack('<H', MessageHeader.SYNC)

_logger = logging.getLogger('point_one.fusion_engine.parsers.fast_indexer')


def _search_blocks_for_fe(input_path: str, block_starts: List[int]):
"""!
@brief Search the specified portions of the file for the start offsets of valid FE messages.
@param input_path The path to the file to be read.
@param block_starts The blocks of data to search for FE messages in.
@return The raw index data corresponding to this thread's data blocks.
"""
header = MessageHeader()
message_end = 0
num_syncs = 0
# Data corresponding to raw values in FileIndex._RAW_DTYPE.
raw_list: List[Tuple[int, int, int]] = []
with open(input_path, 'rb') as fd:
for i in range(len(block_starts)):
block_offset = block_starts[i]
fd.seek(block_offset)
# The `_READ_SIZE_BYTES` will be the data searched for the start of
# messages. The additional data is read to give room to complete the
# last message.
data = fd.read(_READ_SIZE_BYTES + _MAX_FE_MSG_SIZE_BYTES)
if len(data) == _READ_SIZE_BYTES + _MAX_FE_MSG_SIZE_BYTES:
word_count = int(_READ_SIZE_BYTES / 2)
# The last read on the last thread will run out of data, so read
# whatever is left.
else:
word_count = int(len(data) / 2) - 1

# This is a fairly optimized search for preamble matches.
# Allocate space for all the message offsets to check.
np_data = np.empty(word_count * 2, dtype=np.uint16)
# Load the potential sync words for the even offsets.
np_data[0::2] = np.frombuffer(data, dtype=np.uint16, count=word_count)
# Load the potential sync words for the odd offsets ([AA 31 2E AA] shifted over one byte).
np_data[1::2] = np.frombuffer(data[1:], dtype=np.uint16, count=word_count)
# This is lot faster then doing this check in raw Python due to numpy optimizations.
sync_matches = np.where(np_data == _PREAMBLE)[0]

num_syncs += len(sync_matches)

# To do the CRC check and find a p1_time the full message needs to be parsed. This
# section is not particularly optimized. The chance of the preamble appearing in data
# is relatively low, so this code is in a much less hot path then the preamble sync above.
for i in sync_matches:
absolute_offset = i + block_offset
# Don't check preambles found inside other valid messages. Generally, this didn't
# provide much speed up, but could prevent wasting cycles if the message size is large.
if absolute_offset < message_end:
continue

try:
# Check if the message has a valid length and CRC. This could probably be optimized.
header.unpack(buffer=data, offset=i, validate_crc=True, warn_on_unrecognized=False)
# Populate the p1_time from the payload if it exists.
p1_time = Timestamp()
# About half the indexing time is spent doing this p1_time check. This would
# probably take some doing to optimize. One approach to speed up the validation
# and p1 time checks is to read the message as a uint32_t and having a map to
# the p1_time (if present) for each class.
cls = message_type_to_class.get(header.message_type, None)
if cls is not None:
try:
payload = cls()
if hasattr(payload, 'p1_time') or hasattr(payload, 'details'):
payload.unpack(buffer=data, offset=i +
MessageHeader.calcsize(), message_version=header.message_version)
p1_time = payload.get_p1_time()
except BaseException:
pass
# Convert the Timestamp to an integer.
p1_time_raw = Timestamp._INVALID if math.isnan(p1_time.seconds) else int(p1_time.seconds)
message_end = absolute_offset + header.get_message_size()
raw_list.append((p1_time_raw, int(header.message_type), absolute_offset))
except BaseException:
pass
_logger.trace(f'{num_syncs} sync with {len(raw_list)} valid FE.')
# Return the index data for this section of the file.
return np.array(raw_list, dtype=FileIndex._RAW_DTYPE)


def fast_generate_index(
input_path: str,
force_reindex=False,
save_index=True,
max_bytes=None,
num_threads=cpu_count()) -> FileIndex:
"""!
@brief Quickly build a FileIndex.
This basic logic would be relatively easy to extend to do general parallel processing on messages.
@param input_path The path to the file to be read.
@param force_reindex If `True` regenerate the index even if there's an existing file.
@param save_index If `True` save the index to disk after generation.
@param max_bytes If specified, read up to the maximum number of bytes.
@param num_threads The number of parallel processes to spawn for searching the file.
@return The loaded or generated @ref FileIndex.
"""
file_size = os.stat(input_path).st_size
_logger.debug(f'File size: {int(file_size/1024/1024)}MB')

if max_bytes and max_bytes < file_size:
_logger.debug(f'Only indexing: {max_bytes/1024/1024}MB')
file_size = max_bytes
if save_index:
save_index = False
_logger.info('Max bytes specified. Disabling saving index.')

index_path = FileIndex.get_path(input_path)
# Check if index file can be loaded.
if not force_reindex and os.path.exists(index_path):
try:
index = FileIndex(index_path, input_path)
_logger.info(f'Loading existing cache: "{index_path}".')
return index
except ValueError as e:
_logger.warning(f'Couldn\'t load cache "{index_path}": {str(e)}.')

_logger.info(f'Indexing file "{input_path}". This may take a few seconds.')
_logger.debug(f'Using Threads: {num_threads}')

# These are the args passed to the _search_blocks_for_fe instances.
args: list[Tuple[str, List[int]]] = []
# Allocate which blocks of bytes will be processed by each thread.
num_blocks = math.ceil(file_size / _READ_SIZE_BYTES)
# Each thread will process at least blocks_per_thread blocks. If the number
# doesn't divide evenly, distribute the remainder.
blocks_per_thread, blocks_remainder = divmod(num_blocks, num_threads)
byte_offset = 0
for i in range(num_threads):
blocks = blocks_per_thread
if i < blocks_remainder:
blocks += 1
args.append((input_path, list(range(byte_offset, byte_offset + blocks * _READ_SIZE_BYTES, _READ_SIZE_BYTES))))
byte_offset += blocks * _READ_SIZE_BYTES

_logger.debug(f'Reads/thread: {blocks_per_thread}')

# Create a threadpool.
with Pool(num_threads) as p:
# Kick off the threads to process with their args. Then concatenate their returned data.
index_raw = np.concatenate([o for o in p.starmap(_search_blocks_for_fe, args)])

total_entries = len(index_raw)

_logger.debug(f'FE messages found: {total_entries}')

# This does an unnecessary conversion back and forth from the _RAW_DTYPE to
# the _DTYPE and back again when saving. This is to avoid needing to deal
# with the EOF entry directly. This adds less than a second, and could be
# avoided by reproducing the FileIndex EOF entry logic.
index = FileIndex(data=FileIndex._from_raw(index_raw))
if save_index:
_logger.info(f'Saving index to "{index_path}".')
index.save(index_path, input_path)
return index
Loading

0 comments on commit e2be15c

Please sign in to comment.