Skip to content

Commit

Permalink
Add more fields to Python IMU example, and add wheel speed one.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Diamond committed Sep 29, 2023
1 parent ebdd627 commit 2ee2db5
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 39 deletions.
38 changes: 28 additions & 10 deletions python/examples/extract_imu_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,37 @@

# Read satellite data from the file.
reader = DataLoader(input_path)
result = reader.read(message_types=[IMUOutput], show_progress=True)
result = reader.read(message_types=[IMUOutput, RawIMUOutput], show_progress=True)
imu_data = result[IMUOutput.MESSAGE_TYPE]
if len(imu_data.messages) == 0:
raw_imu_data = result[RawIMUOutput.MESSAGE_TYPE]
if len(imu_data.messages) == 0 and len(raw_imu_data.messages) == 0:
logger.warning('No IMU data found in log file.')
sys.exit(2)

# Generate a CSV file.
path = os.path.join(output_dir, 'imu_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), Accel X (m/s^2), Y, Z, Gyro X (rad/s), Y, Z\n')
for message in imu_data.messages:
f.write('%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f\n' %
(float(message.p1_time), *message.accel_mps2, *message.gyro_rps))
# Generate a CSV file for corrected IMU data.
if len(imu_data.messages) != 0:
path = os.path.join(output_dir, 'imu_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), GPS Time (sec), Accel X (m/s^2), Y, Z, Gyro X (rad/s), Y, Z\n')
for message in imu_data.messages:
gps_time = reader.convert_to_gps_time(message.p1_time)
f.write('%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f\n' %
(float(message.p1_time), float(gps_time), *message.accel_mps2, *message.gyro_rps))
else:
logger.info("No corrected IMU data.")

# Generate a CSV file for raw IMU data.
if len(raw_imu_data.messages) != 0:
path = os.path.join(output_dir, 'raw_imu_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), GPS Time (sec), Accel X (m/s^2), Y, Z, Gyro X (rad/s), Y, Z, Temp(C)\n')
for message in raw_imu_data.messages:
gps_time = reader.convert_to_gps_time(message.p1_time)
f.write('%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f\n' %
(float(message.p1_time), float(gps_time), *message.accel_mps2, *message.gyro_rps, message.temperature_degc))
else:
logger.info("No raw IMU data.")

logger.info("Output stored in '%s'." % os.path.abspath(output_dir))
99 changes: 99 additions & 0 deletions python/examples/extract_wheel_speed_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python3

import os
import sys

# Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports
# if this application is being run directly out of the repository and is not installed as a pip package.
root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, root_dir)

from fusion_engine_client.analysis.data_loader import DataLoader
from fusion_engine_client.messages.core import *
from fusion_engine_client.utils import trace as logging
from fusion_engine_client.utils.log import locate_log, DEFAULT_LOG_BASE_DIR
from fusion_engine_client.utils.argument_parser import ArgumentParser

if __name__ == "__main__":
# Parse arguments.
parser = ArgumentParser(description="""\
Extract wheel speed data.
""")
parser.add_argument('--log-base-dir', metavar='DIR', default=DEFAULT_LOG_BASE_DIR,
help="The base directory containing FusionEngine logs to be searched if a log pattern is "
"specified.")
parser.add_argument('log',
help="The log to be read. May be one of:\n"
"- The path to a .p1log file or a file containing FusionEngine messages and other "
"content\n"
"- The path to a FusionEngine log directory\n"
"- A pattern matching a FusionEngine log directory under the specified base directory "
"(see find_fusion_engine_log() and --log-base-dir)")
options = parser.parse_args()

# Configure logging.
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s:%(lineno)d - %(message)s', stream=sys.stdout)
logger = logging.getLogger('point_one.fusion_engine')
logger.setLevel(logging.INFO)

# Locate the input file and set the output directory.
input_path, output_dir, log_id = locate_log(options.log, return_output_dir=True, return_log_id=True,
log_base_dir=options.log_base_dir)
if input_path is None:
sys.exit(1)

if log_id is None:
logger.info('Loading %s.' % os.path.basename(input_path))
else:
logger.info('Loading %s from log %s.' % (os.path.basename(input_path), log_id))

# Read satellite data from the file.
reader = DataLoader(input_path)
result = reader.read(message_types=[WheelSpeedOutput, RawWheelSpeedOutput], show_progress=True)
wheel_speed_data = result[WheelSpeedOutput.MESSAGE_TYPE]
raw_wheel_speed_data = result[RawWheelSpeedOutput.MESSAGE_TYPE]
if len(wheel_speed_data.messages) == 0 and len(raw_wheel_speed_data.messages) == 0:
logger.warning('No wheel speed data found in log file.')
sys.exit(2)

