diff --git a/spacepackets/cfdp/conf.py b/spacepackets/cfdp/conf.py index ee4292c..efb64a2 100644 --- a/spacepackets/cfdp/conf.py +++ b/spacepackets/cfdp/conf.py @@ -58,6 +58,14 @@ def default(cls): crc_flag=CrcFlag.NO_CRC, ) + def header_len_for_cfg(self) -> int: + return ( + 4 + + len(self.source_entity_id) + + len(self.dest_entity_id) + + len(self.transaction_seq_num) + ) + class CfdpDict(TypedDict): source_dest_entity_ids: Tuple[bytes, bytes] diff --git a/spacepackets/cfdp/pdu/nak.py b/spacepackets/cfdp/pdu/nak.py index 4387319..dc0cdf5 100644 --- a/spacepackets/cfdp/pdu/nak.py +++ b/spacepackets/cfdp/pdu/nak.py @@ -16,7 +16,56 @@ class NakPdu(AbstractFileDirectiveBase): - """Encapsulates the NAK file directive PDU, see CCSDS 727.0-B-5 p.84""" + """Encapsulates the NAK file directive PDU, see CCSDS 727.0-B-5 p.84. + + Please note that there is a distinction between a NAK PDU, and a NAK sequence which might + consist of multiple NAK PDUs. Generally, each NAK sequence will only consist of one NAK PDU, + but might consist of multiple ones if one NAK PDU is not sufficient to send all missing + segment requests while staying below a maximum allowed packet size for one PDU. + + The start of scope of a NAK sequence (not individual PDU) can be one of the following: + + - 0 if this is the first NAK sequence. + - 0 if the event which causes an issuance of the NAK PDU is the NAK timer expiry. + - The end-of-scope of the previous NAK sequence for this file transaction. + + The end of scope of a NAK sequence (not individual PDU) can be one of the following: + + - The whole file size if an EOF (No Error) PDU was already receiver. + - The current reception progress at the time of the event that causes issuance of a NAK + sequence. + + The start of scope for an individual NAK PDU is the start of scope of the NAK sequence for + the first NAK PDU inside the sequence. For every other NAK PDU in the sequence, it is the + end-of-scope of the previous NAK PDU. + + The end of scope for an individual NAK PDU is the end of scope of the NAK sequence for the + last NAK PDU inside the sequence. For every other NAK PDU in the sequence, it is the + end offset of the NAK PDU's last segment request. + + Examples + --------- + + Re-request metadata NAK PDU: + + >>> nak_pdu = NakPdu(PduConfig.empty(), 0, 0, [(0, 0)]) + >>> nak_pdu.start_of_scope + 0 + >>> nak_pdu.end_of_scope + 0 + >>> nak_pdu.segment_requests + [(0, 0)] + + Re-request two file segments NAK PDU: + + >>> nak_pdu = NakPdu(PduConfig.empty(), 0, 640, [(0, 128), (512, 640)]) + >>> nak_pdu.start_of_scope + 0 + >>> nak_pdu.end_of_scope + 640 + >>> nak_pdu.segment_requests + [(0, 128), (512, 640)] + """ def __init__( self, @@ -25,11 +74,15 @@ def __init__( end_of_scope: int, segment_requests: Optional[List[Tuple[int, int]]] = None, ): - """Create a NAK PDU object instance + """Create a NAK PDU object instance. - :param start_of_scope: - :param end_of_scope: - :param pdu_conf: Common PDU configuration + :param pdu_conf: Common PDU configuration. + :param start_of_scope: The value of this parameter depends on the start of the scope + of the whole NAK sequence and on the position of this PDU inside the NAK sequence. + See the class documentation for more details. + :param end_of_scope: The value of this parameter depends on the end of the scope + of the whole NAK sequence and on the position of this PDU inside the NAK sequence. + See the class documentation for more details. :param segment_requests: A list of segment request pair tuples, where the first entry of list element is the start offset and the second entry is the end offset. If the start and end offset are both 0, the metadata is re-requested. @@ -52,6 +105,36 @@ def __empty(cls) -> NakPdu: start_of_scope=0, end_of_scope=0, segment_requests=[], pdu_conf=empty_conf ) + @staticmethod + def get_max_seg_reqs_for_max_packet_size_and_pdu_cfg( + max_packet_size: int, pdu_conf: PduConfig + ): + """This function can be used to retrieve the maximum amount of segment request given + a PDU configuration to stay below a certain maximum packet size. This is useful + to calculate how many NAK PDUs are required inside a NAK sequence.""" + base_decrement = pdu_conf.header_len_for_cfg() - 1 + if pdu_conf.crc_flag: + base_decrement += 2 + if pdu_conf.file_flag == LargeFileFlag.NORMAL: + base_decrement += 8 + elif pdu_conf.file_flag == LargeFileFlag.LARGE: + base_decrement += 16 + if max_packet_size < base_decrement: + raise ValueError("maximum packet size too small to hold base packet") + max_packet_size -= base_decrement + if pdu_conf.file_flag == LargeFileFlag.NORMAL: + return max_packet_size // 8 + elif pdu_conf.file_flag == LargeFileFlag.LARGE: + return max_packet_size // 16 + raise ValueError("Invalid large file flag argument") + + def get_max_seg_reqs_for_max_packet_size(self, max_packet_size: int) -> int: + """Member method which forwards to :py:meth:`get_max_seg_reqs_for_max_packet_size_and_pdu_cfg`, + passing the internal PDU configuration field.""" + return NakPdu.get_max_seg_reqs_for_max_packet_size_and_pdu_cfg( + max_packet_size, self.pdu_file_directive.pdu_conf + ) + @property def directive_type(self) -> DirectiveType: return DirectiveType.NAK_PDU diff --git a/spacepackets/util.py b/spacepackets/util.py index 6da350e..2b6fcb4 100644 --- a/spacepackets/util.py +++ b/spacepackets/util.py @@ -1,7 +1,7 @@ from __future__ import annotations import enum import struct -from typing import Union +from typing import Tuple, Union class PrintFormats(enum.IntEnum): @@ -173,7 +173,7 @@ def _verify_int_value(self, val: int): f" {pow(2, self.byte_len * 8) - 1} or negative" ) - def _verify_bytes_value(self, val: bytes) -> (int, bytes): + def _verify_bytes_value(self, val: bytes) -> Tuple[int, bytes]: if len(val) < self.byte_len: raise ValueError( f"Passed byte object {val} smaller than byte length {self.byte_len}" @@ -207,6 +207,9 @@ def __str__(self): def __int__(self): return self.value + def __len__(self): + return self._byte_len + def __eq__(self, other: Union[UnsignedByteField, bytes]): if isinstance(other, UnsignedByteField): return self.value == other.value and self.byte_len == other.byte_len