Skip to content

Commit

Permalink
Integrate fast_generate_index into MixedLogReader.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Diamond committed Oct 4, 2023
1 parent 165a327 commit a5d26d1
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 152 deletions.
3 changes: 0 additions & 3 deletions python/bin/p1_lband_extract
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ Extract L-band corrections data from a log containing FusionEngine L-band frame
ignore_index=not read_index, generate_index=generate_index,
message_types=message_types, time_range=time_range)

if reader.generating_index():
_logger.info('Generating index file - processing complete file. This may take some time.')

total_messages = 0
bytes_decoded = 0

Expand Down
10 changes: 3 additions & 7 deletions python/bin/p1_print
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ other types of data.
options = parser.parse_args()

read_index = not options.ignore_index
generate_index = not options.ignore_index

# Configure logging.
if options.verbose >= 1:
Expand Down Expand Up @@ -162,11 +161,7 @@ other types of data.

# Process all data in the file.
reader = MixedLogReader(input_path, return_bytes=True, return_offset=True, show_progress=options.progress,
ignore_index=not read_index, generate_index=generate_index and read_index,
message_types=message_types, time_range=time_range)

if reader.generating_index() and (len(message_types) > 0 or options.time is not None):
_logger.info('Generating index file - processing complete file. This may take some time.')
ignore_index=not read_index, message_types=message_types, time_range=time_range)

first_p1_time_sec = None
last_p1_time_sec = None
Expand Down Expand Up @@ -259,7 +254,8 @@ other types of data.
_logger.info(format_string.format('Message Name', 'Type', 'Count'))
_logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8))
for type, info in sorted(message_stats.items(), key=lambda x: int(x[0])):
_logger.info(format_string.format(message_type_to_class[type].__name__, int(type), info['count']))
name = message_type_to_class[type].__name__ if type in message_type_to_class else "Unknown"
_logger.info(format_string.format(name, int(type), info['count']))
_logger.info(format_string.format('-' * 50, '-' * 5, '-' * 8))
_logger.info(format_string.format('Total', '', total_messages))
elif total_messages == 0:
Expand Down
12 changes: 2 additions & 10 deletions python/fusion_engine_client/analysis/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ def open(self, path, generate_index=True, ignore_index=False):

self.reader = MixedLogReader(input_file=path, generate_index=generate_index, ignore_index=ignore_index,
return_bytes=True, return_message_index=True)
if self.reader.have_index():
self.logger.debug("Using index file '%s'." % self.reader.index_path)

# Read the first message (with P1 time) in the file to set self.t0.
#
Expand Down Expand Up @@ -434,7 +432,6 @@ def _read(self,
else:
self.reader.rewind()
self.reader.clear_filters()
self.reader.set_generate_index(self._generate_index and not disable_index_generation)
self.reader.set_show_progress(show_progress)
self.reader.set_max_bytes(max_bytes)

Expand Down Expand Up @@ -576,13 +573,8 @@ def _read(self,
data_cache[header.message_type].message_index.append(message_index)

if max_messages is not None:
# If we reached the max message count but we're generating an index file, we need to read through the
# entire data file. Keep reading but discard any further messages.
if self.reader.generating_index() and message_count > abs(max_messages):
logger.debug(' Max messages reached. Discarding. [# messages=%d]' % message_count)
continue
# If we're not generating an index and we hit the max message count, we're done reading.
elif not self.reader.generating_index() and message_count == abs(max_messages):
# If we hit the max message count, we're done reading.
if message_count == abs(max_messages):
logger.debug(' Max messages reached. Done reading. [# messages=%d]' % message_count)
break

Expand Down
44 changes: 37 additions & 7 deletions python/fusion_engine_client/parsers/fast_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]):
# This is lot faster then doing this check in raw Python due to numpy optimizations.
sync_matches = np.where(np_data == _PREAMBLE)[0]

# To do the CRC check and find a p1_time the full message needs to be parsed. Depending on the application it might be better to
# To do the CRC check and find a p1_time the full message needs to be parsed.
for i in sync_matches:
try:
# Check if the message has a valid length and CRC. This could probably be optimized.
Expand Down Expand Up @@ -91,15 +91,44 @@ def _search_blocks_for_fe(input_path: str, block_starts: List[int]):
return np.array(raw_list, dtype=FileIndex._RAW_DTYPE)


def fast_generate_index(input_path: str, num_threads=cpu_count()) -> FileIndex:
def fast_generate_index(
input_path: str,
force_reindex=False,
save_index=True,
max_bytes=None,
num_threads=cpu_count()) -> FileIndex:
"""!
@brief Quickly build a FileIndex
@brief Quickly build a FileIndex.
This basic logic would be relatively easy to extend to do general parallel processing on messages.
@param input_path The path to the file to be read.
@param force_reindex If `True` regenerate the index even if there's an existing file.
@param save_index If `True` save the index to disk after generation.
@param max_bytes If specified, read up to the maximum number of bytes.
@param block_starts The blocks of data to search for FE messages in.
"""
file_size = os.stat(input_path).st_size
_logger.debug(f'File size: {int(file_size/1024/1024)}MB')