# Generate a CSV file for corrected wheel speed data.
if len(wheel_speed_data.messages) != 0:
path = os.path.join(output_dir, 'wheel_speed_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), GPS Time (sec), Front Left Speed (m/s), Front Right Speed (m/s), Back Left Speed (m/s), Back Right Speed (m/s), Gear\n')
for message in wheel_speed_data.messages:
gps_time = reader.convert_to_gps_time(message.p1_time)
f.write(
'%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %d\n' %
(float(message.p1_time),
float(gps_time),
message.front_left_speed_mps,
message.front_right_speed_mps,
message.back_left_speed_mps,
message.back_right_speed_mps,
message.gear))
else:
logger.info("No corrected wheel speed data.")

# Generate a CSV file for raw wheel speed data.
if len(raw_wheel_speed_data.messages) != 0:
path = os.path.join(output_dir, 'raw_wheel_speed_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), GPS Time (sec), Front Left Speed (m/s), Front Right Speed (m/s), Back Left Speed (m/s), Back Right Speed (m/s), Gear\n')
for message in wheel_speed_data.messages:
gps_time = reader.convert_to_gps_time(message.p1_time)
f.write(
'%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %d\n' %
(float(message.p1_time),
float(gps_time),
message.front_left_speed_mps,
message.front_right_speed_mps,
message.back_left_speed_mps,
message.back_right_speed_mps,
message.gear))
else:
logger.info("No raw wheel speed data.")

logger.info("Output stored in '%s'." % os.path.abspath(output_dir))
120 changes: 91 additions & 29 deletions python/fusion_engine_client/analysis/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,17 +295,17 @@ def read(self, *args, **kwargs) \
return self._read(*args, **kwargs)

def _read(self,
message_types: Union[Iterable[MessageType], MessageType] = None,
time_range: TimeRange = None,
show_progress: bool = False,
ignore_cache: bool = False, disable_index_generation: bool = False,
max_messages: int = None, max_bytes: int = None,
message_types: Union[Iterable[MessageType], MessageType] = None,
time_range: TimeRange = None,
show_progress: bool = False,
ignore_cache: bool = False, disable_index_generation: bool = False,
max_messages: int = None, max_bytes: int = None,
require_p1_time: bool = False, require_system_time: bool = False,
return_in_order: bool = False, return_bytes: bool = False, return_message_index: bool = False,
return_numpy: bool = False, keep_messages: bool = False, remove_nan_times: bool = True,
time_align: TimeAlignmentMode = TimeAlignmentMode.NONE,
aligned_message_types: Union[list, tuple, set] = None,
quiet: bool = False) \
return_in_order: bool = False, return_bytes: bool = False, return_message_index: bool = False,
return_numpy: bool = False, keep_messages: bool = False, remove_nan_times: bool = True,
time_align: TimeAlignmentMode = TimeAlignmentMode.NONE,
aligned_message_types: Union[list, tuple, set] = None,
quiet: bool = False) \
-> Union[Dict[MessageType, MessageData], MessageData]:
if quiet:
logger = SilentLogger(self.logger.name)
Expand Down Expand Up @@ -462,7 +462,7 @@ def _read(self,
# not system time. The read_next() call below will apply this condition and only return messages with valid
# system time.
if (max_messages is not None and self.reader.have_index() and
not (require_system_time and system_time_messages_requested)):
not (require_system_time and system_time_messages_requested)):
reader_max_messages_applied = True
if max_messages >= 0:
self.reader.filter_in_place(slice(None, max_messages))
Expand Down Expand Up @@ -649,14 +649,15 @@ def get_log_reader(self) -> MixedLogReader:
def get_input_path(self):
return self.reader.input_file.name

