Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk interpolation to select calibration data #2634

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
150 changes: 126 additions & 24 deletions src/ctapipe/monitoring/interpolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
import astropy.units as u
import numpy as np
import tables
from astropy.table import Table
from astropy.time import Time
from scipy.interpolate import interp1d

from ctapipe.core import Component, traits

__all__ = [
"Interpolator",
"PointingInterpolator",
]
__all__ = ["PointingInterpolator", "ChunkInterpolator"]
maxnoe marked this conversation as resolved.
Show resolved Hide resolved


class Interpolator(Component, metaclass=ABCMeta):
Expand All @@ -22,7 +20,7 @@
Parameters
----------
h5file : None | tables.File
A open hdf5 file with read access.
An open hdf5 file with read access.
"""

bounds_error = traits.Bool(
Expand All @@ -40,7 +38,7 @@
required_columns = set()
expected_units = {}

def __init__(self, h5file=None, **kwargs):
def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None:
super().__init__(**kwargs)

if h5file is not None and not isinstance(h5file, tables.File):
Expand All @@ -60,26 +58,25 @@
self._interpolators = {}

@abstractmethod
def add_table(self, tel_id, input_table):
def add_table(self, tel_id: int, input_table: Table) -> None:
"""
Add a table to this interpolator
Add a table to this interpolator.
This method reads input tables and creates instances of the needed interpolators
to be added to _interpolators. The first index of _interpolators needs to be
tel_id, the second needs to be the name of the parameter that is to be interpolated
tel_id, the second needs to be the name of the parameter that is to be interpolated.

Parameters
----------
tel_id : int
Telescope id
Telescope id.
input_table : astropy.table.Table
Table of pointing values, expected columns
are always ``time`` as ``Time`` column and
other columns for the data that is to be interpolated
other columns for the data that is to be interpolated.
"""

pass

def _check_tables(self, input_table):
def _check_tables(self, input_table: Table) -> None:
missing = self.required_columns - set(input_table.colnames)
if len(missing) > 0:
raise ValueError(f"Table is missing required column(s): {missing}")
Expand All @@ -98,14 +95,14 @@
f"{col} must have units compatible with '{self.expected_units[col].name}'"
)

def _check_interpolators(self, tel_id):
def _check_interpolators(self, tel_id: int) -> None:
if tel_id not in self._interpolators:
if self.h5file is not None:
self._read_parameter_table(tel_id) # might need to be removed
else:
raise KeyError(f"No table available for tel_id {tel_id}")

def _read_parameter_table(self, tel_id):
def _read_parameter_table(self, tel_id: int) -> None:
# prevent circular import between io and monitoring
from ..io import read_table

Expand All @@ -118,30 +115,30 @@

class PointingInterpolator(Interpolator):
"""
Interpolator for pointing and pointing correction data
Interpolator for pointing and pointing correction data.
"""

telescope_data_group = "/dl0/monitoring/telescope/pointing"
required_columns = frozenset(["time", "azimuth", "altitude"])
expected_units = {"azimuth": u.rad, "altitude": u.rad}

def __call__(self, tel_id, time):
def __call__(self, tel_id: int, time: Time) -> tuple[u.Quantity, u.Quantity]:
"""
Interpolate alt/az for given time and tel_id.

Parameters
----------
tel_id : int
telescope id
Telescope id.
time : astropy.time.Time
time for which to interpolate the pointing
Time for which to interpolate the pointing.

Returns
-------
altitude : astropy.units.Quantity[deg]
interpolated altitude angle
Interpolated altitude angle.
azimuth : astropy.units.Quantity[deg]
interpolated azimuth angle
Interpolated azimuth angle.
"""

self._check_interpolators(tel_id)
Expand All @@ -151,14 +148,14 @@
alt = u.Quantity(self._interpolators[tel_id]["alt"](mjd), u.rad, copy=False)
return alt, az

def add_table(self, tel_id, input_table):
def add_table(self, tel_id: int, input_table: Table) -> None:
"""
Add a table to this interpolator
Add a table to this interpolator.

Parameters
----------
tel_id : int
Telescope id
Telescope id.
input_table : astropy.table.Table
Table of pointing values, expected columns
are ``time`` as ``Time`` column, ``azimuth`` and ``altitude``
Expand Down Expand Up @@ -186,3 +183,108 @@
self._interpolators[tel_id] = {}
self._interpolators[tel_id]["az"] = interp1d(mjd, az, **self.interp_options)
self._interpolators[tel_id]["alt"] = interp1d(mjd, alt, **self.interp_options)


