Skip to content

Commit

Permalink
added TM frame module
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed May 26, 2024
1 parent a514263 commit 0bef46c
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 3 deletions.
141 changes: 141 additions & 0 deletions spacepackets/ccsds/tm_frame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional

from spacepackets.crc import CRC16_CCITT_FUNC
from spacepackets.exceptions import BytesTooShortError, InvalidCrcCcitt16


@dataclass
class MasterChannelId:
transfer_frame_version: int
spacecraft_id: int


@dataclass
class TransferFrameDataFieldStatus:
secondary_header_flag: bool
sync_flag: bool
packet_order_flag: bool
segment_len_id: int
first_header_pointer: int

@classmethod
def unpack(cls, data: bytes) -> TransferFrameDataFieldStatus:
if len(data) < 2:
raise BytesTooShortError(2, len(data))
return cls(
bool((data[0] >> 7) & 0b1),
bool((data[0] >> 6) & 0b1),
bool((data[0] >> 5) & 0b1),
(data[0] >> 4) & 0b11,
((data[0] >> 3 & 0b111) << 8) | data[1],
)


class TmFramePrimaryHeader:
def __init__(
self,
master_channel_id: MasterChannelId,
vc_id: int,
ocf_flag: bool,
master_ch_frame_count: int,
vc_frame_count: int,
frame_datafield_status: TransferFrameDataFieldStatus,
):
self.master_channel_id = master_channel_id
self.vc_id = vc_id
self.ocf_flag = ocf_flag
self.master_ch_frame_count = master_ch_frame_count
self.vc_frame_count = vc_frame_count
self.frame_datafield_status = frame_datafield_status

@classmethod
def unpack(cls, data: bytes) -> TmFramePrimaryHeader:
tf_version = (data[0] >> 6) & 0b11
spacecraft_id = (data[0] & 0b111111 << 8) | ((data[1] >> 4) & 0b1111)
master_channel_id = MasterChannelId(tf_version, spacecraft_id)
vc_id = (data[1] >> 1) & 0b111
ocf_flag = bool(data[1] & 0b1)
master_ch_frame_count = data[2]
vc_frame_count = data[3]
frame_datafield_status = TransferFrameDataFieldStatus.unpack(data[4:])
return cls(
master_channel_id,
vc_id,
ocf_flag,
master_ch_frame_count,
vc_frame_count,
frame_datafield_status,
)


@dataclass
class TransferFrameSecondaryHeader:
version_number: int
secondary_header_len: int
data_field: bytes

@classmethod
def unpack(cls, data: bytes) -> TransferFrameSecondaryHeader:
if len(data) < 1:
raise BytesTooShortError(1, len(data))
secondary_header_len = data[0] & 0b111111
return cls(
version_number=(data[0] >> 6) & 0b11,
secondary_header_len=secondary_header_len,
data_field=data[1 : 1 + secondary_header_len],
)


class TmTransferFrame:
def __init__(
self,
primary_header: TmFramePrimaryHeader,
secondary_header: Optional[TransferFrameSecondaryHeader],
data_field: bytes,
op_ctrl_field: Optional[bytes],
frame_error_control: Optional[bytes],
) -> None:
self.primary_header = primary_header
self.secondary_header = secondary_header
self.data_field = data_field
self.op_ctrl_field = op_ctrl_field
self.frame_error_control = frame_error_control

@classmethod
def unpack(cls, raw_frame: bytes, has_error_control_field: bool) -> TmTransferFrame:
primary_header = TmFramePrimaryHeader.unpack(raw_frame)
secondary_header = None
current_idx = 6
if primary_header.frame_datafield_status.secondary_header_flag:
secondary_header = TransferFrameSecondaryHeader.unpack(
raw_frame[current_idx:]
)
current_idx += secondary_header.secondary_header_len
data_end = len(raw_frame)
op_ctrl_field = None
frame_error_control = None
if has_error_control_field:
if current_idx + 2 > len(raw_frame):
raise BytesTooShortError(current_idx + 2, len(raw_frame))
frame_error_control = raw_frame[-2:]
# CRC16-CCITT checksum
if CRC16_CCITT_FUNC(raw_frame) != 0:
raise InvalidCrcCcitt16(raw_frame)
# Used for length checks.
current_idx += 2
data_end -= 2
if primary_header.ocf_flag:
if current_idx + 4 > len(raw_frame):
raise BytesTooShortError(current_idx + 4, len(raw_frame))
op_ctrl_field = raw_frame[-6:-2]
data_end -= 4
frame_data_field = raw_frame[current_idx:data_end]
return cls(
primary_header,
secondary_header,
frame_data_field,
op_ctrl_field,
frame_error_control,
)
5 changes: 5 additions & 0 deletions spacepackets/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ def __init__(self, expected_len: int, bytes_len: int):
)
self.expected_len = expected_len
self.bytes_len = bytes_len


class InvalidCrcCcitt16(Exception):
def __init__(self, data: bytes):
self.data = data
6 changes: 3 additions & 3 deletions spacepackets/uslp/header.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations
from abc import abstractmethod
from typing import Optional
from typing import Optional, Tuple
import enum
import struct

Expand Down Expand Up @@ -77,7 +77,7 @@ def _unpack_raw_header_base_fields(
raw_packet: bytes,
truncated: bool = False,
uslp_version: int = USLP_VERSION_NUMBER,
) -> (int, SourceOrDestField, int, int):
) -> Tuple[int, SourceOrDestField, int, int]:
if len(raw_packet) < 4:
raise UslpInvalidRawPacketOrFrameLen
version_number = (raw_packet[0] & 0xF0) >> 4
Expand All @@ -88,7 +88,7 @@ def _unpack_raw_header_base_fields(
| (raw_packet[1] << 4)
| ((raw_packet[2] & 0xF0) >> 4)
)
src_dest = (raw_packet[2] & 0x08) >> 3
src_dest = SourceOrDestField((raw_packet[2] & 0x08) >> 3)
vcid = ((raw_packet[2] & 0b111) << 3) | ((raw_packet[3] >> 5) & 0b111)
map_id = (raw_packet[3] >> 1) & 0b1111
end_of_frame_primary_header = raw_packet[3] & 0b1
Expand Down

0 comments on commit 0bef46c

Please sign in to comment.