From e51be34ed17cf7ef3eb835206a78be3661fcf5fe Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Tue, 23 Apr 2024 01:08:18 +0200 Subject: [PATCH] Re-work ECSS TM time handling --- CHANGELOG.md | 16 ++++++ docs/examples.rst | 2 +- spacepackets/ccsds/spacepacket.py | 1 + spacepackets/ccsds/time/cds.py | 55 +++++++++++++----- spacepackets/ccsds/time/common.py | 2 +- spacepackets/ecss/pus_17_test.py | 22 +++----- spacepackets/ecss/pus_1_verification.py | 72 ++++++++++++------------ spacepackets/ecss/tm.py | 75 ++++++++++++------------- tests/ccsds/test_sp_parser.py | 4 +- tests/ccsds/test_time.py | 2 +- tests/ecss/test_pus_tm.py | 42 ++++++-------- tests/ecss/test_srv1.py | 43 ++++++-------- tests/ecss/test_srv17.py | 15 +++-- tests/test_pus_verificator.py | 20 +++---- 14 files changed, 196 insertions(+), 175 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f1ada5..1dca2a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,22 @@ and this project adheres to [Semantic Versioning](http://semver.org/). # [unreleased] +# [v0.24.0] 2024-04-23 + +## Changed + +- ECSS PUS telemetry time handling is now more generic and low level: Constructors expect + a simple `bytes` type while unpackers/readers expect the length of the timestamp. Determining + or knowing the size of the timestamp is now the task of the user. +- `CdsShortTimestamp.from_now` renamed to `now`. + +## Added + + +- `spacepackets.ecss.tm.PUS_TM_TIMESTAMP_OFFSET` constant which can be used as a look-ahead to + determine the timestamp length from a raw PUS TM packet. +- `spacepackets.ccsds.CCSDS_HEADER_LEN` constant. + # [v0.23.1] 2024-04-22 ## Added diff --git a/docs/examples.rst b/docs/examples.rst index 619a56f..ea53994 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -16,7 +16,7 @@ PUS ping telemetry reply without a timestamp. cmd_as_bytes = ping_cmd.pack() print(f"Ping telecommand [17,1] (hex): [{cmd_as_bytes.hex(sep=',')}]") - ping_reply = PusTm(service=17, subservice=2, apid=0x01, time_provider=None) + ping_reply = PusTm(service=17, subservice=2, apid=0x01, timestamp=bytes()) tm_as_bytes = ping_reply.pack() print(f"Ping reply [17,2] (hex): [{tm_as_bytes.hex(sep=',')}]") diff --git a/spacepackets/ccsds/spacepacket.py b/spacepackets/ccsds/spacepacket.py index c508fc9..be299b8 100644 --- a/spacepackets/ccsds/spacepacket.py +++ b/spacepackets/ccsds/spacepacket.py @@ -11,6 +11,7 @@ from spacepackets.exceptions import BytesTooShortError SPACE_PACKET_HEADER_SIZE: Final = 6 +CCSDS_HEADER_LEN: Final = SPACE_PACKET_HEADER_SIZE SEQ_FLAG_MASK = 0xC000 APID_MASK = 0x7FF PACKET_ID_MASK = 0x1FFF diff --git a/spacepackets/ccsds/time/cds.py b/spacepackets/ccsds/time/cds.py index 6a5fd7b..09ab7ca 100644 --- a/spacepackets/ccsds/time/cds.py +++ b/spacepackets/ccsds/time/cds.py @@ -84,11 +84,11 @@ def _calculate_unix_seconds(self): def _calculate_date_time(self): if self._unix_seconds < 0: - self._date_time = datetime.datetime( + self._datetime = datetime.datetime( 1970, 1, 1, tzinfo=datetime.timezone.utc ) + datetime.timedelta(seconds=self._unix_seconds) else: - self._date_time = datetime.datetime.fromtimestamp( + self._datetime = datetime.datetime.fromtimestamp( self._unix_seconds, tz=datetime.timezone.utc ) @@ -108,7 +108,7 @@ def ccsds_days(self) -> int: def ms_of_day(self) -> int: return self._ms_of_day - def pack(self) -> bytearray: + def pack(self) -> bytes: cds_packet = bytearray() cds_packet.extend(self.__p_field) cds_packet.extend(struct.pack("!H", self._ccsds_days)) @@ -169,12 +169,14 @@ def __repr__(self): ) def __str__(self): - return f"Date {self._date_time!r} with representation {self!r}" + return f"Date {self._datetime!r} with representation {self!r}" - def __eq__(self, other: CdsShortTimestamp): - return (self.ccsds_days == other.ccsds_days) and ( - self.ms_of_day == other.ms_of_day - ) + def __eq__(self, other: object): + if isinstance(other, CdsShortTimestamp): + return (self.ccsds_days == other.ccsds_days) and ( + self.ms_of_day == other.ms_of_day + ) + return False def __add__(self, timedelta: datetime.timedelta): """Allows adding timedelta to the CDS timestamp provider. @@ -200,10 +202,20 @@ def __add__(self, timedelta: datetime.timedelta): return self @classmethod - def from_now(cls) -> CdsShortTimestamp: + def now(cls) -> CdsShortTimestamp: """Returns a seven byte CDS short timestamp with the current time.""" return cls.from_date_time(datetime.datetime.now(tz=datetime.timezone.utc)) + @classmethod + @deprecation.deprecated( + deprecated_in="0.24.0", + current_version=get_version(), + details="use now instead", + ) + def from_now(cls) -> CdsShortTimestamp: + """Returns a seven byte CDS short timestamp with the current time.""" + return cls.now() + @classmethod @deprecation.deprecated( deprecated_in="0.14.0rc1", @@ -211,12 +223,12 @@ def from_now(cls) -> CdsShortTimestamp: details="use from_now instead", ) def from_current_time(cls) -> CdsShortTimestamp: - return cls.from_now() + return cls.now() @classmethod - def from_date_time(cls, dt: datetime.datetime) -> CdsShortTimestamp: + def from_datetime(cls, dt: datetime.datetime) -> CdsShortTimestamp: instance = cls.empty(False) - instance._date_time = dt + instance._datetime = dt instance._unix_seconds = dt.timestamp() full_unix_secs = int(math.floor(instance._unix_seconds)) subsec_millis = int((instance._unix_seconds - full_unix_secs) * 1000) @@ -226,6 +238,15 @@ def from_date_time(cls, dt: datetime.datetime) -> CdsShortTimestamp: instance._ccsds_days = convert_unix_days_to_ccsds_days(unix_days) return instance + @classmethod + @deprecation.deprecated( + deprecated_in="0.24.0", + current_version=get_version(), + details="use from_datetime instead", + ) + def from_date_time(cls, dt: datetime.datetime) -> CdsShortTimestamp: + return cls.from_datetime(dt) + @staticmethod def ms_of_today(seconds_since_epoch: Optional[float] = None): if seconds_since_epoch is None: @@ -238,5 +259,13 @@ def ms_of_today(seconds_since_epoch: Optional[float] = None): def as_unix_seconds(self) -> float: return self._unix_seconds + def as_datetime(self) -> datetime.datetime: + return self._datetime + + @deprecation.deprecated( + deprecated_in="0.24.0", + current_version=get_version(), + details="use as_datetime instead", + ) def as_date_time(self) -> datetime.datetime: - return self._date_time + return self.as_datetime() diff --git a/spacepackets/ccsds/time/common.py b/spacepackets/ccsds/time/common.py index 83b6c30..e1b8f24 100644 --- a/spacepackets/ccsds/time/common.py +++ b/spacepackets/ccsds/time/common.py @@ -72,7 +72,7 @@ def len(self) -> int: return self.len_packed @abstractmethod - def pack(self) -> bytearray: + def pack(self) -> bytes: pass @abstractmethod diff --git a/spacepackets/ecss/pus_17_test.py b/spacepackets/ecss/pus_17_test.py index b4ff5ac..febc762 100644 --- a/spacepackets/ecss/pus_17_test.py +++ b/spacepackets/ecss/pus_17_test.py @@ -1,10 +1,8 @@ from __future__ import annotations import enum -from typing import Optional from spacepackets import SpacePacketHeader from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl -from spacepackets.ccsds.time import CcsdsTimeProvider from spacepackets.ecss.conf import FETCH_GLOBAL_APID from spacepackets.ecss.defs import PusService from spacepackets.ecss.tm import PusTm, AbstractPusTm @@ -19,7 +17,7 @@ class Service17Tm(AbstractPusTm): def __init__( self, subservice: int, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ssc: int = 0, source_data: bytes = bytes(), apid: int = FETCH_GLOBAL_APID, @@ -30,7 +28,7 @@ def __init__( self.pus_tm = PusTm( service=PusService.S17_TEST, subservice=subservice, - time_provider=time_provider, + timestamp=timestamp, seq_count=ssc, source_data=source_data, apid=apid, @@ -60,8 +58,8 @@ def service(self) -> int: return self.pus_tm.service @property - def time_provider(self) -> Optional[CcsdsTimeProvider]: - return self.pus_tm.time_provider + def timestamp(self) -> bytes: + return self.pus_tm.timestamp @property def subservice(self) -> int: @@ -75,19 +73,17 @@ def pack(self) -> bytearray: return self.pus_tm.pack() @classmethod - def __empty(cls, time_provider: Optional[CcsdsTimeProvider]) -> Service17Tm: - return cls(subservice=0, time_provider=time_provider) + def __empty(cls) -> Service17Tm: + return cls(subservice=0, timestamp=bytes()) @classmethod - def unpack( - cls, data: bytes, time_reader: Optional[CcsdsTimeProvider] - ) -> Service17Tm: + def unpack(cls, data: bytes, timestamp_len: int) -> Service17Tm: """ :raises BytesTooShortError: Passed bytestream too short. :raises ValueError: Unsupported PUS version. :raises InvalidTmCrc16: Invalid CRC16. """ - service_17_tm = cls.__empty(time_provider=time_reader) - service_17_tm.pus_tm = PusTm.unpack(data=data, time_reader=time_reader) + service_17_tm = cls.__empty() + service_17_tm.pus_tm = PusTm.unpack(data=data, timestamp_len=timestamp_len) return service_17_tm diff --git a/spacepackets/ecss/pus_1_verification.py b/spacepackets/ecss/pus_1_verification.py index e957310..01d0759 100644 --- a/spacepackets/ecss/pus_1_verification.py +++ b/spacepackets/ecss/pus_1_verification.py @@ -8,7 +8,6 @@ from spacepackets.ccsds import SpacePacketHeader from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl -from spacepackets.ccsds.time import CcsdsTimeProvider from spacepackets.ecss import PusTc from spacepackets.ecss.conf import FETCH_GLOBAL_APID from spacepackets.ecss.defs import PusService @@ -67,7 +66,7 @@ def __repr__(self): @dataclass class UnpackParams: - time_reader: Optional[CcsdsTimeProvider] + timestamp_len: int bytes_step_id: int = 1 bytes_err_code: int = 1 @@ -124,7 +123,7 @@ class Service1Tm(AbstractPusTm): def __init__( self, subservice: Subservice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, verif_params: Optional[VerificationParams] = None, seq_count: int = 0, apid: int = FETCH_GLOBAL_APID, @@ -139,7 +138,7 @@ def __init__( self.pus_tm = PusTm( service=PusService.S1_VERIFICATION, subservice=subservice, - time_provider=time_provider, + timestamp=timestamp, seq_count=seq_count, apid=apid, packet_version=packet_version, @@ -154,12 +153,12 @@ def pack(self) -> bytearray: return self.pus_tm.pack() @classmethod - def __empty(cls, time_provider: Optional[CcsdsTimeProvider]) -> Service1Tm: - return cls(subservice=Subservice.INVALID, time_provider=time_provider) + def __empty(cls) -> Service1Tm: + return cls(subservice=Subservice.INVALID, timestamp=bytes()) @classmethod def from_tm(cls, tm: PusTm, params: UnpackParams) -> Service1Tm: - service_1_tm = cls.__empty(params.time_reader) + service_1_tm = cls.__empty() service_1_tm.pus_tm = tm cls._unpack_raw_tm(service_1_tm, params) return service_1_tm @@ -175,14 +174,16 @@ def unpack(cls, data: bytes, params: UnpackParams) -> Service1Tm: :raises TmSourceDataTooShortError: TM source data too short. :return: """ - service_1_tm = cls.__empty(params.time_reader) - service_1_tm.pus_tm = PusTm.unpack(data=data, time_reader=params.time_reader) + service_1_tm = cls.__empty() + service_1_tm.pus_tm = PusTm.unpack( + data=data, timestamp_len=params.timestamp_len + ) cls._unpack_raw_tm(service_1_tm, params) return service_1_tm @property - def time_provider(self) -> Optional[CcsdsTimeProvider]: - return self.pus_tm.time_provider + def timestamp(self) -> bytes: + return self.pus_tm.timestamp @property def ccsds_version(self) -> int: @@ -277,6 +278,7 @@ def tc_req_id(self, value): @property def error_code(self) -> Optional[ErrorCode]: if self.has_failure_notice: + assert self._verif_params.failure_notice is not None return self._verif_params.failure_notice.code else: return None @@ -293,26 +295,26 @@ def step_id(self) -> Optional[StepId]: """Retrieve the step number. Returns NONE if this packet does not have a step ID""" return self._verif_params.step_id - def __eq__(self, other: Service1Tm): - return (self.pus_tm == other.pus_tm) and ( - self._verif_params == other._verif_params - ) + def __eq__(self, other: object): + if isinstance(other, Service1Tm): + return (self.pus_tm == other.pus_tm) and ( + self._verif_params == other._verif_params + ) + return False -def create_acceptance_success_tm( - pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider] -) -> Service1Tm: +def create_acceptance_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_ACCEPTANCE_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), - time_provider=time_provider, + timestamp=timestamp, ) def create_acceptance_failure_tm( pus_tc: PusTc, failure_notice: FailureNotice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_ACCEPTANCE_FAILURE, @@ -320,24 +322,22 @@ def create_acceptance_failure_tm( req_id=RequestId.from_sp_header(pus_tc.sp_header), failure_notice=failure_notice, ), - time_provider=time_provider, + timestamp=timestamp, ) -def create_start_success_tm( - pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider] -) -> Service1Tm: +def create_start_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_START_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), - time_provider=time_provider, + timestamp=timestamp, ) def create_start_failure_tm( pus_tc: PusTc, failure_notice: FailureNotice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_START_FAILURE, @@ -345,21 +345,21 @@ def create_start_failure_tm( req_id=RequestId.from_sp_header(pus_tc.sp_header), failure_notice=failure_notice, ), - time_provider=time_provider, + timestamp=timestamp, ) def create_step_success_tm( pus_tc: PusTc, step_id: PacketFieldEnum, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_STEP_SUCCESS, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), step_id=step_id ), - time_provider=time_provider, + timestamp=timestamp, ) @@ -367,7 +367,7 @@ def create_step_failure_tm( pus_tc: PusTc, step_id: PacketFieldEnum, failure_notice: FailureNotice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_STEP_FAILURE, @@ -376,24 +376,22 @@ def create_step_failure_tm( step_id=step_id, failure_notice=failure_notice, ), - time_provider=time_provider, + timestamp=timestamp, ) -def create_completion_success_tm( - pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider] -) -> Service1Tm: +def create_completion_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_COMPLETION_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), - time_provider=time_provider, + timestamp=timestamp, ) def create_completion_failure_tm( pus_tc: PusTc, failure_notice: FailureNotice, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, ) -> Service1Tm: return Service1Tm( subservice=Subservice.TM_COMPLETION_FAILURE, @@ -401,5 +399,5 @@ def create_completion_failure_tm( req_id=RequestId.from_sp_header(pus_tc.sp_header), failure_notice=failure_notice, ), - time_provider=time_provider, + timestamp=timestamp, ) diff --git a/spacepackets/ecss/tm.py b/spacepackets/ecss/tm.py index 3efd122..a96ea8c 100644 --- a/spacepackets/ecss/tm.py +++ b/spacepackets/ecss/tm.py @@ -12,20 +12,20 @@ from .exceptions import TmSrcDataTooShortError # noqa # re-export from spacepackets.version import get_version -from spacepackets.ccsds.time.common import read_p_field from spacepackets.exceptions import BytesTooShortError from spacepackets.util import PrintFormats, get_printable_data_string from spacepackets.ccsds.spacepacket import ( PacketSeqCtrl, SpacePacketHeader, SPACE_PACKET_HEADER_SIZE, + CCSDS_HEADER_LEN, get_total_space_packet_len_from_len_field, PacketType, SpacePacket, AbstractSpacePacket, SequenceFlags, ) -from spacepackets.ccsds.time import CdsShortTimestamp, CcsdsTimeProvider +from spacepackets.ccsds.time import CdsShortTimestamp from spacepackets.ecss.conf import ( PusVersion, get_default_tm_apid, @@ -57,7 +57,7 @@ def service(self) -> int: @property @abstractmethod - def time_provider(self) -> Optional[CcsdsTimeProvider]: + def timestamp(self) -> bytes: pass @property @@ -81,7 +81,7 @@ def __init__( self, service: int, subservice: int, - time_provider: Optional[CcsdsTimeProvider], + timestamp: bytes, message_counter: int, dest_id: int = 0, spacecraft_time_ref: int = 0, @@ -109,14 +109,14 @@ def __init__( ) self.message_counter = message_counter self.dest_id = dest_id - self.time_provider = time_provider + self.timestamp = timestamp @classmethod def __empty(cls) -> PusTmSecondaryHeader: return PusTmSecondaryHeader( service=0, subservice=0, - time_provider=CdsShortTimestamp.from_now(), + timestamp=bytes(), message_counter=0, ) @@ -127,20 +127,16 @@ def pack(self) -> bytearray: secondary_header.append(self.subservice) secondary_header.extend(struct.pack("!H", self.message_counter)) secondary_header.extend(struct.pack("!H", self.dest_id)) - if self.time_provider: - secondary_header.extend(self.time_provider.pack()) + secondary_header.extend(self.timestamp) return secondary_header @classmethod - def unpack( - cls, data: bytes, time_reader: Optional[CcsdsTimeProvider] - ) -> PusTmSecondaryHeader: + def unpack(cls, data: bytes, timestamp_len: int) -> PusTmSecondaryHeader: """Unpack the PUS TM secondary header from the raw packet starting at the header index. :param data: Raw data. Please note that the passed buffer should start where the actual header start is. - :param time_reader: Generic time reader which knows the time stamp size and how to interpret - the raw timestamp + :param timestamp_len: Expected timestamp length. :raises ValueError: bytearray too short or PUS version missmatch. :return: """ @@ -170,22 +166,13 @@ def unpack( "!H", data[current_idx : current_idx + 2] )[0] current_idx += 2 - # If other time formats are supported in the future, this information can be used - # to unpack the correct time code - time_code_id = read_p_field(data[current_idx]) - if time_code_id: - pass - if time_reader: - time_reader.read_from_raw( - data[current_idx : current_idx + time_reader.len_packed] - ) - secondary_header.time_provider = time_reader + secondary_header.timestamp = data[current_idx : current_idx + timestamp_len] return secondary_header def __repr__(self): return ( f"{self.__class__.__name__}(service={self.service!r}," - f" subservice={self.subservice!r}, time={self.time_provider!r}," + f" subservice={self.subservice!r}, time={self.timestamp!r}," f" message_counter={self.message_counter!r}, dest_id={self.dest_id!r}," f" spacecraft_time_ref={self.spacecraft_time_ref!r}," f" pus_version={self.pus_version!r})" @@ -199,11 +186,14 @@ def __eq__(self, other: object): @property def header_size(self) -> int: base_len = 7 - if self.time_provider: - base_len += self.time_provider.len_packed + if self.timestamp: + base_len += len(self.timestamp) return base_len +PUS_TM_TIMESTAMP_OFFSET = CCSDS_HEADER_LEN + PusTmSecondaryHeader.MIN_LEN + + class InvalidTmCrc16(Exception): def __init__(self, tm: PusTm): self.tm = tm @@ -216,10 +206,15 @@ class PusTm(AbstractPusTm): or to deserialize TM packets from a raw byte stream using the :py:meth:`unpack` method. This implementation only supports PUS C. + Deserialization of PUS telemetry requires the timestamp length to be known. If the size of the + timestamp is variable but can be determined from the data, a look-ahead should be performed on + the raw data. The :py:const:`PUS_TM_TIMESTAMP_OFFSET` (13) can be used to do this, assuming + that the timestamp length can be extracted from the timestamp itself. + The following doc example cuts off the timestamp (7 byte CDS Short) and the CRC16 from the ping packet because those change regularly. - >>> ping_tm = PusTm(service=17, subservice=2, seq_count=5, apid=0x01, time_provider=CdsShortTimestamp.empty()) # noqa + >>> ping_tm = PusTm(service=17, subservice=2, seq_count=5, apid=0x01, timestamp=CdsShortTimestamp.now().pack()) # noqa >>> ping_tm.service 17 >>> ping_tm.subservice @@ -236,8 +231,8 @@ def __init__( self, service: int, subservice: int, - time_provider: Optional[CcsdsTimeProvider], - source_data: bytes = bytes([]), + timestamp: bytes, + source_data: bytes = bytes(), seq_count: int = 0, apid: int = FETCH_GLOBAL_APID, message_counter: int = 0, @@ -248,9 +243,7 @@ def __init__( if apid == FETCH_GLOBAL_APID: apid = get_default_tm_apid() self._source_data = source_data - len_stamp = 0 - if time_provider: - len_stamp += time_provider.len_packed + len_stamp = len(timestamp) data_length = self.data_len_from_src_len_timestamp_len( timestamp_len=len_stamp, source_data_len=len(self._source_data), @@ -270,13 +263,15 @@ def __init__( message_counter=message_counter, dest_id=destination_id, spacecraft_time_ref=space_time_ref, - time_provider=time_provider, + timestamp=timestamp, ) self._crc16: Optional[bytes] = None @classmethod def empty(cls) -> PusTm: - return PusTm(service=0, subservice=0, time_provider=CdsShortTimestamp.empty()) + return PusTm( + service=0, subservice=0, timestamp=CdsShortTimestamp.empty().pack() + ) def pack(self, recalc_crc: bool = True) -> bytearray: """Serializes the packet into a raw bytearray. @@ -303,7 +298,7 @@ def calc_crc(self): self._crc16 = struct.pack("!H", crc.crcValue) @classmethod - def unpack(cls, data: bytes, time_reader: Optional[CcsdsTimeProvider]) -> PusTm: + def unpack(cls, data: bytes, timestamp_len: int) -> PusTm: """Attempts to construct a generic PusTelemetry class given a raw bytearray. :param data: Raw bytes containing the PUS telemetry packet. @@ -324,7 +319,7 @@ def unpack(cls, data: bytes, time_reader: Optional[CcsdsTimeProvider]) -> PusTm: raise BytesTooShortError(expected_packet_len, len(data)) pus_tm.pus_tm_sec_header = PusTmSecondaryHeader.unpack( data=data[SPACE_PACKET_HEADER_SIZE:], - time_reader=time_reader, + timestamp_len=timestamp_len, ) if ( expected_packet_len @@ -417,8 +412,8 @@ def ccsds_version(self) -> int: return self.space_packet_header.ccsds_version @property - def time_provider(self) -> Optional[CcsdsTimeProvider]: - return self.pus_tm_sec_header.time_provider + def timestamp(self) -> bytes: + return self.pus_tm_sec_header.timestamp @property def service(self) -> int: @@ -449,8 +444,8 @@ def tm_data(self) -> bytes: def tm_data(self, data: bytes): self._source_data = data stamp_len = 0 - if self.pus_tm_sec_header.time_provider: - stamp_len += self.pus_tm_sec_header.time_provider.len_packed + if self.pus_tm_sec_header.timestamp: + stamp_len += len(self.pus_tm_sec_header.timestamp) self.space_packet_header.data_len = self.data_len_from_src_len_timestamp_len( stamp_len, len(data) ) diff --git a/tests/ccsds/test_sp_parser.py b/tests/ccsds/test_sp_parser.py index f6060b1..19831b5 100644 --- a/tests/ccsds/test_sp_parser.py +++ b/tests/ccsds/test_sp_parser.py @@ -9,7 +9,7 @@ class TestSpParser(TestCase): def setUp(self) -> None: self.tm_packet = PusTm( - service=17, subservice=2, time_provider=CdsShortTimestamp.empty() + service=17, subservice=2, timestamp=CdsShortTimestamp.empty().pack() ) self.packet_ids = (self.tm_packet.packet_id,) self.tm_packet_raw = self.tm_packet.pack() @@ -30,7 +30,7 @@ def test_sp_parser_crap_data_is_skipped(self): service=8, subservice=128, source_data=bytearray(64), - time_provider=CdsShortTimestamp.empty(), + timestamp=CdsShortTimestamp.empty().pack(), ) other_larger_packet_raw = other_larger_packet.pack() self.packet_deque.append(self.tm_packet_raw) diff --git a/tests/ccsds/test_time.py b/tests/ccsds/test_time.py index 4450984..3a0b378 100644 --- a/tests/ccsds/test_time.py +++ b/tests/ccsds/test_time.py @@ -82,7 +82,7 @@ def test_dt_is_utc(self): self.assertEqual(dt.tzinfo.utcoffset(dt), datetime.timedelta(0)) def test_compare_from_now_against_manually_created(self): - stamp = CdsShortTimestamp.from_now() + stamp = CdsShortTimestamp.now() ccsds_days = stamp.ccsds_days ms_of_day = stamp.ms_of_day new_stamp = CdsShortTimestamp(ccsds_days, ms_of_day) diff --git a/tests/ecss/test_pus_tm.py b/tests/ecss/test_pus_tm.py index 6e6ee8a..3b3f84d 100755 --- a/tests/ecss/test_pus_tm.py +++ b/tests/ecss/test_pus_tm.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import struct from unittest import TestCase -from unittest.mock import MagicMock from crcmod.predefined import mkPredefinedCrcFun @@ -25,21 +24,18 @@ from spacepackets.ecss.pus_1_verification import ( RequestId, ) -from .common import generic_time_provider_mock, TEST_STAMP +from .common import TEST_STAMP class TestTelemetry(TestCase): def setUp(self) -> None: - self.time_stamp_provider = MagicMock(spec=CdsShortTimestamp) - - self.time_stamp_provider = generic_time_provider_mock(TEST_STAMP) self.ping_reply = PusTm( service=17, subservice=2, apid=0x123, seq_count=0x234, source_data=bytearray(), - time_provider=self.time_stamp_provider, + timestamp=TEST_STAMP, ) self.ping_reply_raw = self.ping_reply.pack() @@ -72,9 +68,9 @@ def test_no_timestamp(self): apid=0x123, seq_count=0x234, source_data=bytearray(), - time_provider=None, + timestamp=bytes(), ) - self.assertEqual(self.ping_reply.pus_tm_sec_header.time_provider, None) + self.assertEqual(self.ping_reply.pus_tm_sec_header.timestamp, bytes()) tm_raw = self.ping_reply.pack() self.assertEqual(self.ping_reply.packet_len, 15) self.assertEqual(len(tm_raw), 15) @@ -115,7 +111,8 @@ def test_state_setting(self): self.ping_reply.tm_data = source_data self.assertEqual(self.ping_reply.apid, 0x22) self.assertTrue(isinstance(self.ping_reply.crc16, bytes)) - self.assertTrue(len(self.ping_reply.crc16), 2) + assert self.ping_reply.crc16 is not None + self.assertEqual(len(self.ping_reply.crc16), 2) self.assertEqual( self.ping_reply.pus_tm_sec_header.pus_version, PusVersion.PUS_C ) @@ -162,6 +159,7 @@ def test_full_printout(self): sp_header_as_str = raw_space_packet_header.hex(sep=",") raw_secondary_packet_header = self.ping_reply.pus_tm_sec_header.pack() second_header_as_str = raw_secondary_packet_header.hex(sep=",") + assert crc16 is not None expected_printout = ( f"hex [{sp_header_as_str},{second_header_as_str},{crc16.hex(sep=',')}]" ) @@ -185,7 +183,7 @@ def test_unpack(self): self.ping_reply_raw = self.ping_reply.pack() # self.time_stamp_provider.read_from_raw = MagicMock() pus_17_tm_unpacked = PusTm.unpack( - data=self.ping_reply_raw, time_reader=self.time_stamp_provider + data=self.ping_reply_raw, timestamp_len=len(TEST_STAMP) ) self.assertEqual(self.ping_reply.crc16, pus_17_tm_unpacked.crc16) self.assertEqual(pus_17_tm_unpacked, self.ping_reply) @@ -194,10 +192,6 @@ def test_unpack(self): self.assertEqual( pus_17_tm_unpacked.pus_tm_sec_header.pus_version, PusVersion.PUS_C ) - pus_17_tm_unpacked.pus_tm_sec_header.time_provider.read_from_raw.assert_called_once() - pus_17_tm_unpacked.pus_tm_sec_header.time_provider.read_from_raw.assert_called_with( - TEST_STAMP - ) self.assertEqual(pus_17_tm_unpacked.tm_data, source_data) self.assertEqual(pus_17_tm_unpacked.packet_id.raw(), 0x0822) @@ -209,7 +203,7 @@ def test_unpack(self): ValueError, PusTm.unpack, self.ping_reply_raw, - CdsShortTimestamp.empty(), + len(TEST_STAMP), ) self.ping_reply_raw[4] = 0xFF self.ping_reply_raw[5] = 0xFF @@ -229,12 +223,10 @@ def test_unpack(self): self.ping_reply_raw[4] = (incorrect_size & 0xFF00) >> 8 self.ping_reply_raw[5] = incorrect_size & 0xFF with self.assertRaises(InvalidTmCrc16): - PusTm.unpack(data=self.ping_reply_raw, time_reader=self.time_stamp_provider) + PusTm.unpack(data=self.ping_reply_raw, timestamp_len=len(TEST_STAMP)) def test_calc_crc(self): - new_ping_tm = PusTm( - service=17, subservice=2, time_provider=self.time_stamp_provider - ) + new_ping_tm = PusTm(service=17, subservice=2, timestamp=TEST_STAMP) self.assertIsNone(new_ping_tm.crc16) new_ping_tm.calc_crc() self.assertIsNotNone(new_ping_tm.crc16) @@ -242,14 +234,12 @@ def test_calc_crc(self): self.assertEqual(len(new_ping_tm.crc16), 2) def test_crc_always_calced_if_none(self): - new_ping_tm = PusTm( - service=17, subservice=2, time_provider=self.time_stamp_provider - ) + new_ping_tm = PusTm(service=17, subservice=2, timestamp=TEST_STAMP) self.assertIsNone(new_ping_tm.crc16) # Should still calculate CRC tc_raw = new_ping_tm.pack(recalc_crc=False) # Will throw invalid CRC16 error if CRC was not calculated - tc_unpacked = PusTm.unpack(tc_raw, time_reader=self.time_stamp_provider) + tc_unpacked = PusTm.unpack(tc_raw, timestamp_len=len(TEST_STAMP)) self.assertEqual(tc_unpacked, new_ping_tm) def test_faulty_unpack(self): @@ -287,7 +277,7 @@ def test_faulty_sec_header(self): PusTmSecondaryHeader( service=0, subservice=0, - time_provider=CdsShortTimestamp.from_now(), + timestamp=CdsShortTimestamp.now().pack(), message_counter=129302, ) @@ -295,7 +285,9 @@ def test_invalid_raw_size(self): # Set length field invalid self.ping_reply_raw[4] = 0x00 self.ping_reply_raw[5] = 0x00 - self.assertRaises(ValueError, PusTm.unpack, self.ping_reply_raw, None) + self.assertRaises( + ValueError, PusTm.unpack, self.ping_reply_raw, len(TEST_STAMP) + ) def test_req_id(self): tc_packet_id = PacketId(ptype=PacketType.TC, sec_header_flag=True, apid=0x42) diff --git a/tests/ecss/test_srv1.py b/tests/ecss/test_srv1.py index 32c6fdf..cd0b6f7 100644 --- a/tests/ecss/test_srv1.py +++ b/tests/ecss/test_srv1.py @@ -21,14 +21,13 @@ ErrorCode, StepId, ) -from tests.ecss.common import generic_time_provider_mock, TEST_STAMP +from tests.ecss.common import TEST_STAMP class Service1TmTest(TestCase): def setUp(self) -> None: ping_tc = PusTc(service=17, subservice=1) - self.srv1_tm = create_start_success_tm(ping_tc, None) - self.time_stamp_provider = generic_time_provider_mock(TEST_STAMP) + self.srv1_tm = create_start_success_tm(ping_tc, TEST_STAMP) def test_failure_notice_invalid_creation(self): with self.assertRaises(ValueError): @@ -38,14 +37,16 @@ def test_basic(self): self.assertEqual( self.srv1_tm.sp_header, self.srv1_tm.pus_tm.space_packet_header ) - self.assertEqual(self.srv1_tm.time_provider, None) + self.assertEqual(self.srv1_tm.timestamp, TEST_STAMP) self.assertEqual(self.srv1_tm.is_step_reply, False) self.assertEqual(self.srv1_tm.service, 1) self.assertEqual(self.srv1_tm.subservice, 3) self.assertEqual(self.srv1_tm.error_code, None) def test_other_ctor(self): - srv1_tm = Service1Tm.from_tm(self.srv1_tm.pus_tm, UnpackParams(None)) + srv1_tm = Service1Tm.from_tm( + self.srv1_tm.pus_tm, UnpackParams(timestamp_len=len(TEST_STAMP)) + ) self.assertEqual(srv1_tm, self.srv1_tm) def test_failure_notice(self): @@ -95,20 +96,14 @@ def _generic_test_srv_1_success(self, subservice: Subservice): helper_created = None step_id = None if subservice == Subservice.TM_ACCEPTANCE_SUCCESS: - helper_created = create_acceptance_success_tm( - pus_tc, self.time_stamp_provider - ) + helper_created = create_acceptance_success_tm(pus_tc, TEST_STAMP) elif subservice == Subservice.TM_START_SUCCESS: - helper_created = create_start_success_tm(pus_tc, self.time_stamp_provider) + helper_created = create_start_success_tm(pus_tc, TEST_STAMP) elif subservice == Subservice.TM_STEP_SUCCESS: step_id = PacketFieldEnum.with_byte_size(1, 4) - helper_created = create_step_success_tm( - pus_tc, step_id, self.time_stamp_provider - ) + helper_created = create_step_success_tm(pus_tc, step_id, TEST_STAMP) elif subservice == Subservice.TM_COMPLETION_SUCCESS: - helper_created = create_completion_success_tm( - pus_tc, time_provider=self.time_stamp_provider - ) + helper_created = create_completion_success_tm(pus_tc, timestamp=TEST_STAMP) self._test_srv_1_success_tm( pus_tc, Service1Tm( @@ -117,7 +112,7 @@ def _generic_test_srv_1_success(self, subservice: Subservice): req_id=RequestId(pus_tc.packet_id, pus_tc.packet_seq_control), step_id=step_id, ), - time_provider=CdsShortTimestamp.empty(), + timestamp=CdsShortTimestamp.empty().pack(), ), subservice, ) @@ -148,7 +143,7 @@ def _test_srv_1_success_tm( self.assertEqual(srv_1_tm.tc_req_id.tc_psc, pus_tc.packet_seq_control) srv_1_tm_raw = srv_1_tm.pack() srv_1_tm_unpacked = Service1Tm.unpack( - srv_1_tm_raw, UnpackParams(self.time_stamp_provider) + srv_1_tm_raw, UnpackParams(len(TEST_STAMP)) ) self.assertEqual( srv_1_tm_unpacked.tc_req_id.tc_packet_id.raw(), pus_tc.packet_id.raw() @@ -168,23 +163,21 @@ def _generic_test_srv_1_failure(self, subservice: Subservice): step_id = None if subservice == Subservice.TM_ACCEPTANCE_FAILURE: helper_created = create_acceptance_failure_tm( - pus_tc, failure_notice, self.time_stamp_provider + pus_tc, failure_notice, TEST_STAMP ) elif subservice == Subservice.TM_START_FAILURE: - helper_created = create_start_failure_tm( - pus_tc, failure_notice, self.time_stamp_provider - ) + helper_created = create_start_failure_tm(pus_tc, failure_notice, TEST_STAMP) elif subservice == Subservice.TM_STEP_FAILURE: step_id = PacketFieldEnum.with_byte_size(2, 12) helper_created = create_step_failure_tm( pus_tc, failure_notice=failure_notice, step_id=step_id, - time_provider=self.time_stamp_provider, + timestamp=TEST_STAMP, ) elif subservice == Subservice.TM_COMPLETION_FAILURE: helper_created = create_completion_failure_tm( - pus_tc, failure_notice, self.time_stamp_provider + pus_tc, failure_notice, TEST_STAMP ) self._test_srv_1_failure_comparison_helper( pus_tc, @@ -195,7 +188,7 @@ def _generic_test_srv_1_failure(self, subservice: Subservice): failure_notice=failure_notice, step_id=step_id, ), - time_provider=self.time_stamp_provider, + timestamp=TEST_STAMP, ), subservice=subservice, failure_notice=failure_notice, @@ -222,7 +215,7 @@ def _test_srv_1_failure_comparison_helper( self.assertEqual(srv_1_tm.tc_req_id.tc_packet_id, pus_tc.packet_id) self.assertEqual(srv_1_tm.tc_req_id.tc_psc, pus_tc.packet_seq_control) srv_1_tm_raw = srv_1_tm.pack() - unpack_params = UnpackParams(self.time_stamp_provider) + unpack_params = UnpackParams(len(TEST_STAMP)) if failure_notice is not None: unpack_params.bytes_err_code = failure_notice.code.len() if step_id is not None: diff --git a/tests/ecss/test_srv17.py b/tests/ecss/test_srv17.py index 1ca2102..34414bb 100644 --- a/tests/ecss/test_srv17.py +++ b/tests/ecss/test_srv17.py @@ -9,7 +9,7 @@ class TestSrv17Tm(TestCase): def setUp(self) -> None: - self.srv17_tm = Service17Tm(subservice=1, time_provider=None) + self.srv17_tm = Service17Tm(subservice=1, timestamp=bytes()) self.srv17_tm.pus_tm.apid = 0x72 self.time_stamp_provider = generic_time_provider_mock(TEST_STAMP) @@ -19,7 +19,7 @@ def test_state(self): ) self.assertEqual(self.srv17_tm.service, PusService.S17_TEST) self.assertEqual(self.srv17_tm.subservice, 1) - self.assertEqual(self.srv17_tm.time_provider, None) + self.assertEqual(self.srv17_tm.timestamp, bytes()) self.assertEqual(self.srv17_tm.apid, 0x72) self.assertEqual(self.srv17_tm.seq_count, 0) self.assertEqual(self.srv17_tm.seq_flags, SequenceFlags.UNSEGMENTED) @@ -30,17 +30,20 @@ def test_state(self): def test_other_state(self): srv17_with_data = Service17Tm( subservice=128, - time_provider=CdsShortTimestamp(0, 0), + timestamp=CdsShortTimestamp(0, 0).pack(), source_data=bytes([0, 1, 2]), ) self.assertEqual(srv17_with_data.source_data, bytes([0, 1, 2])) - self.assertEqual(srv17_with_data.time_provider, CdsShortTimestamp(0, 0)) + + self.assertEqual( + CdsShortTimestamp.unpack(srv17_with_data.timestamp), CdsShortTimestamp(0, 0) + ) def test_service_17_tm(self): - srv_17_tm = Service17Tm(subservice=2, time_provider=self.time_stamp_provider) + srv_17_tm = Service17Tm(subservice=2, timestamp=TEST_STAMP) self.assertEqual(srv_17_tm.pus_tm.subservice, 2) srv_17_tm_raw = srv_17_tm.pack() srv_17_tm_unpacked = Service17Tm.unpack( - data=srv_17_tm_raw, time_reader=self.time_stamp_provider + data=srv_17_tm_raw, timestamp_len=len(TEST_STAMP) ) self.assertEqual(srv_17_tm_unpacked.pus_tm.subservice, 2) diff --git a/tests/test_pus_verificator.py b/tests/test_pus_verificator.py index 882e05f..08a6c24 100644 --- a/tests/test_pus_verificator.py +++ b/tests/test_pus_verificator.py @@ -28,17 +28,15 @@ class SuccessSet: def __init__(self, pus_tc: PusTc): self.pus_tc = pus_tc self.req_id = RequestId.from_pus_tc(pus_tc) - self.time_reader = CdsShortTimestamp.empty() - self.acc_suc_tm = create_acceptance_success_tm(pus_tc, self.time_reader) - self.sta_suc_tm = create_start_success_tm(pus_tc, self.time_reader) + self.empty_stamp = CdsShortTimestamp.empty() + self.acc_suc_tm = create_acceptance_success_tm(pus_tc, self.empty_stamp.pack()) + self.sta_suc_tm = create_start_success_tm(pus_tc, self.empty_stamp.pack()) self.ste_suc_tm = create_step_success_tm( pus_tc=pus_tc, step_id=StepId.with_byte_size(1, 1), - time_provider=self.time_reader, - ) - self.fin_suc_tm = create_completion_success_tm( - pus_tc, CdsShortTimestamp.empty() + timestamp=self.empty_stamp.pack(), ) + self.fin_suc_tm = create_completion_success_tm(pus_tc, self.empty_stamp.pack()) class FailureSet: @@ -46,21 +44,21 @@ def __init__(self, pus_tc: PusTc, failure_notice: FailureNotice): self.suc_set = SuccessSet(pus_tc) self.failure_notice = failure_notice self.acc_fail_tm = create_acceptance_failure_tm( - pus_tc, self.failure_notice, CdsShortTimestamp.empty() + pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() ) self.sta_fail_tm = create_start_failure_tm( - pus_tc, self.failure_notice, CdsShortTimestamp.empty() + pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() ) self.ste_fail_tm = create_step_failure_tm( pus_tc, failure_notice=self.failure_notice, step_id=StepId.with_byte_size(1, 1), - time_provider=CdsShortTimestamp.empty(), + timestamp=CdsShortTimestamp.empty().pack(), ) self.fin_fail_tm = create_completion_failure_tm( failure_notice=self.failure_notice, pus_tc=self.suc_set.pus_tc, - time_provider=CdsShortTimestamp.empty(), + timestamp=CdsShortTimestamp.empty().pack(), ) @property