class ChunkInterpolator(Interpolator):
"""
Simple interpolator for overlapping chunks of data.
"""

required_columns = frozenset(["start_time", "end_time"])

def __call__(
self, tel_id: int, time: Time, columns: str | list[str]
) -> float | dict[str, float]:
"""
Interpolate overlapping chunks of data for a given time, tel_id, and column(s).

Parameters
----------
tel_id : int
Telescope id.
time : astropy.time.Time
Time for which to interpolate the data.
columns : str or list of str
Name(s) of the column(s) to interpolate.

Returns
-------
interpolated : float or dict
Interpolated data for the specified column(s).
"""

self._check_interpolators(tel_id)

if isinstance(columns, str):
columns = [columns]

result = {}
mjd = time.to_value("mjd")
for column in columns:
if column not in self._interpolators[tel_id]:
raise ValueError(
f"Column '{column}' not found in interpolators for tel_id {tel_id}"
)
result[column] = self._interpolators[tel_id][column](mjd)

if len(result) == 1:
return result[columns[0]]
return result

def add_table(self, tel_id: int, input_table: Table, columns: list[str]) -> None:

Check failure on line 234 in src/ctapipe/monitoring/interpolation.py

View check run for this annotation

CTAO-DPPS-SonarQube / ctapipe Sonarqube Results

src/ctapipe/monitoring/interpolation.py#L234

Refactor this function to reduce its Cognitive Complexity from 24 to the 15 allowed.
"""
Add a table to this interpolator for specific columns.

Parameters
----------
tel_id : int
Telescope id.
input_table : astropy.table.Table
Table of values to be interpolated, expected columns
are ``start_time`` as ``validity start Time`` column,
``end_time`` as ``validity end Time`` and the specified columns
for the data of the chunks.
columns : list of str
Names of the columns to interpolate.
"""

required_columns = set(self.required_columns)
required_columns.update(columns)
self.required_columns = frozenset(required_columns)
self._check_tables(input_table)

input_table = input_table.copy()
input_table.sort("start_time")
start_time = input_table["start_time"].to_value("mjd")
end_time = input_table["end_time"].to_value("mjd")

if tel_id not in self._interpolators:
self._interpolators[tel_id] = {}

for column in columns:
values = input_table[column]

def interpolate_chunk(
mjd: float, start_time=start_time, end_time=end_time, values=values
) -> float:
# Find the index of the closest preceding start time
preceding_index = np.searchsorted(start_time, mjd, side="right") - 1
if preceding_index < 0:
return np.nan

# Check if the time is within the valid range of the chunk
if start_time[preceding_index] <= mjd <= end_time[preceding_index]:
value = values[preceding_index]
if not np.isnan(value):
return value

# If the closest preceding chunk has nan, check the next closest chunk
for i in range(preceding_index - 1, -1, -1):
if start_time[i] <= mjd <= end_time[i]:
value = values[i]
if not np.isnan(value):
return value

return np.nan

self._interpolators[tel_id][column] = interpolate_chunk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit weird here to store closures in a dict instead of just calling a method with tel_id and column that then checks if the necessary table exists.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i'll change it to a method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation follows the PointingInterpolator approach, but of course can be changed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, _check_interpolators in the parent class assumes the current implementation. If i change it to what you describe i will need to make changes to the parent class or forgo this check for the ChunkInterpolator

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe keeping it like this is better.

Copy link
Member

@maxnoe maxnoe Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Looks good, the __call__ method is missing on the interface though

I am not sure if putting the private methods you listed here to the base class will work out in the end, but you'll see how much code can be shared between the chunk interpolator and the linear interpolator.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i'll work on the implementation now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we take an opportunity and stop using interp1d? It is legacy, andscipy suggests this: https://docs.scipy.org/doc/scipy/tutorial/interpolate/1D.html#tutorial-interpolate-1dsection

Also, if we call a base class LinearInterpolator, I guess we should enforce linearity (in the example above, e.g. cubic spline can be applied)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no replacement for linear 1d interpolation for interp1d. It is legacy, but not deprecated.

See scipy/scipy#21258

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also got tricked by "may be removed in the future"

