From 715843dc6dc298c7d249cc9330510d0633617c24 Mon Sep 17 00:00:00 2001 From: Jonathan Diamond Date: Wed, 10 Apr 2024 15:30:52 -0700 Subject: [PATCH] Add DataWrapperMessage to internal Python serializations. --- .../messages/internal/__init__.py | 1 + .../messages/internal/internal_defs.py | 2 + .../messages/internal/playback_logging.py | 58 +++++++++++++++++++ 3 files changed, 61 insertions(+) create mode 100644 python/fusion_engine_client/messages/internal/playback_logging.py diff --git a/python/fusion_engine_client/messages/internal/__init__.py b/python/fusion_engine_client/messages/internal/__init__.py index 9e66fdec..e84d5c44 100644 --- a/python/fusion_engine_client/messages/internal/__init__.py +++ b/python/fusion_engine_client/messages/internal/__init__.py @@ -1,3 +1,4 @@ from .internal_defs import InternalMessageType, InternalSync from .control import * from .profiling import * +from .playback_logging import * diff --git a/python/fusion_engine_client/messages/internal/internal_defs.py b/python/fusion_engine_client/messages/internal/internal_defs.py index f278a02f..477700c1 100644 --- a/python/fusion_engine_client/messages/internal/internal_defs.py +++ b/python/fusion_engine_client/messages/internal/internal_defs.py @@ -38,6 +38,8 @@ class InternalMessageType(IntEnum): # Legacy Internal Alias LEGACY_PLATFORM_STORAGE_DATA = 20105 + DATA_WRAPPER = 20200 + # Command and control messages. DIAG_EVENT_NOTIFICATION = 23004 diff --git a/python/fusion_engine_client/messages/internal/playback_logging.py b/python/fusion_engine_client/messages/internal/playback_logging.py new file mode 100644 index 00000000..1c567a14 --- /dev/null +++ b/python/fusion_engine_client/messages/internal/playback_logging.py @@ -0,0 +1,58 @@ +from construct import (Struct, Int16ul, Int8ul, Padding, Array, GreedyBytes) + +from typing import Union + +from .internal_defs import * + + +class DataWrapperMessage(MessagePayload): + """! + @brief Wrapper for arbitrary data packets. + """ + MESSAGE_TYPE = MessageType.DATA_WRAPPER + MESSAGE_VERSION = 0 + + DataWrapperMessageConstruct = Struct( + "time_stamp_data" / Array(5, Int8ul), + Padding(1), + "data_type" / Int16ul, + "data" / GreedyBytes + ) + + def __init__(self): + self.timestamp_ms = 0 + self.data_type = 0 + self.data = bytes() + + def pack_timestamp(self) -> bytes: + return bytes([(self.timestamp_ms >> (i*8)) & 0xFF for i in range(5)]) + + def unpack_timestamp(self, time_stamp_data: bytes) -> None: + self.timestamp_ms = 0 + for i in range(5): + self.timestamp_ms += time_stamp_data[i] << (i*8) + + def pack(self, buffer: bytes = None, offset: int = 0, return_buffer: bool = True) -> Union[bytes, int]: + packed_data = self.DataWrapperMessageConstruct.build( + {"time_stamp_data": self.pack_timestamp(), "data_type": self.data_type, "data": self.data}) + return PackedDataToBuffer(packed_data, buffer, offset, return_buffer) + + def unpack(self, buffer: bytes, offset: int = 0, message_version: int = MessagePayload._UNSPECIFIED_VERSION) -> int: + parsed = self.DataWrapperMessageConstruct.parse(buffer[offset:]) + self.unpack_timestamp(parsed.time_stamp_data) + self.data_type = parsed.data_type + self.data = parsed.data + return parsed._io.tell() + + def __repr__(self): + result = super().__repr__()[:-1] + result += f', data_type={self.data_type}]' + return result + + def __str__(self): + return construct_message_to_string(message=self, construct=self.DataWrapperMessageConstruct, + value_to_string={'data': lambda x: f'{len(x)} B payload'}, + title=f'Data Wrapper') + + def calcsize(self) -> int: + return len(self.pack())