diff --git a/ctapipe/ml/apply.py b/ctapipe/ml/apply.py index 2bfd48c5d66..39025683e38 100644 --- a/ctapipe/ml/apply.py +++ b/ctapipe/ml/apply.py @@ -131,7 +131,7 @@ class ParticleIdClassifier(ClassificationReconstructor): def __call__(self, event: ArrayEventContainer) -> None: for tel_id in event.trigger.tels_with_trigger: features = self._collect_features(event, tel_id) - prediction, valid = self.model.predict( + prediction, valid = self.model.predict_score( self.subarray.tel[tel_id], features, ) diff --git a/ctapipe/ml/conftest.py b/ctapipe/ml/conftest.py index 46ff3a4c012..4145162264b 100644 --- a/ctapipe/ml/conftest.py +++ b/ctapipe/ml/conftest.py @@ -2,12 +2,13 @@ from ctapipe.utils.datasets import resource_file from ctapipe.core import run_tool -@pytest.fixture(scope='session') + +@pytest.fixture(scope="session") def model_tmp_path(tmp_path_factory): return tmp_path_factory.mktemp("models") -@pytest.fixture(scope='session') +@pytest.fixture(scope="session") def energy_regressor_path(model_tmp_path): from ctapipe.ml.tools.train_energy_regressor import TrainEnergyRegressor @@ -25,3 +26,24 @@ def energy_regressor_path(model_tmp_path): ) assert ret == 0 return out_file + + +@pytest.fixture(scope="session") +def particle_classifier_path(model_tmp_path): + from ctapipe.ml.tools.train_particle_classifier import TrainParticleIdClassifier + + tool = TrainParticleIdClassifier() + config = resource_file("ml-config.yaml") + out_file = model_tmp_path / "particle_classifier.pkl" + ret = run_tool( + tool, + argv=[ + "--input-background=dataset://proton_dl2_train_small.dl2.h5", + "--input-signal=dataset://gamma_diffuse_dl2_train_small.dl2.h5", + f"--output={out_file}", + f"--config={config}", + "--log-level=INFO", + ], + ) + assert ret == 0 + return out_file diff --git a/ctapipe/ml/tools/__init__.py b/ctapipe/ml/tools/__init__.py index 9150e3327eb..46c4f672467 100644 --- a/ctapipe/ml/tools/__init__.py +++ b/ctapipe/ml/tools/__init__.py @@ -1,3 +1,4 @@ from .train_energy_regressor import TrainEnergyRegressor +from .train_particle_classifier import TrainParticleIdClassifier -__all__ = ["TrainEnergyRegressor"] +__all__ = ["TrainEnergyRegressor", "TrainParticleIdClassifier"] diff --git a/ctapipe/ml/tools/apply_particle_classifier.py b/ctapipe/ml/tools/apply_particle_classifier.py new file mode 100644 index 00000000000..136bed0b784 --- /dev/null +++ b/ctapipe/ml/tools/apply_particle_classifier.py @@ -0,0 +1,145 @@ +from astropy.table.operations import vstack +import tables +from ctapipe.core.tool import Tool +from ctapipe.core.traits import Bool, Path, flag, create_class_enum_trait +from ctapipe.io import TableLoader, write_table +from tqdm.auto import tqdm +from ctapipe.io.tableio import TelListToMaskTransform +import numpy as np + +from ..sklearn import Classifier +from ..apply import ParticleIdClassifier +from ..stereo_combination import StereoCombiner + + +class ApplyParticleIdClassifier(Tool): + + overwrite = Bool(default_value=False).tag(config=True) + + input_url = Path( + default_value=None, + allow_none=False, + directory_ok=False, + exists=True, + ).tag(config=True) + + model_path = Path( + default_value=None, allow_none=False, exists=True, directory_ok=False + ).tag(config=True) + + stereo_combiner_type = create_class_enum_trait( + base_class=StereoCombiner, default_value="StereoMeanCombiner" + ).tag(config=True) + + aliases = { + ("i", "input"): "ApplyParticleIdClassifier.input_url", + ("m", "model"): "ApplyParticleIdClassifier.model_path", + } + + flags = { + **flag( + "overwrite", + "ApplyParticleIdClassifier.overwrite", + "Overwrite tables in output file if it exists", + "Don't overwrite tables in output file if it exists", + ), + "f": ( + {"ApplyParticleIdClassifier": {"overwrite": True}}, + "Overwrite output file if it exists", + ), + } + + classes = [ + TableLoader, + Classifier, + StereoCombiner, + ] + + def setup(self): + """""" + self.h5file = tables.open_file(self.input_url, mode="r+") + self.loader = TableLoader( + parent=self, + h5file=self.h5file, + load_dl1_images=False, + load_dl1_parameters=True, + load_dl2=True, + load_simulated=True, + load_instrument=True, + ) + self.estimator = ParticleIdClassifier.read( + self.model_path, + self.loader.subarray, + parent=self, + ) + self.combine = StereoCombiner.from_name( + self.stereo_combiner_type, + combine_property="classification", + algorithm=self.estimator.model.model_cls, + parent=self, + ) + + def start(self): + self.log.info("Applying model") + + tables = [] + for tel_id, tel in tqdm(self.loader.subarray.tel.items()): + if tel not in self.estimator.model.models: + self.log.warning( + "No model for telescope type %s, skipping tel %d", + tel, + tel_id, + ) + continue + + table = self.loader.read_telescope_events([tel_id]) + if len(table) == 0: + self.log.warning("No events for telescope %d", tel_id) + continue + + prediction, valid = self.estimator.predict(tel, table) + prefix = self.estimator.model.model_cls + + class_col = f"{prefix}_prediction" + valid_col = f"{prefix}_is_valid" + table[class_col] = prediction + table[valid_col] = valid + + write_table( + table[["obs_id", "event_id", "tel_id", class_col, valid_col]], + self.loader.input_url, + f"/dl2/event/telescope/classification/{prefix}/tel_{tel_id:03d}", + mode="a", + overwrite=self.overwrite, + ) + tables.append(table) + + if len(tables) == 0: + raise ValueError("No predictions made for any telescope") + + mono_predictions = vstack(tables) + stereo_predictions = self.combine.predict(mono_predictions) + trafo = TelListToMaskTransform(self.loader.subarray) + for c in filter( + lambda c: c.name.endswith("tel_ids"), stereo_predictions.columns.values() + ): + stereo_predictions[c.name] = np.array([trafo(r) for r in c]) + + write_table( + stereo_predictions, + self.loader.input_url, + f"/dl2/event/subarray/classification/{self.estimator.model.model_cls}", + mode="a", + overwrite=self.overwrite, + ) + + def finish(self): + self.h5file.close() + + +def main(): + ApplyParticleIdClassifier().run() + + +if __name__ == "__main__": + main() diff --git a/ctapipe/ml/tools/tests/test_apply.py b/ctapipe/ml/tools/tests/test_apply.py index 63c51c591a6..a966111b54c 100644 --- a/ctapipe/ml/tools/tests/test_apply.py +++ b/ctapipe/ml/tools/tests/test_apply.py @@ -3,15 +3,13 @@ import shutil -def test_apply_energy_regressor( - energy_regressor_path, dl2_shower_geometry_file, tmp_path -): +def test_apply_energy_regressor(energy_regressor_path, dl1_parameters_file, tmp_path): from ctapipe.ml.tools.apply_energy_regressor import ApplyEnergyRegressor - input_path = tmp_path / dl2_shower_geometry_file.name + input_path = tmp_path / dl1_parameters_file.name # create copy to not mutate common test file - shutil.copy2(dl2_shower_geometry_file, input_path) + shutil.copy2(dl1_parameters_file, input_path) ret = run_tool( ApplyEnergyRegressor(), @@ -38,3 +36,40 @@ def test_apply_energy_regressor( assert "ExtraTreesRegressor_energy_mono" in events.colnames assert "ExtraTreesRegressor_is_valid_mono" in events.colnames + + +def test_apply_particle_classifier( + particle_classifier_path, dl1_parameters_file, tmp_path +): + from ctapipe.ml.tools.apply_particle_classifier import ApplyParticleIdClassifier + + input_path = tmp_path / dl1_parameters_file.name + + # create copy to not mutate common test file + shutil.copy2(dl1_parameters_file, input_path) + + ret = run_tool( + ApplyParticleIdClassifier(), + argv=[ + f"--input={input_path}", + f"--model={particle_classifier_path}", + "--ApplyParticleIdClassifier.StereoMeanCombiner.weights=konrad", + ], + ) + assert ret == 0 + + loader = TableLoader(input_path, load_dl2=True) + events = loader.read_subarray_events() + assert "ExtraTreesClassifier_prediction" in events.colnames + assert "ExtraTreesClassifier_tel_ids" in events.colnames + assert "ExtraTreesClassifier_is_valid" in events.colnames + assert "ExtraTreesClassifier_goodness_of_fit" in events.colnames + + events = loader.read_telescope_events() + assert "ExtraTreesClassifier_prediction" in events.colnames + assert "ExtraTreesClassifier_tel_ids" in events.colnames + assert "ExtraTreesClassifier_is_valid" in events.colnames + assert "ExtraTreesClassifier_goodness_of_fit" in events.colnames + + assert "ExtraTreesClassifier_prediction_mono" in events.colnames + assert "ExtraTreesClassifier_is_valid_mono" in events.colnames diff --git a/ctapipe/ml/tools/tests/test_process.py b/ctapipe/ml/tools/tests/test_process.py index 515db2a9765..4ac9072379b 100644 --- a/ctapipe/ml/tools/tests/test_process.py +++ b/ctapipe/ml/tools/tests/test_process.py @@ -43,8 +43,7 @@ def test_process_apply_energy(tmp_path, energy_regressor_path): f"--input={input_url}", f"--output={output}", "--write-images", - "--write-stereo-shower", - "--write-mono-shower", + "--write-showers", f"--energy-regressor={energy_regressor_path}", f"--config={config_path}", ] @@ -52,3 +51,56 @@ def test_process_apply_energy(tmp_path, energy_regressor_path): print(read_table(output, "/dl2/event/telescope/energy/ExtraTreesRegressor/tel_004")) print(read_table(output, "/dl2/event/subarray/energy/ExtraTreesRegressor")) + + +def test_process_apply_classification(tmp_path, particle_classifier_path): + from ctapipe.tools.process import ProcessorTool + from ctapipe.io import SimTelEventSource + + output = tmp_path / "gamma_prod5.dl2_energy.h5" + + config_path = tmp_path / "config.json" + + input_url = "dataset://gamma_prod5.simtel.zst" + + with SimTelEventSource(input_url) as s: + subarray = s.subarray + + allowed_tels = subarray.get_tel_ids_for_type( + "LST_LST_LSTCam" + ) + subarray.get_tel_ids_for_type("MST_MST_NectarCam") + + config = { + "ProcessorTool": { + "EventSource": { + "allowed_tels": allowed_tels, + }, + "stereo_combiner_configs": [ + { + "type": "StereoMeanCombiner", + "combine_property": "classification", + "algorithm": "ExtraTreesClassifier", + } + ], + } + } + + with config_path.open("w") as f: + json.dump(config, f) + + argv = [ + f"--input={input_url}", + f"--output={output}", + "--write-images", + "--write-showers", + f"--particle-classifier={particle_classifier_path}", + f"--config={config_path}", + ] + assert run_tool(ProcessorTool(), argv=argv, cwd=tmp_path) == 0 + + print( + read_table( + output, "/dl2/event/telescope/classification/ExtraTreesClassifier/tel_004" + ) + ) + print(read_table(output, "/dl2/event/subarray/classification/ExtraTreesClassifier")) diff --git a/ctapipe/ml/tools/tests/test_train.py b/ctapipe/ml/tools/tests/test_train.py index f5b40cf79a8..f63be41cdcf 100644 --- a/ctapipe/ml/tools/tests/test_train.py +++ b/ctapipe/ml/tools/tests/test_train.py @@ -1,3 +1,10 @@ def test_train_energy_regressor(energy_regressor_path): from ctapipe.ml.sklearn import Regressor + Regressor.load(energy_regressor_path) + + +def test_train_particle_classifier(particle_classifier_path): + from ctapipe.ml.sklearn import Classifier + + Classifier.load(particle_classifier_path) diff --git a/ctapipe/ml/tools/train_particle_classifier.py b/ctapipe/ml/tools/train_particle_classifier.py new file mode 100644 index 00000000000..592a112fa09 --- /dev/null +++ b/ctapipe/ml/tools/train_particle_classifier.py @@ -0,0 +1,159 @@ +import numpy as np +from ctapipe.core.tool import Tool +from ctapipe.core.traits import Path, Int +from ctapipe.io import TableLoader +from sklearn import metrics +from sklearn.model_selection import StratifiedKFold +from tqdm.auto import tqdm +from astropy.table import vstack + +from ..preprocessing import check_valid_rows +from ..sklearn import Classifier + + +class TrainParticleIdClassifier(Tool): + input_signal_path = Path( + default_value=None, + allow_none=False, + directory_ok=False, + ).tag(config=True) + input_background_path = Path( + default_value=None, + allow_none=False, + directory_ok=False, + ).tag(config=True) + output_path = Path( + default_value=None, + allow_none=False, + directory_ok=False, + ).tag(config=True) + + n_cross_validation = Int(default_value=5).tag(config=True) + n_signal = Int(default_value=None, allow_none=True).tag(config=True) + n_background = Int(default_value=None, allow_none=True).tag(config=True) + random_seed = Int(default_value=0).tag(config=True) + + aliases = { + ("input-background"): "TrainParticleIdClassifier.input_background_path", + ("input-signal"): "TrainParticleIdClassifier.input_signal_path", + ("o", "output"): "TrainParticleIdClassifier.output_path", + } + + classes = [ + TableLoader, + Classifier, + ] + + def setup(self): + """""" + self.signal_loader = TableLoader( + parent=self, + input_url=self.input_signal_path, + load_dl1_images=False, + load_dl1_parameters=True, + load_dl2=True, + load_simulated=True, + load_instrument=True, + ) + self.background_loader = TableLoader( + parent=self, + input_url=self.input_background_path, + load_dl1_images=False, + load_dl1_parameters=True, + load_dl2=True, + load_simulated=True, + load_instrument=True, + ) + self.model = Classifier( + parent=self, + target="hadronness", + ) + self.rng = np.random.default_rng(self.random_seed) + + def start(self): + + # By construction both loaders have the same types defined + types = self.signal_loader.subarray.telescope_types + self.log.info("Signal input-file: %s", self.signal_loader.input_url) + self.log.info("Background input-file: %s", self.background_loader.input_url) + self.log.info("Training models for %d types", len(types)) + for tel_type in types: + self.log.info("Loading events for %s", tel_type) + table = self._read_input_data(tel_type) + self._cross_validate(tel_type, table) + + self.log.info("Performing final fit for %s", tel_type) + self.model.fit(tel_type, table) + self.log.info("done") + + def _read_table(self, telescope_type, loader, n_events=None): + table = loader.read_telescope_events([telescope_type]) + table["hadronness"] = (table["true_shower_primary_id"] != 0).astype(np.int8) + table = table[self.model.features + ["hadronness"]] + + valid = check_valid_rows(table) + self.log.warning("Dropping non-predictable events.") + table = table[valid] + + if n_events is not None: + n_events = min(n_events, len(table)) + idx = self.rng.choice(len(table), n_events, replace=False) + idx.sort() + table = table[idx] + return table + + def _read_input_data(self, tel_type): + signal = self._read_table(tel_type, self.signal_loader, self.n_signal) + background = self._read_table( + tel_type, self.background_loader, self.n_background + ) + table = vstack([signal, background]) + self.log.info( + "Train on %s signal and %s background events", len(signal), len(background) + ) + return table + + def _cross_validate(self, telescope_type, table): + n_cv = self.n_cross_validation + self.log.info(f"Starting cross-validation with {n_cv} folds.") + + scores = [] + + kfold = StratifiedKFold( + n_splits=n_cv, + shuffle=True, + # sklearn does not support numpy's new random API yet + random_state=self.rng.integers(0, 2**31 - 1), + ) + + for (train_indices, test_indices) in tqdm( + kfold.split(table, table["hadronness"]), total=n_cv + ): + train = table[train_indices] + test = table[test_indices] + self.model.fit(telescope_type, train) + prediction, _ = self.model.predict(telescope_type, test) + scores.append(metrics.roc_auc_score(test["hadronness"], prediction)) + + scores = np.array(scores) + + self.log.info(f"Cross validated ROC AUC scores: {scores}") + self.log.info( + "Mean ROC AUC score from CV: %s ± %s", + scores.mean(), + scores.std(), + ) + + def finish(self): + self.log.info("Writing output") + self.model.write(self.output_path) + self.signal_loader.close() + self.background_loader.close() + + +def main(): + TrainParticleIdClassifier().run() + + +if __name__ == "__main__": + main() diff --git a/ctapipe/resources/ml-config.yaml b/ctapipe/resources/ml-config.yaml index 816ed0b90ed..b2e9f78030e 100644 --- a/ctapipe/resources/ml-config.yaml +++ b/ctapipe/resources/ml-config.yaml @@ -21,5 +21,30 @@ TrainEnergyRegressor: - concentration_cog - concentration_core - morphology_num_pixels - - HillasReconstructor_average_intensity + - mirror_area + + +TrainParticleIdClassifier: + n_cross_validation: 5 + Classifier: + model_cls: ExtraTreesClassifier + model_config: + n_estimators: 10 + max_depth: 10 + n_jobs: -1 + + features: + - hillas_intensity + - hillas_length + - hillas_width + - hillas_r + - hillas_skewness + - timing_slope + - leakage_intensity_width_1 + - leakage_intensity_width_2 + - leakage_pixels_width_1 + - leakage_pixels_width_2 + - concentration_cog + - concentration_core + - morphology_num_pixels - mirror_area diff --git a/ctapipe/tools/process.py b/ctapipe/tools/process.py index abfe3d49051..8cb8aff60d9 100644 --- a/ctapipe/tools/process.py +++ b/ctapipe/tools/process.py @@ -23,7 +23,7 @@ from ..reco import ShowerProcessor from ..utils import EventTypeFilter from ..io import metadata -from ..ml import EnergyRegressor, StereoCombiner +from ..ml import EnergyRegressor, ParticleIdClassifier, StereoCombiner COMPATIBLE_DATALEVELS = [ DataLevel.R1, @@ -73,6 +73,14 @@ class ProcessorTool(Tool): help="Path to a trained energy regression model (see ctapipe-ml-train-energy-regressor)", ).tag(config=True) + particle_classifier_path = Path( + default_value=None, + allow_none=True, + exists=True, + directory_ok=False, + help="Path to a trained particle id classifier model (see ctapipe-ml-train-particle-classifier)", + ).tag(config=True) + force_recompute_dl2 = Bool( help="Enforce dl2 recomputation even if already present in the input file", default_value=False, @@ -86,6 +94,7 @@ class ProcessorTool(Tool): ("t", "allowed-tels"): "EventSource.allowed_tels", ("m", "max-events"): "EventSource.max_events", ("e", "energy-regressor"): "ProcessorTool.energy_regressor_path", + ("particle-classifier"): "ProcessorTool.particle_classifier_path", "image-cleaner-type": "ImageProcessor.image_cleaner_type", } @@ -198,6 +207,13 @@ def setup(self): self.event_source.subarray, parent=self, ) + self.particle_classifier = None + if self.particle_classifier_path is not None: + self.particle_classifier = ParticleIdClassifier.read( + self.particle_classifier_path, + self.event_source.subarray, + parent=self, + ) self.stereo_combiners = [] for stereo_combiner in self.stereo_combiner_configs: @@ -228,6 +244,7 @@ def should_compute_dl2(self): return ( self.write.write_showers or self.energy_regressor_path + or self.particle_classifier_path ) @property @@ -310,6 +327,8 @@ def start(self): self.process_shower(event) if self.energy_regressor is not None: self.energy_regressor(event) + if self.particle_classifier is not None: + self.particle_classifier(event) for combiner in self.stereo_combiners: combiner(event) diff --git a/setup.py b/setup.py index 17ccfa0739d..65d030082b7 100755 --- a/setup.py +++ b/setup.py @@ -21,6 +21,8 @@ "ctapipe-quickstart = ctapipe.tools.quickstart:main", "ctapipe-ml-train-energy-regressor = ctapipe.ml.tools.train_energy_regressor:main", "ctapipe-ml-apply-energy-regressor = ctapipe.ml.tools.apply_energy_regressor:main", + "ctapipe-ml-train-particle-classifier = ctapipe.ml.tools.train_particle_classifier:main", + "ctapipe-ml-apply-particle-classifier = ctapipe.ml.tools.apply_particle_classifier:main", ] tests_require = [ "pytest",