diff --git a/CHANGELOG.md b/CHANGELOG.md index 2433a26..808dc4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,11 +8,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/). # [unreleased] +# [v0.26.0] 2024-11-27 + ## Changed - `MetadataPdu` options have to be specified as an optional list of abstract TLVs now. A new getter method `options_as_tlv` can be used to retrieve a list of concrete TLV objects. +## Fixed + +- `CrcError` exception constructor was previously named `__int__` by accident. + # [v0.25.0] 2024-10-29 ## Changed diff --git a/pyproject.toml b/pyproject.toml index da373b1..aab1600 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "spacepackets" description = "Various CCSDS and ECSS packet implementations" readme = "README.md" -version = "0.25.0" +version = "0.26.0" requires-python = ">=3.9" license = {text = "Apache-2.0"} authors = [ @@ -82,7 +82,8 @@ ignore = [ "S101", # Use of assert, should be changed in the future "ANN204", # Do not use return typing on __init__, __new__ and __call__ methods "E111", # Recommended to be disabled when using the ruff formatter - "E114" # Recommended to be disabled when using the ruff formatter + "E114", # Recommended to be disabled when using the ruff formatter + "PLR2004" # This lint is a bit too conservative. Not every number needs to be a named constant. ] [tool.ruff.lint.extend-per-file-ignores] diff --git a/spacepackets/__init__.py b/spacepackets/__init__.py index 5df3389..83a5c7a 100644 --- a/spacepackets/__init__.py +++ b/spacepackets/__init__.py @@ -9,6 +9,15 @@ ) from spacepackets.exceptions import BytesTooShortError +__all__ = [ + "BytesTooShortError", + "PacketType", + "SequenceFlags", + "SpHeader", + "SpacePacket", + "SpacePacketHeader", +] + __LIB_LOGGER = logging.getLogger(__name__) diff --git a/spacepackets/ccsds/__init__.py b/spacepackets/ccsds/__init__.py index eacb83f..4071275 100644 --- a/spacepackets/ccsds/__init__.py +++ b/spacepackets/ccsds/__init__.py @@ -13,3 +13,16 @@ get_total_space_packet_len_from_len_field, ) from .time import * # noqa: F403 # re-export + +__all__ = [ + "SPACE_PACKET_HEADER_SIZE", + "AbstractSpacePacket", + "PacketId", + "PacketSeqCtrl", + "PacketType", + "SequenceFlags", + "SpHeader", + "SpacePacket", + "SpacePacketHeader", + "get_total_space_packet_len_from_len_field", +] diff --git a/spacepackets/ccsds/spacepacket.py b/spacepackets/ccsds/spacepacket.py index a6d62b4..4b5b05b 100644 --- a/spacepackets/ccsds/spacepacket.py +++ b/spacepackets/ccsds/spacepacket.py @@ -6,11 +6,14 @@ import enum import struct from abc import ABC, abstractmethod -from collections.abc import Sequence -from typing import Deque, Final, List, Optional, Tuple +from typing import TYPE_CHECKING, Final from spacepackets.exceptions import BytesTooShortError +if TYPE_CHECKING: + from collections import deque + from collections.abc import Sequence + CCSDS_HEADER_LEN: Final[int] = 6 SPACE_PACKET_HEADER_SIZE: Final[int] = CCSDS_HEADER_LEN SEQ_FLAG_MASK: Final[int] = 0xC000 @@ -66,7 +69,7 @@ def raw(self) -> int: return self.seq_flags << 14 | self.seq_count @classmethod - def empty(cls): + def empty(cls) -> PacketSeqCtrl: return cls(seq_flags=SequenceFlags.CONTINUATION_SEGMENT, seq_count=0) def __eq__(self, other: object) -> bool: @@ -75,7 +78,7 @@ def __eq__(self, other: object) -> bool: return False @classmethod - def from_raw(cls, raw: int): + def from_raw(cls, raw: int) -> PacketSeqCtrl: return cls(seq_flags=SequenceFlags((raw >> 14) & 0b11), seq_count=raw & ~SEQ_FLAG_MASK) @@ -91,7 +94,7 @@ def __init__(self, ptype: PacketType, sec_header_flag: bool, apid: int): self.apid = apid @classmethod - def empty(cls): + def empty(cls) -> PacketId: return cls(ptype=PacketType.TM, sec_header_flag=False, apid=0) def __repr__(self): @@ -166,7 +169,8 @@ def pack(self) -> bytearray: class SpacePacketHeader(AbstractSpacePacket): - """This class encapsulates the space packet header. Packet reference: Blue Book CCSDS 133.0-B-2""" + """This class encapsulates the space packet header. Packet reference: Blue Book + CCSDS 133.0-B-2""" def __init__( self, @@ -320,7 +324,7 @@ def packet_type(self) -> PacketType: return self.packet_id.ptype @packet_type.setter - def packet_type(self, packet_type): + def packet_type(self, packet_type: PacketType) -> None: self.packet_id.ptype = packet_type @property @@ -336,14 +340,14 @@ def sec_header_flag(self) -> bool: return self._packet_id.sec_header_flag @sec_header_flag.setter - def sec_header_flag(self, value): + def sec_header_flag(self, value: bool) -> None: self._packet_id.sec_header_flag = value @property - def seq_count(self): + def seq_count(self) -> int: return self._psc.seq_count - def set_data_len_from_packet_len(self, packet_len: int): + def set_data_len_from_packet_len(self, packet_len: int) -> None: """Sets the data length field from the given total packet length. The total packet length must be at least 7 bytes. @@ -357,15 +361,15 @@ def set_data_len_from_packet_len(self, packet_len: int): self.data_len = packet_len - CCSDS_HEADER_LEN - 1 @seq_count.setter - def seq_count(self, seq_cnt): + def seq_count(self, seq_cnt: int) -> None: self._psc.seq_count = seq_cnt @property - def seq_flags(self): + def seq_flags(self) -> SequenceFlags: return self._psc.seq_flags @seq_flags.setter - def seq_flags(self, value): + def seq_flags(self, value: SequenceFlags) -> None: self._psc.seq_flags = value @property @@ -373,7 +377,7 @@ def header_len(self) -> int: return CCSDS_HEADER_LEN @apid.setter - def apid(self, apid): + def apid(self, apid: int) -> None: self.packet_id.apid = apid @property @@ -441,8 +445,8 @@ class SpacePacket: def __init__( self, sp_header: SpacePacketHeader, - sec_header: Optional[bytes], - user_data: Optional[bytes], + sec_header: bytes | None, + user_data: bytes | None, ): self.sp_header = sp_header self.sec_header = sec_header @@ -475,15 +479,15 @@ def pack(self) -> bytearray: return packet @property - def apid(self): + def apid(self) -> int: return self.sp_header.apid @property - def seq_count(self): + def seq_count(self) -> int: return self.sp_header.seq_count @property - def sec_header_flag(self): + def sec_header_flag(self) -> bool: return self.sp_header.sec_header_flag def __eq__(self, other: object): @@ -501,7 +505,7 @@ def get_space_packet_id_bytes( secondary_header_flag: bool, apid: int, version: int = 0b000, -) -> Tuple[int, int]: +) -> tuple[int, int]: """This function also includes the first three bits reserved for the version. :param version: Version field of the packet ID. Defined to be 0b000 in the space packet standard @@ -542,7 +546,7 @@ def get_apid_from_raw_space_packet(raw_packet: bytes) -> int: return ((raw_packet[0] & 0x7) << 8) | raw_packet[1] -def get_total_space_packet_len_from_len_field(len_field: int): +def get_total_space_packet_len_from_len_field(len_field: int) -> int: """Definition of length field is: C = (Octets in data field - 1). Therefore, octets in data field in len_field plus one. The total space packet length is therefore len_field plus one plus the space packet header size (6)""" @@ -550,8 +554,8 @@ def get_total_space_packet_len_from_len_field(len_field: int): def parse_space_packets( - analysis_queue: Deque[bytearray], packet_ids: Sequence[PacketId] -) -> List[bytearray]: + analysis_queue: deque[bytearray], packet_ids: Sequence[PacketId] +) -> list[bytearray]: """Given a deque of bytearrays, parse for space packets. This funtion expects the deque to be filled on the right side, for example with :py:meth:`collections.deque.append`. If a split packet with a valid header is detected, this function will re-insert the header into @@ -598,10 +602,10 @@ def parse_space_packets( def __handle_packet_id_match( concatenated_packets: bytearray, - analysis_queue: Deque[bytearray], + analysis_queue: deque[bytearray], current_idx: int, - tm_list: List[bytearray], -) -> Tuple[int, int]: + tm_list: list[bytearray], +) -> tuple[int, int]: total_packet_len = get_total_space_packet_len_from_len_field( struct.unpack("!H", concatenated_packets[current_idx + 4 : current_idx + 6])[0] ) diff --git a/spacepackets/ccsds/time/__init__.py b/spacepackets/ccsds/time/__init__.py index 27392d1..5e6011d 100644 --- a/spacepackets/ccsds/time/__init__.py +++ b/spacepackets/ccsds/time/__init__.py @@ -2,3 +2,11 @@ from .cds import CdsShortTimestamp from .common import MS_PER_DAY, SECONDS_PER_DAY, CcsdsTimeCodeId, CcsdsTimeProvider + +__all__ = [ + "MS_PER_DAY", + "SECONDS_PER_DAY", + "CcsdsTimeCodeId", + "CcsdsTimeProvider", + "CdsShortTimestamp", +] diff --git a/spacepackets/ccsds/time/cds.py b/spacepackets/ccsds/time/cds.py index c8bbe73..9367e30 100644 --- a/spacepackets/ccsds/time/cds.py +++ b/spacepackets/ccsds/time/cds.py @@ -5,7 +5,6 @@ import math import struct import time -from typing import Optional, Tuple import deprecation @@ -59,7 +58,7 @@ def __init__(self, ccsds_days: int, ms_of_day: int, init_dt_unix_stamp: bool = T -4383 >>> CdsShortTimestamp(0x0102, 0x03040506).pack().hex(sep=',') '40,01,02,03,04,05,06' - """ + """ # noqa: E501 self.__p_field = bytes([CdsShortTimestamp.CDS_SHORT_ID << 4]) # CCSDS recommends a 1958 Januar 1 epoch, which is different from the Unix epoch self._ccsds_days = ccsds_days @@ -68,11 +67,11 @@ def __init__(self, ccsds_days: int, ms_of_day: int, init_dt_unix_stamp: bool = T if init_dt_unix_stamp: self._setup() - def _setup(self): + def _setup(self) -> None: self._calculate_unix_seconds() self._calculate_date_time() - def _calculate_unix_seconds(self): + def _calculate_unix_seconds(self) -> None: unix_days = convert_ccsds_days_to_unix_days(self._ccsds_days) self._unix_seconds = unix_days * SECONDS_PER_DAY seconds_of_day = self._ms_of_day / 1000.0 @@ -81,7 +80,7 @@ def _calculate_unix_seconds(self): else: self._unix_seconds += seconds_of_day - def _calculate_date_time(self): + def _calculate_date_time(self) -> None: if self._unix_seconds < 0: self._datetime = datetime.datetime( 1970, 1, 1, tzinfo=datetime.timezone.utc @@ -122,7 +121,7 @@ def from_unix_days(cls, unix_days: int, ms_of_day: int) -> CdsShortTimestamp: ) @classmethod - def empty(cls, init_dt_unix_stamp: bool = True): + def empty(cls, init_dt_unix_stamp: bool = True) -> CdsShortTimestamp: """Empty instance containing only zero for all fields. :return: @@ -134,7 +133,7 @@ def unpack(cls, data: bytes) -> CdsShortTimestamp: ccsds_days, ms_of_day = CdsShortTimestamp.unpack_from_raw(data) return cls(ccsds_days=ccsds_days, ms_of_day=ms_of_day) - def read_from_raw(self, data: bytes): + def read_from_raw(self, data: bytes) -> CdsShortTimestamp: """Updates the instance from a given raw CDS short timestamp :param data: @@ -144,7 +143,7 @@ def read_from_raw(self, data: bytes): self._setup() @staticmethod - def unpack_from_raw(data: bytes) -> Tuple[int, int]: + def unpack_from_raw(data: bytes) -> tuple[int, int]: if len(data) < CdsShortTimestamp.TIMESTAMP_SIZE: raise BytesTooShortError(CdsShortTimestamp.TIMESTAMP_SIZE, len(data)) p_field = data[0] @@ -241,7 +240,7 @@ def from_date_time(cls, dt: datetime.datetime) -> CdsShortTimestamp: return cls.from_datetime(dt) @staticmethod - def ms_of_today(seconds_since_epoch: Optional[float] = None): + def ms_of_today(seconds_since_epoch: float | None = None) -> int: if seconds_since_epoch is None: seconds_since_epoch = time.time() fraction_ms = seconds_since_epoch - math.floor(seconds_since_epoch) diff --git a/spacepackets/ccsds/time/common.py b/spacepackets/ccsds/time/common.py index 25834b3..981fd58 100644 --- a/spacepackets/ccsds/time/common.py +++ b/spacepackets/ccsds/time/common.py @@ -78,7 +78,7 @@ def pack(self) -> bytes: pass @abstractmethod - def read_from_raw(self, timestamp: bytes): + def read_from_raw(self, timestamp: bytes) -> CcsdsTimeProvider: pass @abstractmethod diff --git a/spacepackets/cfdp/__init__.py b/spacepackets/cfdp/__init__.py index dc5a27f..400e7bf 100644 --- a/spacepackets/cfdp/__init__.py +++ b/spacepackets/cfdp/__init__.py @@ -43,3 +43,40 @@ TlvHolder, TlvType, ) + +__all__ = [ + "NULL_CHECKSUM_U32", + "CfdpLv", + "CfdpTlv", + "ChecksumType", + "ConditionCode", + "CrcFlag", + "DeliveryCode", + "Direction", + "DirectiveType", + "EntityIdTlv", + "FaultHandlerCode", + "FaultHandlerOverrideTlv", + "FileStatus", + "FileStoreRequestTlv", + "FileStoreResponseTlv", + "FilestoreActionCode", + "FilestoreResponseStatusCode", + "FinishedParams", + "FlowLabelTlv", + "GenericPduPacket", + "InvalidCrc", + "LargeFileFlag", + "MessageToUserTlv", + "PduConfig", + "PduFactory", + "PduHolder", + "PduType", + "SegmentMetadataFlag", + "SegmentationControl", + "TlvHolder", + "TlvType", + "TlvTypeMissmatch", + "TransactionId", + "TransmissionMode", +] diff --git a/spacepackets/cfdp/conf.py b/spacepackets/cfdp/conf.py index 570b3b3..4e2c53e 100644 --- a/spacepackets/cfdp/conf.py +++ b/spacepackets/cfdp/conf.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Tuple, TypedDict +from typing import TypedDict from spacepackets.cfdp.defs import ( CrcFlag, @@ -41,7 +41,7 @@ def empty(cls) -> PduConfig: ) @classmethod - def default(cls): + def default(cls) -> PduConfig: """Valid PDU configuration""" return PduConfig( transaction_seq_num=ByteFieldU8(0), @@ -62,7 +62,7 @@ def header_len(self) -> int: class CfdpDict(TypedDict): - source_dest_entity_ids: Tuple[bytes, bytes] + source_dest_entity_ids: tuple[bytes, bytes] # TODO: Protect dict access with a dedicated lock for thread-safety @@ -71,10 +71,10 @@ class CfdpDict(TypedDict): } -def set_entity_ids(source_entity_id: bytes, dest_entity_id: bytes): +def set_entity_ids(source_entity_id: bytes, dest_entity_id: bytes) -> None: __CFDP_DICT["source_dest_entity_ids"] = (source_entity_id, dest_entity_id) -def get_entity_ids() -> Tuple[bytes, bytes]: +def get_entity_ids() -> tuple[bytes, bytes]: """Return a tuple where the first entry is the source entity ID""" return __CFDP_DICT["source_dest_entity_ids"] diff --git a/spacepackets/cfdp/defs.py b/spacepackets/cfdp/defs.py index 771749c..adb0f90 100644 --- a/spacepackets/cfdp/defs.py +++ b/spacepackets/cfdp/defs.py @@ -1,8 +1,10 @@ from __future__ import annotations import enum +from typing import TYPE_CHECKING -from spacepackets.util import UnsignedByteField +if TYPE_CHECKING: + from spacepackets.util import UnsignedByteField CFDP_VERSION_2_NAME = "CCSDS 727.0-B-5" # Second version of the protocol, only this one is supported here diff --git a/spacepackets/cfdp/exceptions.py b/spacepackets/cfdp/exceptions.py index af8a784..4e4590f 100644 --- a/spacepackets/cfdp/exceptions.py +++ b/spacepackets/cfdp/exceptions.py @@ -4,7 +4,7 @@ class InvalidCrc(Exception): - def __int__(self, crc16: int, message: Optional[str] = None): + def __init__(self, crc16: int, message: Optional[str] = None): self.crc16 = crc16 self.message = message if self.message is None: diff --git a/spacepackets/cfdp/lv.py b/spacepackets/cfdp/lv.py index 065c5e5..f5105a4 100644 --- a/spacepackets/cfdp/lv.py +++ b/spacepackets/cfdp/lv.py @@ -1,6 +1,9 @@ from __future__ import annotations -from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path class CfdpLv: @@ -27,7 +30,7 @@ def from_path(cls, path: Path) -> CfdpLv: return cls.from_str(str(path)) @property - def packet_len(self): + def packet_len(self) -> int: """Returns length of full LV packet""" return self.value_len + 1 @@ -57,5 +60,7 @@ def __repr__(self): def __str__(self): return f"CFDP LV with data 0x[{self.value.hex(sep=',')}] of length" f" {len(self.value)}" - def __eq__(self, other): + def __eq__(self, other: object): + if not isinstance(other, CfdpLv): + return False return self.value == other.value diff --git a/spacepackets/cfdp/pdu/__init__.py b/spacepackets/cfdp/pdu/__init__.py index add35cc..e823027 100644 --- a/spacepackets/cfdp/pdu/__init__.py +++ b/spacepackets/cfdp/pdu/__init__.py @@ -20,3 +20,30 @@ from .metadata import MetadataParams, MetadataPdu from .nak import NakPdu from .prompt import PromptPdu + +__all__ = [ + "AbstractFileDirectiveBase", + "AckPdu", + "DeliveryCode", + "DirectiveType", + "EofPdu", + "FileDataParams", + "FileDataPdu", + "FileDirectivePduBase", + "FileStatus", + "FinishedParams", + "FinishedPdu", + "GenericPduPacket", + "KeepAlivePdu", + "MetadataParams", + "MetadataPdu", + "NakPdu", + "PduConfig", + "PduFactory", + "PduHeader", + "PduHolder", + "PduType", + "PromptPdu", + "SegmentMetadataFlag", + "TransactionStatus", +] diff --git a/spacepackets/cfdp/pdu/ack.py b/spacepackets/cfdp/pdu/ack.py index 9a43eeb..89e590a 100644 --- a/spacepackets/cfdp/pdu/ack.py +++ b/spacepackets/cfdp/pdu/ack.py @@ -3,6 +3,7 @@ import copy import enum import struct +from typing import TYPE_CHECKING from spacepackets.cfdp.conf import PduConfig from spacepackets.cfdp.defs import ConditionCode, CrcFlag, Direction @@ -11,9 +12,11 @@ DirectiveType, FileDirectivePduBase, ) -from spacepackets.cfdp.pdu.header import PduHeader from spacepackets.crc import CRC16_CCITT_FUNC +if TYPE_CHECKING: + from spacepackets.cfdp.pdu.header import PduHeader + class TransactionStatus(enum.IntEnum): """For more detailed information: CCSDS 727.0-B-5 p.81""" @@ -101,7 +104,7 @@ def pack(self) -> bytearray: packet.extend(struct.pack("!H", CRC16_CCITT_FUNC(packet))) return packet - def _calculate_directive_field_len(self): + def _calculate_directive_field_len(self) -> None: directive_param_field_len = 2 if self.pdu_file_directive.pdu_conf.crc_flag == CrcFlag.WITH_CRC: directive_param_field_len += 2 diff --git a/spacepackets/cfdp/pdu/eof.py b/spacepackets/cfdp/pdu/eof.py index d42d293..157fa3d 100644 --- a/spacepackets/cfdp/pdu/eof.py +++ b/spacepackets/cfdp/pdu/eof.py @@ -2,7 +2,7 @@ import copy import struct -from typing import Optional +from typing import TYPE_CHECKING from spacepackets.cfdp.conf import PduConfig from spacepackets.cfdp.defs import ConditionCode, CrcFlag, Direction @@ -11,11 +11,13 @@ DirectiveType, FileDirectivePduBase, ) -from spacepackets.cfdp.pdu.header import PduHeader from spacepackets.cfdp.tlv.tlv import EntityIdTlv from spacepackets.crc import CRC16_CCITT_FUNC from spacepackets.exceptions import BytesTooShortError +if TYPE_CHECKING: + from spacepackets.cfdp.pdu.header import PduHeader + class EofPdu(AbstractFileDirectiveBase): """Encapsulates the EOF file directive PDU, see CCSDS 727.0-B-5 p.79""" @@ -25,7 +27,7 @@ def __init__( pdu_conf: PduConfig, file_checksum: bytes, file_size: int, - fault_location: Optional[EntityIdTlv] = None, + fault_location: EntityIdTlv | None = None, condition_code: ConditionCode = ConditionCode.NO_ERROR, ): """Constructor for an EOF PDU. @@ -78,15 +80,15 @@ def packet_len(self) -> int: return self.pdu_file_directive.packet_len @property - def fault_location(self): + def fault_location(self) -> EntityIdTlv | None: return self._fault_location @fault_location.setter - def fault_location(self, fault_location: Optional[EntityIdTlv]): + def fault_location(self, fault_location: EntityIdTlv | None) -> None: self._fault_location = fault_location self._calculate_directive_param_field_len() - def _calculate_directive_param_field_len(self): + def _calculate_directive_param_field_len(self) -> None: directive_param_field_len = 9 if self.pdu_file_directive.pdu_header.large_file_flag_set: directive_param_field_len = 13 diff --git a/spacepackets/cfdp/pdu/file_data.py b/spacepackets/cfdp/pdu/file_data.py index 2a610e8..ee61b0c 100644 --- a/spacepackets/cfdp/pdu/file_data.py +++ b/spacepackets/cfdp/pdu/file_data.py @@ -4,7 +4,7 @@ import enum import struct from dataclasses import dataclass -from typing import Optional +from typing import TYPE_CHECKING from spacepackets.cfdp import CrcFlag, LargeFileFlag from spacepackets.cfdp.conf import PduConfig @@ -13,14 +13,16 @@ from spacepackets.cfdp.pdu.header import AbstractPduBase, PduHeader from spacepackets.crc import CRC16_CCITT_FUNC from spacepackets.exceptions import BytesTooShortError -from spacepackets.util import UnsignedByteField + +if TYPE_CHECKING: + from spacepackets.util import UnsignedByteField def get_max_file_seg_len_for_max_packet_len_and_pdu_cfg( pdu_conf: PduConfig, max_packet_len: int, - segment_metadata: Optional[SegmentMetadata] = None, -): + segment_metadata: SegmentMetadata | None = None, +) -> int: """This function can be used to calculate the maximum allowed file segment size for a given maximum packet length and the segment metadata if there is any.""" subtract = pdu_conf.header_len() @@ -61,7 +63,7 @@ class SegmentMetadata: class FileDataParams: file_data: bytes offset: int - segment_metadata: Optional[SegmentMetadata] = None + segment_metadata: SegmentMetadata | None = None @classmethod def empty(cls) -> FileDataParams: @@ -140,17 +142,17 @@ def dest_entity_id(self) -> UnsignedByteField: return self.pdu_header.dest_entity_id @property - def record_cont_state(self) -> Optional[RecordContinuationState]: + def record_cont_state(self) -> RecordContinuationState | None: if self._params.segment_metadata is None: return None return self._params.segment_metadata.record_cont_state @property - def offset(self): + def offset(self) -> int: return self._params.offset @property - def crc_flag(self): + def crc_flag(self) -> CrcFlag: return self.pdu_header.crc_flag @property @@ -158,11 +160,11 @@ def has_segment_metadata(self) -> bool: return self._pdu_header.segment_metadata_flag == SegmentMetadataFlag.PRESENT @property - def segment_metadata(self) -> Optional[SegmentMetadata]: + def segment_metadata(self) -> SegmentMetadata | None: return self._params.segment_metadata @segment_metadata.setter - def segment_metadata(self, segment_metadata: Optional[SegmentMetadata]): + def segment_metadata(self, segment_metadata: SegmentMetadata | None) -> None: self._params.segment_metadata = segment_metadata if segment_metadata is None: self._pdu_header.segment_metadata_flag = SegmentMetadataFlag.NOT_PRESENT @@ -171,15 +173,15 @@ def segment_metadata(self, segment_metadata: Optional[SegmentMetadata]): self._calculate_pdu_data_field_len() @property - def file_data(self): + def file_data(self) -> bytes: return self._params.file_data @file_data.setter - def file_data(self, file_data: bytes): + def file_data(self, file_data: bytes) -> None: self._params.file_data = file_data self._calculate_pdu_data_field_len() - def _calculate_pdu_data_field_len(self): + def _calculate_pdu_data_field_len(self) -> None: pdu_data_field_len = 0 if self.segment_metadata is not None: pdu_data_field_len = 1 + len(self.segment_metadata.metadata) @@ -258,7 +260,7 @@ def unpack(cls, data: bytes) -> FileDataPdu: return file_data_packet @property - def packet_len(self): + def packet_len(self) -> int: return self.pdu_header.packet_len def __eq__(self, other: FileDataPdu): diff --git a/spacepackets/cfdp/pdu/file_directive.py b/spacepackets/cfdp/pdu/file_directive.py index 2f41233..528418e 100644 --- a/spacepackets/cfdp/pdu/file_directive.py +++ b/spacepackets/cfdp/pdu/file_directive.py @@ -3,7 +3,7 @@ import abc import enum import struct -from typing import Tuple +from typing import TYPE_CHECKING from spacepackets.cfdp.conf import PduConfig from spacepackets.cfdp.defs import CrcFlag, Direction, LargeFileFlag, TransmissionMode @@ -14,7 +14,9 @@ SegmentMetadataFlag, ) from spacepackets.exceptions import BytesTooShortError -from spacepackets.util import UnsignedByteField + +if TYPE_CHECKING: + from spacepackets.util import UnsignedByteField class DirectiveType(enum.IntEnum): @@ -53,19 +55,19 @@ def transmission_mode(self) -> TransmissionMode: return self.pdu_header.transmission_mode @file_flag.setter - def file_flag(self, field_len: LargeFileFlag): + def file_flag(self, field_len: LargeFileFlag) -> None: self.pdu_header.file_flag = field_len @property - def crc_flag(self): + def crc_flag(self) -> CrcFlag: return self.pdu_header.crc_flag @crc_flag.setter - def crc_flag(self, crc_flag: CrcFlag): + def crc_flag(self, crc_flag: CrcFlag) -> None: self.pdu_header.crc_flag = crc_flag @property - def pdu_data_field_len(self): + def pdu_data_field_len(self) -> int: return self.pdu_header.pdu_data_field_len @property @@ -81,7 +83,7 @@ def dest_entity_id(self) -> UnsignedByteField: return self.pdu_header.dest_entity_id @pdu_data_field_len.setter - def pdu_data_field_len(self, pdu_data_field_len: int): + def pdu_data_field_len(self, pdu_data_field_len: int) -> None: self.pdu_header.pdu_data_field_len = pdu_data_field_len @property @@ -131,7 +133,7 @@ def __init__( ) self._directive_type = directive_code - def verify_length_and_checksum(self, data: bytes): + def verify_length_and_checksum(self, data: bytes) -> None: self.pdu_header.verify_length_and_checksum(data) @property @@ -147,11 +149,11 @@ def directive_type(self) -> DirectiveType: return self._directive_type @property - def directive_param_field_len(self): + def directive_param_field_len(self) -> int: return self.pdu_header.pdu_data_field_len - 1 @directive_param_field_len.setter - def directive_param_field_len(self, directive_param_field_len: int): + def directive_param_field_len(self, directive_param_field_len: int) -> None: self.pdu_header.pdu_data_field_len = directive_param_field_len + 1 @classmethod @@ -186,14 +188,14 @@ def unpack(cls, raw_packet: bytes) -> FileDirectivePduBase: file_directive._directive_type = raw_packet[header_len - 1] return file_directive - def _verify_file_len(self, file_size: int): + def _verify_file_len(self, file_size: int) -> None: """Can be used by subclasses to verify a given file size""" if self.pdu_header.file_flag == LargeFileFlag.LARGE and file_size > pow(2, 64): raise ValueError(f"File size {file_size} larger than 64 bit field") if self.pdu_header.file_flag == LargeFileFlag.NORMAL and file_size > pow(2, 32): raise ValueError(f"File size {file_size} larger than 32 bit field") - def parse_fss_field(self, raw_packet: bytes, current_idx: int) -> Tuple[int, int]: + def parse_fss_field(self, raw_packet: bytes, current_idx: int) -> tuple[int, int]: """Parse the FSS field, which has different size depending on the large file flag being set or not. Returns the current index incremented and the parsed file size. diff --git a/spacepackets/cfdp/pdu/finished.py b/spacepackets/cfdp/pdu/finished.py index 8e0e678..fb9f733 100644 --- a/spacepackets/cfdp/pdu/finished.py +++ b/spacepackets/cfdp/pdu/finished.py @@ -3,7 +3,7 @@ import copy import struct from dataclasses import dataclass, field -from typing import List, Optional +from typing import TYPE_CHECKING from spacepackets.cfdp.conf import PduConfig from spacepackets.cfdp.defs import ( @@ -18,20 +18,22 @@ DirectiveType, FileDirectivePduBase, ) -from spacepackets.cfdp.pdu.header import PduHeader from spacepackets.cfdp.tlv.defs import TlvType from spacepackets.cfdp.tlv.tlv import EntityIdTlv, FileStoreResponseTlv from spacepackets.crc import CRC16_CCITT_FUNC from spacepackets.exceptions import BytesTooShortError +if TYPE_CHECKING: + from spacepackets.cfdp.pdu.header import PduHeader + @dataclass class FinishedParams: condition_code: ConditionCode delivery_code: DeliveryCode file_status: FileStatus - file_store_responses: List[FileStoreResponseTlv] = field(default_factory=list) - fault_location: Optional[EntityIdTlv] = None + file_store_responses: list[FileStoreResponseTlv] = field(default_factory=list) + fault_location: EntityIdTlv | None = None @classmethod def empty(cls) -> FinishedParams: @@ -99,7 +101,7 @@ def condition_code(self) -> ConditionCode: return self._params.condition_code @condition_code.setter - def condition_code(self, condition_code: ConditionCode): + def condition_code(self, condition_code: ConditionCode) -> None: self._params.condition_code = condition_code @property @@ -115,7 +117,7 @@ def packet_len(self) -> int: return self.pdu_file_directive.packet_len @property - def file_store_responses(self) -> List[FileStoreResponseTlv]: + def file_store_responses(self) -> list[FileStoreResponseTlv]: return self._params.file_store_responses @property @@ -123,16 +125,14 @@ def finished_params(self) -> FinishedParams: return self._params @property - def might_have_fault_location(self): - if self._params.condition_code in [ + def might_have_fault_location(self) -> bool: + return self._params.condition_code not in [ ConditionCode.NO_ERROR, ConditionCode.UNSUPPORTED_CHECKSUM_TYPE, - ]: - return False - return True + ] @file_store_responses.setter - def file_store_responses(self, file_store_responses: Optional[List[FileStoreResponseTlv]]): + def file_store_responses(self, file_store_responses: list[FileStoreResponseTlv] | None) -> None: """Setter function for the file store responses :param file_store_responses: :raises ValueError: TLV type is not a filestore response @@ -145,7 +145,7 @@ def file_store_responses(self, file_store_responses: Optional[List[FileStoreResp self._calculate_directive_field_len() @property - def file_store_responses_len(self): + def file_store_responses_len(self) -> int: if not self._params.file_store_responses: return 0 file_store_responses_len = 0 @@ -154,23 +154,20 @@ def file_store_responses_len(self): return file_store_responses_len @property - def fault_location(self) -> Optional[EntityIdTlv]: + def fault_location(self) -> EntityIdTlv | None: return self._params.fault_location @fault_location.setter - def fault_location(self, fault_location: Optional[EntityIdTlv]): + def fault_location(self, fault_location: EntityIdTlv | None) -> None: """Setter function for the fault location. :raises ValueError: Type ID is not entity ID (0x06) """ self._params.fault_location = fault_location self._calculate_directive_field_len() - def _calculate_directive_field_len(self): + def _calculate_directive_field_len(self) -> None: base_len = 1 - if self.fault_location is None: - fault_loc_len = 0 - else: - fault_loc_len = self.fault_location_len + fault_loc_len = 0 if self.fault_location is None else self.fault_location_len if self.pdu_file_directive.pdu_conf.crc_flag == CrcFlag.WITH_CRC: base_len += 2 self.pdu_file_directive.directive_param_field_len = ( @@ -178,7 +175,7 @@ def _calculate_directive_field_len(self): ) @property - def fault_location_len(self): + def fault_location_len(self) -> int: if self._params.fault_location is None: return 0 return self._params.fault_location.packet_len diff --git a/spacepackets/cfdp/pdu/header.py b/spacepackets/cfdp/pdu/header.py index ffe38f6..0610a35 100644 --- a/spacepackets/cfdp/pdu/header.py +++ b/spacepackets/cfdp/pdu/header.py @@ -64,7 +64,7 @@ def file_flag(self) -> LargeFileFlag: @file_flag.setter @abc.abstractmethod - def file_flag(self, file_flag: LargeFileFlag): + def file_flag(self, file_flag: LargeFileFlag) -> None: pass @property @@ -104,12 +104,12 @@ def pdu_data_field_len(self, pdu_data_field_len: int) -> int: @property @abc.abstractmethod - def crc_flag(self): + def crc_flag(self) -> CrcFlag: pass @crc_flag.setter @abc.abstractmethod - def crc_flag(self, crc_flag: CrcFlag): + def crc_flag(self, crc_flag: CrcFlag) -> None: pass @property @@ -131,7 +131,7 @@ def __eq__(self, other: AbstractPduBase): ) @staticmethod - def header_len_from_raw(data: bytes): + def header_len_from_raw(data: bytes) -> int: entity_id_len = ((data[3] >> 4) & 0b111) + 1 seq_num_len = (data[3] & 0b111) + 1 return AbstractPduBase.FIXED_LENGTH + 2 * entity_id_len + seq_num_len @@ -175,24 +175,24 @@ def pdu_type(self) -> PduType: return self._pdu_type @pdu_type.setter - def pdu_type(self, pdu_type: PduType): + def pdu_type(self, pdu_type: PduType) -> None: self._pdu_type = pdu_type @property - def source_entity_id(self): + def source_entity_id(self) -> UnsignedByteField: return self.pdu_conf.source_entity_id @property - def dest_entity_id(self): + def dest_entity_id(self) -> UnsignedByteField: return self.pdu_conf.dest_entity_id @property - def transmission_mode(self): + def transmission_mode(self) -> TransmissionMode: return self.pdu_conf.trans_mode def set_entity_ids( self, source_entity_id: UnsignedByteField, dest_entity_id: UnsignedByteField - ): + ) -> None: """Both IDs must be set at once because they must have the same length as well :param source_entity_id: :param dest_entity_id: @@ -204,55 +204,55 @@ def set_entity_ids( self.pdu_conf.dest_entity_id = dest_entity_id @property - def transaction_seq_num(self): + def transaction_seq_num(self) -> UnsignedByteField: return self.pdu_conf.transaction_seq_num @transaction_seq_num.setter - def transaction_seq_num(self, transaction_seq_num: UnsignedByteField): + def transaction_seq_num(self, transaction_seq_num: UnsignedByteField) -> None: self.pdu_conf.transaction_seq_num = transaction_seq_num @property - def file_flag(self): + def file_flag(self) -> LargeFileFlag: return self.pdu_conf.file_flag @file_flag.setter - def file_flag(self, file_flag: LargeFileFlag): + def file_flag(self, file_flag: LargeFileFlag) -> None: self.pdu_conf.file_flag = file_flag @property - def crc_flag(self): + def crc_flag(self) -> CrcFlag: return self.pdu_conf.crc_flag @crc_flag.setter - def crc_flag(self, crc_flag: CrcFlag): + def crc_flag(self, crc_flag: CrcFlag) -> None: self.pdu_conf.crc_flag = crc_flag @transmission_mode.setter - def transmission_mode(self, trans_mode: TransmissionMode): + def transmission_mode(self, trans_mode: TransmissionMode) -> None: self.pdu_conf.trans_mode = trans_mode @property - def direction(self): + def direction(self) -> Direction: return self.pdu_conf.direction @direction.setter - def direction(self, direction: Direction): + def direction(self, direction: Direction) -> None: self.pdu_conf.direction = direction @property - def seg_ctrl(self): + def seg_ctrl(self) -> SegmentationControl: return self.pdu_conf.seg_ctrl @seg_ctrl.setter - def seg_ctrl(self, seg_ctrl: SegmentationControl): + def seg_ctrl(self, seg_ctrl: SegmentationControl) -> None: self.pdu_conf.seg_ctrl = seg_ctrl @property - def pdu_data_field_len(self): + def pdu_data_field_len(self) -> int: return self._pdu_data_field_len @pdu_data_field_len.setter - def pdu_data_field_len(self, new_len: int): + def pdu_data_field_len(self, new_len: int) -> None: """Set the PDU data field length. :param new_len: @@ -355,11 +355,11 @@ def unpack(cls, data: bytes) -> PduHeader: def verify_length_and_checksum(self, data: bytes) -> int: if len(data) < self.packet_len: raise BytesTooShortError(self.packet_len, len(data)) - if self.pdu_conf.crc_flag == CrcFlag.WITH_CRC: - if CRC16_CCITT_FUNC(data[: self.packet_len]) != 0: - raise InvalidCrc( - struct.unpack("!H", data[self.packet_len - 2 : self.packet_len])[0] - ) + if ( + self.pdu_conf.crc_flag == CrcFlag.WITH_CRC + and CRC16_CCITT_FUNC(data[: self.packet_len]) != 0 + ): + raise InvalidCrc(struct.unpack("!H", data[self.packet_len - 2 : self.packet_len])[0]) return self.packet_len @staticmethod diff --git a/spacepackets/cfdp/pdu/helper.py b/spacepackets/cfdp/pdu/helper.py index b5fda9c..3029c0b 100644 --- a/spacepackets/cfdp/pdu/helper.py +++ b/spacepackets/cfdp/pdu/helper.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Optional, Type, Union, cast +from typing import Any, NoReturn, Union, cast import deprecation @@ -26,7 +26,7 @@ class PduHolder: """Helper type to store arbitrary PDU types and cast them to a concrete PDU type conveniently""" - def __init__(self, pdu: Optional[GenericPduPacket]): + def __init__(self, pdu: GenericPduPacket | None): self.pdu = pdu def pack(self) -> bytearray: @@ -40,7 +40,7 @@ def pack(self) -> bytearray: current_version=get_version(), details="use packet member instead", ) - def base(self): + def base(self) -> GenericPduPacket | None: return self.pdu @base.setter @@ -49,7 +49,7 @@ def base(self): current_version=get_version(), details="use packet member instead", ) - def base(self, base: GenericPduPacket): + def base(self, base: GenericPduPacket) -> None: self.pdu = base @property @@ -64,11 +64,11 @@ def pdu_type(self) -> PduType: return self.pdu.pdu_type @property - def is_file_directive(self): + def is_file_directive(self) -> bool: return self.pdu_type == PduType.FILE_DIRECTIVE @property - def pdu_directive_type(self) -> Optional[DirectiveType]: + def pdu_directive_type(self) -> DirectiveType | None: """If the contained type is not a PDU file directive, returns None. Otherwise, returns the directive type """ @@ -80,10 +80,10 @@ def pdu_directive_type(self) -> Optional[DirectiveType]: def __repr__(self): return f"{self.__class__.__name__}(base={self.pdu!r}" - def _raise_not_target_exception(self, pdu_type: Type[Any]): + def _raise_not_target_exception(self, pdu_type: type[Any]) -> NoReturn: raise TypeError(f"Stored PDU is not {pdu_type.__name__!r}: {self.pdu!r}") - def _cast_to_concrete_file_directive(self, pdu_type: Type[Any], dir_type: DirectiveType) -> Any: + def _cast_to_concrete_file_directive(self, pdu_type: type[Any], dir_type: DirectiveType) -> Any: # noqa: ANN401 if ( isinstance(self.pdu, AbstractFileDirectiveBase) and self.pdu.pdu_type == PduType.FILE_DIRECTIVE # type: ignore @@ -92,11 +92,13 @@ def _cast_to_concrete_file_directive(self, pdu_type: Type[Any], dir_type: Direct if pdu_base.directive_type == dir_type: return cast(pdu_type, self.pdu) self._raise_not_target_exception(pdu_type) + return None def to_file_data_pdu(self) -> FileDataPdu: # type: ignore if isinstance(self.pdu, AbstractPduBase) and self.pdu.pdu_type == PduType.FILE_DATA: return cast(FileDataPdu, self.pdu) self._raise_not_target_exception(FileDataPdu) + return None def to_metadata_pdu(self) -> MetadataPdu: return self._cast_to_concrete_file_directive(MetadataPdu, DirectiveType.METADATA_PDU) @@ -124,7 +126,7 @@ class PduFactory: """Helper class to generate PDUs and retrieve PDU information from a raw bytestream""" @staticmethod - def from_raw(data: bytes) -> Optional[GenericPduPacket]: + def from_raw(data: bytes) -> GenericPduPacket | None: # noqa: PLR0911 if not PduFactory.is_file_directive(data): return FileDataPdu.unpack(data) directive = PduFactory.pdu_directive_type(data) @@ -157,7 +159,7 @@ def is_file_directive(data: bytes) -> bool: return PduFactory.pdu_type(data) == PduType.FILE_DIRECTIVE @staticmethod - def pdu_directive_type(data: bytes) -> Optional[DirectiveType]: + def pdu_directive_type(data: bytes) -> DirectiveType | None: """Retrieve the PDU directive type from a raw bytestream. :raises ValueError: Invalid directive type. diff --git a/spacepackets/cfdp/pdu/keep_alive.py b/spacepackets/cfdp/pdu/keep_alive.py index 5c9b318..d9004d8 100644 --- a/spacepackets/cfdp/pdu/keep_alive.py +++ b/spacepackets/cfdp/pdu/keep_alive.py @@ -2,11 +2,11 @@ import copy import struct +from typing import TYPE_CHECKING from spacepackets.cfdp import CrcFlag from spacepackets.cfdp.conf import LargeFileFlag, PduConfig from spacepackets.cfdp.defs import Direction -from spacepackets.cfdp.pdu import PduHeader from spacepackets.cfdp.pdu.file_directive import ( AbstractFileDirectiveBase, DirectiveType, @@ -14,6 +14,9 @@ ) from spacepackets.crc import CRC16_CCITT_FUNC +if TYPE_CHECKING: + from spacepackets.cfdp.pdu import PduHeader + class KeepAlivePdu(AbstractFileDirectiveBase): """Encapsulates the Keep Alive file directive PDU, see CCSDS 727.0-B-5 p.85""" @@ -43,11 +46,11 @@ def pdu_header(self) -> PduHeader: return self.pdu_file_directive.pdu_header @property - def file_flag(self): + def file_flag(self) -> LargeFileFlag: return self.pdu_file_directive.pdu_header.file_flag @file_flag.setter - def file_flag(self, file_size: LargeFileFlag): + def file_flag(self, file_size: LargeFileFlag) -> None: directive_param_field_len = 4 if file_size == LargeFileFlag.LARGE: directive_param_field_len = 8 @@ -103,7 +106,7 @@ def unpack(cls, data: bytes) -> KeepAlivePdu: return keep_alive_pdu @property - def packet_len(self): + def packet_len(self) -> int: return self.pdu_file_directive.packet_len def __eq__(self, other: KeepAlivePdu): diff --git a/spacepackets/cfdp/pdu/metadata.py b/spacepackets/cfdp/pdu/metadata.py index fb00dd7..2dbdf32 100644 --- a/spacepackets/cfdp/pdu/metadata.py +++ b/spacepackets/cfdp/pdu/metadata.py @@ -3,12 +3,11 @@ import copy import dataclasses import struct -from typing import List, Optional +from typing import TYPE_CHECKING from spacepackets.cfdp.conf import LargeFileFlag, PduConfig from spacepackets.cfdp.defs import ChecksumType, CrcFlag, Direction from spacepackets.cfdp.lv import CfdpLv -from spacepackets.cfdp.pdu import PduHeader from spacepackets.cfdp.pdu.file_directive import ( AbstractFileDirectiveBase, DirectiveType, @@ -18,14 +17,17 @@ from spacepackets.crc import CRC16_CCITT_FUNC from spacepackets.exceptions import BytesTooShortError +if TYPE_CHECKING: + from spacepackets.cfdp.pdu import PduHeader + @dataclasses.dataclass class MetadataParams: closure_requested: bool checksum_type: ChecksumType file_size: int - source_file_name: Optional[str] - dest_file_name: Optional[str] + source_file_name: str | None + dest_file_name: str | None class MetadataPdu(AbstractFileDirectiveBase): @@ -53,7 +55,7 @@ def __init__( self, pdu_conf: PduConfig, params: MetadataParams, - options: Optional[TlvList] = None, + options: TlvList | None = None, ): pdu_conf = copy.copy(pdu_conf) self.params = params @@ -109,28 +111,26 @@ def __empty(cls) -> MetadataPdu: ) @property - def options(self) -> Optional[TlvList]: + def options(self) -> TlvList | None: return self._options @options.setter - def options(self, options: Optional[TlvList]): + def options(self, options: TlvList | None) -> None: self._options = options self._calculate_directive_field_len() - def options_as_tlv(self) -> Optional[List[CfdpTlv]]: - """Returns :py:meth:`options` converted to a list of concrete :py:class:`CfdpTlv` objects.""" + def options_as_tlv(self) -> list[CfdpTlv] | None: + """Returns :py:meth:`options` converted to a list of concrete :py:class:`CfdpTlv` + objects.""" if self.options is None: return None - cfdp_tlv_list = [] - for option in self.options: - cfdp_tlv_list.append(CfdpTlv(option.tlv_type, option.value)) - return cfdp_tlv_list + return [CfdpTlv(option.tlv_type, option.value) for option in self.options] @property - def directive_param_field_len(self): + def directive_param_field_len(self) -> int: return self.pdu_file_directive.directive_param_field_len - def _calculate_directive_field_len(self): + def _calculate_directive_field_len(self) -> None: directive_param_field_len = ( 5 + self._source_file_name_lv.packet_len + self._dest_file_name_lv.packet_len ) @@ -144,7 +144,7 @@ def _calculate_directive_field_len(self): self.pdu_file_directive.directive_param_field_len = directive_param_field_len @property - def source_file_name(self) -> Optional[str]: + def source_file_name(self) -> str | None: """If there is no associated source file, for example for messages used for Proxy Operations, this function will return None """ @@ -153,7 +153,7 @@ def source_file_name(self) -> Optional[str]: return self._source_file_name_lv.value.decode() @source_file_name.setter - def source_file_name(self, source_file_name: Optional[str]): + def source_file_name(self, source_file_name: str | None) -> None: if source_file_name is None: self._source_file_name_lv = CfdpLv(value=b"") else: @@ -162,7 +162,7 @@ def source_file_name(self, source_file_name: Optional[str]): self._calculate_directive_field_len() @property - def dest_file_name(self) -> Optional[str]: + def dest_file_name(self) -> str | None: """If there is no associated source file, for example for messages used for Proxy Operations, this function will return None """ @@ -171,7 +171,7 @@ def dest_file_name(self) -> Optional[str]: return self._dest_file_name_lv.value.decode() @dest_file_name.setter - def dest_file_name(self, dest_file_name: Optional[str]): + def dest_file_name(self, dest_file_name: str | None) -> None: if dest_file_name is None: self._dest_file_name_lv = CfdpLv(value=b"") else: @@ -242,7 +242,7 @@ def unpack(cls, data: bytes) -> MetadataPdu: metadata_pdu._parse_options(raw_packet=data, start_idx=current_idx) return metadata_pdu - def _parse_options(self, raw_packet: bytes, start_idx: int): + def _parse_options(self, raw_packet: bytes, start_idx: int) -> None: self._options = [] current_idx = start_idx while True: diff --git a/spacepackets/cfdp/pdu/nak.py b/spacepackets/cfdp/pdu/nak.py index 5f0e62c..879d493 100644 --- a/spacepackets/cfdp/pdu/nak.py +++ b/spacepackets/cfdp/pdu/nak.py @@ -1,12 +1,11 @@ from __future__ import annotations import struct -from typing import List, Optional, Tuple +from typing import TYPE_CHECKING from spacepackets.cfdp import CrcFlag from spacepackets.cfdp.conf import PduConfig from spacepackets.cfdp.defs import Direction -from spacepackets.cfdp.pdu import PduHeader from spacepackets.cfdp.pdu.file_directive import ( AbstractFileDirectiveBase, DirectiveType, @@ -15,6 +14,9 @@ ) from spacepackets.crc import CRC16_CCITT_FUNC +if TYPE_CHECKING: + from spacepackets.cfdp.pdu import PduHeader + def get_max_seg_reqs_for_max_packet_size_and_pdu_cfg( max_packet_size: int, pdu_conf: PduConfig @@ -104,7 +106,7 @@ def __init__( pdu_conf: PduConfig, start_of_scope: int, end_of_scope: int, - segment_requests: Optional[List[Tuple[int, int]]] = None, + segment_requests: list[tuple[int, int]] | None = None, ): """Create a NAK PDU object instance. @@ -142,8 +144,9 @@ def __empty(cls) -> NakPdu: return cls(start_of_scope=0, end_of_scope=0, segment_requests=[], pdu_conf=empty_conf) def get_max_seg_reqs_for_max_packet_size(self, max_packet_size: int) -> int: - """Member method which forwards to :py:meth:`get_max_seg_reqs_for_max_packet_size_and_pdu_cfg`, - passing the internal PDU configuration field.""" + """Member method which forwards to + :py:meth:`get_max_seg_reqs_for_max_packet_size_and_pdu_cfg`, passing the internal PDU + configuration field.""" return get_max_seg_reqs_for_max_packet_size_and_pdu_cfg( max_packet_size, self.pdu_file_directive.pdu_conf ) @@ -157,17 +160,17 @@ def pdu_header(self) -> PduHeader: return self.pdu_file_directive.pdu_header @property - def file_flag(self): + def file_flag(self) -> LargeFileFlag: return self.pdu_file_directive.file_flag @file_flag.setter - def file_flag(self, file_flag: LargeFileFlag): + def file_flag(self, file_flag: LargeFileFlag) -> None: """Set the file size. This changes the length of the packet when packed as well which is handled by this function""" self.pdu_file_directive.file_flag = file_flag self._calculate_directive_field_len() - def _calculate_directive_field_len(self): + def _calculate_directive_field_len(self) -> None: if self.pdu_file_directive.file_flag == LargeFileFlag.NORMAL: directive_param_field_len = 8 + len(self._segment_requests) * 8 elif self.pdu_file_directive.file_flag == LargeFileFlag.LARGE: @@ -179,7 +182,7 @@ def _calculate_directive_field_len(self): self.pdu_file_directive.directive_param_field_len = directive_param_field_len @property - def segment_requests(self) -> List[Tuple[int, int]]: + def segment_requests(self) -> list[tuple[int, int]]: """An optional list of segment request pair tuples, where the first entry of list element is the start offset and the second entry is the end offset. If the start and end offset are both 0, the metadata is re-requested. @@ -187,13 +190,13 @@ def segment_requests(self) -> List[Tuple[int, int]]: return self._segment_requests @segment_requests.setter - def segment_requests(self, segment_requests: Optional[List[Tuple[int, int]]]): + def segment_requests(self, segment_requests: list[tuple[int, int]] | None) -> None: """Update the segment requests. This changes the length of the packet when packed as well which is handled by this function.""" if segment_requests is None: self._segment_requests = [] else: - self._segment_requests: List[Tuple[int, int]] = segment_requests # type: ignore + self._segment_requests: list[tuple[int, int]] = segment_requests # type: ignore self._calculate_directive_field_len() def pack(self) -> bytearray: diff --git a/spacepackets/cfdp/tlv/__init__.py b/spacepackets/cfdp/tlv/__init__.py index 85bc260..2d994f1 100644 --- a/spacepackets/cfdp/tlv/__init__.py +++ b/spacepackets/cfdp/tlv/__init__.py @@ -45,3 +45,39 @@ map_enum_status_code_to_int, map_int_status_code_to_enum, ) + +__all__ = [ + "ORIGINATING_TRANSACTION_ID_MSG_TYPE_ID", + "CfdpTlv", + "DirListingOptions", + "DirectoryListingParameters", + "DirectoryListingRequest", + "DirectoryListingResponse", + "DirectoryOperationMessageType", + "DirectoryParams", + "EntityIdTlv", + "FaultHandlerOverrideTlv", + "FileStoreRequestTlv", + "FileStoreResponseTlv", + "FilestoreActionCode", + "FilestoreResponseStatusCode", + "FlowLabelTlv", + "MessageToUserTlv", + "OriginatingTransactionId", + "ProxyCancelRequest", + "ProxyClosureRequest", + "ProxyMessageType", + "ProxyPutRequest", + "ProxyPutRequestParams", + "ProxyPutResponse", + "ProxyPutResponseParams", + "ProxyTransmissionMode", + "ReservedCfdpMessage", + "TlvHolder", + "TlvList", + "TlvType", + "create_cfdp_proxy_and_dir_op_message_marker", + "map_enum_status_code_to_action_status_code", + "map_enum_status_code_to_int", + "map_int_status_code_to_enum", +] diff --git a/spacepackets/cfdp/tlv/base.py b/spacepackets/cfdp/tlv/base.py index 90dac08..30d1678 100644 --- a/spacepackets/cfdp/tlv/base.py +++ b/spacepackets/cfdp/tlv/base.py @@ -1,10 +1,12 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import List +from typing import TYPE_CHECKING from spacepackets.cfdp.exceptions import TlvTypeMissmatch -from spacepackets.cfdp.tlv.defs import TlvType + +if TYPE_CHECKING: + from spacepackets.cfdp.tlv.defs import TlvType class AbstractTlvBase(ABC): @@ -35,9 +37,9 @@ def __eq__(self, other: object): return False return self.tlv_type == other.tlv_type and self.value == other.value - def check_type(self, tlv_type: TlvType): + def check_type(self, tlv_type: TlvType) -> None: if self.tlv_type != tlv_type: raise TlvTypeMissmatch(found=tlv_type, expected=self.tlv_type) -TlvList = List[AbstractTlvBase] +TlvList = list[AbstractTlvBase] diff --git a/spacepackets/cfdp/tlv/holder.py b/spacepackets/cfdp/tlv/holder.py index ec6ca8a..ca4696a 100644 --- a/spacepackets/cfdp/tlv/holder.py +++ b/spacepackets/cfdp/tlv/holder.py @@ -1,4 +1,4 @@ -from typing import Any, Optional, Type, cast +from typing import Any, Optional, cast from spacepackets.cfdp.tlv.base import TlvType from spacepackets.cfdp.tlv.msg_to_user import MessageToUserTlv @@ -18,15 +18,16 @@ def __init__(self, tlv: Optional[AbstractTlvBase]): self.tlv = tlv @property - def tlv_type(self): + def tlv_type(self) -> None | TlvType: if self.tlv is not None: return self.tlv.tlv_type + return None def __cast_internally( self, - obj_type: Type[AbstractTlvBase], + obj_type: type[AbstractTlvBase], expected_type: TlvType, - ) -> Any: + ) -> Any: # noqa: ANN401 assert self.tlv is not None if self.tlv.tlv_type != expected_type: raise TypeError(f"Invalid object {self.tlv} for type {self.tlv.tlv_type}") diff --git a/spacepackets/cfdp/tlv/msg_to_user.py b/spacepackets/cfdp/tlv/msg_to_user.py index d8b091e..f262e1c 100644 --- a/spacepackets/cfdp/tlv/msg_to_user.py +++ b/spacepackets/cfdp/tlv/msg_to_user.py @@ -6,7 +6,7 @@ import dataclasses from pathlib import Path -from typing import Optional, Tuple +from typing import TYPE_CHECKING from spacepackets.cfdp.defs import ( ConditionCode, @@ -17,7 +17,6 @@ ) from spacepackets.cfdp.exceptions import TlvTypeMissmatch from spacepackets.cfdp.lv import CfdpLv -from spacepackets.cfdp.pdu.finished import FinishedParams from spacepackets.cfdp.tlv.base import AbstractTlvBase from spacepackets.cfdp.tlv.defs import ( ORIGINATING_TRANSACTION_ID_MSG_TYPE_ID, @@ -28,6 +27,9 @@ from spacepackets.cfdp.tlv.tlv import CfdpTlv from spacepackets.util import UnsignedByteField +if TYPE_CHECKING: + from spacepackets.cfdp.pdu.finished import FinishedParams + class MessageToUserTlv(AbstractTlvBase): """Message to User TLV implementation as specified in CCSDS 727.0-B-5 5.4.3""" @@ -41,7 +43,7 @@ def pack(self) -> bytearray: return self.tlv.pack() @property - def packet_len(self): + def packet_len(self) -> int: return self.tlv.packet_len @property @@ -53,11 +55,9 @@ def tlv_type(self) -> TlvType: return MessageToUserTlv.TLV_TYPE def is_reserved_cfdp_message(self) -> bool: - if len(self.tlv.value) >= 5 and self.tlv.value[0:4].decode() == "cfdp": - return True - return False + return bool(len(self.tlv.value) >= 5 and self.tlv.value[0:4].decode() == "cfdp") - def to_reserved_msg_tlv(self) -> Optional[ReservedCfdpMessage]: + def to_reserved_msg_tlv(self) -> ReservedCfdpMessage | None: """Attempt to convert to a reserved CFDP message. Please note that this operation will fail if the message if not a reserved CFDP message and will then return None. This method is especially useful to have access to the more specialized @@ -67,7 +67,7 @@ def to_reserved_msg_tlv(self) -> Optional[ReservedCfdpMessage]: return ReservedCfdpMessage(self.tlv.value[4], self.tlv.value[5:]) @classmethod - def __empty(cls): + def __empty(cls) -> MessageToUserTlv: return cls(b"") @classmethod @@ -115,7 +115,7 @@ def to_generic_msg_to_user_tlv(self) -> MessageToUserTlv: return MessageToUserTlv.from_tlv(self.tlv) @property - def packet_len(self): + def packet_len(self) -> int: return self.tlv.packet_len @property @@ -146,19 +146,19 @@ def is_directory_operation(self) -> bool: def is_originating_transaction_id(self) -> bool: return self.get_reserved_cfdp_message_type() == ORIGINATING_TRANSACTION_ID_MSG_TYPE_ID - def get_cfdp_proxy_message_type(self) -> Optional[ProxyMessageType]: + def get_cfdp_proxy_message_type(self) -> ProxyMessageType | None: if not self.is_cfdp_proxy_operation(): return None return ProxyMessageType(self.get_reserved_cfdp_message_type()) - def get_directory_operation_type(self) -> Optional[DirectoryOperationMessageType]: + def get_directory_operation_type(self) -> DirectoryOperationMessageType | None: if not self.is_directory_operation(): return None return DirectoryOperationMessageType(self.get_reserved_cfdp_message_type()) def get_originating_transaction_id( self, - ) -> Optional[TransactionId]: + ) -> TransactionId | None: if not self.is_originating_transaction_id(): return None if len(self.value) < 1: @@ -176,7 +176,7 @@ def get_originating_transaction_id( UnsignedByteField.from_bytes(seq_num), ) - def get_proxy_put_request_params(self) -> Optional[ProxyPutRequestParams]: + def get_proxy_put_request_params(self) -> ProxyPutRequestParams | None: """This function extract the proxy put request parameters from the raw value if applicable. If the value format is invalid, this function will return None.""" if ( @@ -200,7 +200,7 @@ def get_proxy_put_request_params(self) -> Optional[ProxyPutRequestParams]: dest_name_lv, ) - def get_proxy_put_response_params(self) -> Optional[ProxyPutResponseParams]: + def get_proxy_put_response_params(self) -> ProxyPutResponseParams | None: if ( not self.is_cfdp_proxy_operation() or self.get_cfdp_proxy_message_type() != ProxyMessageType.PUT_RESPONSE @@ -211,7 +211,7 @@ def get_proxy_put_response_params(self) -> Optional[ProxyPutResponseParams]: file_status = FileStatus(self.value[5] & 0b11) return ProxyPutResponseParams(condition_code, delivery_code, file_status) - def get_proxy_closure_requested(self) -> Optional[bool]: + def get_proxy_closure_requested(self) -> bool | None: if ( not self.is_cfdp_proxy_operation() or self.get_cfdp_proxy_message_type() != ProxyMessageType.CLOSURE_REQUEST @@ -219,7 +219,7 @@ def get_proxy_closure_requested(self) -> Optional[bool]: return None return self.value[5] & 0b1 - def get_proxy_transmission_mode(self) -> Optional[TransmissionMode]: + def get_proxy_transmission_mode(self) -> TransmissionMode | None: if ( not self.is_cfdp_proxy_operation() or self.get_cfdp_proxy_message_type() != ProxyMessageType.TRANSMISSION_MODE @@ -227,7 +227,7 @@ def get_proxy_transmission_mode(self) -> Optional[TransmissionMode]: return None return TransmissionMode(self.value[5] & 0b1) - def get_dir_listing_request_params(self) -> Optional[DirectoryParams]: + def get_dir_listing_request_params(self) -> DirectoryParams | None: if ( not self.is_directory_operation() or self.get_directory_operation_type() != DirectoryOperationMessageType.LISTING_REQUEST @@ -237,7 +237,7 @@ def get_dir_listing_request_params(self) -> Optional[DirectoryParams]: dir_file_name_lv = CfdpLv.unpack(self.value[5 + dir_path_lv.packet_len :]) return DirectoryParams(dir_path_lv, dir_file_name_lv) - def get_dir_listing_response_params(self) -> Optional[Tuple[bool, DirectoryParams]]: + def get_dir_listing_response_params(self) -> tuple[bool, DirectoryParams] | None: """ Returns --------- @@ -259,7 +259,7 @@ def get_dir_listing_response_params(self) -> Optional[Tuple[bool, DirectoryParam dir_file_name_lv = CfdpLv.unpack(self.value[6 + dir_path_lv.packet_len :]) return listing_success, DirectoryParams(dir_path_lv, dir_file_name_lv) - def get_dir_listing_options(self) -> Optional[DirListingOptions]: + def get_dir_listing_options(self) -> DirListingOptions | None: if ( not self.is_directory_operation() or self.get_directory_operation_type() diff --git a/spacepackets/cfdp/tlv/tlv.py b/spacepackets/cfdp/tlv/tlv.py index 781aa38..67ea174 100644 --- a/spacepackets/cfdp/tlv/tlv.py +++ b/spacepackets/cfdp/tlv/tlv.py @@ -1,7 +1,5 @@ from __future__ import annotations -from typing import Optional, Tuple - from spacepackets.cfdp.defs import ( ConditionCode, FaultHandlerCode, @@ -24,7 +22,7 @@ def map_enum_status_code_to_int(status_code: FilestoreResponseStatusCode) -> int def map_enum_status_code_to_action_status_code( status_code_enum: FilestoreResponseStatusCode, -) -> Tuple[FilestoreActionCode, int]: +) -> tuple[FilestoreActionCode, int]: """Map a given file store response status code to the action code and the corresponding 4 bit status code. the status code will be 0x00 for a SUCCESS operation and 0b1111 if the operation was not performed. @@ -45,8 +43,7 @@ def map_int_status_code_to_enum( code was detected """ try: - status_code = FilestoreResponseStatusCode(action_code << 4 | status_code) - return status_code + return FilestoreResponseStatusCode(action_code << 4 | status_code) except (IndexError, ValueError): return FilestoreResponseStatusCode.INVALID @@ -74,11 +71,11 @@ def __init__(self, tlv_type: TlvType, value: bytes): self._value = value @property - def tlv_type(self): + def tlv_type(self) -> TlvType: return self._tlv_type @tlv_type.setter - def tlv_type(self, tlv_type: TlvType): + def tlv_type(self, tlv_type: TlvType) -> None: self._tlv_type = tlv_type @property @@ -105,10 +102,10 @@ def unpack(cls, data: bytes) -> CfdpTlv: raise BytesTooShortError(2, len(data)) try: tlv_type = TlvType(data[0]) - except ValueError: + except ValueError as err: raise ValueError( f"TLV field invalid, found value {data[0]} is not a possible TLV" " parameter" - ) + ) from err value = bytearray() if len(data) > 2: @@ -151,7 +148,7 @@ def pack(self) -> bytearray: return self.tlv.pack() @property - def packet_len(self): + def packet_len(self) -> int: return self.tlv.packet_len @property @@ -203,14 +200,14 @@ def __init__(self, flow_label: bytes): self.tlv = CfdpTlv(tlv_type=self.tlv_type, value=flow_label) @classmethod - def __empty(cls): + def __empty(cls) -> FlowLabelTlv: return cls(b"") def pack(self) -> bytearray: return self.tlv.pack() @property - def packet_len(self): + def packet_len(self) -> int: return self.tlv.packet_len @property @@ -249,7 +246,7 @@ def __init__( self.action_code = action_code self.first_file_name = first_file_name self.second_file_name = second_file_name - self.tlv: Optional[CfdpTlv] = None + self.tlv: CfdpTlv | None = None def _common_packer(self, status_code: int) -> bytearray: tlv_value = bytearray() @@ -277,18 +274,18 @@ def common_packet_len(self) -> int: return expected_len @staticmethod - def _check_raw_tlv_field(first_byte: int, expected: TlvType): + def _check_raw_tlv_field(first_byte: int, expected: TlvType) -> None: try: raw_tlv_type = TlvType(first_byte) if raw_tlv_type != expected: raise TlvTypeMissmatch(raw_tlv_type, expected) - except IndexError: - raise ValueError(f"No TLV type for raw field {first_byte}") + except IndexError as err: + raise ValueError(f"No TLV type for raw field {first_byte}") from err @staticmethod def _common_unpacker( raw_bytes: bytes, - ) -> Tuple[FilestoreActionCode, str, int, int, Optional[str]]: + ) -> tuple[FilestoreActionCode, str, int, int, str | None]: """Does only unpack common fields, does not unpack the filestore message of a Filestore Response package @@ -300,10 +297,10 @@ def _common_unpacker( action_code_as_int = (raw_bytes[value_idx] >> 4) & 0x0F try: action_code = FilestoreActionCode(action_code_as_int) - except ValueError: + except ValueError as err: raise ValueError( "Invalid action code in file store response with value" f" {action_code_as_int}" - ) + ) from err status_code_as_int = raw_bytes[value_idx] & 0x0F value_idx += 1 first_lv = CfdpLv.unpack(raw_bytes=raw_bytes[value_idx:]) @@ -343,7 +340,7 @@ def __init__( second_file_name=second_file_name, ) - def generate_tlv(self): + def generate_tlv(self) -> None: if self.tlv is None: self.tlv = self._build_tlv() @@ -352,7 +349,7 @@ def pack(self) -> bytearray: return self.tlv.pack() @property - def packet_len(self): + def packet_len(self) -> int: return self.common_packet_len() @property @@ -392,7 +389,7 @@ def from_tlv(cls, cfdp_tlv: CfdpTlv) -> FileStoreRequestTlv: return fs_response @classmethod - def _set_fields(cls, instance: FileStoreRequestTlv, raw_data: bytes): + def _set_fields(cls, instance: FileStoreRequestTlv, raw_data: bytes) -> FileStoreRequestTlv: action_code, first_name, _, _, second_name = cls._common_unpacker(raw_bytes=raw_data) instance.action_code = action_code instance.first_file_name = first_name @@ -410,8 +407,10 @@ def __init__( status_code: FilestoreResponseStatusCode, first_file_name: str, second_file_name: str = "", - filestore_msg: CfdpLv = CfdpLv(value=b""), + filestore_msg: None | CfdpLv = None, ): + if filestore_msg is None: + filestore_msg = CfdpLv(value=b"") super().__init__( action_code=action_code, first_file_name=first_file_name, @@ -420,7 +419,7 @@ def __init__( self.status_code = status_code self.filestore_msg = filestore_msg - def generate_tlv(self): + def generate_tlv(self) -> None: if self.tlv is None: self.tlv = self._build_tlv() @@ -434,7 +433,7 @@ def value(self) -> bytes: return self.tlv.value # type: ignore @property - def packet_len(self): + def packet_len(self) -> int: return self.common_packet_len() + self.filestore_msg.packet_len @property @@ -472,7 +471,7 @@ def from_tlv(cls, cfdp_tlv: CfdpTlv) -> FileStoreResponseTlv: return fs_response @classmethod - def _set_fields(cls, instance: FileStoreResponseTlv, data: bytes): + def _set_fields(cls, instance: FileStoreResponseTlv, data: bytes) -> None: action_code, first_name, status_code, idx, second_name = cls._common_unpacker( raw_bytes=data ) @@ -480,11 +479,11 @@ def _set_fields(cls, instance: FileStoreResponseTlv, data: bytes): instance.first_file_name = first_name try: status_code_named = FilestoreResponseStatusCode(action_code << 4 | status_code) - except ValueError: + except ValueError as err: raise ValueError( "Invalid status code in file store response with value" f" {status_code} for action code {action_code}" - ) + ) from err instance.status_code = status_code_named if second_name is not None: instance.second_file_name = second_name @@ -492,8 +491,8 @@ def _set_fields(cls, instance: FileStoreResponseTlv, data: bytes): class EntityIdTlv(AbstractTlvBase): - """This helper class has a :py:meth:`__eq__` implementation which only compares the numerical value - of the entity IDs""" + """This helper class has a :py:meth:`__eq__` implementation which only compares the numerical + value of the entity IDs""" TLV_TYPE = TlvType.ENTITY_ID @@ -504,7 +503,7 @@ def pack(self) -> bytearray: return self.tlv.pack() @property - def packet_len(self): + def packet_len(self) -> int: return self.tlv.packet_len @property diff --git a/spacepackets/countdown.py b/spacepackets/countdown.py index de4dd88..a221917 100644 --- a/spacepackets/countdown.py +++ b/spacepackets/countdown.py @@ -2,7 +2,6 @@ import time from datetime import timedelta -from typing import Optional def time_ms() -> int: @@ -14,7 +13,7 @@ class Countdown: """Utility class for counting down time. Exposes a simple API to initiate it with an initial timeout and to check whether is has expired.""" - def __init__(self, init_timeout: Optional[timedelta]): + def __init__(self, init_timeout: timedelta | None): if init_timeout is not None: self._timeout_ms = int(init_timeout / timedelta(milliseconds=1)) self._start_time_ms = time_ms() @@ -40,27 +39,25 @@ def timeout(self) -> timedelta: return timedelta(milliseconds=self._timeout_ms) @timeout.setter - def timeout(self, timeout: timedelta): + def timeout(self, timeout: timedelta) -> None: """Set a new timeout for the countdown instance.""" self._timeout_ms = round(timeout / timedelta(milliseconds=1)) def timed_out(self) -> bool: - if round(time_ms() - self._start_time_ms) >= self._timeout_ms: - return True - return False + return round(time_ms() - self._start_time_ms) >= self._timeout_ms def busy(self) -> bool: return not self.timed_out() - def reset(self, new_timeout: Optional[timedelta] = None): + def reset(self, new_timeout: timedelta | None = None) -> None: if new_timeout is not None: self.timeout = new_timeout self.start() - def start(self): + def start(self) -> None: self._start_time_ms = time_ms() - def time_out(self): + def time_out(self) -> None: self._start_time_ms = 0 def remaining_time(self) -> timedelta: diff --git a/spacepackets/ecss/__init__.py b/spacepackets/ecss/__init__.py index a41eae5..1775616 100644 --- a/spacepackets/ecss/__init__.py +++ b/spacepackets/ecss/__init__.py @@ -17,6 +17,28 @@ from .tc import PusTc, PusTcDataFieldHeader, PusTelecommand from .tm import PusTelemetry, PusTm, PusTmSecondaryHeader +__all__ = [ + "PacketFieldBase", + "PacketFieldEnum", + "PacketFieldU8", + "PacketFieldU16", + "PacketFieldU32", + "PfcReal", + "PfcSigned", + "PfcUnsigned", + "Ptc", + "PusService", + "PusTc", + "PusTcDataFieldHeader", + "PusTelecommand", + "PusTelemetry", + "PusTm", + "PusTmSecondaryHeader", + "PusVerificator", + "PusVersion", + "RequestId", +] + def check_pus_crc(tc_packet: bytes) -> bool: """Checks the CRC of a given raw PUS packet. It is expected that the passed packet is the exact diff --git a/spacepackets/ecss/exceptions.py b/spacepackets/ecss/exceptions.py index abc208a..c48c923 100644 --- a/spacepackets/ecss/exceptions.py +++ b/spacepackets/ecss/exceptions.py @@ -4,4 +4,3 @@ class TmSrcDataTooShortError(BytesTooShortError): """Similar to the :py:class:`BytesTooShortError`, but specifies that the source data field of the PUS telemetry packet is too short.""" - diff --git a/spacepackets/ecss/fields.py b/spacepackets/ecss/fields.py index e6912ec..8125f8b 100644 --- a/spacepackets/ecss/fields.py +++ b/spacepackets/ecss/fields.py @@ -87,20 +87,20 @@ def __init__(self, pfc: int, val: int): self.val = val @classmethod - def with_byte_size(cls, num_bytes: int, val: int): + def with_byte_size(cls, num_bytes: int, val: int) -> PacketFieldEnum: return cls(num_bytes * 8, val) def pack(self) -> bytearray: num_bytes = self.check_pfc(self.pfc) return bytearray(IntByteConversion.to_unsigned(num_bytes, self.val)) - def len(self): + def len(self) -> int: """Return the length in bytes. This will raise a ValueError for non-byte-aligned PFC values.""" return self.check_pfc(self.pfc) @classmethod - def unpack(cls, data: bytes, pfc: int): + def unpack(cls, data: bytes, pfc: int) -> PacketFieldEnum: """Construct from a raw bytestream. :raises BytesTooShortError: Raw bytestream too short. diff --git a/spacepackets/ecss/pus_17_test.py b/spacepackets/ecss/pus_17_test.py index eeb620a..756d538 100644 --- a/spacepackets/ecss/pus_17_test.py +++ b/spacepackets/ecss/pus_17_test.py @@ -1,12 +1,15 @@ from __future__ import annotations import enum +from typing import TYPE_CHECKING -from spacepackets import SpacePacketHeader -from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl from spacepackets.ecss.defs import PusService from spacepackets.ecss.tm import AbstractPusTm, PusTm +if TYPE_CHECKING: + from spacepackets import SpacePacketHeader + from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl + class Subservice(enum.IntEnum): TC_PING = 1 diff --git a/spacepackets/ecss/pus_1_verification.py b/spacepackets/ecss/pus_1_verification.py index 0a2d6fb..4e2514f 100644 --- a/spacepackets/ecss/pus_1_verification.py +++ b/spacepackets/ecss/pus_1_verification.py @@ -4,19 +4,21 @@ import enum from dataclasses import dataclass -from typing import Optional +from typing import TYPE_CHECKING -from spacepackets.ccsds import SpacePacketHeader -from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl -from spacepackets.ecss import PusTc +from spacepackets import BytesTooShortError from spacepackets.ecss.defs import PusService from spacepackets.ecss.fields import PacketFieldEnum from spacepackets.ecss.tm import AbstractPusTm, PusTm -from .. import BytesTooShortError from .exceptions import TmSrcDataTooShortError from .req_id import RequestId +if TYPE_CHECKING: + from spacepackets.ccsds import SpacePacketHeader + from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl + from spacepackets.ecss import PusTc + class Subservice(enum.IntEnum): INVALID = 0 @@ -42,13 +44,15 @@ def __init__(self, code: ErrorCode, data: bytes): def pack(self) -> bytes: data = self.code.pack() data.extend(self.data) - return data + return bytes(data) - def len(self): + def len(self) -> int: return self.code.len() + len(self.data) @classmethod - def unpack(cls, data: bytes, num_bytes_err_code: int, num_bytes_data: Optional[int] = None): + def unpack( + cls, data: bytes, num_bytes_err_code: int, num_bytes_data: int | None = None + ) -> FailureNotice: pfc = num_bytes_err_code * 8 if num_bytes_data is None: num_bytes_data = len(data) - num_bytes_err_code @@ -74,8 +78,8 @@ class UnpackParams: @dataclass class VerificationParams: req_id: RequestId - step_id: Optional[StepId] = None - failure_notice: Optional[FailureNotice] = None + step_id: StepId | None = None + failure_notice: FailureNotice | None = None def pack(self) -> bytearray: data = bytearray(self.req_id.pack()) @@ -85,7 +89,7 @@ def pack(self) -> bytearray: data.extend(self.failure_notice.pack()) return data - def len(self): + def len(self) -> int: init_len = 4 if self.step_id is not None: init_len += self.step_id.len() @@ -93,7 +97,7 @@ def len(self): init_len += self.failure_notice.len() return init_len - def verify_against_subservice(self, subservice: Subservice): + def verify_against_subservice(self, subservice: Subservice) -> None: if subservice % 2 == 0: if self.failure_notice is None: raise InvalidVerifParams("Failure Notice should be something") @@ -122,7 +126,7 @@ def __init__( apid: int, subservice: Subservice, timestamp: bytes, - verif_params: Optional[VerificationParams] = None, + verif_params: VerificationParams | None = None, seq_count: int = 0, packet_version: int = 0b000, space_time_ref: int = 0b0000, @@ -197,11 +201,11 @@ def sp_header(self) -> SpacePacketHeader: return self.pus_tm.space_packet_header @property - def service(self): + def service(self) -> int: return self.pus_tm.service @property - def subservice(self): + def subservice(self) -> int: return self.pus_tm.subservice @property @@ -209,7 +213,7 @@ def source_data(self) -> bytes: return self.pus_tm.source_data @classmethod - def _unpack_raw_tm(cls, instance: Service1Tm, params: UnpackParams): + def _unpack_raw_tm(cls, instance: Service1Tm, params: UnpackParams) -> None: tm_data = instance.pus_tm.tm_data if len(tm_data) < 4: raise TmSrcDataTooShortError(4, len(tm_data)) @@ -219,7 +223,7 @@ def _unpack_raw_tm(cls, instance: Service1Tm, params: UnpackParams): else: instance._unpack_success_verification(params) - def _unpack_failure_verification(self, unpack_cfg: UnpackParams): + def _unpack_failure_verification(self, unpack_cfg: UnpackParams) -> None: """Handle parsing a verification failure packet, subservice ID 2, 4, 6 or 8""" tm_data = self.pus_tm.tm_data subservice = self.pus_tm.subservice @@ -240,7 +244,7 @@ def _unpack_failure_verification(self, unpack_cfg: UnpackParams): tm_data[current_idx:], unpack_cfg.bytes_err_code, len(tm_data) - current_idx ) - def _unpack_success_verification(self, unpack_cfg: UnpackParams): + def _unpack_success_verification(self, unpack_cfg: UnpackParams) -> None: if self.pus_tm.subservice == Subservice.TM_STEP_SUCCESS: try: self._verif_params.step_id = StepId.unpack( @@ -248,12 +252,12 @@ def _unpack_success_verification(self, unpack_cfg: UnpackParams): data=self.pus_tm.tm_data[4 : 4 + unpack_cfg.bytes_step_id], ) except BytesTooShortError as e: - raise TmSrcDataTooShortError(e.expected_len, e.bytes_len) + raise TmSrcDataTooShortError(e.expected_len, e.bytes_len) from e elif self.pus_tm.subservice not in [1, 3, 7]: raise ValueError(f"invalid subservice {self.pus_tm.subservice}, not in [1, 3, 7]") @property - def failure_notice(self) -> Optional[FailureNotice]: + def failure_notice(self) -> FailureNotice | None: return self._verif_params.failure_notice @property @@ -261,15 +265,15 @@ def has_failure_notice(self) -> bool: return (self.subservice % 2) == 0 @property - def tc_req_id(self): + def tc_req_id(self) -> RequestId: return self._verif_params.req_id @tc_req_id.setter - def tc_req_id(self, value): + def tc_req_id(self, value: RequestId) -> None: self._verif_params.req_id = value @property - def error_code(self) -> Optional[ErrorCode]: + def error_code(self) -> ErrorCode | None: if self.has_failure_notice: assert self._verif_params.failure_notice is not None return self._verif_params.failure_notice.code @@ -277,13 +281,10 @@ def error_code(self) -> Optional[ErrorCode]: @property def is_step_reply(self) -> bool: - return ( - self.subservice == Subservice.TM_STEP_FAILURE - or self.subservice == Subservice.TM_STEP_SUCCESS - ) + return self.subservice in (Subservice.TM_STEP_FAILURE, Subservice.TM_STEP_SUCCESS) @property - def step_id(self) -> Optional[StepId]: + def step_id(self) -> StepId | None: """Retrieve the step number. Returns NONE if this packet does not have a step ID""" return self._verif_params.step_id diff --git a/spacepackets/ecss/pus_verificator.py b/spacepackets/ecss/pus_verificator.py index 8ac6350..2852970 100644 --- a/spacepackets/ecss/pus_verificator.py +++ b/spacepackets/ecss/pus_verificator.py @@ -1,6 +1,6 @@ import enum from dataclasses import dataclass, field -from typing import Dict, List, Optional +from typing import Optional from spacepackets.ecss import PusTc from spacepackets.ecss.pus_1_verification import RequestId, Service1Tm, Subservice @@ -18,11 +18,11 @@ class VerificationStatus: accepted: StatusField = StatusField.UNSET started: StatusField = StatusField.UNSET step: StatusField = StatusField.UNSET - step_list: List[int] = field(default_factory=list) + step_list: list[int] = field(default_factory=list) completed: StatusField = StatusField.UNSET -VerifDictT = Dict[RequestId, VerificationStatus] +VerifDictT = dict[RequestId, VerificationStatus] @dataclass @@ -54,7 +54,7 @@ class PusVerificator: """ def __init__(self): - self._verif_dict: VerifDictT = dict() + self._verif_dict: VerifDictT = {} def add_tc(self, tc: PusTc) -> bool: req_id = RequestId.from_sp_header(tc.sp_header) @@ -116,23 +116,23 @@ def _check_subservice( return res @property - def verif_dict(self): + def verif_dict(self) -> VerifDictT: return self._verif_dict def _handle_step_failure( self, verif_status: VerificationStatus, res: TmCheckResult, pus_1_tm: Service1Tm - ): + ) -> None: self._check_all_replies_recvd_after_step(verif_status) verif_status.step = StatusField.FAILURE verif_status.step_list.append(pus_1_tm.step_id.val) res.completed = True @staticmethod - def _check_all_replies_recvd_after_step(verif_stat: VerificationStatus): - if verif_stat.accepted != StatusField.UNSET and verif_stat.started != StatusField.UNSET: + def _check_all_replies_recvd_after_step(verif_stat: VerificationStatus) -> None: + if StatusField.UNSET not in (verif_stat.accepted, verif_stat.started): verif_stat.all_verifs_recvd = True - def remove_completed_entries(self): + def remove_completed_entries(self) -> None: self._verif_dict = { key: val for key, val in self._verif_dict.items() if not val.all_verifs_recvd } diff --git a/spacepackets/ecss/req_id.py b/spacepackets/ecss/req_id.py index c2f98fb..94d891f 100644 --- a/spacepackets/ecss/req_id.py +++ b/spacepackets/ecss/req_id.py @@ -1,10 +1,13 @@ from __future__ import annotations import struct +from typing import TYPE_CHECKING from spacepackets import BytesTooShortError from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl, SpacePacketHeader -from spacepackets.ecss.tc import PusTc + +if TYPE_CHECKING: + from spacepackets.ecss.tc import PusTc class RequestId: @@ -29,7 +32,7 @@ def __init__(self, tc_packet_id: PacketId, tc_psc: PacketSeqCtrl, ccsds_version: self.ccsds_version = ccsds_version @classmethod - def empty(cls): + def empty(cls) -> RequestId: return cls(PacketId.empty(), PacketSeqCtrl.empty()) @classmethod @@ -45,7 +48,7 @@ def unpack(cls, tm_data: bytes) -> RequestId: ) @classmethod - def from_pus_tc(cls, pus_tc: PusTc): + def from_pus_tc(cls, pus_tc: PusTc) -> RequestId: return cls.from_sp_header(pus_tc.sp_header) @classmethod @@ -61,9 +64,9 @@ def pack(self) -> bytes: packet_id_and_version = (self.ccsds_version << 13) | self.tc_packet_id.raw() raw.extend(struct.pack("!H", packet_id_and_version)) raw.extend(struct.pack("!H", self.tc_psc.raw())) - return raw + return bytes(raw) - def as_u32(self): + def as_u32(self) -> int: packet_id_and_version = (self.ccsds_version << 13) | self.tc_packet_id.raw() return (packet_id_and_version << 16) | self.tc_psc.raw() diff --git a/spacepackets/ecss/tc.py b/spacepackets/ecss/tc.py index e548afd..d3e4330 100644 --- a/spacepackets/ecss/tc.py +++ b/spacepackets/ecss/tc.py @@ -5,7 +5,6 @@ from __future__ import annotations import struct -from typing import Optional, Tuple import deprecation from crcmod.predefined import PredefinedCrc @@ -95,7 +94,7 @@ def __eq__(self, other: object): return False @classmethod - def get_header_size(cls): + def get_header_size(cls) -> int: return cls.PUS_C_SEC_HEADER_LEN @@ -161,7 +160,7 @@ def __init__( ) self._app_data = app_data self._valid = True - self._crc16: Optional[bytes] = None + self._crc16: bytes | None = None @classmethod def from_sp_header( @@ -172,7 +171,7 @@ def from_sp_header( app_data: bytes = bytes([]), source_id: int = 0, ack_flags: int = 0b1111, - ): + ) -> PusTc: pus_tc = cls.empty() sp_header.packet_type = PacketType.TC sp_header.sec_header_flag = True @@ -244,7 +243,7 @@ def to_space_packet(self) -> SpacePacket: user_data.extend(self._crc16) # type: ignore return SpacePacket(self.sp_header, self.pus_tc_sec_header.pack(), user_data) - def calc_crc(self): + def calc_crc(self) -> None: """Can be called to calculate the CRC16. Also sets the internal CRC16 field.""" crc = PredefinedCrc(crc_name="crc-ccitt-false") crc.update(self.sp_header.pack()) @@ -305,20 +304,18 @@ def get_data_length(app_data_len: int, secondary_header_len: int) -> int: The size of the TC packet is the size of the packet secondary header with source ID + the length of the application data + length of the CRC16 checksum - 1 """ - data_length = secondary_header_len + app_data_len + 1 - return data_length + return secondary_header_len + app_data_len + 1 @deprecation.deprecated( deprecated_in="v0.14.0rc3", current_version=get_version(), details="use pack and the class itself to build this instead", ) - def pack_command_tuple(self) -> Tuple[bytearray, PusTc]: + def pack_command_tuple(self) -> tuple[bytearray, PusTc]: """Pack a tuple consisting of the raw packet as the first entry and the class representation as the second entry """ - command_tuple = (self.pack(), self) - return command_tuple + return (self.pack(), self) @property def service(self) -> int: @@ -333,7 +330,7 @@ def source_id(self) -> int: return self.pus_tc_sec_header.source_id @source_id.setter - def source_id(self, source_id: int): + def source_id(self, source_id: int) -> None: self.pus_tc_sec_header.source_id = source_id @property @@ -361,22 +358,22 @@ def app_data(self) -> bytes: return self._app_data @app_data.setter - def app_data(self, app_data: bytes): + def app_data(self, app_data: bytes) -> None: self._app_data = app_data @property - def crc16(self) -> Optional[bytes]: + def crc16(self) -> bytes | None: """Will be the raw CRC16 if the telecommand was created using :py:meth:`unpack`, :py:meth:`pack` was called at least once or :py:meth:`calc_crc` was called at least once.""" return self._crc16 @seq_count.setter - def seq_count(self, value): + def seq_count(self, value: int) -> None: self.sp_header.seq_count = value @apid.setter - def apid(self, apid): + def apid(self, apid: int) -> None: self.sp_header.apid = apid @@ -391,7 +388,7 @@ def generate_packet_crc(tc_packet: bytearray) -> bytes: crc = CRC16_CCITT_FUNC(tc_packet[0 : len(tc_packet) - 2]) tc_packet[len(tc_packet) - 2] = (crc & 0xFF00) >> 8 tc_packet[len(tc_packet) - 1] = crc & 0xFF - return tc_packet + return bytes(tc_packet) def generate_crc(data: bytearray) -> bytes: @@ -400,4 +397,4 @@ def generate_crc(data: bytearray) -> bytes: data_with_crc += data crc = CRC16_CCITT_FUNC(data) data_with_crc.extend(struct.pack("!H", crc)) - return data_with_crc + return bytes(data_with_crc) diff --git a/spacepackets/ecss/tm.py b/spacepackets/ecss/tm.py index 121a0e1..f8e1116 100644 --- a/spacepackets/ecss/tm.py +++ b/spacepackets/ecss/tm.py @@ -6,7 +6,6 @@ import struct from abc import abstractmethod -from typing import Optional import deprecation from crcmod.predefined import PredefinedCrc @@ -15,6 +14,7 @@ CCSDS_HEADER_LEN, SPACE_PACKET_HEADER_SIZE, AbstractSpacePacket, + PacketId, PacketSeqCtrl, PacketType, SequenceFlags, @@ -210,14 +210,14 @@ class PusTm(AbstractPusTm): The following doc example cuts off the timestamp (7 byte CDS Short) and the CRC16 from the ping packet because those change regularly. - >>> ping_tm = PusTm(service=17, subservice=2, seq_count=5, apid=0x01, timestamp=CdsShortTimestamp.now().pack()) # noqa + >>> ping_tm = PusTm(service=17, subservice=2, seq_count=5, apid=0x01, timestamp=CdsShortTimestamp.now().pack()) >>> ping_tm.service 17 >>> ping_tm.subservice 2 >>> ping_tm.pack()[:-9].hex(sep=',') '08,01,c0,05,00,0f,20,11,02,00,00,00,00' - """ + """ # noqa: E501 CDS_SHORT_SIZE = 7 PUS_TIMESTAMP_SIZE = CDS_SHORT_SIZE @@ -258,7 +258,7 @@ def __init__( spacecraft_time_ref=space_time_ref, timestamp=timestamp, ) - self._crc16: Optional[bytes] = None + self._crc16: bytes | None = None @classmethod def empty(cls) -> PusTm: @@ -280,7 +280,7 @@ def pack(self, recalc_crc: bool = True) -> bytearray: tm_packet_raw.extend(self._crc16) return tm_packet_raw - def calc_crc(self): + def calc_crc(self) -> None: """Can be called to calculate the CRC16""" crc = PredefinedCrc(crc_name="crc-ccitt-false") crc.update(self.space_packet_header.pack()) @@ -424,7 +424,7 @@ def tm_data(self) -> bytes: return self._source_data @tm_data.setter - def tm_data(self, data: bytes): + def tm_data(self, data: bytes) -> None: self._source_data = data stamp_len = len(self.pus_tm_sec_header.timestamp) self.space_packet_header.data_len = self.data_len_from_src_len_timestamp_len( @@ -432,23 +432,23 @@ def tm_data(self, data: bytes): ) @property - def apid(self): + def apid(self) -> int: return self.space_packet_header.apid @apid.setter - def apid(self, apid: int): + def apid(self, apid: int) -> None: self.space_packet_header.apid = apid @property - def seq_flags(self): + def seq_flags(self) -> SequenceFlags: return self.space_packet_header.seq_flags @seq_flags.setter - def seq_flags(self, seq_flags): + def seq_flags(self, seq_flags: SequenceFlags) -> None: self.space_packet_header.seq_flags = seq_flags @property - def packet_id(self): + def packet_id(self) -> PacketId: return self.space_packet_header.packet_id @staticmethod @@ -481,7 +481,7 @@ def seq_count(self) -> int: return self.space_packet_header.seq_count @property - def crc16(self) -> Optional[bytes]: + def crc16(self) -> bytes | None: """Will be the raw CRC16 if the telecommand was created using :py:meth:`unpack` or :py:meth:`pack` was called at least once.""" return self._crc16 @@ -504,7 +504,7 @@ def get_full_packet_string(self, print_format: PrintFormats = PrintFormats.HEX) "use pack and get_printable_data_string or the hex method on bytearray" " instead" ), ) - def print_full_packet_string(self, print_format: PrintFormats = PrintFormats.HEX): + def print_full_packet_string(self, print_format: PrintFormats = PrintFormats.HEX) -> None: """Print the full TM packet in a clean format.""" print(self.get_full_packet_string(print_format=print_format)) @@ -513,7 +513,7 @@ def print_full_packet_string(self, print_format: PrintFormats = PrintFormats.HEX current_version=get_version(), details=("use print, the source_data property and the hex method on bytearray" " instead"), ) - def print_source_data(self, print_format: PrintFormats = PrintFormats.HEX): + def print_source_data(self, print_format: PrintFormats = PrintFormats.HEX) -> None: """Prints the TM source data in a clean format""" print(self.get_source_data_string(print_format=print_format)) diff --git a/spacepackets/seqcount.py b/spacepackets/seqcount.py index 8f504c2..e5339fe 100644 --- a/spacepackets/seqcount.py +++ b/spacepackets/seqcount.py @@ -14,7 +14,7 @@ def max_bit_width(self) -> int: @max_bit_width.setter @abstractmethod - def max_bit_width(self, width: int): + def max_bit_width(self, width: int) -> None: pass @abstractmethod @@ -44,10 +44,10 @@ def max_bit_width(self) -> int: return self._max_bit_width @max_bit_width.setter - def max_bit_width(self, width: int): + def max_bit_width(self, width: int) -> None: self._max_bit_width = width - def create_new(self): + def create_new(self) -> None: with open(self.file_name, "w") as file: file.write("0\n") @@ -101,7 +101,7 @@ def max_bit_width(self) -> int: return self._max_bit_width @max_bit_width.setter - def max_bit_width(self, width: int): + def max_bit_width(self, width: int) -> None: self._max_bit_width = width def get_and_increment(self) -> int: diff --git a/spacepackets/uslp/__init__.py b/spacepackets/uslp/__init__.py index 2e3d32c..019c7fa 100644 --- a/spacepackets/uslp/__init__.py +++ b/spacepackets/uslp/__init__.py @@ -13,3 +13,16 @@ SourceOrDestField, TruncatedPrimaryHeader, ) + +__all__ = [ + "BypassSequenceControlFlag", + "FrameHeaderT", + "PrimaryHeader", + "ProtocolCommandFlag", + "SourceOrDestField", + "TfdzConstructionRules", + "TransferFrame", + "TransferFrameDataField", + "TruncatedPrimaryHeader", + "UslpProtocolIdentifier", +] diff --git a/spacepackets/uslp/frame.py b/spacepackets/uslp/frame.py index fcccf64..9f106e7 100644 --- a/spacepackets/uslp/frame.py +++ b/spacepackets/uslp/frame.py @@ -2,7 +2,7 @@ import enum import struct -from typing import Optional, Union +from typing import Union from .defs import ( UslpFhpVhopFieldMissing, @@ -41,8 +41,8 @@ def __init__( self, has_insert_zone: bool, has_fecf: bool, - insert_zone_len: Optional[int] = None, - fecf_len: Optional[int] = None, + insert_zone_len: int | None = None, + fecf_len: int | None = None, ): if has_insert_zone and insert_zone_len is None: raise ValueError @@ -60,8 +60,8 @@ def __init__( fixed_len: int, has_insert_zone: bool, has_fecf: bool, - insert_zone_len: Optional[int] = None, - fecf_len: Optional[int] = None, + insert_zone_len: int | None = None, + fecf_len: int | None = None, ): """Contains properties required when unpacking fixed USLP frames. These properties can not be determined by parsing the frame. The standard refers to these properties @@ -87,8 +87,8 @@ def __init__( has_insert_zone: bool, has_fecf: bool, truncated_frame_len: int, - insert_zone_len: Optional[int] = None, - fecf_len: Optional[int] = None, + insert_zone_len: int | None = None, + fecf_len: int | None = None, ): """Contains properties required when unpacking variable USLP frames. These properties can not be determined by parsing the frame. The standard refers to these properties @@ -165,7 +165,7 @@ def __init__( tfdz_cnstr_rules: TfdzConstructionRules, uslp_ident: UslpProtocolIdentifier, tfdz: bytes, - fhp_or_lvop: Optional[int] = None, + fhp_or_lvop: int | None = None, ): """ Notes on the FHP or LVOP field. For more details, refer to CCSDS 732.1-B-2. p.92: @@ -205,21 +205,21 @@ def __init__( raise ValueError @property - def tfdz(self): + def tfdz(self) -> bytes: return self._tfdz @tfdz.setter - def tfdz(self, tfdz: bytes): + def tfdz(self, tfdz: bytes) -> None: self._tfdz = tfdz self._size = self.header_len() + len(tfdz) def header_len(self) -> int: return 1 if self.fhp_or_lvop is None else 3 - def len(self): + def len(self) -> int: return self._size - def pack(self, truncated: bool = False, frame_type: Optional[FrameType] = None) -> bytearray: + def pack(self, truncated: bool = False, frame_type: FrameType | None = None) -> bytearray: packet = bytearray() packet.append(self.tfdz_contr_rules << 5 | self.uslp_ident) if frame_type is None: @@ -235,53 +235,49 @@ def pack(self, truncated: bool = False, frame_type: Optional[FrameType] = None) packet.extend(self.tfdz) return packet - def should_have_fhp_or_lvp_field( - self, truncated: bool, frame_type: Optional[FrameType] - ) -> bool: + def should_have_fhp_or_lvp_field(self, truncated: bool, frame_type: FrameType | None) -> bool: if frame_type is not None and frame_type == FrameType.VARIABLE: return False - if not truncated and self.tfdz_contr_rules in [ - TfdzConstructionRules.FpPacketSpanningMultipleFrames, - TfdzConstructionRules.FpContinuingPortionOfMapaSDU, - TfdzConstructionRules.FpFixedStartOfMapaSDU, - ]: - return True - return False + return bool( + not truncated + and self.tfdz_contr_rules + in [ + TfdzConstructionRules.FpPacketSpanningMultipleFrames, + TfdzConstructionRules.FpContinuingPortionOfMapaSDU, + TfdzConstructionRules.FpFixedStartOfMapaSDU, + ] + ) def verify_frame_type(self, frame_type: FrameType) -> bool: - if (frame_type == FrameType.FIXED and self.__cnstr_rules_for_fp()) or (frame_type == FrameType.VARIABLE and self.__cnstr_rules_for_vp()): - return True - return False + return bool( + (frame_type == FrameType.FIXED and self.__cnstr_rules_for_fp()) + or (frame_type == FrameType.VARIABLE and self.__cnstr_rules_for_vp()) + ) def __cnstr_rules_for_fp(self) -> bool: - if self.tfdz_contr_rules in [ + return self.tfdz_contr_rules in [ TfdzConstructionRules.FpPacketSpanningMultipleFrames, TfdzConstructionRules.FpContinuingPortionOfMapaSDU, TfdzConstructionRules.FpFixedStartOfMapaSDU, - ]: - return True - return False + ] def __cnstr_rules_for_vp(self) -> bool: - if self.tfdz_contr_rules in [ + return self.tfdz_contr_rules in [ TfdzConstructionRules.VpContinuingSegment, TfdzConstructionRules.VpLastSegment, TfdzConstructionRules.VpOctetStream, TfdzConstructionRules.VpNoSegmentation, TfdzConstructionRules.VpStartingSegment, - ]: - return True - return False + ] @classmethod def __empty(cls) -> TransferFrameDataField: - empty = TransferFrameDataField( + return TransferFrameDataField( tfdz_cnstr_rules=TfdzConstructionRules.FpPacketSpanningMultipleFrames, uslp_ident=UslpProtocolIdentifier.SPACE_PACKETS_ENCAPSULATION_PACKETS, fhp_or_lvop=None, tfdz=bytearray(), ) - return empty @classmethod def unpack( @@ -289,7 +285,7 @@ def unpack( raw_tfdf: bytes, truncated: bool, exact_len: int, - frame_type: Optional[FrameType], + frame_type: FrameType | None, ) -> TransferFrameDataField: """Unpack a TFDF, given a raw bytearray. @@ -307,9 +303,8 @@ def unpack( raise UslpInvalidRawPacketOrFrameLen tfdf.tfdz_contr_rules = (raw_tfdf[0] >> 5) & 0b111 tfdf.uslp_ident = raw_tfdf[0] & 0b11111 - if frame_type is not None: - if not tfdf.verify_frame_type(frame_type=frame_type): - raise UslpInvalidConstructionRules + if frame_type is not None and not tfdf.verify_frame_type(frame_type=frame_type): + raise UslpInvalidConstructionRules if tfdf.should_have_fhp_or_lvp_field(truncated=truncated, frame_type=frame_type): tfdf.fhp_or_lvop = (raw_tfdf[1] << 8) | raw_tfdf[2] tfdz_start = 3 @@ -327,9 +322,9 @@ def __init__( self, header: FrameHeaderT, tfdf: TransferFrameDataField, - insert_zone: Optional[bytes] = None, - op_ctrl_field: Optional[bytes] = None, - fecf: Optional[bytes] = None, + insert_zone: bytes | None = None, + op_ctrl_field: bytes | None = None, + fecf: bytes | None = None, ): self.header = header self.tfdf = tfdf @@ -337,7 +332,7 @@ def __init__( self.op_ctrl_field = op_ctrl_field self.fecf = fecf - def pack(self, truncated: bool = False, frame_type: Optional[FrameType] = None) -> bytearray: + def pack(self, truncated: bool = False, frame_type: FrameType | None = None) -> bytearray: frame = bytearray() frame.extend(self.header.pack()) if self.insert_zone is not None: @@ -355,14 +350,14 @@ def pack(self, truncated: bool = False, frame_type: Optional[FrameType] = None) frame.extend(self.fecf) return frame - def set_frame_len_in_header(self): + def set_frame_len_in_header(self) -> None: # According to the standard, the frame length field will contain the length of of the # frame minus 1. Also check whether this is a regular header and not a truncated one, # as the truncated one does not have the frame length field if isinstance(self.header, PrimaryHeader): self.header.frame_len = self.len() - 1 - def len(self): + def len(self) -> int: size = self.header.len() + self.tfdf.len() if self.insert_zone is not None: size += len(self.insert_zone) @@ -381,19 +376,19 @@ def __empty(cls) -> TransferFrame: tfdz_cnstr_rules=TfdzConstructionRules.FpPacketSpanningMultipleFrames, uslp_ident=UslpProtocolIdentifier.SPACE_PACKETS_ENCAPSULATION_PACKETS, fhp_or_lvop=None, - tfdz=bytearray(), + tfdz=b"", ) - empty = TransferFrame( + return TransferFrame( header=empty_header, tfdf=empty_data_field, insert_zone=None, op_ctrl_field=None, fecf=None, ) - return empty + # TODO: Fix lint by creating helper methods. @classmethod - def unpack( + def unpack( # noqa: PLR0912 too many branches cls, raw_frame: bytes, frame_type: FrameType, frame_properties: FramePropertiesT ) -> TransferFrame: """Unpack a USLP transfer frame from a raw bytearray. All managed parameters have @@ -476,7 +471,7 @@ def __get_tfdf_len( raw_frame_len: int, header: UslpHeaderT, properties: FramePropertiesT, - ): + ) -> int: """This helper function calculates the initial value for expected TFDF length and subtracts all (optional) fields lengths if they are present. diff --git a/spacepackets/uslp/header.py b/spacepackets/uslp/header.py index 8ba9ec6..17835f3 100644 --- a/spacepackets/uslp/header.py +++ b/spacepackets/uslp/header.py @@ -3,7 +3,6 @@ import enum import struct from abc import abstractmethod -from typing import Optional, Tuple from .defs import ( UslpInvalidRawPacketOrFrameLen, @@ -74,7 +73,7 @@ def _unpack_raw_header_base_fields( raw_packet: bytes, truncated: bool = False, uslp_version: int = USLP_VERSION_NUMBER, - ) -> Tuple[int, SourceOrDestField, int, int]: + ) -> tuple[int, SourceOrDestField, int, int]: if len(raw_packet) < 4: raise UslpInvalidRawPacketOrFrameLen version_number = (raw_packet[0] & 0xF0) >> 4 @@ -122,7 +121,7 @@ def __empty(cls) -> TruncatedPrimaryHeader: scid=0x00, src_dest=SourceOrDestField.DEST, vcid=0x00, map_id=0x00 ) - def len(self): + def len(self) -> int: return 4 def truncated(self) -> bool: @@ -188,7 +187,7 @@ def __init__( prot_ctrl_cmd_flag: ProtocolCommandFlag, op_ctrl_flag: bool, vcf_count_len: int = 0, - vcf_count: Optional[int] = None, + vcf_count: int | None = None, ): super().__init__(scid, src_dest, vcid, map_id) self.frame_len = frame_len @@ -281,7 +280,7 @@ def unpack(cls, raw_packet: bytes, uslp_version: int = USLP_VERSION_NUMBER) -> P end -= 1 return packet - def len(self): + def len(self) -> int: return 7 + self.vcf_count_len diff --git a/spacepackets/util.py b/spacepackets/util.py index badf735..d04ad1a 100644 --- a/spacepackets/util.py +++ b/spacepackets/util.py @@ -46,6 +46,7 @@ def get_printable_data_string(print_format: PrintFormats, data: bytes) -> str: return get_dec_data_string(data) if print_format == PrintFormats.BIN: return get_bin_data_string(data) + return None class IntByteConversion: @@ -53,11 +54,11 @@ class IntByteConversion: def signed_struct_specifier(byte_num: int) -> str: if byte_num == 1: return "!b" - if byte_num == 2: # noqa: PLR2004 + if byte_num == 2: return "!h" - if byte_num == 4: # noqa: PLR2004 + if byte_num == 4: return "!i" - if byte_num == 8: # noqa: PLR2004 + if byte_num == 8: return "!q" raise ValueError("Invalid byte number, must be one of [1, 2, 4, 8]") @@ -65,11 +66,11 @@ def signed_struct_specifier(byte_num: int) -> str: def unsigned_struct_specifier(byte_num: int) -> str: if byte_num == 1: return "!B" - if byte_num == 2: # noqa: PLR2004 + if byte_num == 2: return "!H" - if byte_num == 4: # noqa: PLR2004 + if byte_num == 4: return "!I" - if byte_num == 8: # noqa: PLR2004 + if byte_num == 8: return "!Q" raise ValueError(f"invalid byte number {byte_num}, must be one of [1, 2, 4, 8]") @@ -125,32 +126,32 @@ def __init__(self, val: int, byte_len: int): self._val_as_bytes = IntByteConversion.to_unsigned(self.byte_len, self.value) @classmethod - def from_bytes(cls, raw: bytes): + def from_bytes(cls, raw: bytes) -> UnsignedByteField: return cls( struct.unpack(IntByteConversion.unsigned_struct_specifier(len(raw)), raw)[0], len(raw), ) @property - def byte_len(self): + def byte_len(self) -> int: return self._byte_len @byte_len.setter - def byte_len(self, byte_len: int): + def byte_len(self, byte_len: int) -> None: UnsignedByteField.verify_byte_len(byte_len) self._byte_len = byte_len @property - def value(self): + def value(self) -> int: return self._val @value.setter - def value(self, val: int | bytes | bytearray): + def value(self, val: int | bytes | bytearray) -> None: if isinstance(val, int): self._verify_int_value(val) self._val = val self._val_as_bytes = IntByteConversion.to_unsigned(self.byte_len, self.value) - elif isinstance(val, bytes) or isinstance(val, bytearray): + elif isinstance(val, (bytes, bytearray)): self._val, self._val_as_bytes = self._verify_bytes_value(bytes(val)) @property @@ -158,12 +159,12 @@ def as_bytes(self) -> bytes: return self._val_as_bytes @staticmethod - def verify_byte_len(byte_len: int): + def verify_byte_len(byte_len: int) -> None: if byte_len not in [0, 1, 2, 4, 8]: # I really have no idea why anyone would use other values than these raise ValueError("Only 0, 1, 2, 4 and 8 bytes are allowed as an entity ID length") - def _verify_int_value(self, val: int): + def _verify_int_value(self, val: int) -> None: if val > pow(2, self.byte_len * 8) - 1 or val < 0: raise ValueError( f"Passed value {val} larger than allowed" @@ -181,7 +182,7 @@ def _verify_bytes_value(self, val: bytes) -> tuple[int, bytes]: return int_val, val[0 : self.byte_len] @property - def hex_str(self): + def hex_str(self) -> str | None: if self.byte_len == 1: return f"{self.value:#04x}" if self.byte_len == 2: @@ -190,6 +191,7 @@ def hex_str(self): return f"{self.value:#010x}" if self.byte_len == 8: return f"{self.value:#018x}" + return None def __repr__(self): return f"{self.__class__.__name__}(val={self.value!r}, byte_len={self.byte_len!r})" @@ -214,7 +216,7 @@ def __hash__(self): """Makes all unsigned byte fields usable as dictionary keys""" return hash((self.value, self.byte_len)) - def default_string(self, prefix): + def default_string(self, prefix: str) -> str: return f"{prefix}({self.value}, 0x[{self.as_bytes.hex(sep=',')}])" diff --git a/tests/cfdp/test_transaction_id.py b/tests/cfdp/test_transaction_id.py index 662f794..971bccd 100644 --- a/tests/cfdp/test_transaction_id.py +++ b/tests/cfdp/test_transaction_id.py @@ -19,7 +19,7 @@ def test_eq(self): self.assertNotEqual(self.transaction_id_0, self.transaction_id_1) def test_repr(self): - repr = self.transaction_id_0.__repr__() - self.assertTrue("TransactionId" in repr) - self.assertTrue("source_entity_id=" in repr) - self.assertTrue("transaction_seq_num=" in repr) + repr_str = self.transaction_id_0.__repr__() + self.assertTrue("TransactionId" in repr_str) + self.assertTrue("source_entity_id=" in repr_str) + self.assertTrue("transaction_seq_num=" in repr_str)