118 changes: 117 additions & 1 deletion src/ctapipe/monitoring/tests/test_interpolator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,127 @@
from astropy.table import Table
from astropy.time import Time

from ctapipe.monitoring.interpolation import PointingInterpolator
from ctapipe.monitoring.interpolation import ChunkInterpolator, PointingInterpolator

t0 = Time("2022-01-01T00:00:00")


def test_chunk_selection():
table = Table(
{
"start_time": t0 + [0, 1, 2, 6] * u.s,
"end_time": t0 + [2, 3, 4, 8] * u.s,
"values": [1, 2, 3, 4],
},
)
interpolator = ChunkInterpolator()
interpolator.add_table(1, table, ["values"])

val1 = interpolator(tel_id=1, time=t0 + 1.2 * u.s, columns="values")
val2 = interpolator(tel_id=1, time=t0 + 1.7 * u.s, columns="values")
val3 = interpolator(tel_id=1, time=t0 + 2.2 * u.s, columns="values")

assert np.isclose(val1, 2)
assert np.isclose(val2, 2)
assert np.isclose(val3, 3)


def test_chunk_selection_multiple_columns():
table = Table(
{
"start_time": t0 + [0, 1, 2, 6] * u.s,
"end_time": t0 + [2, 3, 4, 8] * u.s,
"values1": [1, 2, 3, 4],
"values2": [10, 20, 30, 40],
},
)
interpolator = ChunkInterpolator()
interpolator.add_table(1, table, ["values1", "values2"])

result1 = interpolator(
tel_id=1, time=t0 + 1.2 * u.s, columns=["values1", "values2"]
)
result2 = interpolator(
tel_id=1, time=t0 + 1.7 * u.s, columns=["values1", "values2"]
)
result3 = interpolator(
tel_id=1, time=t0 + 2.2 * u.s, columns=["values1", "values2"]
)

assert np.isclose(result1["values1"], 2)
assert np.isclose(result1["values2"], 20)
assert np.isclose(result2["values1"], 2)
assert np.isclose(result2["values2"], 20)
assert np.isclose(result3["values1"], 3)
assert np.isclose(result3["values2"], 30)


def test_nan_switch():
table = Table(
{
"start_time": t0 + [0, 1, 2, 6] * u.s,
"end_time": t0 + [2, 3, 4, 8] * u.s,
"values": [1, np.nan, 3, 4],
},
)
interpolator = ChunkInterpolator()
interpolator.add_table(1, table, ["values"])

val = interpolator(tel_id=1, time=t0 + 1.2 * u.s, columns="values")

assert np.isclose(val, 1)


def test_nan_switch_multiple_columns():
table = Table(
{
"start_time": t0 + [0, 1, 2, 6] * u.s,
"end_time": t0 + [2, 3, 4, 8] * u.s,
"values1": [1, np.nan, 3, 4],
"values2": [10, 20, np.nan, 40],
},
)
interpolator = ChunkInterpolator()
interpolator.add_table(1, table, ["values1", "values2"])

result = interpolator(tel_id=1, time=t0 + 1.2 * u.s, columns=["values1", "values2"])

assert np.isclose(result["values1"], 1)
assert np.isclose(result["values2"], 20)


def test_no_valid_chunk():
table = Table(
{
"start_time": t0 + [0, 1, 2, 6] * u.s,
"end_time": t0 + [2, 3, 4, 8] * u.s,
"values": [1, 2, 3, 4],
},
)
interpolator = ChunkInterpolator()
interpolator.add_table(1, table, ["values"])

val = interpolator(tel_id=1, time=t0 + 5.2 * u.s, columns="values")
assert np.isnan(val)


def test_no_valid_chunk_multiple_columns():
table = Table(
{
"start_time": t0 + [0, 1, 2, 6] * u.s,
"end_time": t0 + [2, 3, 4, 8] * u.s,
"values1": [1, 2, 3, 4],
"values2": [10, 20, 30, 40],
},
)
interpolator = ChunkInterpolator()
interpolator.add_table(1, table, ["values1", "values2"])

result = interpolator(tel_id=1, time=t0 + 5.2 * u.s, columns=["values1", "values2"])
assert np.isnan(result["values1"])
assert np.isnan(result["values2"])


def test_azimuth_switchover():
"""Test pointing interpolation"""

Expand Down
Loading