diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c2a25c..8e6153b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). Renamed `PusFileSeqCountProvider` to `CcsdsFileSeqCountProvider` but keep old alias. +## Added + +- New `SpacePacketHeader.tc` and `SpacePacketHeader.tm` constructors which set the packet + type correctly + # [v0.24.2] 2024-10-15 ## Fixed diff --git a/README.md b/README.md index 849c881..7998c11 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,8 @@ coverage run -m pytest # Documentation -The documentation is built with Sphinx +The documentation is built with Sphinx and new documentation should be written using the +[NumPy format](https://www.sphinx-doc.org/en/master/usage/extensions/example_numpy.html#example-numpy). Install the required dependencies first: diff --git a/docs/examples.rst b/docs/examples.rst index ea53994..9eecd66 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -1,40 +1,15 @@ Examples ========= -ECSS PUS packets ------------------ - -The following example shows how to generate PUS packets using the PUS ping telecommand and a -PUS ping telemetry reply without a timestamp. - -.. testcode:: pus - - from spacepackets.ecss.tc import PusTc - from spacepackets.ecss.tm import PusTm - - ping_cmd = PusTc(service=17, subservice=1, apid=0x01) - cmd_as_bytes = ping_cmd.pack() - print(f"Ping telecommand [17,1] (hex): [{cmd_as_bytes.hex(sep=',')}]") - - ping_reply = PusTm(service=17, subservice=2, apid=0x01, timestamp=bytes()) - tm_as_bytes = ping_reply.pack() - print(f"Ping reply [17,2] (hex): [{tm_as_bytes.hex(sep=',')}]") - -Output: - -.. testoutput:: pus - - Ping telecommand [17,1] (hex): [18,01,c0,00,00,06,2f,11,01,00,00,16,1d] - Ping reply [17,2] (hex): [08,01,c0,00,00,08,20,11,02,00,00,00,00,86,d7] - CCSDS Space Packet ------------------- -The following example shows how to generate a space packet header: +The following example shows how to generate a space packet header or a custom telecommand +based on CCSDS space packets. .. testcode:: ccsds - from spacepackets.ccsds.spacepacket import SpHeader, PacketType + from spacepackets.ccsds.spacepacket import SpHeader, PacketType, CCSDS_HEADER_LEN spacepacket_header = SpHeader( packet_type=PacketType.TC, apid=0x01, seq_count=0, data_len=0 @@ -42,11 +17,22 @@ The following example shows how to generate a space packet header: header_as_bytes = spacepacket_header.pack() print(f"Space packet header (hex): [{header_as_bytes.hex(sep=',')}]") + # Create CCSDS space packet telecommand with custom data. + custom_data = bytes([1, 2, 3, 4]) + tc_header = SpHeader.tc(apid=2, seq_count=5, data_len=0) + tc_header.set_data_len_from_packet_len(CCSDS_HEADER_LEN + len(custom_data)) + telecommand = tc_header.pack() + telecommand.extend(custom_data) + print(f"Space packet telecommand (hex): [{telecommand.hex(sep=',')}]") + + + Output: .. testoutput:: ccsds Space packet header (hex): [10,01,c0,00,00,00] + Space packet telecommand (hex): [10,02,c0,05,00,03,01,02,03,04] CFDP Packets ----------------- @@ -119,6 +105,32 @@ Output --- PDU 2 RAW --- 0x[24,00,0a,00,01,00,02,04,00,1c,29,1c,a3,00,00,00,0c] +ECSS PUS packets +----------------- + +The following example shows how to generate PUS packets using the PUS ping telecommand and a +PUS ping telemetry reply without a timestamp. + +.. testcode:: pus + + from spacepackets.ecss.tc import PusTc + from spacepackets.ecss.tm import PusTm + + ping_cmd = PusTc(service=17, subservice=1, apid=0x01) + cmd_as_bytes = ping_cmd.pack() + print(f"Ping telecommand [17,1] (hex): [{cmd_as_bytes.hex(sep=',')}]") + + ping_reply = PusTm(service=17, subservice=2, apid=0x01, timestamp=bytes()) + tm_as_bytes = ping_reply.pack() + print(f"Ping reply [17,2] (hex): [{tm_as_bytes.hex(sep=',')}]") + +Output: + +.. testoutput:: pus + + Ping telecommand [17,1] (hex): [18,01,c0,00,00,06,2f,11,01,00,00,16,1d] + Ping reply [17,2] (hex): [08,01,c0,00,00,08,20,11,02,00,00,00,00,86,d7] + USLP Frames ------------------- diff --git a/spacepackets/ccsds/spacepacket.py b/spacepackets/ccsds/spacepacket.py index 809d5d1..c14ac0f 100644 --- a/spacepackets/ccsds/spacepacket.py +++ b/spacepackets/ccsds/spacepacket.py @@ -172,8 +172,7 @@ def pack(self) -> bytearray: class SpacePacketHeader(AbstractSpacePacket): - """This class encapsulates the space packet header. - Packet reference: Blue Book CCSDS 133.0-B-2""" + """This class encapsulates the space packet header. Packet reference: Blue Book CCSDS 133.0-B-2""" def __init__( self, @@ -187,6 +186,10 @@ def __init__( ): """Create a space packet header with the given field parameters. + The data length field can also be set from the total packet length by using the + :py:meth:`set_data_len_from_packet_len` method after construction of the space packet + header object. + >>> sph = SpacePacketHeader(packet_type=PacketType.TC, apid=0x42, seq_count=0, data_len=12) >>> hex(sph.apid) '0x42' @@ -201,17 +204,28 @@ def __init__( >>> sph.packet_seq_control PacketSeqCtrl(seq_flags=, seq_count=0) - :param packet_type: 0 for Telemetery, 1 for Telecommands - :param apid: Application Process ID, should not be larger - than 11 bits, deciaml 2074 or hex 0x7ff - :param seq_count: Source sequence counter, should not be larger than 0x3fff or - decimal 16383 - :param data_len: Contains a length count C that equals one fewer than the length of the - packet data field. Should not be larger than 65535 bytes - :param ccsds_version: - :param sec_header_flag: Secondary header flag, or False by default. - :param seq_flags: - :raises ValueError: On invalid parameters + Parameters + ----------- + packet_type: PacketType + 0 for Telemetery, 1 for Telecommands + apid: int + Application Process ID, should not be larger than 11 bits, deciaml 2074 or hex 0x7ff + seq_count: int + Source sequence counter, should not be larger than 0x3fff or decimal 16383 + data_len: int + Contains a length count C that equals one fewer than the length of the packet data + field. Should not be larger than 65535 bytes + sec_header_flag: bool + Secondary header flag, or False by default. + seq_flags: + Sequence flags, defaults to unsegmented. + ccsds_version: int + Version of the CCSDS packet. Defaults to 0b000 + + Raises + -------- + ValueError + On invalid parameters """ if data_len > pow(2, 16) - 1 or data_len < 0: raise ValueError( @@ -225,6 +239,54 @@ def __init__( self._psc = PacketSeqCtrl(seq_flags=seq_flags, seq_count=seq_count) self.data_len = data_len + @classmethod + def tc( + cls, + apid: int, + seq_count: int, + data_len: int, + sec_header_flag: bool = False, + seq_flags: SequenceFlags = SequenceFlags.UNSEGMENTED, + ccsds_version: int = 0b000, + ) -> SpacePacketHeader: + """Create a space packet header with the given field parameters for a telecommand packet. + Calls the default constructor :py:meth:`SpacePacketHeader` with the packet type + set to :py:class:`PacketType.TC`. + """ + return cls( + packet_type=PacketType.TC, + apid=apid, + seq_count=seq_count, + data_len=data_len, + sec_header_flag=sec_header_flag, + seq_flags=seq_flags, + ccsds_version=ccsds_version, + ) + + @classmethod + def tm( + cls, + apid: int, + seq_count: int, + data_len: int, + sec_header_flag: bool = False, + seq_flags: SequenceFlags = SequenceFlags.UNSEGMENTED, + ccsds_version: int = 0b000, + ) -> SpacePacketHeader: + """Create a space packet header with the given field parameters for a telemetry packet. + Calls the default constructor :py:meth:`SpacePacketHeader` with the packet type + set to :py:class:`PacketType.TM`. + """ + return cls( + packet_type=PacketType.TM, + apid=apid, + seq_count=seq_count, + data_len=data_len, + sec_header_flag=sec_header_flag, + seq_flags=seq_flags, + ccsds_version=ccsds_version, + ) + @classmethod def from_composite_fields( cls, @@ -289,6 +351,19 @@ def sec_header_flag(self, value): def seq_count(self): return self._psc.seq_count + def set_data_len_from_packet_len(self, packet_len: int): + """Sets the data length field from the given total packet length. The total packet length + must be at least 7 bytes. + + Raises + ------- + ValueError + The passed packet length is smaller than the minimum expected 7 bytes. + """ + if packet_len < CCSDS_HEADER_LEN + 1: + raise ValueError("specified total packet length too short") + self.data_len = packet_len - CCSDS_HEADER_LEN - 1 + @seq_count.setter def seq_count(self, seq_cnt): self._psc.seq_count = seq_cnt @@ -313,7 +388,12 @@ def apid(self, apid): def packet_len(self) -> int: """Retrieve the full space packet size when packed. - :return: Size of the TM packet based on the space packet header data length field. + The full packet size is the data length field plus the :py:const:`CCSDS_HEADER_LEN` of 6 + bytes plus one. + + Returns + -------- + Size of the TM packet based on the space packet header data length field. """ return CCSDS_HEADER_LEN + self.data_len + 1 @@ -383,8 +463,12 @@ def __repr__(self): ) def pack(self) -> bytearray: - """Pack the raw byte representation of the space packet - :raises ValueError: Mandatory fields were not supplied properly""" + """Pack the raw byte representation of the space packet. + + Raises + -------- + ValueError + Mandatory fields were not supplied properly""" packet = self.sp_header.pack() if self.sp_header.sec_header_flag: if self.sec_header is None: diff --git a/tests/ccsds/test_space_packet.py b/tests/ccsds/test_space_packet.py index 25b7a7d..6343527 100644 --- a/tests/ccsds/test_space_packet.py +++ b/tests/ccsds/test_space_packet.py @@ -35,6 +35,44 @@ def test_basic(self): self.assertEqual(self.sp_header.data_len, 0x16) self.assertEqual(self.sp_header.packet_type, PacketType.TC) + def test_tm_header(self): + sp_header = SpacePacketHeader.tm(apid=0x03, data_len=16, seq_count=35) + self.assertEqual(sp_header.apid, 0x03) + self.assertEqual(sp_header.seq_flags, SequenceFlags.UNSEGMENTED) + self.assertEqual(sp_header.ccsds_version, 0b000) + self.assertEqual(sp_header.packet_id, PacketId(PacketType.TM, False, 0x03)) + self.assertEqual( + sp_header.packet_seq_control, + PacketSeqCtrl(SequenceFlags.UNSEGMENTED, 35), + ) + self.assertEqual(sp_header.seq_count, 35) + self.assertEqual(sp_header.data_len, 16) + self.assertEqual(sp_header.packet_type, PacketType.TM) + + def test_len_field_setter(self): + self.sp_header.set_data_len_from_packet_len(10) + # Total packet lenght minus the header lenght minus 1 + self.assertEqual(self.sp_header.data_len, 3) + + def test_invalid_len_field_setter_call(self): + for idx in range(7): + with self.assertRaises(ValueError): + self.sp_header.set_data_len_from_packet_len(idx) + + def test_tc_header(self): + sp_header = SpacePacketHeader.tc(apid=0x7FF, data_len=16, seq_count=0x3FFF) + self.assertEqual(sp_header.apid, 0x7FF) + self.assertEqual(sp_header.seq_flags, SequenceFlags.UNSEGMENTED) + self.assertEqual(sp_header.ccsds_version, 0b000) + self.assertEqual(sp_header.packet_id, PacketId(PacketType.TC, False, 0x7FF)) + self.assertEqual( + sp_header.packet_seq_control, + PacketSeqCtrl(SequenceFlags.UNSEGMENTED, 0x3FFF), + ) + self.assertEqual(sp_header.seq_count, 0x3FFF) + self.assertEqual(sp_header.data_len, 16) + self.assertEqual(sp_header.packet_type, PacketType.TC) + def test_raw_output(self): raw_output = self.sp_header.pack() self.assertEqual(