Skip to content

Commit

Permalink
Moved version, device ID, and event messages, and cleaned up Doxygen. (
Browse files Browse the repository at this point in the history
…#322)

# Changes
- Moved `VersionInfoMessage`, `DeviceIDMessage`, and `EventNotificationMessage` from `control.h` to `device.h`
- Reorganized `configuration.h` Doxygen to group components and improve navigation
- Removed commented-out arrays from variable-length message structs to avoid confusion
- Minor documentation wording clarifications

# Fixes
- Fixed overlapping pose sources in time scale plot
- Fixed speed estimate on wheel speed plot if multiple pose sources are present
  • Loading branch information
adamshapiro0 authored Jun 14, 2024
2 parents e83c6ed + a840f39 commit d655dc8
Show file tree
Hide file tree
Showing 7 changed files with 1,050 additions and 1,009 deletions.
5 changes: 3 additions & 2 deletions python/fusion_engine_client/analysis/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def plot_time_scale(self):
figure['layout']['yaxis2'].update(title="Interval (sec)", rangemode="tozero")

# Read the pose data to get P1 and GPS timestamps.
result = self.reader.read(message_types=[PoseMessage], source_ids=self.source_ids, **self.params)
result = self.reader.read(message_types=[PoseMessage], source_ids=self.default_source_id, **self.params)
pose_data = result[PoseMessage.MESSAGE_TYPE]

if len(pose_data.p1_time) > 0:
Expand Down Expand Up @@ -1577,7 +1577,8 @@ def _get_time_source(meas_type, data):
# direction. This will also be an absolute value, so may not match the wheel data if it is signed and the
# vehicle is going backward.
else:
result = self.reader.read(message_types=[PoseAuxMessage], source_ids=self.source_ids, **self.params)
result = self.reader.read(message_types=[PoseAuxMessage], source_ids=self.default_source_id,
**self.params)
pose_aux_data = result[PoseAuxMessage.MESSAGE_TYPE]
if len(pose_aux_data.p1_time) != 0:
self.logger.warning('Body forward velocity not available. Estimating |speed| from ENU velocity. '
Expand Down
263 changes: 0 additions & 263 deletions python/fusion_engine_client/messages/control.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import string
import struct
from typing import Sequence

from construct import (Struct, Int64ul, Int16ul, Int8ul, Padding, this, Bytes, PaddedString)

from ..utils.construct_utils import AutoEnum, construct_message_to_string
from ..utils.enum_utils import IntEnum
from .defs import *


Expand Down Expand Up @@ -380,266 +377,6 @@ def _get_known_mask_name(cls, mask) -> str:
return None


class VersionInfoMessage(MessagePayload):
"""!
@brief Software and hardware version information.
"""
MESSAGE_TYPE = MessageType.VERSION_INFO
MESSAGE_VERSION = 0

VersionInfoMessageConstruct = Struct(
"system_time_ns" / Int64ul,
"fw_version_length" / Int8ul,
"engine_version_length" / Int8ul,
"os_version_length" / Int8ul,
"rx_version_length" / Int8ul,
Padding(4),
"fw_version_str" / PaddedString(this.fw_version_length, 'utf8'),
"engine_version_str" / PaddedString(this.engine_version_length, 'utf8'),
"os_version_str" / PaddedString(this.os_version_length, 'utf8'),
"rx_version_str" / PaddedString(this.rx_version_length, 'utf8'),
)

def __init__(self):
self.system_time_ns = 0
self.fw_version_str = ""
self.engine_version_str = ""
self.os_version_str = ""
self.rx_version_str = ""

def pack(self, buffer: bytes = None, offset: int = 0, return_buffer: bool = True) -> (bytes, int):
values = dict(self.__dict__)
values['fw_version_length'] = len(self.fw_version_str)
values['engine_version_length'] = len(self.engine_version_str)
values['os_version_length'] = len(self.os_version_str)
values['rx_version_length'] = len(self.rx_version_str)
packed_data = self.VersionInfoMessageConstruct.build(values)
return PackedDataToBuffer(packed_data, buffer, offset, return_buffer)

def unpack(self, buffer: bytes, offset: int = 0, message_version: int = MessagePayload._UNSPECIFIED_VERSION) -> int:
parsed = self.VersionInfoMessageConstruct.parse(buffer[offset:])
self.__dict__.update(parsed)
return parsed._io.tell()

def __repr__(self):
result = super().__repr__()[:-1]
result += f', fw={self.fw_version_str}, engine={self.engine_version_str}, os={self.os_version_str} ' \
f'rx={self.rx_version_str}]'
return result

def __str__(self):
string = f'Version Info @ %s\n' % system_time_to_str(self.system_time_ns)
string += f' Firmware: {self.fw_version_str}\n'
string += f' FusionEngine: {self.engine_version_str}\n'
string += f' OS: {self.os_version_str}\n'
string += f' GNSS receiver: {self.rx_version_str}'
return string

def calcsize(self) -> int:
return len(self.pack())


class DeviceType(IntEnum):
UNKNOWN = 0
ATLAS = 1
LG69T_AM = 2
LG69T_AP = 3
LG69T_AH = 4
NEXAR_BEAM2K = 5,
SSR_LG69T = 6,
SSR_DESKTOP = 7,


class DeviceIDMessage(MessagePayload):
"""!
@brief Device identifiers.
"""
MESSAGE_TYPE = MessageType.DEVICE_ID
MESSAGE_VERSION = 0
_PRINTABLE_CHARS = bytes(string.printable, 'ascii')

DeviceIDMessageConstruct = Struct(
"system_time_ns" / Int64ul,
"device_type" / AutoEnum(Int8ul, DeviceType),
"hw_id_length" / Int8ul,
"user_id_length" / Int8ul,
"receiver_id_length" / Int8ul,
Padding(4),
"hw_id_data" / Bytes(this.hw_id_length),
"user_id_data" / Bytes(this.user_id_length),
"receiver_id_data" / Bytes(this.receiver_id_length),
)

def __init__(self):
self.system_time_ns = 0
self.device_type = DeviceType.UNKNOWN
self.hw_id_data = b""
self.user_id_data = b""
self.receiver_id_data = b""

def pack(self, buffer: bytes = None, offset: int = 0, return_buffer: bool = True) -> (bytes, int):
values = dict(self.__dict__)
values['hw_id_length'] = len(self.hw_id_length)
values['user_id_length'] = len(self.user_id_length)
values['receiver_id_length'] = len(self.receiver_id_length)
packed_data = self.DeviceIDMessageConstruct.build(values)
return PackedDataToBuffer(packed_data, buffer, offset, return_buffer)

def unpack(self, buffer: bytes, offset: int = 0, message_version: int = MessagePayload._UNSPECIFIED_VERSION) -> int:
parsed = self.DeviceIDMessageConstruct.parse(buffer[offset:])
self.__dict__.update(parsed)
return parsed._io.tell()

@staticmethod
def _get_str(msg: bytes) -> str:
is_printable = all(b in DeviceIDMessage._PRINTABLE_CHARS for b in msg)
if is_printable:
return msg.decode('ascii')
else:
return '[' + ' '.join(f'{b:02X}' for b in msg) + ']'

def __repr__(self):
result = super().__repr__()[:-1]
result += f'type={self.device_type}, hw={self._get_str(self.hw_id_data)}, user={self._get_str(self.user_id_data)},\
rx={self._get_str(self.receiver_id_data)}'
return result

def __str__(self):
string = f'Device ID Info @ %s\n' % system_time_to_str(self.system_time_ns)
string += f' Device Type: {self.device_type}\n'
string += f' HW ID: {self._get_str(self.hw_id_data)}\n'
string += f' User ID: {self._get_str(self.user_id_data)}\n'
string += f' Receiver ID: {self._get_str(self.receiver_id_data)}'
return string

def calcsize(self) -> int:
return len(self.pack())


class EventType(IntEnum):
LOG = 0
RESET = 1
CONFIG_CHANGE = 2
COMMAND = 3
COMMAND_RESPONSE = 4


class EventNotificationMessage(MessagePayload):
"""!
@brief Notification of a system event for logging purposes.
"""
MESSAGE_TYPE = MessageType.EVENT_NOTIFICATION
MESSAGE_VERSION = 0

EventNotificationConstruct = Struct(
"event_type" / AutoEnum(Int8ul, EventType),
Padding(3),
"system_time_ns" / Int64ul,
"event_flags" / Int64ul,
"event_description_len_bytes" / Int16ul,
Padding(2),
"event_description" / Bytes(this.event_description_len_bytes),
)

def __init__(self):
self.event_type = EventType.LOG
self.system_time_ns = 0
self.event_flags = 0
self.event_description = bytes()

def pack(self, buffer: bytes = None, offset: int = 0, return_buffer: bool = True) -> (bytes, int):
values = dict(self.__dict__)
values['event_description_len_bytes'] = len(self.event_description)
if isinstance(self.event_description, str):
values['event_description'] = self.event_description.encode('utf-8')
packed_data = self.EventNotificationConstruct.build(values)
return PackedDataToBuffer(packed_data, buffer, offset, return_buffer)

def unpack(self, buffer: bytes, offset: int = 0, message_version: int = MessagePayload._UNSPECIFIED_VERSION) -> int:
parsed = self.EventNotificationConstruct.parse(buffer[offset:])
self.__dict__.update(parsed)

# For logged FusionEngine commands/responses, the device intentionally offsets the preamble by 0x0101 from
# 0x2E31 ('.1') to 0x2F32 ('/2'). That way, the encapsulated messages within the event messages don't get
# parsed, but we can still identify them. We'll undo that offset here so the content in self.event_description
# reflects the original command/response.
if (self.event_type == EventType.COMMAND or self.event_type == EventType.COMMAND_RESPONSE) and \
len(self.event_description) >= 2 and (self.event_description[:2] == b'/2'):
self.event_description = bytearray(self.event_description)
self.event_description[0] -= 1
self.event_description[1] -= 1

return parsed._io.tell()

def __repr__(self):
result = super().__repr__()[:-1]
result += f', type={self.event_type}, flags=0x{self.event_flags:X}'
if self.event_type == EventType.COMMAND or self.event_type == EventType.COMMAND_RESPONSE:
result += f', data={len(self.event_description)} B'
else:
result += f', description={self.event_description}'
result += ']'
return result

def __str__(self):
return construct_message_to_string(
message=self, construct=self.EventNotificationConstruct,
title=f'Event Notification @ %s' % system_time_to_str(self.system_time_ns),
fields=['event_type', 'event_flags', 'event_description'],
value_to_string={
'event_flags': lambda x: '0x%016X' % x,
'event_description': lambda x: self.event_description_to_string(),
})

def calcsize(self) -> int:
return len(self.pack())

def event_description_to_string(self, max_bytes=None):
# For commands and responses, the payload should contain the binary FusionEngine message. Try to decode the
# message type.
if self.event_type == EventType.COMMAND or self.event_type == EventType.COMMAND_RESPONSE:
if len(self.event_description) >= MessageHeader.calcsize():
header = MessageHeader()
header.unpack(self.event_description, validate_crc=False, warn_on_unrecognized=False)
message_repr = f'[{header.message_type.to_string(include_value=True)}]'

message_cls = MessagePayload.get_message_class(header.message_type)
if message_cls is not None:
try:
message = message_cls()
message.unpack(buffer=self.event_description, offset=header.calcsize())
message_repr = repr(message)
except ValueError as e:
pass
else:
message_repr = '<Malformed>'

return "%s\n%s" % (message_repr,
self._populate_data_byte_string(self.event_description, max_bytes=max_bytes))
elif isinstance(self.event_description, str):
return self.event_description
else:
try:
return self.event_description.decode('utf-8')
except UnicodeDecodeError:
return repr(self.event_description)

@classmethod
def to_numpy(cls, messages: Sequence['EventNotificationMessage']):
result = {
'system_time': np.array([m.system_time_ns * 1e-9 for m in messages]),
'event_type': np.array([int(m.event_type) for m in messages], dtype=int),
'event_flags': np.array([int(m.event_flags) for m in messages], dtype=np.uint64),
}
return result

@classmethod
def _populate_data_byte_string(cls, data: bytes, max_bytes: int = None):
data_truncated = data if max_bytes is None else data[:max_bytes]
suffix = '' if len(data_truncated) == len(data) else '...'
return f'Data ({len(data)} B): {" ".join("%02X" % b for b in data_truncated)}{suffix}'


class ShutdownRequest(MessagePayload):
"""!
@brief Perform a device shutdown.
Expand Down
Loading

0 comments on commit d655dc8

Please sign in to comment.