Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more fields to Python IMU example, and add wheel speed one. #266

Merged
merged 2 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions python/examples/extract_imu_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,37 @@

# Read satellite data from the file.
reader = DataLoader(input_path)
result = reader.read(message_types=[IMUOutput], show_progress=True)
result = reader.read(message_types=[IMUOutput, RawIMUOutput], show_progress=True)
imu_data = result[IMUOutput.MESSAGE_TYPE]
if len(imu_data.messages) == 0:
raw_imu_data = result[RawIMUOutput.MESSAGE_TYPE]
if len(imu_data.messages) == 0 and len(raw_imu_data.messages) == 0:
logger.warning('No IMU data found in log file.')
sys.exit(2)

# Generate a CSV file.
path = os.path.join(output_dir, 'imu_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), Accel X (m/s^2), Y, Z, Gyro X (rad/s), Y, Z\n')
for message in imu_data.messages:
f.write('%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f\n' %
(float(message.p1_time), *message.accel_mps2, *message.gyro_rps))
# Generate a CSV file for corrected IMU data.
if len(imu_data.messages) != 0:
path = os.path.join(output_dir, 'imu_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), GPS Time (sec), Accel X (m/s^2), Y, Z, Gyro X (rad/s), Y, Z\n')
for message in imu_data.messages:
gps_time = reader.convert_to_gps_time(message.p1_time)
f.write('%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f\n' %
(float(message.p1_time), float(gps_time), *message.accel_mps2, *message.gyro_rps))
else:
logger.info("No corrected IMU data.")

# Generate a CSV file for raw IMU data.
if len(raw_imu_data.messages) != 0:
path = os.path.join(output_dir, 'raw_imu_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), GPS Time (sec), Accel X (m/s^2), Y, Z, Gyro X (rad/s), Y, Z, Temp(C)\n')
for message in raw_imu_data.messages:
gps_time = reader.convert_to_gps_time(message.p1_time)
f.write('%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %.6f\n' %
(float(message.p1_time), float(gps_time), *message.accel_mps2, *message.gyro_rps, message.temperature_degc))
else:
logger.info("No raw IMU data.")

logger.info("Output stored in '%s'." % os.path.abspath(output_dir))
99 changes: 99 additions & 0 deletions python/examples/extract_wheel_speed_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python3

import os
import sys

# Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports
# if this application is being run directly out of the repository and is not installed as a pip package.
root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, root_dir)

from fusion_engine_client.analysis.data_loader import DataLoader
from fusion_engine_client.messages.core import *
from fusion_engine_client.utils import trace as logging
from fusion_engine_client.utils.log import locate_log, DEFAULT_LOG_BASE_DIR
from fusion_engine_client.utils.argument_parser import ArgumentParser

if __name__ == "__main__":
# Parse arguments.
parser = ArgumentParser(description="""\
Extract wheel speed data.
""")
parser.add_argument('--log-base-dir', metavar='DIR', default=DEFAULT_LOG_BASE_DIR,
help="The base directory containing FusionEngine logs to be searched if a log pattern is "
"specified.")
parser.add_argument('log',
help="The log to be read. May be one of:\n"
"- The path to a .p1log file or a file containing FusionEngine messages and other "
"content\n"
"- The path to a FusionEngine log directory\n"
"- A pattern matching a FusionEngine log directory under the specified base directory "
"(see find_fusion_engine_log() and --log-base-dir)")
options = parser.parse_args()

# Configure logging.
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(name)s:%(lineno)d - %(message)s', stream=sys.stdout)
logger = logging.getLogger('point_one.fusion_engine')
logger.setLevel(logging.INFO)

# Locate the input file and set the output directory.
input_path, output_dir, log_id = locate_log(options.log, return_output_dir=True, return_log_id=True,
log_base_dir=options.log_base_dir)
if input_path is None:
sys.exit(1)

if log_id is None:
logger.info('Loading %s.' % os.path.basename(input_path))
else:
logger.info('Loading %s from log %s.' % (os.path.basename(input_path), log_id))

# Read satellite data from the file.
reader = DataLoader(input_path)
result = reader.read(message_types=[WheelSpeedOutput, RawWheelSpeedOutput], show_progress=True)
axlan marked this conversation as resolved.
Show resolved Hide resolved
wheel_speed_data = result[WheelSpeedOutput.MESSAGE_TYPE]
raw_wheel_speed_data = result[RawWheelSpeedOutput.MESSAGE_TYPE]
if len(wheel_speed_data.messages) == 0 and len(raw_wheel_speed_data.messages) == 0:
logger.warning('No wheel speed data found in log file.')
sys.exit(2)

# Generate a CSV file for corrected wheel speed data.
if len(wheel_speed_data.messages) != 0:
path = os.path.join(output_dir, 'wheel_speed_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), GPS Time (sec), Front Left Speed (m/s), Front Right Speed (m/s), Back Left Speed (m/s), Back Right Speed (m/s), Gear\n')
for message in wheel_speed_data.messages:
gps_time = reader.convert_to_gps_time(message.p1_time)
f.write(
'%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %d\n' %
(float(message.p1_time),
float(gps_time),
message.front_left_speed_mps,
message.front_right_speed_mps,
message.back_left_speed_mps,
message.back_right_speed_mps,
message.gear))
else:
logger.info("No corrected wheel speed data.")

