Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start adding tests for orca fc #16

Merged
merged 18 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/daq2lh5/orca/orca_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ def hex_dump(
as_short: bool = False,
id_dict: dict = None,
use_logging: bool = True,
return_output=False,
) -> None:
dump_cmd = print # noqa: T202
if use_logging:
dump_cmd = log.debug

output = []
data_id = get_data_id(packet, shift=shift_data_id)
n_words = get_n_words(packet)
if id_dict is not None:
Expand All @@ -62,9 +60,9 @@ def hex_dump(
else:
heading = f"data ID = {data_id}"
if print_n_words:
dump_cmd(f"{heading}: {n_words} words")
output.append(f"{heading}: {n_words} words")
else:
dump_cmd(f"{heading}:")
output.append(f"{heading}:")
n_to_print = int(np.minimum(n_words, max_words))
pad = int(np.ceil(np.log10(n_to_print)))
for i in range(n_to_print):
Expand All @@ -76,4 +74,13 @@ def hex_dump(
line += f" {packet[i]}"
if as_short:
line += f" {np.frombuffer(packet[i:i+1].tobytes(), dtype='uint16')}"
dump_cmd(line)
output.append(line)

dump_cmd = print # noqa: T202
if use_logging:
dump_cmd = log.debug
for line in output:
dump_cmd(line)

if return_output:
return output
140 changes: 126 additions & 14 deletions src/daq2lh5/orca/orca_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,138 @@ class OrcaStreamer(DataStreamer):
def __init__(self) -> None:
super().__init__()
self.in_stream = None
self.packet_locs = []
self.buffer = np.empty(1024, dtype="uint32") # start with a 4 kB packet buffer
self.header = None
self.header_decoder = OrcaHeaderDecoder()
self.decoder_id_dict = {} # dict of data_id to decoder object
self.rbl_id_dict = {} # dict of RawBufferLists for each data_id
self.missing_decoders = []

def load_packet_header(self) -> np.uint32 | None:
"""Loads the packet header at the current read location into the buffer

and updates internal variables.
"""
pkt_hdr = self.buffer[:1]
n_bytes_read = self.in_stream.readinto(pkt_hdr) # buffer is at least 4 kB long
self.n_bytes_read += n_bytes_read
if n_bytes_read == 0: # EOF
return None
if n_bytes_read != 4:
raise RuntimeError(f"only got {n_bytes_read} bytes for packet header")

# packet is valid. Can set the packet_id and log its location
self.packet_id += 1
filepos = self.in_stream.tell() - n_bytes_read
if self.packet_id < len(self.packet_locs):
if self.packet_locs[self.packet_id] != filepos:
raise RuntimeError(
f"filepos for packet {self.packet_id} was {filepos} but {self.packet_locs[self.packet_id]} was expected"
)
else:
if len(self.packet_locs) != self.packet_id:
raise RuntimeError(
f"loaded packet {self.packet_id} after packet {len(self.packet_locs)-1}"
)
self.packet_locs.append(filepos)

return pkt_hdr

def skip_packet(self, n: int = 1) -> bool:
"""Skip a packets without loading it into the internal buffer.

Requires loading the header. Optionally skips n packets.

Returns
----------
succeeded
returns False if reached EOF, otherwise returns true
"""
if self.in_stream is None:
raise RuntimeError("self.in_stream is None")
if not int(n) >= 0:
raise ValueError(f"n must be a non-negative int, can't be {n}")
n = int(n)
while n > 0:
pkt_hdr = self.load_packet_header()
if pkt_hdr is None:
return False
self.in_stream.seek((orca_packet.get_n_words(pkt_hdr) - 1) * 4, 1)
n -= 1
return True

def build_packet_locs(self, saveloc=True) -> None:
loc = self.in_stream.tell()
pid = self.packet_id
if len(self.packet_locs) > 0:
self.in_stream.seek(self.packet_locs[-1])
self.packet_id = len(self.packet_locs) - 2
while self.skip_packet():
pass # builds the rest of the packet_locs list
if saveloc:
self.in_stream.seek(loc)
self.packet_id = pid

def count_packets(self, saveloc=True) -> None:
self.build_packet_locs(saveloc=saveloc)
return len(self.packet_locs)

# TODO: need to correct for endianness?
def load_packet(self, skip_unknown_ids: bool = False) -> np.uint32 | None:
def load_packet(
self, index: int = None, whence: int = 0, skip_unknown_ids: bool = False
) -> np.uint32 | None:
"""Loads the next packet into the internal buffer.

Returns packet as a :class:`numpy.uint32` view of the buffer (a slice),
returns ``None`` at EOF.

Parameters
----------
index
Optionally give an index of packet to skip to, relative to the
"whence" location. Can be positive or negative. If out-of-range for
the file, None will be returned.
whence
used when an index is supplied. Follows the file.seek() convention:
whence = 0 (default) means index is relative to the beginning of the
file; whence = 1 means index is relative to the current position in
the file; whence = 2 means relative to the end of the file.

Returns
----------
packet
a view of the internal buffer spanning the packet data (uint32
ndarray). If you want to hold on to the packet data while you load
more packets, you can call copy() on the view to make a copy.
"""
if self.in_stream is None:
raise RuntimeError("self.in_stream is None")

# read packet header
pkt_hdr = self.buffer[:1]
n_bytes_read = self.in_stream.readinto(pkt_hdr) # buffer is at least 4 kB long
self.n_bytes_read += n_bytes_read
if n_bytes_read == 0:
if index is not None:
if whence not in [0, 1, 2]:
raise ValueError(f"whence can't be {whence}")
index = int(index)
# convert whence 1 or 2 to whence = 0
if whence == 1: # index is relative to current position
index += self.packet_id - 1
elif whence == 2: # index is relative to end of file
self.build_packet_locs(saveloc=False)
index += len(self.packet_locs) - 2
if index < 0:
self.in_stream.seek(0)
self.packet_id = -1
return None
while index >= len(self.packet_locs):
if not self.skip_packet():
return None
self.in_stream.seek(self.packet_locs[index])
self.packet_id = index - 1

# load packet header
pkt_hdr = self.load_packet_header()
if pkt_hdr is None:
return None
if n_bytes_read != 4:
raise RuntimeError(f"only got {n_bytes_read} bytes for packet header")

# if it's a short packet, we are done
if orca_packet.is_short(pkt_hdr):
Expand All @@ -69,7 +177,6 @@ def load_packet(self, skip_unknown_ids: bool = False) -> np.uint32 | None:
not in self.decoder_id_dict
):
self.in_stream.seek((n_words - 1) * 4, 1)
self.n_bytes_read += (n_words - 1) * 4 # well, we didn't really read it...
return pkt_hdr

# load into buffer, resizing as necessary
Expand Down Expand Up @@ -204,15 +311,17 @@ def open_stream(
"""

self.set_in_stream(stream_name)
self.packet_id = -1

# read in the header
packet = self.load_packet()
if packet is None:
raise RuntimeError(f"no orca data in file {stream_name}")
if orca_packet.get_data_id(packet) != 0:
raise RuntimeError(
f"got data id {orca_packet.get_data_id(packet)} for header"
)

self.packet_id = 0
self.any_full |= self.header_decoder.decode_packet(packet, self.packet_id)
self.header = self.header_decoder.header

Expand Down Expand Up @@ -240,9 +349,7 @@ def open_stream(
name = id_to_dec_name_dict[data_id]
if name not in instantiated_decoders:
if name not in globals():
log.warning(
f"no implementation of {name}, corresponding packets will be skipped"
)
self.missing_decoders.append(data_id)
continue
decoder = globals()[name]
instantiated_decoders[name] = decoder(header=self.header)
Expand Down Expand Up @@ -296,13 +403,18 @@ def read_packet(self) -> bool:
packet = self.load_packet(skip_unknown_ids=True)
if packet is None:
return False
self.packet_id += 1

# look up the data id, decoder, and rbl
data_id = orca_packet.get_data_id(packet, shift=False)
log.debug(
f"packet {self.packet_id}: data_id = {data_id}, decoder = {'None' if data_id not in self.decoder_id_dict else type(self.decoder_id_dict[data_id]).__name__}"
)
if data_id in self.missing_decoders:
name = self.header.get_id_to_decoder_name_dict(shift_data_id=False)[
data_id
]
log.warning(f"no implementation of {name}, packets were skipped")
continue
if data_id in self.rbl_id_dict:
break

Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def tmptestdir():


def pytest_sessionfinish(session, exitstatus):
if exitstatus == 0:
if exitstatus == 0 and os.path.exists(_tmptestdir):
shutil.rmtree(_tmptestdir)


Expand Down
2 changes: 1 addition & 1 deletion tests/orca/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
def orca_stream(lgnd_test_data):
orstr = OrcaStreamer()
orstr.open_stream(
lgnd_test_data.get_path("orca/fc/L200-comm-20220519-phy-geds.orca")
lgnd_test_data.get_path("orca/fc/l200-p02-r008-phy-20230113T174010Z.orca")
)
return orstr
40 changes: 40 additions & 0 deletions tests/orca/test_orca_fc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest

from daq2lh5.orca import orca_packet


@pytest.fixture(scope="module")
def fc_packets(orca_stream):
packets = []
packets.append(orca_stream.load_packet(3).copy()) # config
packets.append(orca_stream.load_packet(4).copy()) # status
packets.append(orca_stream.load_packet(13).copy()) # waveform
orca_stream.close_stream() # avoid warning that file is still open
return packets


def test_orfc_config_decoding(orca_stream, fc_packets):
config_packet = fc_packets[0]
assert config_packet is not None

data_id = orca_packet.get_data_id(config_packet)
name = orca_stream.header.get_id_to_decoder_name_dict()[data_id]
assert name == "ORFlashCamListenerConfigDecoder"


def test_orfc_status_decoding(orca_stream, fc_packets):
status_packet = fc_packets[1]
assert status_packet is not None

data_id = orca_packet.get_data_id(status_packet)
name = orca_stream.header.get_id_to_decoder_name_dict()[data_id]
assert name == "ORFlashCamListenerStatusDecoder"


def test_orfc_waveform_decoding(orca_stream, fc_packets):
wf_packet = fc_packets[2]
assert wf_packet is not None

data_id = orca_packet.get_data_id(wf_packet)
name = orca_stream.header.get_id_to_decoder_name_dict()[data_id]
assert name == "ORFlashCamWaveformDecoder"
43 changes: 41 additions & 2 deletions tests/orca/test_orca_packet.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,41 @@
def test_orca_packet_import():
pass
from daq2lh5.orca import orca_packet


def test_orca_packet_funcs(orca_stream):
# The values in this test are particular to the test orca file in
# legend-testdata and may need to be changed if that file is changed

assert orca_stream.count_packets() == 911

packet = orca_stream.load_packet()
assert orca_packet.is_short(packet) is False
assert orca_packet.get_data_id(packet) == 3
assert orca_packet.get_n_words(packet) == 4
assert orca_packet.hex_dump(packet, return_output=True)[-1] == "3 0x63c1977a"

id_dict = orca_stream.header.get_id_to_decoder_name_dict()
seen = []
for ii in range(100):
packet = orca_stream.load_packet(ii)
if packet is None:
break
name = id_dict[orca_packet.get_data_id(packet)]
# if ii < 20: print(ii, name)
if ii == 0:
assert name == "OrcaHeaderDecoder"
if ii == 1:
assert name == "ORRunDecoderForRun"
if ii == 910:
assert name == "ORRunDecoderForRun"
if name not in seen:
seen.append(name)
expected = [
"OrcaHeaderDecoder",
"ORRunDecoderForRun",
"ORFlashCamListenerConfigDecoder",
"ORFlashCamListenerStatusDecoder",
"ORFlashCamWaveformDecoder",
]
assert seen == expected

orca_stream.close_stream() # avoid warning that file is still open
Loading