diff --git a/docs/changes/2634.feature.rst b/docs/changes/2634.feature.rst new file mode 100644 index 00000000000..f09235fb0bb --- /dev/null +++ b/docs/changes/2634.feature.rst @@ -0,0 +1 @@ +Add ChunkInterpolator to ctapipe.monitoring.interpolation as a tool to select data from chunks. The planned use for this is to select calibration data. diff --git a/src/ctapipe/monitoring/__init__.py b/src/ctapipe/monitoring/__init__.py index cb102f768cf..03d583abaab 100644 --- a/src/ctapipe/monitoring/__init__.py +++ b/src/ctapipe/monitoring/__init__.py @@ -2,7 +2,12 @@ Module for handling monitoring data. """ from .aggregator import PlainAggregator, SigmaClippingAggregator, StatisticsAggregator -from .interpolation import Interpolator, PointingInterpolator +from .interpolation import ( + ChunkInterpolator, + LinearInterpolator, + MonitoringInterpolator, + PointingInterpolator, +) from .outlier import ( MedianOutlierDetector, OutlierDetector, @@ -18,6 +23,8 @@ "RangeOutlierDetector", "MedianOutlierDetector", "StdOutlierDetector", - "Interpolator", + "MonitoringInterpolator", + "LinearInterpolator", "PointingInterpolator", + "ChunkInterpolator", ] diff --git a/src/ctapipe/monitoring/interpolation.py b/src/ctapipe/monitoring/interpolation.py index 84064cbc1a3..95e6456de1d 100644 --- a/src/ctapipe/monitoring/interpolation.py +++ b/src/ctapipe/monitoring/interpolation.py @@ -1,85 +1,81 @@ from abc import ABCMeta, abstractmethod +from functools import partial from typing import Any import astropy.units as u import numpy as np import tables +from astropy.table import Table from astropy.time import Time from scipy.interpolate import interp1d from ctapipe.core import Component, traits __all__ = [ - "Interpolator", + "MonitoringInterpolator", + "LinearInterpolator", "PointingInterpolator", + "ChunkInterpolator", ] -class Interpolator(Component, metaclass=ABCMeta): +class MonitoringInterpolator(Component, metaclass=ABCMeta): """ - Interpolator parent class. + MonitoringInterpolator parent class. Parameters ---------- h5file : None | tables.File - A open hdf5 file with read access. + An open hdf5 file with read access. """ - bounds_error = traits.Bool( - default_value=True, - help="If true, raises an exception when trying to extrapolate out of the given table", - ).tag(config=True) - - extrapolate = traits.Bool( - help="If bounds_error is False, this flag will specify whether values outside" - "the available values are filled with nan (False) or extrapolated (True).", - default_value=False, - ).tag(config=True) - - telescope_data_group = None - required_columns = set() - expected_units = {} - - def __init__(self, h5file=None, **kwargs): + def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: super().__init__(**kwargs) + self.telescope_data_group = None + self.required_columns = set() + self.expected_units = {} + self._interpolators = {} + if h5file is not None and not isinstance(h5file, tables.File): raise TypeError("h5file must be a tables.File") self.h5file = h5file - self.interp_options: dict[str, Any] = dict(assume_sorted=True, copy=False) - if self.bounds_error: - self.interp_options["bounds_error"] = True - elif self.extrapolate: - self.interp_options["bounds_error"] = False - self.interp_options["fill_value"] = "extrapolate" - else: - self.interp_options["bounds_error"] = False - self.interp_options["fill_value"] = np.nan + @abstractmethod + def __call__(self, tel_id: int, time: Time): + """ + Interpolates monitoring data for a given timestamp - self._interpolators = {} + Parameters + ---------- + tel_id : int + Telescope id. + time : astropy.time.Time + Time for which to interpolate the monitoring data. + + """ + pass @abstractmethod - def add_table(self, tel_id, input_table): + def add_table(self, tel_id: int, input_table: Table) -> None: """ - Add a table to this interpolator + Add a table to this interpolator. This method reads input tables and creates instances of the needed interpolators to be added to _interpolators. The first index of _interpolators needs to be - tel_id, the second needs to be the name of the parameter that is to be interpolated + tel_id, the second needs to be the name of the parameter that is to be interpolated. Parameters ---------- tel_id : int - Telescope id + Telescope id. input_table : astropy.table.Table Table of pointing values, expected columns are always ``time`` as ``Time`` column and - other columns for the data that is to be interpolated + other columns for the data that is to be interpolated. """ - pass - def _check_tables(self, input_table): + def _check_tables(self, input_table: Table) -> None: missing = self.required_columns - set(input_table.colnames) if len(missing) > 0: raise ValueError(f"Table is missing required column(s): {missing}") @@ -98,14 +94,14 @@ def _check_tables(self, input_table): f"{col} must have units compatible with '{self.expected_units[col].name}'" ) - def _check_interpolators(self, tel_id): + def _check_interpolators(self, tel_id: int) -> None: if tel_id not in self._interpolators: if self.h5file is not None: self._read_parameter_table(tel_id) # might need to be removed else: raise KeyError(f"No table available for tel_id {tel_id}") - def _read_parameter_table(self, tel_id): + def _read_parameter_table(self, tel_id: int) -> None: # prevent circular import between io and monitoring from ..io import read_table @@ -116,32 +112,69 @@ def _read_parameter_table(self, tel_id): self.add_table(tel_id, input_table) -class PointingInterpolator(Interpolator): +class LinearInterpolator(MonitoringInterpolator): """ - Interpolator for pointing and pointing correction data + LinearInterpolator parent class. + + Parameters + ---------- + h5file : None | tables.File + An open hdf5 file with read access. """ - telescope_data_group = "/dl0/monitoring/telescope/pointing" - required_columns = frozenset(["time", "azimuth", "altitude"]) - expected_units = {"azimuth": u.rad, "altitude": u.rad} + bounds_error = traits.Bool( + default_value=True, + help="If true, raises an exception when trying to extrapolate out of the given table", + ).tag(config=True) - def __call__(self, tel_id, time): + extrapolate = traits.Bool( + help="If bounds_error is False, this flag will specify whether values outside" + "the available values are filled with nan (False) or extrapolated (True).", + default_value=False, + ).tag(config=True) + + def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: + super().__init__(h5file, **kwargs) + + self.interp_options: dict[str, Any] = dict(assume_sorted=True, copy=False) + if self.bounds_error: + self.interp_options["bounds_error"] = True + elif self.extrapolate: + self.interp_options["bounds_error"] = False + self.interp_options["fill_value"] = "extrapolate" + else: + self.interp_options["bounds_error"] = False + self.interp_options["fill_value"] = np.nan + + +class PointingInterpolator(LinearInterpolator): + """ + Interpolator for pointing and pointing correction data. + """ + + def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: + super().__init__(h5file, **kwargs) + self.telescope_data_group = "/dl0/monitoring/telescope/pointing" + self.required_columns = frozenset(["time", "azimuth", "altitude"]) + self.expected_units = {"azimuth": u.rad, "altitude": u.rad} + + def __call__(self, tel_id: int, time: Time) -> tuple[u.Quantity, u.Quantity]: """ Interpolate alt/az for given time and tel_id. Parameters ---------- tel_id : int - telescope id + Telescope id. time : astropy.time.Time - time for which to interpolate the pointing + Time for which to interpolate the pointing. Returns ------- altitude : astropy.units.Quantity[deg] - interpolated altitude angle + Interpolated altitude angle. azimuth : astropy.units.Quantity[deg] - interpolated azimuth angle + Interpolated azimuth angle. """ self._check_interpolators(tel_id) @@ -151,20 +184,19 @@ def __call__(self, tel_id, time): alt = u.Quantity(self._interpolators[tel_id]["alt"](mjd), u.rad, copy=False) return alt, az - def add_table(self, tel_id, input_table): + def add_table(self, tel_id: int, input_table: Table) -> None: """ - Add a table to this interpolator + Add a table to this interpolator. Parameters ---------- tel_id : int - Telescope id + Telescope id. input_table : astropy.table.Table Table of pointing values, expected columns are ``time`` as ``Time`` column, ``azimuth`` and ``altitude`` as quantity columns for pointing and pointing correction data. """ - self._check_tables(input_table) if not isinstance(input_table["time"], Time): @@ -184,5 +216,135 @@ def add_table(self, tel_id, input_table): alt = input_table["altitude"].quantity.to_value(u.rad) mjd = input_table["time"].tai.mjd self._interpolators[tel_id] = {} - self._interpolators[tel_id]["az"] = interp1d(mjd, az, **self.interp_options) - self._interpolators[tel_id]["alt"] = interp1d(mjd, alt, **self.interp_options) + self._interpolators[tel_id]["az"] = interp1d( + mjd, az, kind="linear", **self.interp_options + ) + self._interpolators[tel_id]["alt"] = interp1d( + mjd, alt, kind="linear", **self.interp_options + ) + + +class ChunkInterpolator(MonitoringInterpolator): + """ + Simple interpolator for overlapping chunks of data. + """ + + required_columns = frozenset(["start_time", "end_time"]) + start_time = {} + end_time = {} + values = {} + + def __call__( + self, tel_id: int, time: Time, columns: str | list[str] + ) -> float | dict[str, float]: + """ + Interpolate overlapping chunks of data for a given time, tel_id, and column(s). + + Parameters + ---------- + tel_id : int + Telescope id. + time : astropy.time.Time + Time for which to interpolate the data. + columns : str or list of str + Name(s) of the column(s) to interpolate. + + Returns + ------- + interpolated : float or dict + Interpolated data for the specified column(s). + """ + + self._check_interpolators(tel_id) + + if isinstance(columns, str): + columns = [columns] + + result = {} + mjd = time.to_value("mjd") + for column in columns: + if column not in self._interpolators[tel_id]: + raise ValueError( + f"Column '{column}' not found in interpolators for tel_id {tel_id}" + ) + result[column] = self._interpolators[tel_id][column](mjd) + + if len(result) == 1: + return result[columns[0]] + return result + + def add_table(self, tel_id: int, input_table: Table, columns: list[str]) -> None: + """ + Add a table to this interpolator for specific columns. + + Parameters + ---------- + tel_id : int + Telescope id. + input_table : astropy.table.Table + Table of values to be interpolated, expected columns + are ``start_time`` as ``validity start Time`` column, + ``end_time`` as ``validity end Time`` and the specified columns + for the data of the chunks. + columns : list of str + Names of the columns to interpolate. + """ + + required_columns = set(self.required_columns) + required_columns.update(columns) + self.required_columns = frozenset(required_columns) + self._check_tables(input_table) + + input_table = input_table.copy() + input_table.sort("start_time") + + if tel_id not in self._interpolators: + self._interpolators[tel_id] = {} + self.values[tel_id] = {} + self.start_time[tel_id] = {} + self.end_time[tel_id] = {} + + for column in columns: + self.values[tel_id][column] = input_table[column] + self.start_time[tel_id][column] = input_table["start_time"].to_value("mjd") + self.end_time[tel_id][column] = input_table["end_time"].to_value("mjd") + self._interpolators[tel_id][column] = partial( + self._interpolate_chunk, tel_id, column + ) + + def _interpolate_chunk(self, tel_id, column, mjd: float) -> float: + """ + Interpolates overlapping chunks of data preferring earlier chunks if valid + + Parameters + ---------- + tel_id : int + tel_id for which data is to be interpolated + column : str + name of the column for which data is to be interpolated + mjd : float + Time for which to interpolate the data. + """ + + start_time = self.start_time[tel_id][column] + end_time = self.end_time[tel_id][column] + values = self.values[tel_id][column] + # Find the index of the closest preceding start time + preceding_index = np.searchsorted(start_time, mjd, side="right") - 1 + if preceding_index < 0: + return np.nan + + # Check if the time is within the valid range of the chunk + if start_time[preceding_index] <= mjd <= end_time[preceding_index]: + value = values[preceding_index] + if not np.isnan(value): + return value + + # If the closest preceding chunk has nan, check the next closest chunk + for i in range(preceding_index - 1, -1, -1): + if start_time[i] <= mjd <= end_time[i]: + value = values[i] + if not np.isnan(value): + return value + + return np.nan diff --git a/src/ctapipe/monitoring/tests/test_interpolator.py b/src/ctapipe/monitoring/tests/test_interpolator.py index 782aeae7435..696f4f4f209 100644 --- a/src/ctapipe/monitoring/tests/test_interpolator.py +++ b/src/ctapipe/monitoring/tests/test_interpolator.py @@ -5,11 +5,127 @@ from astropy.table import Table from astropy.time import Time -from ctapipe.monitoring.interpolation import PointingInterpolator +from ctapipe.monitoring.interpolation import ChunkInterpolator, PointingInterpolator t0 = Time("2022-01-01T00:00:00") +def test_chunk_selection(): + table = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "values": [1, 2, 3, 4], + }, + ) + interpolator = ChunkInterpolator() + interpolator.add_table(1, table, ["values"]) + + val1 = interpolator(tel_id=1, time=t0 + 1.2 * u.s, columns="values") + val2 = interpolator(tel_id=1, time=t0 + 1.7 * u.s, columns="values") + val3 = interpolator(tel_id=1, time=t0 + 2.2 * u.s, columns="values") + + assert np.isclose(val1, 2) + assert np.isclose(val2, 2) + assert np.isclose(val3, 3) + + +def test_chunk_selection_multiple_columns(): + table = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "values1": [1, 2, 3, 4], + "values2": [10, 20, 30, 40], + }, + ) + interpolator = ChunkInterpolator() + interpolator.add_table(1, table, ["values1", "values2"]) + + result1 = interpolator( + tel_id=1, time=t0 + 1.2 * u.s, columns=["values1", "values2"] + ) + result2 = interpolator( + tel_id=1, time=t0 + 1.7 * u.s, columns=["values1", "values2"] + ) + result3 = interpolator( + tel_id=1, time=t0 + 2.2 * u.s, columns=["values1", "values2"] + ) + + assert np.isclose(result1["values1"], 2) + assert np.isclose(result1["values2"], 20) + assert np.isclose(result2["values1"], 2) + assert np.isclose(result2["values2"], 20) + assert np.isclose(result3["values1"], 3) + assert np.isclose(result3["values2"], 30) + + +def test_nan_switch(): + table = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "values": [1, np.nan, 3, 4], + }, + ) + interpolator = ChunkInterpolator() + interpolator.add_table(1, table, ["values"]) + + val = interpolator(tel_id=1, time=t0 + 1.2 * u.s, columns="values") + + assert np.isclose(val, 1) + + +def test_nan_switch_multiple_columns(): + table = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "values1": [1, np.nan, 3, 4], + "values2": [10, 20, np.nan, 40], + }, + ) + interpolator = ChunkInterpolator() + interpolator.add_table(1, table, ["values1", "values2"]) + + result = interpolator(tel_id=1, time=t0 + 1.2 * u.s, columns=["values1", "values2"]) + + assert np.isclose(result["values1"], 1) + assert np.isclose(result["values2"], 20) + + +def test_no_valid_chunk(): + table = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "values": [1, 2, 3, 4], + }, + ) + interpolator = ChunkInterpolator() + interpolator.add_table(1, table, ["values"]) + + val = interpolator(tel_id=1, time=t0 + 5.2 * u.s, columns="values") + assert np.isnan(val) + + +def test_no_valid_chunk_multiple_columns(): + table = Table( + { + "start_time": t0 + [0, 1, 2, 6] * u.s, + "end_time": t0 + [2, 3, 4, 8] * u.s, + "values1": [1, 2, 3, 4], + "values2": [10, 20, 30, 40], + }, + ) + interpolator = ChunkInterpolator() + interpolator.add_table(1, table, ["values1", "values2"]) + + result = interpolator(tel_id=1, time=t0 + 5.2 * u.s, columns=["values1", "values2"]) + assert np.isnan(result["values1"]) + assert np.isnan(result["values2"]) + + def test_azimuth_switchover(): """Test pointing interpolation"""