# Generate a CSV file for raw wheel speed data.
if len(raw_wheel_speed_data.messages) != 0:
path = os.path.join(output_dir, 'raw_wheel_speed_data.csv')
logger.info("Generating '%s'." % path)
with open(path, 'w') as f:
f.write('P1 Time (sec), GPS Time (sec), Front Left Speed (m/s), Front Right Speed (m/s), Back Left Speed (m/s), Back Right Speed (m/s), Gear\n')
for message in wheel_speed_data.messages:
gps_time = reader.convert_to_gps_time(message.p1_time)
f.write(
'%.6f, %.6f, %.6f, %.6f, %.6f, %.6f, %d\n' %
(float(message.p1_time),
float(gps_time),
message.front_left_speed_mps,
message.front_right_speed_mps,
message.back_left_speed_mps,
message.back_right_speed_mps,
message.gear))
else:
logger.info("No raw wheel speed data.")

logger.info("Output stored in '%s'." % os.path.abspath(output_dir))
120 changes: 91 additions & 29 deletions python/fusion_engine_client/analysis/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,17 +295,17 @@ def read(self, *args, **kwargs) \
return self._read(*args, **kwargs)

def _read(self,
message_types: Union[Iterable[MessageType], MessageType] = None,
time_range: TimeRange = None,
show_progress: bool = False,
ignore_cache: bool = False, disable_index_generation: bool = False,
max_messages: int = None, max_bytes: int = None,
message_types: Union[Iterable[MessageType], MessageType] = None,
time_range: TimeRange = None,
show_progress: bool = False,
ignore_cache: bool = False, disable_index_generation: bool = False,
max_messages: int = None, max_bytes: int = None,
require_p1_time: bool = False, require_system_time: bool = False,
return_in_order: bool = False, return_bytes: bool = False, return_message_index: bool = False,
return_numpy: bool = False, keep_messages: bool = False, remove_nan_times: bool = True,
time_align: TimeAlignmentMode = TimeAlignmentMode.NONE,
aligned_message_types: Union[list, tuple, set] = None,
quiet: bool = False) \
return_in_order: bool = False, return_bytes: bool = False, return_message_index: bool = False,
return_numpy: bool = False, keep_messages: bool = False, remove_nan_times: bool = True,
time_align: TimeAlignmentMode = TimeAlignmentMode.NONE,
aligned_message_types: Union[list, tuple, set] = None,
quiet: bool = False) \
-> Union[Dict[MessageType, MessageData], MessageData]:
if quiet:
logger = SilentLogger(self.logger.name)
Expand Down Expand Up @@ -462,7 +462,7 @@ def _read(self,
# not system time. The read_next() call below will apply this condition and only return messages with valid
# system time.
if (max_messages is not None and self.reader.have_index() and
not (require_system_time and system_time_messages_requested)):
not (require_system_time and system_time_messages_requested)):
reader_max_messages_applied = True
if max_messages >= 0:
self.reader.filter_in_place(slice(None, max_messages))
Expand Down Expand Up @@ -649,14 +649,15 @@ def get_log_reader(self) -> MixedLogReader:
def get_input_path(self):
return self.reader.input_file.name

