Skip to content

Commit

Permalink
Merge pull request #38 from us-irs/rename-pdu-holder-member
Browse files Browse the repository at this point in the history
Rename packet holder member names
  • Loading branch information
robamu authored Oct 4, 2023
2 parents 1d09476 + 9a8103e commit 9965c3a
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 68 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

# [unreleased]

## Changed

- Renamed `TlvHolder` field `base` to `tlv` and `PduHolder` field `base` to `pdu`.

# [v0.18.0] 2023-09-08

## Changed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Currently, this includes the following components:
It also contains various helper modules

- `PusVerificator` module to track the verification of sent telecommands
- PTC and PFC definitions for ECSS packets
- [PTC and PFC definitions](https://spacepackets.readthedocs.io/en/latest/api/ecss.html#module-spacepackets.ecss.fields) for ECSS packets

# Install

Expand Down
72 changes: 47 additions & 25 deletions spacepackets/cfdp/pdu/helper.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,67 @@
from __future__ import annotations
from typing import cast, Union, Type, Optional

from typing import Optional, Type, Union, cast, Any

import deprecation

from spacepackets.cfdp import PduType
from spacepackets.cfdp.pdu import (
MetadataPdu,
AbstractFileDirectiveBase,
DirectiveType,
AckPdu,
NakPdu,
FinishedPdu,
DirectiveType,
EofPdu,
FinishedPdu,
KeepAlivePdu,
MetadataPdu,
NakPdu,
PromptPdu,
)
from spacepackets.cfdp.pdu.file_data import FileDataPdu
from spacepackets.cfdp.pdu.header import AbstractPduBase
from spacepackets.version import get_version

GenericPduPacket = Union[AbstractFileDirectiveBase, AbstractPduBase]


class PduHolder:
"""Helper type to store arbitrary PDU types and cast them to a concrete PDU type conveniently"""

def __init__(self, base: Optional[GenericPduPacket]):
self.base = base
def __init__(self, pdu: Optional[GenericPduPacket]):
self.pdu = pdu

def pack(self) -> bytearray:
if self.base is None:
return bytearray()
return self.base.pack()

@property
@deprecation.deprecated(
deprecated_in="0.19.0",
current_version=get_version(),
details="use packet member instead",
)
def base(self):
return self.pdu

@base.setter
@deprecation.deprecated(
deprecated_in="0.19.0",
current_version=get_version(),
details="use packet member instead",
)
def base(self, base: GenericPduPacket):
self.pdu = base

@property
def packet_len(self) -> int:
if self.base is None:
if self.pdu is None:
return 0
return self.base.packet_len
return self.pdu.packet_len

@property
def pdu_type(self) -> PduType:
return self.base.pdu_header.pdu_type
assert self.pdu is not None
return self.pdu.pdu_type

@property
def is_file_directive(self):
Expand All @@ -51,35 +74,34 @@ def pdu_directive_type(self) -> Optional[DirectiveType]:
"""
if not self.is_file_directive:
return None
directive_base = cast(AbstractFileDirectiveBase, self.base)
directive_base = cast(AbstractFileDirectiveBase, self.pdu)
return directive_base.directive_type

def __repr__(self):
return f"{self.__class__.__name__}(base={self.base!r}"
return f"{self.__class__.__name__}(base={self.pdu!r}"

def _raise_not_target_exception(self, pdu_type: Type[any]):
raise TypeError(f"Stored PDU is not {pdu_type.__name__!r}: {self.base!r}")
def _raise_not_target_exception(self, pdu_type: Type[Any]):
raise TypeError(f"Stored PDU is not {pdu_type.__name__!r}: {self.pdu!r}")

def _cast_to_concrete_file_directive(
self, pdu_type: Type[any], dir_type: DirectiveType
):
self, pdu_type: Type[Any], dir_type: DirectiveType
) -> Any:
if (
isinstance(self.base, AbstractFileDirectiveBase)
and self.base.pdu_type == PduType.FILE_DIRECTIVE
and self.pdu.pdu_type == PduType.FILE_DIRECTIVE # type: ignore
):
pdu_base = cast(AbstractFileDirectiveBase, self.base)
pdu_base = cast(AbstractFileDirectiveBase, self.pdu)
if pdu_base.directive_type == dir_type:
return cast(pdu_type, self.base)
return cast(pdu_type, self.pdu)
self._raise_not_target_exception(pdu_type)

def to_file_data_pdu(self) -> FileDataPdu:
def to_file_data_pdu(self) -> FileDataPdu: # type: ignore
if (
isinstance(self.base, AbstractPduBase)
and self.base.pdu_type == PduType.FILE_DATA
isinstance(self.pdu, AbstractPduBase)
and self.pdu.pdu_type == PduType.FILE_DATA
):
return cast(FileDataPdu, self.base)
else:
self._raise_not_target_exception(FileDataPdu)
return cast(FileDataPdu, self.pdu)
self._raise_not_target_exception(FileDataPdu)

def to_metadata_pdu(self) -> MetadataPdu:
return self._cast_to_concrete_file_directive(
Expand Down
71 changes: 34 additions & 37 deletions spacepackets/cfdp/tlv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dataclasses
import struct
from abc import ABC, abstractmethod
from typing import Tuple, Optional, Type, Union, List, Any, cast
from typing import Tuple, Optional, Type, List, Any, cast
import enum
from spacepackets.cfdp.lv import CfdpLv
from spacepackets.cfdp.defs import ConditionCode, FaultHandlerCode
Expand Down Expand Up @@ -106,13 +106,11 @@ def map_enum_status_code_to_action_status_code(
) -> Tuple[FilestoreActionCode, int]:
"""Map a given file store response status code to the action code and the corresponding
4 bit status code. the status code will be 0x00 for a SUCCESS operation and 0b1111 if the
operation was not performed"""
try:
status_code = FilestoreActionCode((status_code_enum & 0xF0) >> 4)
except ValueError:
# Invalid status code
status_code = -1
return status_code, status_code_enum & 0x0F
operation was not performed.
:raise ValueError: Invalid filestore action code detected.
"""
return FilestoreActionCode((status_code_enum & 0xF0) >> 4), status_code_enum & 0x0F


def map_int_status_code_to_enum(
Expand All @@ -133,12 +131,10 @@ def map_int_status_code_to_enum(


class TlvTypeMissmatch(Exception):
def __init__(self, found: TlvType, expected: TlvType, *args, **kwards):
def __init__(self, found: TlvType, expected: TlvType):
self.found = found
self.expected = expected

def __str__(self):
return f"Expected TLV {self.expected}, found {self.found}"
super().__init__(f"Expected TLV {self.expected}, found {self.found}")


class AbstractTlvBase(ABC):
Expand Down Expand Up @@ -569,7 +565,7 @@ def generate_tlv(self):

def pack(self) -> bytearray:
self.generate_tlv()
return self.tlv.pack()
return self.tlv.pack() # type: ignore

@property
def packet_len(self):
Expand All @@ -578,7 +574,7 @@ def packet_len(self):
@property
def value(self) -> bytes:
self.generate_tlv()
return self.tlv.value
return self.tlv.value # type: ignore

@property
def tlv_type(self) -> TlvType:
Expand Down Expand Up @@ -613,7 +609,7 @@ def from_tlv(cls, cfdp_tlv: CfdpTlv) -> FileStoreRequestTlv:

@classmethod
def _set_fields(cls, instance: FileStoreRequestTlv, raw_data: bytes):
action_code, first_name, status_code, _, second_name = cls._common_unpacker(
action_code, first_name, _, _, second_name = cls._common_unpacker(
raw_bytes=raw_data
)
instance.action_code = action_code
Expand Down Expand Up @@ -648,12 +644,12 @@ def generate_tlv(self):

def pack(self) -> bytearray:
self.generate_tlv()
return self.tlv.pack()
return self.tlv.pack() # type: ignore

@property
def value(self) -> bytes:
self.generate_tlv()
return self.tlv.value
return self.tlv.value # type: ignore

@property
def packet_len(self):
Expand Down Expand Up @@ -715,7 +711,7 @@ def _set_fields(cls, instance: FileStoreResponseTlv, data: bytes):
instance.filestore_msg = CfdpLv.unpack(data[idx:])


TlvList = List[Union[AbstractTlvBase]]
TlvList = List[AbstractTlvBase]


class ProxyMessageType(enum.IntEnum):
Expand Down Expand Up @@ -826,50 +822,51 @@ def __init__(self, params: ProxyPutRequestParams):


class TlvHolder:
def __init__(self, tlv_base: Optional[AbstractTlvBase]):
self.base = tlv_base
def __init__(self, tlv: Optional[AbstractTlvBase]):
self.tlv = tlv

@property
def tlv_type(self):
if self.base is not None:
return self.base.tlv_type
if self.tlv is not None:
return self.tlv.tlv_type

def __cast_internally(
self,
obj_type: Type[AbstractTlvBase],
expected_type: TlvType,
) -> Any:
if self.base.tlv_type != expected_type:
raise TypeError(f"Invalid object {self.base} for type {self.base.tlv_type}")
return cast(obj_type, self.base)
assert self.tlv is not None
if self.tlv.tlv_type != expected_type:
raise TypeError(f"Invalid object {self.tlv} for type {self.tlv.tlv_type}")
return cast(obj_type, self.tlv)

def to_fs_request(self) -> FileStoreRequestTlv:
# Check this type first. It's a concrete type where we can not just use a simple cast
if isinstance(self.base, CfdpTlv):
return FileStoreRequestTlv.from_tlv(self.base)
if isinstance(self.tlv, CfdpTlv):
return FileStoreRequestTlv.from_tlv(self.tlv)
return self.__cast_internally(FileStoreRequestTlv, TlvType.FILESTORE_REQUEST)

def to_fs_response(self) -> FileStoreResponseTlv:
if isinstance(self.base, CfdpTlv):
return FileStoreResponseTlv.from_tlv(self.base)
if isinstance(self.tlv, CfdpTlv):
return FileStoreResponseTlv.from_tlv(self.tlv)
return self.__cast_internally(FileStoreResponseTlv, TlvType.FILESTORE_RESPONSE)

def to_msg_to_user(self) -> MessageToUserTlv:
if isinstance(self.base, CfdpTlv):
return MessageToUserTlv.from_tlv(self.base)
if isinstance(self.tlv, CfdpTlv):
return MessageToUserTlv.from_tlv(self.tlv)
return self.__cast_internally(MessageToUserTlv, TlvType.MESSAGE_TO_USER)

def to_fault_handler_override(self) -> FaultHandlerOverrideTlv:
if isinstance(self.base, CfdpTlv):
return FaultHandlerOverrideTlv.from_tlv(self.base)
if isinstance(self.tlv, CfdpTlv):
return FaultHandlerOverrideTlv.from_tlv(self.tlv)
return self.__cast_internally(FaultHandlerOverrideTlv, TlvType.FAULT_HANDLER)

def to_flow_label(self) -> FlowLabelTlv:
if isinstance(self.base, CfdpTlv):
return FlowLabelTlv.from_tlv(self.base)
if isinstance(self.tlv, CfdpTlv):
return FlowLabelTlv.from_tlv(self.tlv)
return self.__cast_internally(FlowLabelTlv, TlvType.FLOW_LABEL)

def to_entity_id(self) -> EntityIdTlv:
if isinstance(self.base, CfdpTlv):
return EntityIdTlv.from_tlv(self.base)
if isinstance(self.tlv, CfdpTlv):
return EntityIdTlv.from_tlv(self.tlv)
return self.__cast_internally(EntityIdTlv, TlvType.ENTITY_ID)
2 changes: 1 addition & 1 deletion tests/cfdp/pdus/test_pdu_holder.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_file_data(self):
segment_metadata_flag=False,
)
file_data_pdu = FileDataPdu(pdu_conf=self.pdu_conf, params=fd_params)
self.pdu_wrapper.base = file_data_pdu
self.pdu_wrapper.pdu = file_data_pdu
pdu_casted_back = self.pdu_wrapper.to_file_data_pdu()
self.assertEqual(pdu_casted_back, file_data_pdu)
self.assertEqual(self.pdu_wrapper.pdu_directive_type, None)
Expand Down
8 changes: 4 additions & 4 deletions tests/cfdp/tlvslvs/test_tlvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ def test_status_code_converters(self):
)
self.assertEqual(action_code, FilestoreActionCode.APPEND_FILE_SNP)
self.assertEqual(status_code, 0b1111)
action_code, status_code = map_enum_status_code_to_action_status_code(
FilestoreResponseStatusCode.INVALID
)
self.assertEqual(action_code, -1)
with self.assertRaises(ValueError):
map_enum_status_code_to_action_status_code(
FilestoreResponseStatusCode.INVALID
)
status_code = map_int_status_code_to_enum(
action_code=FilestoreActionCode.APPEND_FILE_SNP, status_code=0b1111
)
Expand Down

0 comments on commit 9965c3a

Please sign in to comment.