if max_bytes and max_bytes < file_size:
_logger.debug(f'Only indexing: {max_bytes/1024/1024}MB')
file_size = max_bytes
if save_index:
save_index = False
_logger.info('Max bytes specified. Disabling saving index.')

index_path = FileIndex.get_path(input_path)
# Check if index file can be loaded.
if not force_reindex and os.path.exists(index_path):
try:
index = FileIndex(index_path, input_path)
_logger.info(f'Loading existing cache: "{index_path}".')
return index
except ValueError as e:
_logger.warning(f'Couldn\'t load cache "{index_path}": {str(e)}.')

_logger.info(f'Indexing file "{input_path}". This may take a few seconds.')
_logger.debug(f'Using Threads: {num_threads}')

# These are the args passed to the _search_blocks_for_fe instances.
Expand Down Expand Up @@ -128,11 +157,12 @@ def fast_generate_index(input_path: str, num_threads=cpu_count()) -> FileIndex:

_logger.debug(f'FE messages found: {total_entries}')

index_path = FileIndex.get_path(input_path)
_logger.debug("Saving index file as '%s'." % index_path)

# This does an unnecessary conversion back and forth from the _RAW_DTYPE to
# the _DTYPE and back again when saving. This is to avoid needing to deal
# with the EOF entry directly. This adds less then a second, and could be
# avoided by reproducing the eof stamping logic.
return FileIndex(data=FileIndex._from_raw(index_raw))
index = FileIndex(data=FileIndex._from_raw(index_raw))
if save_index:
_logger.info(f'Saving index to "{index_path}".')
index.save(index_path, input_path)
return index
146 changes: 21 additions & 125 deletions python/fusion_engine_client/parsers/mixed_log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import numpy as np

from . import file_index
from . import fast_indexer, file_index
from ..messages import MessageType, MessageHeader, MessagePayload, Timestamp, message_type_to_class
from ..utils import trace as logging
from ..utils.time_range import TimeRange
Expand Down Expand Up @@ -102,38 +102,13 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool =
self.max_bytes = sys.maxsize
else:
self.max_bytes = max_bytes
if generate_index:
self.logger.debug('Max bytes specified. Disabling index generation.')
generate_index = False

# Open the companion index file if one exists.
self.index_path = file_index.FileIndex.get_path(input_path)
self._original_index = None
self.index = None
self.next_index_elem = 0
if ignore_index:
if os.path.exists(self.index_path):
if generate_index:
self.logger.debug("Deleting/regenerating index file @ '%s'." % self.index_path)
os.remove(self.index_path)
else:
self.logger.debug("Ignoring index file @ '%s'." % self.index_path)
else:
if os.path.exists(self.index_path):
try:
self.logger.debug("Loading index file '%s'." % self.index_path)
self._original_index = file_index.FileIndex(index_path=self.index_path, data_path=input_path,
delete_on_error=generate_index)
self.index = self._original_index[self.message_types][self.time_range]
self.filtered_message_types = len(np.unique(self._original_index.type)) != \
len(np.unique(self.index.type))
except ValueError as e:
self.logger.error("Error loading index file: %s" % str(e))
else:
self.logger.debug("No index file found @ '%s'." % self.index_path)

self.index_builder = None
self.set_generate_index(generate_index)
# Open the companion index file if one exists, otherwise index the file.
self._original_index = fast_indexer.fast_generate_index(input_path, ignore_index, generate_index, max_bytes)
self.next_index_elem = 0
self.index = self._original_index[self.message_types][self.time_range]
self.filtered_message_types = len(np.unique(self._original_index.type)) != \
len(np.unique(self.index.type))

def rewind(self):
self.logger.debug('Rewinding to the start of the file.')
Expand All @@ -155,9 +130,6 @@ def rewind(self):
self.next_index_elem = 0
self.input_file.seek(0, os.SEEK_SET)

if self.index_builder is not None:
self.index_builder = file_index.FileIndexBuilder()

def seek_to_message(self, message_index: int, is_filtered_index: bool = False):
if self.index is None:
raise NotImplemented('A file index is required to seek by message index.')
Expand Down Expand Up @@ -185,18 +157,6 @@ def have_index(self):
def get_index(self):
return self._original_index

def generating_index(self):
return self.index_builder is not None

def set_generate_index(self, generate_index):
if self._original_index is None:
if generate_index:
self.logger.debug("Generating index file '%s'." % self.index_path)
self.index_builder = file_index.FileIndexBuilder()
else:
self.logger.debug("Index generation disabled.")
self.index_builder = None

def set_show_progress(self, show_progress):
self.show_progress = show_progress

Expand All @@ -205,26 +165,19 @@ def set_max_bytes(self, max_bytes):
self.max_bytes = sys.maxsize
else:
self.max_bytes = max_bytes
if self.index_builder is not None:
self.logger.debug('Max bytes specified. Disabling index generation.')
self.set_generate_index(False)

def get_bytes_read(self):
return self.total_bytes_read

def next(self):
return self.read_next()