def convert_to_p1_time(self,
times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]],
Union[datetime, gpstime, Timestamp, float]],
assume_utc: bool = False) ->\
def _convert_time(self, gps_to_p1,
times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]],
Union[datetime, gpstime, Timestamp, float]],
assume_utc: bool = False) ->\
np.ndarray:
"""!
@brief Convert UTC or GPS timestamps to P1 time.
@brief Convert UTC or GPS timestamps to P1 time or Convert UTC or P1 timestamps to GPS time.
@param gps_to_p1 If `True`, convert to P1 time. If `False`, convert to GPS time.
@param times A list of one or more timestamps to be converted, using any of the following formats:
- `datetime` - A UTC or local timezone date and time
- `gpstime` - A GPS timestamp
Expand All @@ -670,7 +671,7 @@ def convert_to_p1_time(self,
- For `datetime`, if `tzinfo` is not set, assume it is `timezone.utc`. Otherwise, interpret the timestamp
in the local timezone.
@return A numpy array containing P1 time values (in seconds), or `nan` if the value could not be converted.
@return A numpy array containing time values (in seconds), or `nan` if the value could not be converted.
"""
# Load pose messages, which contain the relationship between P1 and GPS time. Omit any NAN values from the
# reference timestamps.
Expand Down Expand Up @@ -700,6 +701,7 @@ def convert_to_p1_time(self,

# First, convert all UTC times and all timestamp objects to GPS time or P1 values in seconds.
timezone_warn_issued = False

def _to_gps_or_p1(value):
nonlocal timezone_warn_issued
if isinstance(value, gpstime):
Expand Down Expand Up @@ -739,24 +741,84 @@ def _to_gps_or_p1(value):
time_sec = np.array((_to_gps_or_p1(times),))

# Now, find all values that are GPS time (i.e., big enough that we assume they're not P1 times already) and
# convert them to P1 time.
gps_idx = is_gps_time(time_sec)
if np.any(gps_idx):
if p1_ref_sec is None:
time_sec[gps_idx] = np.nan
else:
# The relationship between P1 time and GPS time should not change rapidly since P1 time is rate-steered
# to align with GPS time. As a result, it should be safe to extrapolate between gaps in P1 or GPS times,
# as long as the gaps aren't extremely large. NumPy's interp() function does not extrapolate, so we
# instead use SciPy's function.
f = sp.interpolate.interp1d(gps_ref_sec, p1_ref_sec, fill_value='extrapolate')
time_sec[gps_idx] = f(time_sec[gps_idx])
# convert them to P1 time or vice versa.
if gps_to_p1:
gps_idx = is_gps_time(time_sec)
if np.any(gps_idx):
if p1_ref_sec is None:
time_sec[gps_idx] = np.nan
else:
# The relationship between P1 time and GPS time should not change rapidly since P1 time is rate-steered
# to align with GPS time. As a result, it should be safe to extrapolate between gaps in P1 or GPS times,
# as long as the gaps aren't extremely large. NumPy's interp() function does not extrapolate, so we
# instead use SciPy's function.
f = sp.interpolate.interp1d(gps_ref_sec, p1_ref_sec, fill_value='extrapolate')
time_sec[gps_idx] = f(time_sec[gps_idx])
else:
p1_idx = not is_gps_time(time_sec)
if np.any(p1_idx):
if p1_idx is None:
time_sec[p1_idx] = np.nan
else:
# See comment on sp.interpolate.interp1d above.
f = sp.interpolate.interp1d(p1_ref_sec, gps_ref_sec, fill_value='extrapolate')
time_sec[p1_idx] = f(time_sec[p1_idx])

if return_scalar:
return time_sec[0]
else:
return time_sec

def convert_to_p1_time(self,
times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]],
Union[datetime, gpstime, Timestamp, float]],
assume_utc: bool = False) ->\
np.ndarray:
"""!
@brief Convert UTC or GPS timestamps to P1 time.
@param times A list of one or more timestamps to be converted, using any of the following formats:
- `datetime` - A UTC or local timezone date and time
- `gpstime` - A GPS timestamp
- A @ref fusion_engine_client.messages.timestamps.Timestamp containing GPS time or P1 time
- A @ref fusion_engine_client.messages.timestamps.MeasurementDetails containing GPS time or P1 time
- `float` - A GPS or P1 time value (in seconds)
- Note that UTC timestamps cannot be specified `float` unless `assume_utc == True`
@param assume_utc If `True`:
- For `float` values, assume values greater than the POSIX offset for 2000/1/1 are UTC timestamps in
seconds.
- For `datetime`, if `tzinfo` is not set, assume it is `timezone.utc`. Otherwise, interpret the timestamp
in the local timezone.
@return A numpy array containing P1 time values (in seconds), or `nan` if the value could not be converted.
"""
return self._convert_time(True, times, assume_utc)

def convert_to_gps_time(self,
times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]],
Union[datetime, gpstime, Timestamp, float]],
assume_utc: bool = False) ->\
np.ndarray:
"""!
@brief Convert UTC or P1 timestamps to GPS time.
@param times A list of one or more timestamps to be converted, using any of the following formats:
- `datetime` - A UTC or local timezone date and time
- `gpstime` - A GPS timestamp
- A @ref fusion_engine_client.messages.timestamps.Timestamp containing GPS time or P1 time
- A @ref fusion_engine_client.messages.timestamps.MeasurementDetails containing GPS time or P1 time
- `float` - A GPS or P1 time value (in seconds)
- Note that UTC timestamps cannot be specified `float` unless `assume_utc == True`
@param assume_utc If `True`:
- For `float` values, assume values greater than the POSIX offset for 2000/1/1 are UTC timestamps in
seconds.
- For `datetime`, if `tzinfo` is not set, assume it is `timezone.utc`. Otherwise, interpret the timestamp
in the local timezone.
@return A numpy array containing GPS time values (in seconds), or `nan` if the value could not be converted.
"""
return self._convert_time(False, times, assume_utc)

@classmethod
def time_align_data(cls, data: dict, mode: TimeAlignmentMode = TimeAlignmentMode.INSERT,
message_types: Union[list, tuple, set] = None):
Expand Down

0 comments on commit 2ee2db5

Please sign in to comment.