Skip to content

Commit

Permalink
raw byte APIs accept both bytes and bytearray
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Nov 30, 2024
1 parent e70fb33 commit 1eea73e
Show file tree
Hide file tree
Showing 27 changed files with 99 additions and 88 deletions.
6 changes: 3 additions & 3 deletions spacepackets/ccsds/spacepacket.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def packet_len(self) -> int:
return CCSDS_HEADER_LEN + self.data_len + 1

@classmethod
def unpack(cls, data: bytes) -> SpacePacketHeader:
def unpack(cls, data: bytes | bytearray) -> SpacePacketHeader:
"""Unpack a raw space packet into the space packet header instance.
:raise ValueError: Raw packet length invalid
Expand Down Expand Up @@ -445,8 +445,8 @@ class SpacePacket:
def __init__(
self,
sp_header: SpacePacketHeader,
sec_header: bytes | None,
user_data: bytes | None,
sec_header: bytes | bytearray | None,
user_data: bytes | bytearray | None,
):
self.sp_header = sp_header
self.sec_header = sec_header
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/cfdp/lv.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class CfdpLv:
def __init__(self, value: bytes):
def __init__(self, value: bytes | bytearray):
"""This class encapsulates CFDP Length-Value (LV) fields.
Raises
Expand Down Expand Up @@ -42,7 +42,7 @@ def pack(self) -> bytearray:
return packet

@classmethod
def unpack(cls, raw_bytes: bytes) -> CfdpLv:
def unpack(cls, raw_bytes: bytes | bytearray) -> CfdpLv:
"""Parses LV field at the start of the given bytearray
:raise ValueError: Invalid length found
Expand Down
2 changes: 1 addition & 1 deletion spacepackets/cfdp/pdu/ack.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __repr__(self):
)

@classmethod
def unpack(cls, data: bytes) -> AckPdu:
def unpack(cls, data: bytes | bytearray) -> AckPdu:
"""Generate an object instance from raw data. Care should be taken to check whether
the raw bytestream really contains an ACK PDU.
Expand Down
6 changes: 4 additions & 2 deletions spacepackets/cfdp/pdu/eof.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def pack(self) -> bytearray:
return eof_pdu

@classmethod
def unpack(cls, data: bytes) -> EofPdu:
def unpack(cls, data: bytes | bytearray) -> EofPdu:
"""Generate an object instance from raw data. Care should be taken to check whether
the raw bytestream really contains an EOF PDU.
Expand Down Expand Up @@ -156,7 +156,9 @@ def unpack(cls, data: bytes) -> EofPdu:
eof_pdu.fault_location = EntityIdTlv.unpack(data=data[current_idx:])
return eof_pdu

def __eq__(self, other: EofPdu):
def __eq__(self, other: object) -> bool:
if not isinstance(other, EofPdu):
return False
return (
self.pdu_file_directive == other.pdu_file_directive
and self.condition_code == other.condition_code
Expand Down
6 changes: 3 additions & 3 deletions spacepackets/cfdp/pdu/file_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def pack(self) -> bytearray:
return file_data_pdu

@classmethod
def unpack(cls, data: bytes) -> FileDataPdu:
def unpack(cls, data: bytes | bytearray) -> FileDataPdu:
"""Generate an object instance from raw data. Care should be taken to check whether
the raw bytestream really contains a File Data PDU.
Expand All @@ -242,7 +242,7 @@ def unpack(cls, data: bytes) -> FileDataPdu:
metadata = data[current_idx : current_idx + segment_metadata_len]
current_idx += segment_metadata_len
file_data_packet.segment_metadata = SegmentMetadata(
record_cont_state=rec_cont_state, metadata=metadata
record_cont_state=rec_cont_state, metadata=bytes(metadata)
)
if not file_data_packet.pdu_header.large_file_flag_set:
struct_arg_tuple = ("!I", 4)
Expand All @@ -259,7 +259,7 @@ def unpack(cls, data: bytes) -> FileDataPdu:
if file_data_packet.pdu_header.crc_flag == CrcFlag.WITH_CRC:
data = data[:-2]
if current_idx < len(data):
file_data_packet._params.file_data = data[current_idx:]
file_data_packet._params.file_data = bytes(data[current_idx:])
return file_data_packet

@property
Expand Down
6 changes: 3 additions & 3 deletions spacepackets/cfdp/pdu/file_directive.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def __init__(
)
self._directive_type = directive_code

def verify_length_and_checksum(self, data: bytes) -> None:
def verify_length_and_checksum(self, data: bytes | bytearray) -> None:
self.pdu_header.verify_length_and_checksum(data)

@property
Expand Down Expand Up @@ -172,7 +172,7 @@ def pack(self) -> bytearray:
return data

