From 2ee2db52bb43edd34b74a9cb34b4e87e20fca57e Mon Sep 17 00:00:00 2001 From: Jonathan Diamond Date: Fri, 29 Sep 2023 15:10:37 -0700 Subject: [PATCH 1/2] Add more fields to Python IMU example, and add wheel speed one. --- python/examples/extract_imu_data.py | 38 ++++-- python/examples/extract_wheel_speed_data.py | 99 +++++++++++++++ .../analysis/data_loader.py | 120 +++++++++++++----- 3 files changed, 218 insertions(+), 39 deletions(-) create mode 100755 python/examples/extract_wheel_speed_data.py diff --git a/python/examples/extract_imu_data.py b/python/examples/extract_imu_data.py index 8ac9f868..fb4f591e 100755 --- a/python/examples/extract_imu_data.py +++ b/python/examples/extract_imu_data.py @@ -49,19 +49,37 @@ # Read satellite data from the file. reader = DataLoader(input_path) - result = reader.read(message_types=[IMUOutput], show_progress=True) + result = reader.read(message_types=[IMUOutput, RawIMUOutput], show_progress=True) imu_data = result[IMUOutput.MESSAGE_TYPE] - if len(imu_data.messages) == 0: + raw_imu_data = result[RawIMUOutput.MESSAGE_TYPE] + if len(imu_data.messages) == 0 and len(raw_imu_data.messages) == 0: logger.warning('No IMU data found in log file.') sys.exit(2) - # Generate a CSV file. - path = os.path.join(output_dir, 'imu_data.csv') - logger.info("Generating '%s'." % path) - with open(path, 'w') as f: - f.write('P1 Time (sec), Accel X (m/s^2), Y, Z, Gyro X (rad/s), Y, Z\n') - for message in imu_data.messages: - f.write('%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f\n' % - (float(message.p1_time), *message.accel_mps2, *message.gyro_rps)) + # Generate a CSV file for corrected IMU data. + if len(imu_data.messages) != 0: + path = os.path.join(output_dir, 'imu_data.csv') + logger.info("Generating '%s'." % path) + with open(path, 'w') as f: + f.write('P1 Time (sec), GPS Time (sec), Accel X (m/s^2), Y, Z, Gyro X (rad/s), Y, Z\n') + for message in imu_data.messages: + gps_time = reader.convert_to_gps_time(message.p1_time) + f.write('%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f\n' % + (float(message.p1_time), float(gps_time), *message.accel_mps2, *message.gyro_rps)) + else: + logger.info("No corrected IMU data.") + + # Generate a CSV file for raw IMU data. + if len(raw_imu_data.messages) != 0: + path = os.path.join(output_dir, 'raw_imu_data.csv') + logger.info("Generating '%s'." % path) + with open(path, 'w') as f: + f.write('P1 Time (sec), GPS Time (sec), Accel X (m/s^2), Y, Z, Gyro X (rad/s), Y, Z, Temp(C)\n') + for message in raw_imu_data.messages: + gps_time = reader.convert_to_gps_time(message.p1_time) + f.write('%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f\n' % + (float(message.p1_time), float(gps_time), *message.accel_mps2, *message.gyro_rps, message.temperature_degc)) + else: + logger.info("No raw IMU data.") logger.info("Output stored in '%s'." % os.path.abspath(output_dir)) diff --git a/python/examples/extract_wheel_speed_data.py b/python/examples/extract_wheel_speed_data.py new file mode 100755 index 00000000..c13a77e9 --- /dev/null +++ b/python/examples/extract_wheel_speed_data.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 + +import os +import sys + +# Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports +# if this application is being run directly out of the repository and is not installed as a pip package. +root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.insert(0, root_dir) + +from fusion_engine_client.analysis.data_loader import DataLoader +from fusion_engine_client.messages.core import * +from fusion_engine_client.utils import trace as logging +from fusion_engine_client.utils.log import locate_log, DEFAULT_LOG_BASE_DIR +from fusion_engine_client.utils.argument_parser import ArgumentParser + +if __name__ == "__main__": + # Parse arguments. + parser = ArgumentParser(description="""\ +Extract wheel speed data. +""") + parser.add_argument('--log-base-dir', metavar='DIR', default=DEFAULT_LOG_BASE_DIR, + help="The base directory containing FusionEngine logs to be searched if a log pattern is " + "specified.") + parser.add_argument('log', + help="The log to be read. May be one of:\n" + "- The path to a .p1log file or a file containing FusionEngine messages and other " + "content\n" + "- The path to a FusionEngine log directory\n" + "- A pattern matching a FusionEngine log directory under the specified base directory " + "(see find_fusion_engine_log() and --log-base-dir)") + options = parser.parse_args() + + # Configure logging. + logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s:%(lineno)d - %(message)s', stream=sys.stdout) + logger = logging.getLogger('point_one.fusion_engine') + logger.setLevel(logging.INFO) + + # Locate the input file and set the output directory. + input_path, output_dir, log_id = locate_log(options.log, return_output_dir=True, return_log_id=True, + log_base_dir=options.log_base_dir) + if input_path is None: + sys.exit(1) + + if log_id is None: + logger.info('Loading %s.' % os.path.basename(input_path)) + else: + logger.info('Loading %s from log %s.' % (os.path.basename(input_path), log_id)) + + # Read satellite data from the file. + reader = DataLoader(input_path) + result = reader.read(message_types=[WheelSpeedOutput, RawWheelSpeedOutput], show_progress=True) + wheel_speed_data = result[WheelSpeedOutput.MESSAGE_TYPE] + raw_wheel_speed_data = result[RawWheelSpeedOutput.MESSAGE_TYPE] + if len(wheel_speed_data.messages) == 0 and len(raw_wheel_speed_data.messages) == 0: + logger.warning('No wheel speed data found in log file.') + sys.exit(2) + + # Generate a CSV file for corrected wheel speed data. + if len(wheel_speed_data.messages) != 0: + path = os.path.join(output_dir, 'wheel_speed_data.csv') + logger.info("Generating '%s'." % path) + with open(path, 'w') as f: + f.write('P1 Time (sec), GPS Time (sec), Front Left Speed (m/s), Front Right Speed (m/s), Back Left Speed (m/s), Back Right Speed (m/s), Gear\n') + for message in wheel_speed_data.messages: + gps_time = reader.convert_to_gps_time(message.p1_time) + f.write( + '%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %d\n' % + (float(message.p1_time), + float(gps_time), + message.front_left_speed_mps, + message.front_right_speed_mps, + message.back_left_speed_mps, + message.back_right_speed_mps, + message.gear)) + else: + logger.info("No corrected wheel speed data.") + + # Generate a CSV file for raw wheel speed data. + if len(raw_wheel_speed_data.messages) != 0: + path = os.path.join(output_dir, 'raw_wheel_speed_data.csv') + logger.info("Generating '%s'." % path) + with open(path, 'w') as f: + f.write('P1 Time (sec), GPS Time (sec), Front Left Speed (m/s), Front Right Speed (m/s), Back Left Speed (m/s), Back Right Speed (m/s), Gear\n') + for message in wheel_speed_data.messages: + gps_time = reader.convert_to_gps_time(message.p1_time) + f.write( + '%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %d\n' % + (float(message.p1_time), + float(gps_time), + message.front_left_speed_mps, + message.front_right_speed_mps, + message.back_left_speed_mps, + message.back_right_speed_mps, + message.gear)) + else: + logger.info("No raw wheel speed data.") + + logger.info("Output stored in '%s'." % os.path.abspath(output_dir)) diff --git a/python/fusion_engine_client/analysis/data_loader.py b/python/fusion_engine_client/analysis/data_loader.py index ceb92a4d..c9b116e0 100644 --- a/python/fusion_engine_client/analysis/data_loader.py +++ b/python/fusion_engine_client/analysis/data_loader.py @@ -295,17 +295,17 @@ def read(self, *args, **kwargs) \ return self._read(*args, **kwargs) def _read(self, - message_types: Union[Iterable[MessageType], MessageType] = None, - time_range: TimeRange = None, - show_progress: bool = False, - ignore_cache: bool = False, disable_index_generation: bool = False, - max_messages: int = None, max_bytes: int = None, + message_types: Union[Iterable[MessageType], MessageType] = None, + time_range: TimeRange = None, + show_progress: bool = False, + ignore_cache: bool = False, disable_index_generation: bool = False, + max_messages: int = None, max_bytes: int = None, require_p1_time: bool = False, require_system_time: bool = False, - return_in_order: bool = False, return_bytes: bool = False, return_message_index: bool = False, - return_numpy: bool = False, keep_messages: bool = False, remove_nan_times: bool = True, - time_align: TimeAlignmentMode = TimeAlignmentMode.NONE, - aligned_message_types: Union[list, tuple, set] = None, - quiet: bool = False) \ + return_in_order: bool = False, return_bytes: bool = False, return_message_index: bool = False, + return_numpy: bool = False, keep_messages: bool = False, remove_nan_times: bool = True, + time_align: TimeAlignmentMode = TimeAlignmentMode.NONE, + aligned_message_types: Union[list, tuple, set] = None, + quiet: bool = False) \ -> Union[Dict[MessageType, MessageData], MessageData]: if quiet: logger = SilentLogger(self.logger.name) @@ -462,7 +462,7 @@ def _read(self, # not system time. The read_next() call below will apply this condition and only return messages with valid # system time. if (max_messages is not None and self.reader.have_index() and - not (require_system_time and system_time_messages_requested)): + not (require_system_time and system_time_messages_requested)): reader_max_messages_applied = True if max_messages >= 0: self.reader.filter_in_place(slice(None, max_messages)) @@ -649,14 +649,15 @@ def get_log_reader(self) -> MixedLogReader: def get_input_path(self): return self.reader.input_file.name - def convert_to_p1_time(self, - times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]], - Union[datetime, gpstime, Timestamp, float]], - assume_utc: bool = False) ->\ + def _convert_time(self, gps_to_p1, + times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]], + Union[datetime, gpstime, Timestamp, float]], + assume_utc: bool = False) ->\ np.ndarray: """! - @brief Convert UTC or GPS timestamps to P1 time. + @brief Convert UTC or GPS timestamps to P1 time or Convert UTC or P1 timestamps to GPS time. + @param gps_to_p1 If `True`, convert to P1 time. If `False`, convert to GPS time. @param times A list of one or more timestamps to be converted, using any of the following formats: - `datetime` - A UTC or local timezone date and time - `gpstime` - A GPS timestamp @@ -670,7 +671,7 @@ def convert_to_p1_time(self, - For `datetime`, if `tzinfo` is not set, assume it is `timezone.utc`. Otherwise, interpret the timestamp in the local timezone. - @return A numpy array containing P1 time values (in seconds), or `nan` if the value could not be converted. + @return A numpy array containing time values (in seconds), or `nan` if the value could not be converted. """ # Load pose messages, which contain the relationship between P1 and GPS time. Omit any NAN values from the # reference timestamps. @@ -700,6 +701,7 @@ def convert_to_p1_time(self, # First, convert all UTC times and all timestamp objects to GPS time or P1 values in seconds. timezone_warn_issued = False + def _to_gps_or_p1(value): nonlocal timezone_warn_issued if isinstance(value, gpstime): @@ -739,24 +741,84 @@ def _to_gps_or_p1(value): time_sec = np.array((_to_gps_or_p1(times),)) # Now, find all values that are GPS time (i.e., big enough that we assume they're not P1 times already) and - # convert them to P1 time. - gps_idx = is_gps_time(time_sec) - if np.any(gps_idx): - if p1_ref_sec is None: - time_sec[gps_idx] = np.nan - else: - # The relationship between P1 time and GPS time should not change rapidly since P1 time is rate-steered - # to align with GPS time. As a result, it should be safe to extrapolate between gaps in P1 or GPS times, - # as long as the gaps aren't extremely large. NumPy's interp() function does not extrapolate, so we - # instead use SciPy's function. - f = sp.interpolate.interp1d(gps_ref_sec, p1_ref_sec, fill_value='extrapolate') - time_sec[gps_idx] = f(time_sec[gps_idx]) + # convert them to P1 time or vice versa. + if gps_to_p1: + gps_idx = is_gps_time(time_sec) + if np.any(gps_idx): + if p1_ref_sec is None: + time_sec[gps_idx] = np.nan + else: + # The relationship between P1 time and GPS time should not change rapidly since P1 time is rate-steered + # to align with GPS time. As a result, it should be safe to extrapolate between gaps in P1 or GPS times, + # as long as the gaps aren't extremely large. NumPy's interp() function does not extrapolate, so we + # instead use SciPy's function. + f = sp.interpolate.interp1d(gps_ref_sec, p1_ref_sec, fill_value='extrapolate') + time_sec[gps_idx] = f(time_sec[gps_idx]) + else: + p1_idx = not is_gps_time(time_sec) + if np.any(p1_idx): + if p1_idx is None: + time_sec[p1_idx] = np.nan + else: + # See comment on sp.interpolate.interp1d above. + f = sp.interpolate.interp1d(p1_ref_sec, gps_ref_sec, fill_value='extrapolate') + time_sec[p1_idx] = f(time_sec[p1_idx]) if return_scalar: return time_sec[0] else: return time_sec + def convert_to_p1_time(self, + times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]], + Union[datetime, gpstime, Timestamp, float]], + assume_utc: bool = False) ->\ + np.ndarray: + """! + @brief Convert UTC or GPS timestamps to P1 time. + + @param times A list of one or more timestamps to be converted, using any of the following formats: + - `datetime` - A UTC or local timezone date and time + - `gpstime` - A GPS timestamp + - A @ref fusion_engine_client.messages.timestamps.Timestamp containing GPS time or P1 time + - A @ref fusion_engine_client.messages.timestamps.MeasurementDetails containing GPS time or P1 time + - `float` - A GPS or P1 time value (in seconds) + - Note that UTC timestamps cannot be specified `float` unless `assume_utc == True` + @param assume_utc If `True`: + - For `float` values, assume values greater than the POSIX offset for 2000/1/1 are UTC timestamps in + seconds. + - For `datetime`, if `tzinfo` is not set, assume it is `timezone.utc`. Otherwise, interpret the timestamp + in the local timezone. + + @return A numpy array containing P1 time values (in seconds), or `nan` if the value could not be converted. + """ + return self._convert_time(True, times, assume_utc) + + def convert_to_gps_time(self, + times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]], + Union[datetime, gpstime, Timestamp, float]], + assume_utc: bool = False) ->\ + np.ndarray: + """! + @brief Convert UTC or P1 timestamps to GPS time. + + @param times A list of one or more timestamps to be converted, using any of the following formats: + - `datetime` - A UTC or local timezone date and time + - `gpstime` - A GPS timestamp + - A @ref fusion_engine_client.messages.timestamps.Timestamp containing GPS time or P1 time + - A @ref fusion_engine_client.messages.timestamps.MeasurementDetails containing GPS time or P1 time + - `float` - A GPS or P1 time value (in seconds) + - Note that UTC timestamps cannot be specified `float` unless `assume_utc == True` + @param assume_utc If `True`: + - For `float` values, assume values greater than the POSIX offset for 2000/1/1 are UTC timestamps in + seconds. + - For `datetime`, if `tzinfo` is not set, assume it is `timezone.utc`. Otherwise, interpret the timestamp + in the local timezone. + + @return A numpy array containing GPS time values (in seconds), or `nan` if the value could not be converted. + """ + return self._convert_time(False, times, assume_utc) + @classmethod def time_align_data(cls, data: dict, mode: TimeAlignmentMode = TimeAlignmentMode.INSERT, message_types: Union[list, tuple, set] = None): From b7d6cefe4051413a9a1e4f509500e44064c7fe2a Mon Sep 17 00:00:00 2001 From: Jonathan Diamond Date: Mon, 2 Oct 2023 11:00:20 -0700 Subject: [PATCH 2/2] Update from PR comments. --- ..._data.py => extract_vehicle_speed_data.py} | 51 +++++++++++++++++-- .../analysis/data_loader.py | 27 +++++----- 2 files changed, 62 insertions(+), 16 deletions(-) rename python/examples/{extract_wheel_speed_data.py => extract_vehicle_speed_data.py} (70%) diff --git a/python/examples/extract_wheel_speed_data.py b/python/examples/extract_vehicle_speed_data.py similarity index 70% rename from python/examples/extract_wheel_speed_data.py rename to python/examples/extract_vehicle_speed_data.py index c13a77e9..0c7af9e5 100755 --- a/python/examples/extract_wheel_speed_data.py +++ b/python/examples/extract_vehicle_speed_data.py @@ -49,12 +49,21 @@ # Read satellite data from the file. reader = DataLoader(input_path) - result = reader.read(message_types=[WheelSpeedOutput, RawWheelSpeedOutput], show_progress=True) + result = reader.read( + message_types=[ + WheelSpeedOutput, + RawWheelSpeedOutput, + VehicleSpeedOutput, + RawVehicleSpeedOutput], + show_progress=True) + if all(len(d.messages) == 0 for d in result.values()): + logger.warning('No speed data found in log file.') + sys.exit(2) + wheel_speed_data = result[WheelSpeedOutput.MESSAGE_TYPE] raw_wheel_speed_data = result[RawWheelSpeedOutput.MESSAGE_TYPE] - if len(wheel_speed_data.messages) == 0 and len(raw_wheel_speed_data.messages) == 0: - logger.warning('No wheel speed data found in log file.') - sys.exit(2) + vehicle_speed_data = result[WheelSpeedOutput.MESSAGE_TYPE] + raw_vehicle_speed_data = result[RawWheelSpeedOutput.MESSAGE_TYPE] # Generate a CSV file for corrected wheel speed data. if len(wheel_speed_data.messages) != 0: @@ -96,4 +105,38 @@ else: logger.info("No raw wheel speed data.") + # Generate a CSV file for corrected vehicle speed data. + if len(vehicle_speed_data.messages) != 0: + path = os.path.join(output_dir, 'vehicle_speed_data.csv') + logger.info("Generating '%s'." % path) + with open(path, 'w') as f: + f.write('P1 Time (sec), GPS Time (sec), Vehicle Speed (m/s), Gear\n') + for message in vehicle_speed_data.messages: + gps_time = reader.convert_to_gps_time(message.p1_time) + f.write( + '%.6f, %.6f, %.6f, %d\n' % + (float(message.p1_time), + float(gps_time), + message.vehicle_speed_mps, + message.gear)) + else: + logger.info("No corrected vehicle speed data.") + + # Generate a CSV file for raw vehicle speed data. + if len(raw_vehicle_speed_data.messages) != 0: + path = os.path.join(output_dir, 'raw_vehicle_speed_data.csv') + logger.info("Generating '%s'." % path) + with open(path, 'w') as f: + f.write('P1 Time (sec), GPS Time (sec), Vehicle Speed (m/s), Gear\n') + for message in vehicle_speed_data.messages: + gps_time = reader.convert_to_gps_time(message.p1_time) + f.write( + '%.6f, %.6f, %.6f, %d\n' % + (float(message.p1_time), + float(gps_time), + message.vehicle_speed_mps, + message.gear)) + else: + logger.info("No raw vehicle speed data.") + logger.info("Output stored in '%s'." % os.path.abspath(output_dir)) diff --git a/python/fusion_engine_client/analysis/data_loader.py b/python/fusion_engine_client/analysis/data_loader.py index c9b116e0..23d3c25d 100644 --- a/python/fusion_engine_client/analysis/data_loader.py +++ b/python/fusion_engine_client/analysis/data_loader.py @@ -1,10 +1,8 @@ -from typing import Dict, Iterable, Tuple, Union +from enum import Enum, auto +from typing import Dict, Iterable, Union from collections import deque -import copy from datetime import datetime, timezone -import io -import os from gpstime import gpstime, unix2gps import numpy as np @@ -12,7 +10,7 @@ from ..messages import * from ..messages.timestamp import is_gps_time -from ..parsers.file_index import FileIndex, FileIndexBuilder +from ..parsers.file_index import FileIndex from ..parsers.mixed_log_reader import MixedLogReader from ..utils import trace as logging from ..utils.trace import SilentLogger @@ -20,6 +18,11 @@ from ..utils.time_range import TimeRange +class TimeConversionType: + P1_TO_GPS = auto() + GPS_TO_P1 = auto() + + class MessageData(object): def __init__(self, message_type, params): self.message_type = message_type @@ -649,7 +652,7 @@ def get_log_reader(self) -> MixedLogReader: def get_input_path(self): return self.reader.input_file.name - def _convert_time(self, gps_to_p1, + def _convert_time(self, conversion_type: TimeConversionType, times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]], Union[datetime, gpstime, Timestamp, float]], assume_utc: bool = False) ->\ @@ -657,7 +660,7 @@ def _convert_time(self, gps_to_p1, """! @brief Convert UTC or GPS timestamps to P1 time or Convert UTC or P1 timestamps to GPS time. - @param gps_to_p1 If `True`, convert to P1 time. If `False`, convert to GPS time. + @param conversion_type If `GPS_TO_P1`, convert to P1 time. If `P1_TO_GPS`, convert to GPS time. @param times A list of one or more timestamps to be converted, using any of the following formats: - `datetime` - A UTC or local timezone date and time - `gpstime` - A GPS timestamp @@ -742,7 +745,7 @@ def _to_gps_or_p1(value): # Now, find all values that are GPS time (i.e., big enough that we assume they're not P1 times already) and # convert them to P1 time or vice versa. - if gps_to_p1: + if conversion_type == TimeConversionType.GPS_TO_P1: gps_idx = is_gps_time(time_sec) if np.any(gps_idx): if p1_ref_sec is None: @@ -754,8 +757,8 @@ def _to_gps_or_p1(value): # instead use SciPy's function. f = sp.interpolate.interp1d(gps_ref_sec, p1_ref_sec, fill_value='extrapolate') time_sec[gps_idx] = f(time_sec[gps_idx]) - else: - p1_idx = not is_gps_time(time_sec) + elif conversion_type == TimeConversionType.P1_TO_GPS: + p1_idx = np.logical_not(is_gps_time(time_sec)) if np.any(p1_idx): if p1_idx is None: time_sec[p1_idx] = np.nan @@ -792,7 +795,7 @@ def convert_to_p1_time(self, @return A numpy array containing P1 time values (in seconds), or `nan` if the value could not be converted. """ - return self._convert_time(True, times, assume_utc) + return self._convert_time(conversion_type=TimeConversionType.GPS_TO_P1, times=times, assume_utc=assume_utc) def convert_to_gps_time(self, times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]], @@ -817,7 +820,7 @@ def convert_to_gps_time(self, @return A numpy array containing GPS time values (in seconds), or `nan` if the value could not be converted. """ - return self._convert_time(False, times, assume_utc) + return self._convert_time(conversion_type=TimeConversionType.P1_TO_GPS, times=times, assume_utc=assume_utc) @classmethod def time_align_data(cls, data: dict, mode: TimeAlignmentMode = TimeAlignmentMode.INSERT,