diff --git a/docs/api-reference/vectorization/index.rst b/docs/api-reference/vectorization/index.rst new file mode 100644 index 00000000000..640a28b2eb0 --- /dev/null +++ b/docs/api-reference/vectorization/index.rst @@ -0,0 +1,20 @@ +.. _vectorization: + +******************************************************************************* +Helper Functions for Vectorized Operations on Tables (`~ctapipe.vectorization`) +******************************************************************************* + +.. currentmodule:: ctapipe.vectorization + +This module contains helper functions for performing vectorized operations +on tables, as used in e.g. :ref:`reco_stereo_combination`. + + +Reference/API +============= + +.. automodapi:: ctapipe.vectorization + :no-inheritance-diagram: + +.. automodapi:: ctapipe.vectorization.aggregate + :no-inheritance-diagram: diff --git a/docs/changes/2658.bugfix.rst b/docs/changes/2658.bugfix.rst new file mode 100644 index 00000000000..cf89c761fca --- /dev/null +++ b/docs/changes/2658.bugfix.rst @@ -0,0 +1,2 @@ +Fix ``_uncert`` calculations in ``ctapipe.reco.StereoMeanCombiner``. +Add helper functions for vectorized numpy calculations as new ``ctapipe.vectorization`` module. diff --git a/src/ctapipe/reco/stereo_combination.py b/src/ctapipe/reco/stereo_combination.py index 19c561d89b2..5fc85f91e4f 100644 --- a/src/ctapipe/reco/stereo_combination.py +++ b/src/ctapipe/reco/stereo_combination.py @@ -17,6 +17,7 @@ ReconstructedEnergyContainer, ReconstructedGeometryContainer, ) +from ..vectorization import get_subarray_index, weighted_mean_std_ufunc from .utils import add_defaults_and_meta _containers = { @@ -31,38 +32,6 @@ ] -def _grouped_add(tel_data, n_array_events, indices): - """ - Calculate the group-wise sum for each array event over the - corresponding telescope events. ``indices`` is an array - that gives the index of the subarray event for each telescope event, - returned by - ``np.unique(tel_events[["obs_id", "event_id"]], return_inverse=True)`` - """ - combined_values = np.zeros(n_array_events) - np.add.at(combined_values, indices, tel_data) - return combined_values - - -def _weighted_mean_ufunc(tel_values, weights, n_array_events, indices): - # avoid numerical problems by very large or small weights - weights = weights / weights.max() - sum_prediction = _grouped_add( - tel_values * weights, - n_array_events, - indices, - ) - sum_of_weights = _grouped_add( - weights, - n_array_events, - indices, - ) - mean = np.full(n_array_events, np.nan) - valid = sum_of_weights > 0 - mean[valid] = sum_prediction[valid] / sum_of_weights[valid] - return mean - - class StereoCombiner(Component): """ Base Class for algorithms combining telescope-wise predictions to common prediction. @@ -299,26 +268,27 @@ def predict_table(self, mono_predictions: Table) -> Table: prefix = f"{self.prefix}_tel" # TODO: Integrate table quality query once its done valid = mono_predictions[f"{prefix}_is_valid"] - valid_predictions = mono_predictions[valid] - array_events, indices, multiplicity = np.unique( - mono_predictions[["obs_id", "event_id"]], - return_inverse=True, - return_counts=True, + obs_ids, event_ids, multiplicity, tel_to_array_indices = get_subarray_index( + mono_predictions ) - stereo_table = Table(array_events) + n_array_events = len(obs_ids) + stereo_table = Table({"obs_id": obs_ids, "event_id": event_ids}) # copy metadata for colname in ("obs_id", "event_id"): stereo_table[colname].description = mono_predictions[colname].description - n_array_events = len(array_events) - weights = self._calculate_weights(valid_predictions) + weights = self._calculate_weights(mono_predictions[valid]) if self.property is ReconstructionProperty.PARTICLE_TYPE: - if len(valid_predictions) > 0: - mono_predictions = valid_predictions[f"{prefix}_prediction"] - stereo_predictions = _weighted_mean_ufunc( - mono_predictions, weights, n_array_events, indices[valid] + if np.sum(valid) > 0: + stereo_predictions, _ = weighted_mean_std_ufunc( + mono_predictions[f"{prefix}_prediction"], + valid, + n_array_events, + tel_to_array_indices, + multiplicity, + weights=weights, ) else: stereo_predictions = np.full(n_array_events, np.nan) @@ -328,29 +298,21 @@ def predict_table(self, mono_predictions: Table) -> Table: stereo_table[f"{self.prefix}_goodness_of_fit"] = np.nan elif self.property is ReconstructionProperty.ENERGY: - if len(valid_predictions) > 0: - mono_energies = valid_predictions[f"{prefix}_energy"].quantity.to_value( + if np.sum(valid) > 0: + mono_energies = mono_predictions[f"{prefix}_energy"].quantity.to_value( u.TeV ) - if self.log_target: mono_energies = np.log(mono_energies) - stereo_energy = _weighted_mean_ufunc( + stereo_energy, std = weighted_mean_std_ufunc( mono_energies, - weights, + valid, n_array_events, - indices[valid], + tel_to_array_indices, + multiplicity, + weights=weights, ) - variance = _weighted_mean_ufunc( - (mono_energies - np.repeat(stereo_energy, multiplicity)[valid]) - ** 2, - weights, - n_array_events, - indices[valid], - ) - std = np.sqrt(variance) - if self.log_target: stereo_energy = np.exp(stereo_energy) std = np.exp(std) @@ -369,22 +331,37 @@ def predict_table(self, mono_predictions: Table) -> Table: stereo_table[f"{self.prefix}_goodness_of_fit"] = np.nan elif self.property is ReconstructionProperty.GEOMETRY: - if len(valid_predictions) > 0: + if np.sum(valid) > 0: coord = AltAz( - alt=valid_predictions[f"{prefix}_alt"], - az=valid_predictions[f"{prefix}_az"], + alt=mono_predictions[f"{prefix}_alt"], + az=mono_predictions[f"{prefix}_az"], ) # https://en.wikipedia.org/wiki/Von_Mises%E2%80%93Fisher_distribution#Mean_direction mono_x, mono_y, mono_z = coord.cartesian.get_xyz() - stereo_x = _weighted_mean_ufunc( - mono_x, weights, n_array_events, indices[valid] + stereo_x, _ = weighted_mean_std_ufunc( + mono_x, + valid, + n_array_events, + tel_to_array_indices, + multiplicity, + weights=weights, ) - stereo_y = _weighted_mean_ufunc( - mono_y, weights, n_array_events, indices[valid] + stereo_y, _ = weighted_mean_std_ufunc( + mono_y, + valid, + n_array_events, + tel_to_array_indices, + multiplicity, + weights=weights, ) - stereo_z = _weighted_mean_ufunc( - mono_z, weights, n_array_events, indices[valid] + stereo_z, _ = weighted_mean_std_ufunc( + mono_z, + valid, + n_array_events, + tel_to_array_indices, + multiplicity, + weights=weights, ) mean_cartesian = CartesianRepresentation( @@ -425,7 +402,9 @@ def predict_table(self, mono_predictions: Table) -> Table: tel_ids = [[] for _ in range(n_array_events)] - for index, tel_id in zip(indices[valid], valid_predictions["tel_id"]): + for index, tel_id in zip( + tel_to_array_indices[valid], mono_predictions["tel_id"][valid] + ): tel_ids[index].append(tel_id) stereo_table[f"{self.prefix}_telescopes"] = tel_ids diff --git a/src/ctapipe/reco/tests/test_stereo_combination.py b/src/ctapipe/reco/tests/test_stereo_combination.py index 7187262af27..63ced2a980c 100644 --- a/src/ctapipe/reco/tests/test_stereo_combination.py +++ b/src/ctapipe/reco/tests/test_stereo_combination.py @@ -84,8 +84,18 @@ def test_predict_mean_energy(weights, mono_table): assert_array_equal(stereo["event_id"], np.array([1, 2, 1])) if weights == "intensity": assert_array_equal(stereo["dummy_energy"], [7, 0.5, np.nan] * u.TeV) + assert_allclose( + stereo["dummy_energy_uncert"].quantity, + [4.242641, 0, np.nan] * u.TeV, + atol=1e-7, + ) elif weights == "none": assert_array_equal(stereo["dummy_energy"], [5, 0.5, np.nan] * u.TeV) + assert_allclose( + stereo["dummy_energy_uncert"].quantity, + [3.741657, 0, np.nan] * u.TeV, + atol=1e-7, + ) assert_array_equal(stereo["dummy_telescopes"][0], np.array([1, 2, 3])) assert_array_equal(stereo["dummy_telescopes"][1], 5) diff --git a/src/ctapipe/tools/apply_models.py b/src/ctapipe/tools/apply_models.py index 6ec237006e0..3caff55fed1 100644 --- a/src/ctapipe/tools/apply_models.py +++ b/src/ctapipe/tools/apply_models.py @@ -1,6 +1,7 @@ """ Tool to apply machine learning models in bulk (as opposed to event by event). """ + import numpy as np import tables from astropy.table import Table, join, vstack @@ -9,9 +10,8 @@ from ctapipe.core.tool import Tool from ctapipe.core.traits import Bool, Integer, List, Path, classes_with_traits, flag from ctapipe.io import HDF5Merger, TableLoader, write_table -from ctapipe.io.astropy_helpers import read_table +from ctapipe.io.astropy_helpers import join_allow_empty, read_table from ctapipe.io.tableio import TelListToMaskTransform -from ctapipe.io.tableloader import _join_subarray_events from ctapipe.reco import Reconstructor __all__ = [ @@ -218,6 +218,23 @@ def _apply(self, reconstructor, tel_tables, start, stop): def _combine(self, reconstructor, tel_tables, start, stop): stereo_table = vstack(list(tel_tables.values())) + # stacking the single telescope tables and joining + # potentially changes the order of the subarray events. + # to ensure events are stored in the correct order, + # we resort to trigger table order + trigger = read_table( + self.h5file, "/dl1/event/subarray/trigger", start=start, stop=stop + )[["obs_id", "event_id"]] + trigger["__sort_index__"] = np.arange(len(trigger)) + + stereo_table = join_allow_empty( + stereo_table, + trigger, + keys=["obs_id", "event_id"], + join_type="left", + ) + stereo_table.sort("__sort_index__") + combiner = reconstructor.stereo_combiner stereo_predictions = combiner.predict_table(stereo_table) del stereo_table @@ -230,19 +247,6 @@ def _combine(self, reconstructor, tel_tables, start, stop): stereo_predictions[c.name] = np.array([trafo(r) for r in c]) stereo_predictions[c.name].description = c.description - # stacking the single telescope tables and joining - # potentially changes the order of the subarray events. - # to ensure events are stored in the correct order, - # we resort to trigger table order - trigger = read_table( - self.h5file, "/dl1/event/subarray/trigger", start=start, stop=stop - )[["obs_id", "event_id"]] - trigger["__sort_index__"] = np.arange(len(trigger)) - - stereo_predictions = _join_subarray_events(trigger, stereo_predictions) - stereo_predictions.sort("__sort_index__") - del stereo_predictions["__sort_index__"] - write_table( stereo_predictions, self.output_path, diff --git a/src/ctapipe/vectorization/__init__.py b/src/ctapipe/vectorization/__init__.py new file mode 100644 index 00000000000..b31ac7d1c6a --- /dev/null +++ b/src/ctapipe/vectorization/__init__.py @@ -0,0 +1,6 @@ +from .aggregate import get_subarray_index, weighted_mean_std_ufunc + +__all__ = [ + "get_subarray_index", + "weighted_mean_std_ufunc", +] diff --git a/src/ctapipe/vectorization/aggregate.py b/src/ctapipe/vectorization/aggregate.py new file mode 100644 index 00000000000..2c5ffc3d3b1 --- /dev/null +++ b/src/ctapipe/vectorization/aggregate.py @@ -0,0 +1,145 @@ +"""Helper functions for vectorizing numpy operations.""" + +import numpy as np +from numba import njit, uint64 + +__all__ = ["get_subarray_index", "weighted_mean_std_ufunc"] + + +@njit +def _get_subarray_index(obs_ids, event_ids): + n_tel_events = len(obs_ids) + idx = np.zeros(n_tel_events, dtype=uint64) + current_idx = 0 + subarray_obs_index = [] + subarray_event_index = [] + multiplicities = [] + multiplicity = 0 + + if n_tel_events > 0: + subarray_obs_index.append(obs_ids[0]) + subarray_event_index.append(event_ids[0]) + multiplicity += 1 + + for i in range(1, n_tel_events): + if obs_ids[i] != obs_ids[i - 1] or event_ids[i] != event_ids[i - 1]: + # append to subarray events + multiplicities.append(multiplicity) + subarray_obs_index.append(obs_ids[i]) + subarray_event_index.append(event_ids[i]) + # reset state + current_idx += 1 + multiplicity = 0 + + multiplicity += 1 + idx[i] = current_idx + + # Append multiplicity of the last subarray event + if n_tel_events > 0: + multiplicities.append(multiplicity) + + return ( + np.asarray(subarray_obs_index), + np.asarray(subarray_event_index), + np.asarray(multiplicities), + idx, + ) + + +def get_subarray_index(tel_table): + """ + Get the obs_ids and event_ids of all subarray events contained + in a table of telescope events, their multiplicity and an array + giving the index of the subarray event for each telescope event. + This requires the telescope events to be SORTED by their corresponding + subarray events (meaning by ``["obs_id", "event_id"]``). + + Parameters + ---------- + tel_table: astropy.table.Table + table with telescope events as rows + + Returns + ------- + Tuple(np.ndarray, np.ndarray, np.ndarray, np.ndarray) + obs_ids of subarray events, event_ids of subarray events, + multiplicity of subarray events, index of the subarray event + for each telescope event + """ + obs_idx = tel_table["obs_id"] + event_idx = tel_table["event_id"] + return _get_subarray_index(obs_idx, event_idx) + + +def _grouped_add(tel_data, n_array_events, indices): + """ + Calculate the group-wise sum for each array event over the + corresponding telescope events. ``indices`` is an array + that gives the index of the subarray event for each telescope event. + """ + combined_values = np.zeros(n_array_events) + np.add.at(combined_values, indices, tel_data) + return combined_values + + +def weighted_mean_std_ufunc( + tel_values, + valid_tel, + n_array_events, + indices, + multiplicity, + weights=np.array([1]), +): + """ + Calculate the weighted mean and standard deviation for each array event + over the corresponding telescope events. + + Parameters + ---------- + tel_values: np.ndarray + values for each telescope event + valid_tel: array-like + boolean mask giving the valid values of ``tel_values`` + n_array_events: int + number of array events with corresponding telescope events in ``tel_values`` + indices: np.ndarray + index of the subarray event for each telescope event, returned as + the fourth return value of ``get_subarray_index`` + multiplicity: np.ndarray + multiplicity of the subarray events in the same order as the order of + subarray events in ``indices`` + weights: np.ndarray + weights used for averaging (equal/no weights are used by default) + + Returns + ------- + Tuple(np.ndarray, np.ndarray) + weighted mean and standard deviation for each array event + """ + # avoid numerical problems by very large or small weights + weights = weights / weights.max() + tel_values = tel_values[valid_tel] + indices = indices[valid_tel] + + sum_prediction = _grouped_add( + tel_values * weights, + n_array_events, + indices, + ) + sum_of_weights = _grouped_add( + weights, + n_array_events, + indices, + ) + mean = np.full(n_array_events, np.nan) + valid = sum_of_weights > 0 + mean[valid] = sum_prediction[valid] / sum_of_weights[valid] + + sum_sq_residulas = _grouped_add( + (tel_values - np.repeat(mean, multiplicity)[valid_tel]) ** 2 * weights, + n_array_events, + indices, + ) + variance = np.full(n_array_events, np.nan) + variance[valid] = sum_sq_residulas[valid] / sum_of_weights[valid] + return mean, np.sqrt(variance) diff --git a/src/ctapipe/vectorization/tests/test_aggregate.py b/src/ctapipe/vectorization/tests/test_aggregate.py new file mode 100644 index 00000000000..14c3f578c4f --- /dev/null +++ b/src/ctapipe/vectorization/tests/test_aggregate.py @@ -0,0 +1,50 @@ +import numpy as np +from astropy.table import Table + +from ctapipe.io import TableLoader, read_table +from ctapipe.io.tests.test_table_loader import check_equal_array_event_order + + +def test_get_subarray_index(dl1_parameters_file): + from ctapipe.vectorization import get_subarray_index + + opts = dict(simulated=False, true_parameters=False, dl2=False, pointing=False) + with TableLoader(dl1_parameters_file, **opts) as loader: + tel_events = loader.read_telescope_events() + + subarray_obs_ids, subarray_event_ids, _, _ = get_subarray_index(tel_events) + trigger = read_table(dl1_parameters_file, "/dl1/event/subarray/trigger") + + assert len(subarray_obs_ids) == len(subarray_event_ids) + assert len(subarray_obs_ids) == len(trigger) + check_equal_array_event_order( + Table({"obs_id": subarray_obs_ids, "event_id": subarray_event_ids}), trigger + ) + + +def test_mean_std_ufunc(dl1_parameters_file): + from ctapipe.vectorization import get_subarray_index, weighted_mean_std_ufunc + + opts = dict(simulated=False, true_parameters=False, dl2=False, pointing=False) + with TableLoader(dl1_parameters_file, **opts) as loader: + tel_events = loader.read_telescope_events() + + valid = np.isfinite(tel_events["hillas_length"]) + obs_ids, event_ids, m, tel_to_subarray_idx = get_subarray_index(tel_events) + n_array_events = len(obs_ids) + + # test only default uniform weights, + # other weights are tested in test_stereo_combination + mean, std = weighted_mean_std_ufunc( + tel_events["hillas_length"], valid, n_array_events, tel_to_subarray_idx, m + ) + + # check if result is identical with np.mean/np.std + true_mean = np.full(n_array_events, np.nan) + true_std = np.full(n_array_events, np.nan) + grouped = tel_events.group_by(["obs_id", "event_id"]) + true_mean = grouped["hillas_length"].groups.aggregate(np.nanmean) + true_std = grouped["hillas_length"].groups.aggregate(np.nanstd) + + assert np.allclose(mean, true_mean, equal_nan=True) + assert np.allclose(std, true_std, equal_nan=True)