@classmethod
def unpack(cls, raw_packet: bytes) -> FileDirectivePduBase:
def unpack(cls, raw_packet: bytes | bytearray) -> FileDirectivePduBase:
"""Unpack a raw bytearray into the File Directive PDU object representation.
:param raw_packet: Unpack PDU file directive base
Expand All @@ -195,7 +195,7 @@ def _verify_file_len(self, file_size: int) -> None:
if self.pdu_header.file_flag == LargeFileFlag.NORMAL and file_size > pow(2, 32):
raise ValueError(f"File size {file_size} larger than 32 bit field")

def parse_fss_field(self, raw_packet: bytes, current_idx: int) -> tuple[int, int]:
def parse_fss_field(self, raw_packet: bytes | bytearray, current_idx: int) -> tuple[int, int]:
"""Parse the FSS field, which has different size depending on the large file flag being
set or not. Returns the current index incremented and the parsed file size.
Expand Down
6 changes: 3 additions & 3 deletions spacepackets/cfdp/pdu/finished.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def pack(self) -> bytearray:
return packet

@classmethod
def unpack(cls, data: bytes) -> FinishedPdu:
def unpack(cls, data: bytes | bytearray) -> FinishedPdu:
"""Generate an object instance from raw data. Care should be taken to check whether
the raw bytestream really contains a Finished PDU.
Expand Down Expand Up @@ -241,7 +241,7 @@ def unpack(cls, data: bytes) -> FinishedPdu:
finished_pdu._unpack_tlvs(rest_of_packet=data[current_idx:end_of_optional_tlvs_idx])
return finished_pdu

def _unpack_tlvs(self, rest_of_packet: bytes) -> int:
def _unpack_tlvs(self, rest_of_packet: bytes | bytearray) -> int:
current_idx = 0
fs_responses_list = []
fault_loc = None
Expand All @@ -266,7 +266,7 @@ def _unpack_tlvs(self, rest_of_packet: bytes) -> int:
self.fault_location = fault_loc
return current_idx

def __eq__(self, other: object):
def __eq__(self, other: object) -> bool:
if not isinstance(other, FinishedPdu):
return False
return self._params == other._params and self.pdu_file_directive == other.pdu_file_directive
Expand Down
12 changes: 7 additions & 5 deletions spacepackets/cfdp/pdu/header.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def file_flag(self) -> LargeFileFlag:

@file_flag.setter
@abc.abstractmethod
def file_flag(self, file_flag: LargeFileFlag) -> None:
def file_flag(self, field_len: LargeFileFlag) -> None:
pass

@property
Expand Down Expand Up @@ -120,7 +120,9 @@ def packet_len(self) -> int:
def large_file_flag_set(self) -> bool:
return self.file_flag == LargeFileFlag.LARGE

def __eq__(self, other: AbstractPduBase):
def __eq__(self, other: object) -> bool:
if not isinstance(other, AbstractPduBase):
return False
return (
self.pdu_type == other.pdu_type
and self.file_flag == other.file_flag
Expand All @@ -131,7 +133,7 @@ def __eq__(self, other: AbstractPduBase):
)

@staticmethod
def header_len_from_raw(data: bytes) -> int:
def header_len_from_raw(data: bytes | bytearray) -> int:
entity_id_len = ((data[3] >> 4) & 0b111) + 1
seq_num_len = (data[3] & 0b111) + 1
return AbstractPduBase.FIXED_LENGTH + 2 * entity_id_len + seq_num_len
Expand Down Expand Up @@ -306,7 +308,7 @@ def __empty(cls) -> PduHeader:
)

@classmethod
def unpack(cls, data: bytes) -> PduHeader:
def unpack(cls, data: bytes | bytearray) -> PduHeader:
"""Unpack a raw bytearray into the PDU header object representation.
:param data:
Expand Down Expand Up @@ -352,7 +354,7 @@ def unpack(cls, data: bytes) -> PduHeader:
pdu_header.set_entity_ids(source_entity_id=source_entity_id, dest_entity_id=dest_entity_id)
return pdu_header

