Skip to content

Commit

Permalink
Merge pull request #19 from legend-exp/hdf5_settings
Browse files Browse the repository at this point in the history
Configure waveform compression and HDF5 settings in `build_raw()` config files
  • Loading branch information
gipert authored Nov 13, 2023
2 parents cf62519 + 13dbab3 commit 2cb5b56
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 36 deletions.
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ classifiers =
[options]
packages = find:
install_requires =
dspeed~=1.1
dspeed>=1.1
h5py>=3.2.0
hdf5plugin
legend-pydataobj~=1.1
legend-pydataobj>=1.4.1
numpy>=1.21
pyfcutils
tqdm>=4.27
Expand Down
24 changes: 21 additions & 3 deletions src/daq2lh5/buffer_processor/buffer_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ def buffer_processor(rb: RawBuffer) -> Table:
``"dtype_conv": {"lgdo": "dtype" [, ...]}`` `(dict)`
Casts `lgdo` to the requested data type.
``"compression": { "lgdo": "codec_name" [, ...]}`` `(dict)`
``"compression": {"lgdo": "codec_name" [, ...]}`` `(dict)`
Updates the `compression` attribute of `lgdo` to `codec_name`. The
attribute sets the compression algorithm applied by
:func:`~.lgdo.lh5_store.LH5Store.read_object` before writing `lgdo` to
disk. Can be used to apply custom waveform compression algorithms from
:mod:`lgdo.compression`.
``"hdf5_settings": {"lgdo": { <HDF5 settings> }}`` `(dict)`
Updates the `hdf5_settings` attribute of `lgdo`. The attribute sets the
HDF5 dataset options applied by
:func:`~.lgdo.lh5_store.LH5Store.read_object` before writing `lgdo` to
disk.
Parameters
Expand Down Expand Up @@ -102,7 +109,9 @@ def buffer_processor(rb: RawBuffer) -> Table:
,}
"compression": {
"windowed_waveform/values": RadwareSigcompress(codec_shift=-32768),
"presummed_waveform/values": ULEB128ZigZagDiff(),
}
"hdf5_settings": {
"presummed_waveform/values": {"shuffle": True, "compression": "lzf"},
}
}
},
Expand Down Expand Up @@ -143,7 +152,7 @@ def buffer_processor(rb: RawBuffer) -> Table:
if "drop" in rb.proc_spec.keys():
process_drop(rb, tmp_table)

# at last, assign compression attributes
# assign compression attributes
if "compression" in rb.proc_spec.keys():
for name, codec in rb.proc_spec["compression"].items():
ptr = tmp_table
Expand All @@ -154,6 +163,15 @@ def buffer_processor(rb: RawBuffer) -> Table:
codec if isinstance(codec, WaveformCodec) else str2wfcodec(codec)
)

# and HDF5 settings
if "hdf5_settings" in rb.proc_spec.keys():
for name, settings in rb.proc_spec["hdf5_settings"].items():
ptr = tmp_table
for word in name.split("/"):
ptr = ptr[word]

ptr.attrs["hdf5_settings"] = settings

return tmp_table


Expand Down
16 changes: 9 additions & 7 deletions src/daq2lh5/build_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
import os
import time

import hdf5plugin
import lgdo
import numpy as np
from lgdo.lh5_store import DEFAULT_HDF5_COMPRESSION
from tqdm.auto import tqdm

from .compass.compass_streamer import CompassStreamer
Expand All @@ -28,7 +26,7 @@ def build_raw(
n_max: int = np.inf,
overwrite: bool = False,
compass_config_file: str = None,
hdf5_compression: str | dict | hdf5plugin.filters.Filter = DEFAULT_HDF5_COMPRESSION,
hdf5_settings: dict[str, ...] = None,
**kwargs,
) -> None:
"""Convert data into LEGEND HDF5 raw-tier format.
Expand Down Expand Up @@ -77,12 +75,16 @@ def build_raw(
json-shorthand for the output specification (see
:mod:`.compass.compass_event_decoder`).
hdf5_compression
forwarded to :meth:`~.lgdo.lh5_store.LH5Store.write_object`.
hdf5_settings
keyword arguments (as a dict) forwarded to
:meth:`~.lgdo.lh5_store.LH5Store.write_object`.
**kwargs
sent to :class:`.RawBufferLibrary` generation as `kw_dict` argument.
"""
if hdf5_settings is None:
hdf5_settings = {}