def convert_to_p1_time(self,
times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]],
Union[datetime, gpstime, Timestamp, float]],
assume_utc: bool = False) ->\
def _convert_time(self, gps_to_p1,
axlan marked this conversation as resolved.
Show resolved Hide resolved
times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]],
Union[datetime, gpstime, Timestamp, float]],
assume_utc: bool = False) ->\
np.ndarray:
"""!
@brief Convert UTC or GPS timestamps to P1 time.
@brief Convert UTC or GPS timestamps to P1 time or Convert UTC or P1 timestamps to GPS time.

@param gps_to_p1 If `True`, convert to P1 time. If `False`, convert to GPS time.
@param times A list of one or more timestamps to be converted, using any of the following formats:
- `datetime` - A UTC or local timezone date and time
- `gpstime` - A GPS timestamp
Expand All @@ -670,7 +671,7 @@ def convert_to_p1_time(self,
- For `datetime`, if `tzinfo` is not set, assume it is `timezone.utc`. Otherwise, interpret the timestamp
in the local timezone.

@return A numpy array containing P1 time values (in seconds), or `nan` if the value could not be converted.
@return A numpy array containing time values (in seconds), or `nan` if the value could not be converted.
"""
# Load pose messages, which contain the relationship between P1 and GPS time. Omit any NAN values from the
# reference timestamps.
Expand Down Expand Up @@ -700,6 +701,7 @@ def convert_to_p1_time(self,

# First, convert all UTC times and all timestamp objects to GPS time or P1 values in seconds.
timezone_warn_issued = False

def _to_gps_or_p1(value):
nonlocal timezone_warn_issued
if isinstance(value, gpstime):
Expand Down Expand Up @@ -739,24 +741,84 @@ def _to_gps_or_p1(value):
time_sec = np.array((_to_gps_or_p1(times),))

# Now, find all values that are GPS time (i.e., big enough that we assume they're not P1 times already) and
# convert them to P1 time.
gps_idx = is_gps_time(time_sec)
if np.any(gps_idx):
if p1_ref_sec is None:
time_sec[gps_idx] = np.nan
else:
# The relationship between P1 time and GPS time should not change rapidly since P1 time is rate-steered
# to align with GPS time. As a result, it should be safe to extrapolate between gaps in P1 or GPS times,
# as long as the gaps aren't extremely large. NumPy's interp() function does not extrapolate, so we
# instead use SciPy's function.
f = sp.interpolate.interp1d(gps_ref_sec, p1_ref_sec, fill_value='extrapolate')
time_sec[gps_idx] = f(time_sec[gps_idx])
# convert them to P1 time or vice versa.
if gps_to_p1:
gps_idx = is_gps_time(time_sec)
if np.any(gps_idx):
if p1_ref_sec is None:
time_sec[gps_idx] = np.nan
else:
# The relationship between P1 time and GPS time should not change rapidly since P1 time is rate-steered
# to align with GPS time. As a result, it should be safe to extrapolate between gaps in P1 or GPS times,
# as long as the gaps aren't extremely large. NumPy's interp() function does not extrapolate, so we
# instead use SciPy's function.
f = sp.interpolate.interp1d(gps_ref_sec, p1_ref_sec, fill_value='extrapolate')
time_sec[gps_idx] = f(time_sec[gps_idx])
else:
p1_idx = not is_gps_time(time_sec)
axlan marked this conversation as resolved.
Show resolved Hide resolved
if np.any(p1_idx):
if p1_idx is None:
time_sec[p1_idx] = np.nan
else:
# See comment on sp.interpolate.interp1d above.
f = sp.interpolate.interp1d(p1_ref_sec, gps_ref_sec, fill_value='extrapolate')
time_sec[p1_idx] = f(time_sec[p1_idx])

if return_scalar:
return time_sec[0]
else:
return time_sec

def convert_to_p1_time(self,
times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]],
Union[datetime, gpstime, Timestamp, float]],
assume_utc: bool = False) ->\
np.ndarray:
"""!
@brief Convert UTC or GPS timestamps to P1 time.

@param times A list of one or more timestamps to be converted, using any of the following formats:
- `datetime` - A UTC or local timezone date and time
- `gpstime` - A GPS timestamp
- A @ref fusion_engine_client.messages.timestamps.Timestamp containing GPS time or P1 time
- A @ref fusion_engine_client.messages.timestamps.MeasurementDetails containing GPS time or P1 time
- `float` - A GPS or P1 time value (in seconds)
- Note that UTC timestamps cannot be specified `float` unless `assume_utc == True`
@param assume_utc If `True`:
- For `float` values, assume values greater than the POSIX offset for 2000/1/1 are UTC timestamps in
seconds.
- For `datetime`, if `tzinfo` is not set, assume it is `timezone.utc`. Otherwise, interpret the timestamp
in the local timezone.

@return A numpy array containing P1 time values (in seconds), or `nan` if the value could not be converted.
"""
return self._convert_time(True, times, assume_utc)
axlan marked this conversation as resolved.
Show resolved Hide resolved

def convert_to_gps_time(self,
times: Union[Iterable[Union[datetime, gpstime, Timestamp, float]],
Union[datetime, gpstime, Timestamp, float]],
assume_utc: bool = False) ->\
np.ndarray:
"""!
@brief Convert UTC or P1 timestamps to GPS time.

@param times A list of one or more timestamps to be converted, using any of the following formats:
- `datetime` - A UTC or local timezone date and time
- `gpstime` - A GPS timestamp
- A @ref fusion_engine_client.messages.timestamps.Timestamp containing GPS time or P1 time
- A @ref fusion_engine_client.messages.timestamps.MeasurementDetails containing GPS time or P1 time
- `float` - A GPS or P1 time value (in seconds)
- Note that UTC timestamps cannot be specified `float` unless `assume_utc == True`
@param assume_utc If `True`:
- For `float` values, assume values greater than the POSIX offset for 2000/1/1 are UTC timestamps in
seconds.
- For `datetime`, if `tzinfo` is not set, assume it is `timezone.utc`. Otherwise, interpret the timestamp
in the local timezone.

@return A numpy array containing GPS time values (in seconds), or `nan` if the value could not be converted.
"""
return self._convert_time(False, times, assume_utc)

@classmethod
def time_align_data(cls, data: dict, mode: TimeAlignmentMode = TimeAlignmentMode.INSERT,
message_types: Union[list, tuple, set] = None):
Expand Down
Loading