def verify_length_and_checksum(self, data: bytes) -> int:
def verify_length_and_checksum(self, data: bytes | bytearray) -> int:
if len(data) < self.packet_len:
raise BytesTooShortError(self.packet_len, len(data))
if (
Expand Down
12 changes: 7 additions & 5 deletions spacepackets/cfdp/pdu/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ class PduFactory:
"""Helper class to generate PDUs and retrieve PDU information from a raw bytestream"""

@staticmethod
def from_raw(data: bytes) -> GenericPduPacket | None: # noqa: PLR0911
def from_raw(data: bytes | bytearray) -> GenericPduPacket | None: # noqa: PLR0911
if len(data) == 0:
return None
if not PduFactory.is_file_directive(data):
return FileDataPdu.unpack(data)
directive = PduFactory.pdu_directive_type(data)
Expand All @@ -147,19 +149,19 @@ def from_raw(data: bytes) -> GenericPduPacket | None: # noqa: PLR0911
return None

@staticmethod
def from_raw_to_holder(data: bytes) -> PduHolder:
def from_raw_to_holder(data: bytes | bytearray) -> PduHolder:
return PduHolder(PduFactory.from_raw(data))

@staticmethod
def pdu_type(data: bytes) -> PduType:
def pdu_type(data: bytes | bytearray) -> PduType:
return PduType((data[0] >> 4) & 0x01)

@staticmethod
def is_file_directive(data: bytes) -> bool:
def is_file_directive(data: bytes | bytearray) -> bool:
return PduFactory.pdu_type(data) == PduType.FILE_DIRECTIVE

@staticmethod
def pdu_directive_type(data: bytes) -> DirectiveType | None:
def pdu_directive_type(data: bytes | bytearray) -> DirectiveType | None:
"""Retrieve the PDU directive type from a raw bytestream.
:raises ValueError: Invalid directive type.
Expand Down
2 changes: 1 addition & 1 deletion spacepackets/cfdp/pdu/keep_alive.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def pack(self) -> bytearray:
return keep_alive_packet

@classmethod
def unpack(cls, data: bytes) -> KeepAlivePdu:
def unpack(cls, data: bytes | bytearray) -> KeepAlivePdu:
"""Generate an object instance from raw data. Care should be taken to check whether
the raw bytestream really contains a Keep Alive PDU.
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/cfdp/pdu/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def pack(self) -> bytearray:
return packet

@classmethod
def unpack(cls, data: bytes) -> MetadataPdu:
def unpack(cls, data: bytes | bytearray) -> MetadataPdu:
"""Generate an object instance from raw data. Care should be taken to check whether
the raw bytestream really contains a Metadata PDU.
Expand Down Expand Up @@ -244,7 +244,7 @@ def unpack(cls, data: bytes) -> MetadataPdu:
metadata_pdu._parse_options(raw_packet=data, start_idx=current_idx)
return metadata_pdu

def _parse_options(self, raw_packet: bytes, start_idx: int) -> None:
def _parse_options(self, raw_packet: bytes | bytearray, start_idx: int) -> None:
self._options = []
current_idx = start_idx
while True:
Expand Down
2 changes: 1 addition & 1 deletion spacepackets/cfdp/pdu/nak.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def pack(self) -> bytearray:
return nak_pdu

@classmethod
def unpack(cls, data: bytes) -> NakPdu:
def unpack(cls, data: bytes | bytearray) -> NakPdu:
"""Generate an object instance from raw data. The user should take care to check whether
the raw bytestream really contains a NAK PDU.
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/cfdp/pdu/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __repr__(self):
)

@classmethod
def unpack(cls, data: bytes) -> PromptPdu:
def unpack(cls, data: bytes | bytearray) -> PromptPdu:
"""Generate an object instance from raw data. Care should be taken to check whether
the raw bytestream really contains a Prompt PDU.
Expand All @@ -83,7 +83,7 @@ def unpack(cls, data: bytes) -> PromptPdu:
prompt_pdu.response_required = ResponseRequired((data[current_idx] & 0x80) >> 7)
return prompt_pdu

def __eq__(self, other: object):
def __eq__(self, other: object) -> bool:
if not isinstance(other, PromptPdu):
return False
return (
Expand Down
12 changes: 6 additions & 6 deletions spacepackets/cfdp/tlv/msg_to_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __empty(cls) -> MessageToUserTlv:
return cls(b"")

@classmethod
def unpack(cls, data: bytes) -> MessageToUserTlv:
def unpack(cls, data: bytes | bytearray) -> MessageToUserTlv:
msg_to_user_tlv = cls.__empty()
msg_to_user_tlv.tlv = CfdpTlv.unpack(data)
msg_to_user_tlv.check_type(MessageToUserTlv.TLV_TYPE)
Expand All @@ -98,7 +98,7 @@ class ReservedCfdpMessage(AbstractTlvBase):
conversion.
"""

def __init__(self, msg_type: int, value: bytes):
def __init__(self, msg_type: int, value: bytes | bytearray):
assert msg_type < pow(2, 8) - 1
full_value = bytearray(b"cfdp")
full_value.append(msg_type)
Expand Down Expand Up @@ -217,7 +217,7 @@ def get_proxy_closure_requested(self) -> bool | None:
or self.get_cfdp_proxy_message_type() != ProxyMessageType.CLOSURE_REQUEST
):
return None
return self.value[5] & 0b1
return bool(self.value[5] & 0b1)

def get_proxy_transmission_mode(self) -> TransmissionMode | None:
if (
Expand Down Expand Up @@ -270,7 +270,7 @@ def get_dir_listing_options(self) -> DirListingOptions | None:
raise ValueError(
f"value with length {len(self.value)} too small for dir listing options."
)
return DirListingOptions((self.value[5] >> 1) & 0b1, self.value[5] & 0b1)
return DirListingOptions(bool((self.value[5] >> 1) & 0b1), bool(self.value[5] & 0b1))


@dataclasses.dataclass
Expand All @@ -284,15 +284,15 @@ def source_file_as_str(self) -> str:
return self.source_file_name.value.decode()

@property
def source_file_as_path(self) -> str:
def source_file_as_path(self) -> Path:
return Path(self.source_file_as_str)

@property
def dest_file_as_str(self) -> str:
return self.dest_file_name.value.decode()

@property
def dest_file_as_path(self) -> str:
def dest_file_as_path(self) -> Path:
return Path(self.dest_file_as_str)


Expand Down
Loading

0 comments on commit 1eea73e

Please sign in to comment.