# convert any environment variables in in_stream so that we can check for readability
in_stream = os.path.expandvars(in_stream)
# later: fix if in_stream is not a file
Expand Down Expand Up @@ -223,7 +225,7 @@ def build_raw(

# Write header data
lh5_store = lgdo.LH5Store(keep_open=True)
write_to_lh5_and_clear(header_data, lh5_store, hdf5_compression=hdf5_compression)
write_to_lh5_and_clear(header_data, lh5_store, **hdf5_settings)

# Now loop through the data
n_bytes_last = streamer.n_bytes_read
Expand All @@ -248,7 +250,7 @@ def build_raw(
if log.getEffectiveLevel() <= logging.INFO and n_max < np.inf:
progress_bar.update(n_read)

write_to_lh5_and_clear(chunk_list, lh5_store, hdf5_compression=hdf5_compression)
write_to_lh5_and_clear(chunk_list, lh5_store, **hdf5_settings)

if n_max <= 0:
log.info(f"Wrote {n_max} rows, exiting...")
Expand Down
42 changes: 21 additions & 21 deletions src/daq2lh5/data_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,31 @@ class DataDecoder:
Any key-value entry in a configuration dictionary attached to an element
of `decoded_values` is typically interpreted as an attribute to be attached
to the corresponding LGDO. This feature can be for example exploited to
specify the data compression algorithm used by
specify HDF5 dataset settings used by
:meth:`~.lgdo.lh5_store.LH5Store.write_object` to write LGDOs to disk.
For example ::
from lgdo.compression import RadwareSigcompress
FCEventDecoder.decoded_values = {
"packet_id": {"dtype": "uint32", "compression": "gzip"},
"packet_id": {"dtype": "uint32", "hdf5_settings": {"compression": "gzip"}},
# ...
"waveform": {
"dtype": "uint16",
"datatype": "waveform",
# ...
"compression": {"values": RadwareSigcompress(codec_shift=-32768)},
"hdf5_settings": {"t0": {"compression": "lzf", shuffle: True}},
}
}
LGDOs corresponding to ``packet_id`` and ``waveform`` will have their
`compression` attribute set as ``"gzip"`` and
``RadwareSigcompress(codec_shift=-32768)``, respectively. Before being
written to disk, they will compressed with the HDF5 built-in Gzip filter
and with the :class:`~.lgdo.compression.radware.RadwareSigcompress`
waveform compressor.
The LGDO corresponding to ``packet_id`` will have its `hdf5_settings`
attribute set as ``{"compression": "gzip"}``, while ``waveform.values``
will have its `compression` attribute set to
``RadwareSigcompress(codec_shift=-32768)``. Before being written to disk,
they will be compressed with the HDF5 built-in Gzip filter and with the
:class:`~.lgdo.compression.radware.RadwareSigcompress` waveform compressor.
Examples
--------
Expand Down Expand Up @@ -178,7 +179,10 @@ def make_lgdo(self, key: int | str = None, size: int = None) -> LGDO:
dt = attrs.pop("dt")
dt_units = attrs.pop("dt_units")
wf_len = attrs.pop("wf_len")
compression = attrs.pop("compression", None)
settings = {
"compression": attrs.pop("compression", {}),
"hdf5_settings": attrs.pop("hdf5_settings", {}),
}

wf_table = lgdo.WaveformTable(
size=size,
Expand All @@ -190,18 +194,14 @@ def make_lgdo(self, key: int | str = None, size: int = None) -> LGDO:
dtype=dtype,
attrs=attrs,
)
if compression is not None:
if not isinstance(compression, dict):
raise RuntimeError(
"waveform/compression attribute must be a dictionary"
)

if "values" in compression:
wf_table.values.attrs["compression"] = compression["values"]
if "t0" in compression:
wf_table.t0.attrs["compression"] = compression["t0"]
if "dt" in compression:
wf_table.dt.attrs["compression"] = compression["dt"]

# attach compression/hdf5_settings to sub-fields
for el in ["values", "t0", "dt"]:
for settings_name in ("hdf5_settings", "compression"):
if el in settings[settings_name]:
wf_table[el].attrs[settings_name] = settings[settings_name][
el
]

data_obj.add_field(field, wf_table)
continue
Expand Down
5 changes: 4 additions & 1 deletion tests/buffer_processor/test_buffer_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from daq2lh5.build_raw import build_raw
from daq2lh5.fc.fc_event_decoder import fc_decoded_values

# skip compression in build_raw
# skip waveform compression in build_raw
fc_decoded_values["waveform"].pop("compression", None)

config_dir = Path(__file__).parent / "test_buffer_processor_configs"
Expand Down Expand Up @@ -1409,6 +1409,9 @@ def test_buffer_processor_compression_settings(lgnd_test_data, tmptestdir):
),
"presummed_waveform/values": ULEB128ZigZagDiff(),
},
"hdf5_settings": {
"presummed_waveform/values": {"shuffle": True},
},
},
}
}
Expand Down
62 changes: 60 additions & 2 deletions tests/buffer_processor/test_lh5_buffer_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from pathlib import Path