def read_next(self, require_p1_time=False, require_system_time=False, generate_index=True):
return self._read_next(require_p1_time=require_p1_time, require_system_time=require_system_time,
generate_index=generate_index)
def read_next(self, require_p1_time=False, require_system_time=False):
return self._read_next(require_p1_time=require_p1_time, require_system_time=require_system_time)

def _read_next(self, require_p1_time=False, require_system_time=False, generate_index=True, force_eof=False):
def _read_next(self, require_p1_time=False, require_system_time=False, force_eof=False):
if force_eof:
if not self.reached_eof():
if self.generating_index():
raise ValueError('Cannot jump to EOF while building an index file.')

self.logger.debug('Forcibly seeking to EOF.')
if self.index is None:
self.input_file.seek(self.file_size_bytes, os.SEEK_SET)
Expand Down Expand Up @@ -337,8 +290,7 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_
# Deserialize the payload if we need it.
need_payload = self.return_payload or \
self.time_range is not None or \
require_p1_time or require_system_time or \
(self.index_builder is not None and generate_index)
require_p1_time or require_system_time

if need_payload:
cls = message_type_to_class.get(header.message_type, None)
Expand All @@ -362,11 +314,6 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_
# Extract P1 time if available.
p1_time = payload.get_p1_time() if payload is not None else Timestamp()

# Add this message to the index file.
if self.index_builder is not None and generate_index:
self.index_builder.append(message_type=header.message_type, offset_bytes=start_offset_bytes,
p1_time=p1_time)

# Now, if this message is not in the user-specified filter criteria, skip it.
#
# If we have an index available, this is implied by the index (we won't seek to messages that don't meet
Expand All @@ -381,7 +328,7 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_
self.logger.trace("Message does not have valid P1 time. Skipping.", depth=1)
continue
elif self.time_range is not None and not self.time_range.is_in_range(payload):
if self.time_range.in_range_started() and (self.index_builder is None or not generate_index):
if self.time_range.in_range_started():
self.logger.debug("End of time range reached. Finished processing.")
break
else:
Expand Down Expand Up @@ -414,70 +361,22 @@ def _read_next(self, require_p1_time=False, require_system_time=False, generate_
self._print_progress(self.total_bytes_read)
self.logger.debug("Read %d bytes total." % self.total_bytes_read)

# If we are creating an index file, save it now.
if self.index_builder is not None and generate_index:
self.logger.debug("Saving index file as '%s'." % self.index_path)
self._original_index = self.index_builder.save(self.index_path, self.input_file.name)
self.index_builder = None

self.index = self._original_index[self.message_types][self.time_range]
if self.remove_invalid_p1_time:
self.index = self.index.get_time_range(hint='remove_nans')
self.message_types = None
self.time_range = None
self.next_index_elem = len(self.index)

# Finished iterating.
if force_eof:
return
else:
raise StopIteration()

def _advance_to_next_sync(self):
if self.index is None:
try:
if self.logger.isEnabledFor(logging.getTraceLevel(depth=2)):
self.logger.trace('Starting next sync search @ %d (0x%x).' %
(self.total_bytes_read, self.total_bytes_read),
depth=2)
while True:
if self.total_bytes_read + 1 >= self.max_bytes:
self.logger.debug('Max read length exceeded (%d B).' % self.max_bytes)
return False

byte0 = self.input_file.read(1)[0]
self.total_bytes_read += 1
while True:
if byte0 == MessageHeader.SYNC0:
if self.total_bytes_read + 1 >= self.max_bytes:
self.logger.debug('Max read length exceeded (%d B).' % self.max_bytes)
return False

byte1 = self.input_file.read(1)[0]
self.total_bytes_read += 1
if byte1 == MessageHeader.SYNC1:
self.input_file.seek(-2, os.SEEK_CUR)
self.total_bytes_read -= 2
if self.logger.isEnabledFor(logging.getTraceLevel(depth=3)):
self.logger.trace('Sync bytes found @ %d (0x%x).' %
(self.total_bytes_read, self.total_bytes_read),
depth=3)
return True
byte0 = byte1
else:
break
except IndexError:
return False
if self.next_index_elem == len(self.index):
return False
else:
if self.next_index_elem == len(self.index):
return False
else:
offset_bytes = self.index.offset[self.next_index_elem]
self.current_message_index = self.index.message_index[self.next_index_elem]
self.next_index_elem += 1
self.input_file.seek(offset_bytes, os.SEEK_SET)
self.total_bytes_read = offset_bytes
return True
offset_bytes = self.index.offset[self.next_index_elem]
self.current_message_index = self.index.message_index[self.next_index_elem]
self.next_index_elem += 1
self.input_file.seek(offset_bytes, os.SEEK_SET)
self.total_bytes_read = offset_bytes
return True

def _print_progress(self, file_size=None):
show_progress = self.show_progress
Expand Down Expand Up @@ -664,7 +563,4 @@ def __next__(self):

@classmethod
def generate_index_file(cls, input_file):
reader = MixedLogReader(input_file=input_file, ignore_index=False, generate_index=True, return_payload=False)
if reader.index is None:
for _ in reader:
pass
return fast_indexer.fast_generate_index(input_file)

0 comments on commit a5d26d1

Please sign in to comment.