import h5py
import lgdo
import numpy as np
from dspeed import build_processing_chain as bpc
Expand All @@ -11,7 +12,7 @@
from daq2lh5.build_raw import build_raw
from daq2lh5.fc.fc_event_decoder import fc_decoded_values

# skip compression in build_raw
# skip waveform compression in build_raw
fc_decoded_values["waveform"].pop("compression", None)

config_dir = Path(__file__).parent / "test_buffer_processor_configs"
Expand Down Expand Up @@ -308,7 +309,7 @@ def test_lh5_buffer_processor_separate_name_tables(lgnd_test_data):
# Set up I/O files, including config
daq_file = lgnd_test_data.get_path("fcio/L200-comm-20211130-phy-spms.fcio")
raw_file = daq_file.replace(
"L200-comm-20211130-phy-spms.fcio", " L200-comm-fake-geds-and-spms.lh5"
"L200-comm-20211130-phy-spms.fcio", "L200-comm-fake-geds-and-spms.lh5"
)
processed_file = raw_file.replace(
"L200-comm-fake-geds-and-spms.lh5", "L200-comm-fake-geds-and-spms_proc.lh5"
Expand Down Expand Up @@ -1089,3 +1090,60 @@ def test_buffer_processor_all_pass(lgnd_test_data):
proc_df = proc[obj].get_dataframe([str(sub_obj)])

assert raw_df.equals(proc_df)


def test_lh5_buffer_processor_hdf5_settings(lgnd_test_data):
# Set up I/O files, including config
daq_file = lgnd_test_data.get_path("fcio/L200-comm-20211130-phy-spms.fcio")
raw_file = daq_file.replace(
"L200-comm-20211130-phy-spms.fcio", "L200-comm-20211130-phy-spms.lh5"
)
processed_file = raw_file.replace(
"L200-comm-20211130-phy-spms.lh5", "L200-comm-20211130-phy-spms_proc.lh5"
)

out_spec = {
"FCEventDecoder": {
"ch{key}": {
"key_list": [[0, 6]],
"out_stream": processed_file + ":{name}",
"out_name": "raw",
"proc_spec": {
"window": ["waveform", 1000, -1000, "windowed_waveform"],
"dsp_config": {
"outputs": ["presum_2rate", "presummed_waveform"],
"processors": {
"presum_rate, presummed_waveform": {
"function": "presum",
"module": "dspeed.processors",
"args": [
"waveform",
0,
"presum_rate",
"presummed_waveform(shape=len(waveform)/16, period=waveform.period*16, offset=waveform.offset)",
],
"unit": "ADC",
}
},
},
"drop": ["waveform"],
"dtype_conv": {
"presummed_waveform/values": "uint32",
},
"hdf5_settings": {
"presummed_waveform/values": {
"compression": "lzf",
"shuffle": False,
},
},
},
}
}
}

# Build the raw file
build_raw(in_stream=daq_file, out_spec=out_spec, overwrite=True)

with h5py.File(processed_file) as f:
assert f["ch0"]["raw"]["presummed_waveform"]["values"].compression == "lzf"
assert f["ch0"]["raw"]["presummed_waveform"]["values"].shuffle is False
Loading

0 comments on commit 2cb5b56

Please sign in to comment.