diff --git a/src/pygama/pargen/AoE_cal.py b/src/pygama/pargen/AoE_cal.py index ec5ce2b5b..8b58a468e 100644 --- a/src/pygama/pargen/AoE_cal.py +++ b/src/pygama/pargen/AoE_cal.py @@ -4,7 +4,6 @@ from __future__ import annotations -import copy import logging import re from datetime import datetime @@ -21,17 +20,16 @@ import pygama.math.binned_fitting as pgf import pygama.math.histogram as pgh import pygama.pargen.energy_cal as pgc -from pygama.math.distributions import ( - exgauss, - gauss_on_exgauss, - gauss_on_step, - gaussian, - hpge_peak, - nb_erfc, -) +from pygama.math.distributions import exgauss, gauss_on_exgauss, gaussian, nb_erfc from pygama.math.functions.gauss import nb_gauss_amp from pygama.math.functions.hpge_peak import hpge_get_fwfm, hpge_get_fwhm, hpge_get_mode from pygama.math.functions.sum_dists import SumDists +from pygama.pargen.survival_fractions import ( + compton_sf, + compton_sf_sweep, + get_sf_sweep, + get_survival_fraction, +) from pygama.pargen.utils import convert_to_minuit, return_nans log = logging.getLogger(__name__) @@ -153,7 +151,7 @@ def aoe_peak_bounds(func, guess, **kwargs): "x_hi": (None, None), "n_sig": (0, None), "mu": (guess["x_lo"], guess["x_hi"]), - "sigma": (0, None), + "sigma": (0, (guess["x_hi"] - guess["x_lo"]) / 4), "n_bkg": (0, None), "tau": (0, None), } @@ -163,8 +161,8 @@ def aoe_peak_bounds(func, guess, **kwargs): "x_hi": (None, None), "n_sig": (0, None), "mu": (guess["x_lo"], guess["x_hi"]), - "sigma": (0, None), - "htail": (0, 1), + "sigma": (0, (guess["x_hi"] - guess["x_lo"]) / 4), + "htail": (0, 0.5), "tau_sig": (None, 0), "n_bkg": (0, None), "tau": (0, None), @@ -175,7 +173,7 @@ def aoe_peak_bounds(func, guess, **kwargs): "x_hi": (None, None), "area": (0, None), "mu": (guess["x_lo"], guess["x_hi"]), - "sigma": (0, None), + "sigma": (0, (guess["x_hi"] - guess["x_lo"]) / 4), "tau": (0, None), } elif func == gaussian: @@ -184,7 +182,7 @@ def aoe_peak_bounds(func, guess, **kwargs): "x_hi": (None, None), "area": (0, None), "mu": (guess["x_lo"], guess["x_hi"]), - "sigma": (0, None), + "sigma": (0, (guess["x_hi"] - guess["x_lo"]) / 4), } for item, value in kwargs.items(): @@ -474,8 +472,8 @@ def fit_time_means(tstamps, means, sigmas): """ Fit the time dependence of the means of the A/E distribution - Args: - + Parameters + ---------- tstamps: np.array Timestamps of the data means: np.array @@ -534,478 +532,14 @@ def fit_time_means(tstamps, means, sigmas): return out_dict -def energy_guess(energy, func_i, fit_range=None, bin_width=1, peak=None, eres=None): - """ - Simple guess for peak fitting - """ - if fit_range is None: - fit_range = (np.nanmin(energy), np.nanmax(energy)) - if func_i == hpge_peak or func_i == gauss_on_step: - parguess = pgc.get_hpge_energy_peak_par_guess( - energy, func_i, fit_range=fit_range - ) - - if peak is not None: - parguess["mu"] = peak - - if eres is not None: - parguess["sigma"] = eres / 2.355 - - for i, guess in enumerate(parguess): - if np.isnan(guess): - parguess[i] = 0 - - else: - log.error(f"energy_guess not implemented for {func_i}") - return None - return parguess - - -def fix_all_but_nevents(func): - """ - Returns: Sequence list of fixed indexes for fitting and mask for parameters - """ - - if func == gauss_on_step: - # pars are: n_sig, mu, sigma, n_bkg, hstep, lower, upper, components - fixed = ["x_lo", "x_hi", "mu", "sigma", "hstep"] - - elif func == hpge_peak: - # pars are: , components - fixed = ["x_lo", "x_hi", "mu", "sigma", "htail", "tau", "hstep"] - - else: - log.error(f"get_hpge_E_fixed not implemented for {func}") - return None, None - mask = ~np.in1d(func.required_args(), fixed) - return fixed, mask - - -def get_bounds(func, parguess): - if func == hpge_peak or func == gauss_on_step: - bounds = pgc.get_hpge_energy_bounds(func, parguess) - - bounds["mu"] = (parguess["mu"] - 1, parguess["mu"] + 1) - bounds["n_sig"] = (0, 2 * (parguess["n_sig"] + parguess["n_bkg"])) - bounds["n_bkg"] = (0, 2 * (parguess["n_sig"] + parguess["n_bkg"])) - - else: - log.error(f"get_bounds not implemented for {func}") - return None - return bounds - - -def get_peak_label(peak: float) -> str: - if peak == 2039: - return "CC @" - elif peak == 1592.5: - return "Tl DEP @" - elif peak == 1620.5: - return "Bi FEP @" - elif peak == 2103.53: - return "Tl SEP @" - elif peak == 2614.5: - return "Tl FEP @" - - -def pass_pdf_gos( - x, x_lo, x_hi, n_sig, epsilon_sig, n_bkg, epsilon_bkg, mu, sigma, hstep1, hstep2 -): - return gauss_on_step.pdf_ext( - x, x_lo, x_hi, n_sig * epsilon_sig, mu, sigma, n_bkg * epsilon_bkg, hstep1 - ) - - -def fail_pdf_gos( - x, x_lo, x_hi, n_sig, epsilon_sig, n_bkg, epsilon_bkg, mu, sigma, hstep1, hstep2 -): - return gauss_on_step.pdf_ext( - x, - x_lo, - x_hi, - n_sig * (1 - epsilon_sig), - mu, - sigma, - n_bkg * (1 - epsilon_bkg), - hstep2, - ) - - -def pass_pdf_hpge( - x, - x_lo, - x_hi, - n_sig, - epsilon_sig, - n_bkg, - epsilon_bkg, - mu, - sigma, - htail, - tau, - hstep1, - hstep2, -): - return hpge_peak.pdf_ext( - x, - x_lo, - x_hi, - n_sig * epsilon_sig, - mu, - sigma, - htail, - tau, - n_bkg * epsilon_bkg, - hstep1, - ) - - -def fail_pdf_hpge( - x, - x_lo, - x_hi, - n_sig, - epsilon_sig, - n_bkg, - epsilon_bkg, - mu, - sigma, - htail, - tau, - hstep1, - hstep2, -): - return hpge_peak.pdf_ext( - x, - x_lo, - x_hi, - n_sig * (1 - epsilon_sig), - mu, - sigma, - htail, - tau, - n_bkg * (1 - epsilon_bkg), - hstep2, - ) - - -def update_guess(func, parguess, energies): - if func == gauss_on_step or func == hpge_peak: - - total_events = len(energies) - parguess["n_sig"] = len( - energies[ - (energies > parguess["mu"] - 2 * parguess["sigma"]) - & (energies < parguess["mu"] + 2 * parguess["sigma"]) - ] - ) - parguess["n_sig"] -= len( - energies[ - (energies > parguess["x_lo"]) - & (energies < parguess["x_lo"] + 2 * parguess["sigma"]) - ] - ) - parguess["n_sig"] -= len( - energies[ - (energies > parguess["x_hi"] - 2 * parguess["sigma"]) - & (energies < parguess["x_hi"]) - ] - ) - parguess["n_bkg"] = total_events - parguess["n_sig"] - return parguess - - else: - log.error(f"update_guess not implemented for {func}") - return parguess - - -def get_survival_fraction( - energy, - cut_param, - cut_val, - peak, - eres_pars, - fit_range=None, - high_cut=None, - pars=None, - errs=None, - dt_mask=None, - mode="greater", - func=hpge_peak, - fix_step=False, - display=0, -): - if dt_mask is None: - dt_mask = np.full(len(cut_param), True, dtype=bool) - - if not isinstance(energy, np.ndarray): - energy = np.array(energy) - if not isinstance(cut_param, np.ndarray): - cut_param = np.array(cut_param) - - if fit_range is None: - fit_range = (np.nanmin(energy), np.nanmax(energy)) - - nan_idxs = np.isnan(cut_param) - if high_cut is not None: - idxs = (cut_param > cut_val) & (cut_param < high_cut) & dt_mask - else: - if mode == "greater": - idxs = (cut_param > cut_val) & dt_mask - elif mode == "less": - idxs = (cut_param < cut_val) & dt_mask - else: - raise ValueError("mode not recognised") - - if pars is None or errs is None: - (pars, errs, cov, _, func, _, _, _) = pgc.unbinned_staged_energy_fit( - energy, - func, - guess_func=energy_guess, - bounds_func=get_bounds, - guess_kwargs={"peak": peak, "eres": eres_pars}, - fit_range=fit_range, - ) - - guess_pars_surv = copy.deepcopy(pars) - - # add update guess here for n_sig and n_bkg - guess_pars_surv = update_guess(func, guess_pars_surv, energy[(~nan_idxs) & (idxs)]) - - parguess = { - "x_lo": pars["x_lo"], - "x_hi": pars["x_hi"], - "mu": pars["mu"], - "sigma": pars["sigma"], - "hstep1": pars["hstep"], - "hstep2": pars["hstep"], - "n_sig": pars["n_sig"], - "n_bkg": pars["n_bkg"], - "epsilon_sig": guess_pars_surv["n_sig"] / pars["n_sig"], - "epsilon_bkg": guess_pars_surv["n_bkg"] / pars["n_bkg"], - } - - bounds = { - "n_sig": (0, pars["n_sig"] + pars["n_bkg"]), - "epsilon_sig": (0, 1), - "n_bkg": (0, pars["n_bkg"] + pars["n_sig"]), - "epsilon_bkg": (0, 1), - "hstep1": (-1, 1), - "hstep2": (-1, 1), - } - - if func == hpge_peak: - parguess.update({"htail": pars["htail"], "tau": pars["tau"]}) - - if func == hpge_peak: - lh = cost.ExtendedUnbinnedNLL( - energy[(~nan_idxs) & (idxs)], pass_pdf_hpge - ) + cost.ExtendedUnbinnedNLL(energy[(~nan_idxs) & (~idxs)], fail_pdf_hpge) - elif func == gauss_on_step: - lh = cost.ExtendedUnbinnedNLL( - energy[(~nan_idxs) & (idxs)], pass_pdf_gos - ) + cost.ExtendedUnbinnedNLL(energy[(~nan_idxs) & (~idxs)], fail_pdf_gos) - - else: - raise ValueError("Unknown func") - - m = Minuit(lh, **parguess) - fixed = ["x_lo", "x_hi", "n_sig", "n_bkg", "mu", "sigma"] # "hstep" - if func == hpge_peak: - fixed += ["tau", "htail"] - if fix_step is True: - fixed += ["hstep1", "hstep2"] - - m.fixed[fixed] = True - for arg, val in bounds.items(): - m.limits[arg] = val - - m.simplex().migrad() - m.hesse() - - sf = m.values["epsilon_sig"] * 100 - err = m.errors["epsilon_sig"] * 100 - - if display > 1: - fig, (ax1, ax2) = plt.subplots(1, 2) - bins = np.arange(1552, 1612, 1) - ax1.hist(energy[(~nan_idxs) & (idxs)], bins=bins, histtype="step") - - ax2.hist(energy[(~nan_idxs) & (~idxs)], bins=bins, histtype="step") - - if func == hpge_peak: - ax1.plot(bins, pass_pdf_hpge(bins, **m.values.to_dict())[1]) - ax2.plot(bins, fail_pdf_hpge(bins, **m.values.to_dict())[1]) - elif func == gauss_on_step: - ax1.plot(bins, pass_pdf_gos(bins, **m.values.to_dict())[1]) - ax2.plot(bins, fail_pdf_gos(bins, **m.values.to_dict())[1]) - - plt.show() - - return sf, err, m.values, m.errors - - -def get_sf_sweep( - energy: np.array, - cut_param: np.array, - final_cut_value: float = None, - peak: float = 1592.5, - eres_pars: list = None, - dt_mask=None, - cut_range=(-5, 5), - n_samples=26, - mode="greater", - fit_range=None, - debug_mode=False, -) -> tuple(pd.DataFrame, float, float): - """ - Calculates survival fraction for gamma lines using fitting method as in cut determination - """ - - if dt_mask is None: - dt_mask = np.full(len(cut_param), True, dtype=bool) - - if not isinstance(energy, np.ndarray): - energy = np.array(energy) - if not isinstance(cut_param, np.ndarray): - cut_param = np.array(cut_param) - - cut_vals = np.linspace(cut_range[0], cut_range[1], n_samples) - out_df = pd.DataFrame() - - (pars, errs, _, _, func, _, _, _) = pgc.unbinned_staged_energy_fit( - energy, - hpge_peak, - guess_func=energy_guess, - bounds_func=get_bounds, - guess_kwargs={"peak": peak, "eres": eres_pars}, - fit_range=fit_range, - ) - - for cut_val in cut_vals: - try: - sf, err, _, _ = get_survival_fraction( - energy, - cut_param, - cut_val, - peak, - eres_pars, - fit_range=fit_range, - dt_mask=dt_mask, - mode=mode, - pars=pars, - func=func, - ) - out_df = pd.concat( - [out_df, pd.DataFrame([{"cut_val": cut_val, "sf": sf, "sf_err": err}])] - ) - except BaseException as e: - if e == KeyboardInterrupt: - raise (e) - elif debug_mode: - raise (e) - out_df.set_index("cut_val", inplace=True) - if final_cut_value is not None: - sf, sf_err, _, _ = get_survival_fraction( - energy, - cut_param, - final_cut_value, - peak, - eres_pars, - fit_range=fit_range, - dt_mask=dt_mask, - mode=mode, - pars=pars, - func=func, - ) - else: - sf = None - sf_err = None - return ( - out_df, - sf, - sf_err, - ) - - -def compton_sf(cut_param, low_cut_val, high_cut_val=None, mode="greater", dt_mask=None): - if dt_mask is None: - dt_mask = np.full(len(cut_param), True, dtype=bool) - - if not isinstance(cut_param, np.ndarray): - cut_param = np.array(cut_param) - - if high_cut_val is not None: - mask = (cut_param > low_cut_val) & (cut_param < high_cut_val) & dt_mask - else: - if mode == "greater": - mask = (cut_param > low_cut_val) & dt_mask - elif mode == "less": - mask = (cut_param < low_cut_val) & dt_mask - else: - raise ValueError("mode not recognised") - - ct_n = len(cut_param[~mask]) - ct_err = np.sqrt(len(cut_param[~mask])) - surv_n = len(cut_param[mask]) - surv_err = np.sqrt(len(cut_param[mask])) - - pc_n = ct_n + surv_n - - sf = surv_n / pc_n - err = 100 * sf * (1 - sf) * np.sqrt((ct_err / ct_n) ** 2 + (surv_err / surv_n) ** 2) - sf *= 100 - - return { - "low_cut": low_cut_val, - "sf": sf, - "sf_err": err, - "high_cut": high_cut_val, - } - - -def compton_sf_sweep( - energy: np.array, - cut_param: np.array, - final_cut_value: float, - peak: float, - eres: list[float, float] = None, - dt_mask: np.array = None, - cut_range=(-5, 5), - n_samples=51, - mode="greater", -) -> tuple(float, np.array, list): +class CalAoE: """ - Determines survival fraction for compton continuum by basic counting + Class for calibrating the A/E, + this follow 5 steps: + the time correction, drift time correction, energy correction, + the cut level calculation and the survival fraction calculation. """ - if not isinstance(energy, np.ndarray): - energy = np.array(energy) - if not isinstance(cut_param, np.ndarray): - cut_param = np.array(cut_param) - - cut_vals = np.linspace(cut_range[0], cut_range[1], n_samples) - out_df = pd.DataFrame() - for cut_val in cut_vals: - ct_dict = compton_sf(cut_param, cut_val, mode=mode, dt_mask=dt_mask) - df = pd.DataFrame( - [ - { - "cut_val": ct_dict["low_cut"], - "sf": ct_dict["sf"], - "sf_err": ct_dict["sf_err"], - } - ] - ) - out_df = pd.concat([out_df, df]) - out_df.set_index("cut_val", inplace=True) - - sf_dict = compton_sf(cut_param, final_cut_value, mode=mode, dt_mask=dt_mask) - - return out_df, sf_dict["sf"], sf_dict["sf_err"] - - -class CalAoE: def __init__( self, cal_dicts: dict = None, @@ -1017,12 +551,50 @@ def __init__( dep_correct: bool = False, dt_cut: dict = None, dt_param: str = "dt_eff", - high_cut_val: int = 3, + high_cut_val: float = 3, mean_func: Callable = Pol1, sigma_func: Callable = SigmaFit, - compt_bands_width: int = 20, + compt_bands_width: float = 20, debug_mode: bool = False, ): + """ + Parameters + ---------- + + cal_dicts: dict + Dictionary of calibration parameters can either be empty/None, for a single run or for multiple runs + keyed by timestamp in the format YYYYMMDDTHHMMSSZ + cal_energy_param: str + Calibrated energy parameter to use for A/E calibrations and for determining peak events + eres_func: callable + Function to determine the energy resolution should take in a single variable the calibrated energy + pdf: PDF + PDF to fit to the A/E distribution + selection_string: str + Selection string for the data that will be passed as a query to the data dataframe + dt_corr: bool + Whether to correct the drift time + dep_correct: bool + Whether to correct the double escape peak into the single site band before cut determination + dt_cut: dict + Dictionary of the drift time cut parameters in the form: + {"out_param": "dt_cut", "hard": False} + where the out_param is the name of the parameter to cut on in the dataframe (should have been precalculated) + and "hard" is whether to remove these events completely for survival fraction calculations + or whether they they should only be removed in the cut determination/ A/E calibration steps + but the survival fractions should be calculated with them included + high_cut_val: float + Value to cut the A/E distribution at on the high side + mean_func: Callable + Function to fit the energy dependence of the A/E mean, should be in the form of a class as above for Pol1 + sigma_func: Callable + Function to fit the energy dependence of the A/E sigma, should be in the form of a class as above for Sigma_Fit + compt_bands_width: float + Width of the compton bands to use for the energy correction + debug_mode: bool + If True will raise errors if the A/E calibration fails otherwise just return NaN values + + """ self.cal_dicts = cal_dicts if cal_dicts is not None else {} self.cal_energy_param = cal_energy_param self.eres_func = eres_func @@ -1047,6 +619,11 @@ def __init__( self.debug_mode = debug_mode def update_cal_dicts(self, update_dict): + """ + Util function for updating the calibration dictionaries + checks if the dictionary is in the format of a single run or multiple runs + and then updates the dictionary accordingly + """ if len(self.cal_dicts) > 0 and re.match( r"(\d{8})T(\d{6})Z", list(self.cal_dicts)[0] ): @@ -1059,8 +636,40 @@ def update_cal_dicts(self, update_dict): self.cal_dicts.update(update_dict) def time_correction( - self, df, aoe_param, mode="full", output_name="AoE_Timecorr", display=0 + self, + df: pd.DataFrame, + aoe_param: str, + mode: str = "full", + output_name: str = "AoE_Timecorr", + display: int = 0, ): + """ + Calculates the time correction for the A/E parameter by fitting the 1-1.3 MeV + Compton continuum and extracting the centroid. If only a single run is passed will + just perform a shift of the A/E parameter to 1 using the centroid otherwise for multiple + runs the shift will be determined on the mode given in. + + Parameters + ---------- + + df: pd.DataFrame + Dataframe containing the data + aoe_param: str + Name of the A/E parameter to use + mode: str + Mode to use for the time correction, can be "full", "partial" or "none": + + none: just use the mean of the a/e centroids to shift all the data + partial: iterate through the centroids if vary by less than 0.4 sigma + then group and take mean otherwise when a run higher than 0.4 sigma + is found if it is a single run set to nan otherwise start a new block + full : each run will be corrected individually + output_name: str + Name of the output parameter for the time corrected A/E in the dataframe and to + be added to the calibration dictionary + display: int + plot level + """ log.info("Starting A/E time correction") self.timecorr_df = pd.DataFrame() try: @@ -1177,7 +786,7 @@ def time_correction( log.info("A/E time correction finished") else: try: - pars, errs, cov = unbinned_aoe_fit( + pars, errs, _ = unbinned_aoe_fit( df.query( f"{self.fit_selection} & {self.cal_energy_param}>1000 & {self.cal_energy_param}<1300" )[aoe_param], @@ -1264,7 +873,23 @@ def drift_time_correction( display: int = 0, ): """ - Calculates the correction needed to align the two drift time regions for ICPC detectors + Calculates the correction needed to align the two drift time regions for ICPC detectors. + This is done by fitting the two drift time peaks in the drift time spectrum and then + fitting the A/E peaks in each of these regions. A simple linear correction is then applied + to align these regions. + + Parameters + ---------- + + data: pd.DataFrame + Dataframe containing the data + aoe_param: str + Name of the A/E parameter to use as starting point + output_name: str + Name of the output parameter for the drift time corrected A/E in the dataframe and to + be added to the calibration dictionary + display: int + plot level """ log.info("Starting A/E drift time correction") self.dt_res_dict = {} @@ -1388,13 +1013,31 @@ def energy_correction( self, data: pd.DataFrame, aoe_param: str, - corrected_param="AoE_Corrected", - classifier_param="AoE_Classifier", + corrected_param: str = "AoE_Corrected", + classifier_param: str = "AoE_Classifier", display: int = 0, ): """ Calculates the corrections needed for the energy dependence of the A/E. - Does this by fitting the compton continuum in slices and then applies fits to the centroid and variance. + Does this by fitting the compton continuum in slices and then applies fits + to the centroid and variance. + + Parameters + ---------- + + data: pd.DataFrame + Dataframe containing the data + aoe_param: str + Name of the A/E parameter to use as starting point + corrected_param: str + Name of the output parameter for the energy mean corrected A/E to + be added in the dataframe and to the calibration dictionary + classifier_param: str + Name of the output parameter for the full mean and sigma energy corrected A/E classifier + to be added in the dataframe and to the calibration dictionary + display: int + plot level + """ log.info("Starting A/E energy correction") @@ -1644,8 +1287,12 @@ def energy_correction( "expression": f"{aoe_param}/({self.mean_func.string_func(self.cal_energy_param)})", "parameters": mu_pars.to_dict(), }, + f"_{classifier_param}_intermediate": { + "expression": f"({aoe_param})-({self.mean_func.string_func(self.cal_energy_param)})", + "parameters": mu_pars.to_dict(), + }, classifier_param: { - "expression": f"({corrected_param}-1)/({self.sigma_func.string_func(self.cal_energy_param)})", + "expression": f"(_{classifier_param}_intermediate)/({self.sigma_func.string_func(self.cal_energy_param)})", "parameters": sig_pars.to_dict(), }, } @@ -1657,13 +1304,34 @@ def get_aoe_cut_fit( aoe_param: str, peak: float, ranges: tuple, - dep_acc: float, + dep_acc: float = 0.9, output_cut_param: str = "AoE_Low_Cut", display: int = 1, ): """ - Determines A/E cut by sweeping through values and for each one fitting the DEP to determine how many events survive. - Then interpolates to get cut value at desired DEP survival fraction (typically 90%) + Determines A/E cut by sweeping through values and for each one fitting the + DEP to determine how many events survive. + Fits the resulting distribution and + interpolates to get cut value at desired DEP survival fraction (typically 90%) + + Parameters + ---------- + + data: pd.DataFrame + Dataframe containing the data + aoe_param: str + Name of the A/E parameter to use as starting point + peak : float + Energy of the peak to use for the cut determination e.g. 1592.5 + ranges: tuple + Tuple of the range in keV below and above the peak to use for the cut determination e.g. (20,40) + dep_acc: float + Desired DEP survival fraction for final cut + output_cut_param: str + Name of the output parameter for the events passing A/E in the dataframe and to + be added to the calibration dictionary + display: int + plot level """ log.info("Starting A/E low cut determination") @@ -1688,7 +1356,7 @@ def get_aoe_cut_fit( peak, self.eres_func(peak), fit_range=erange, - dt_mask=None, + data_mask=None, cut_range=(-8, 0), n_samples=40, mode="greater", @@ -1750,14 +1418,37 @@ def get_aoe_cut_fit( def calculate_survival_fractions_sweep( self, - data, - aoe_param, - peaks, - fit_widths, + data: pd.DataFrame, + aoe_param: str, + peaks: list, + fit_widths: list[tuple], n_samples=26, cut_range=(-5, 5), mode="greater", ): + """ + Calculate survival fractions for the A/E cut for a list of peaks by sweeping through values + of the A/E cut to show how this varies + + Parameters + ---------- + + data: pd.DataFrame + Dataframe containing the data + aoe_param: str + Name of the parameter in the dataframe for the final A/E classifier + peaks: list + List of peaks to calculate the survival fractions for + fit_widths: list + List of tuples of the energy range to fit the peak in + n_samples: int + Number of samples to take in the sweep + cut_range: tuple + Range of the cut to sweep through + mode: str + mode to use for the cut determination, can be "greater" or "less" i.e. do we want to + keep events with A/E greater or less than the cut value + """ sfs = pd.DataFrame() peak_dfs = {} @@ -1778,12 +1469,10 @@ def calculate_survival_fractions_sweep( peak_df[self.cal_energy_param].to_numpy(), peak_df[aoe_param].to_numpy(), self.low_cut_val, - peak, - fwhm, cut_range=cut_range, n_samples=n_samples, mode=mode, - dt_mask=( + data_mask=( peak_df[self.dt_cut_param].to_numpy() if self.dt_cut_param is not None else None @@ -1812,7 +1501,7 @@ def calculate_survival_fractions_sweep( cut_range=cut_range, n_samples=n_samples, mode=mode, - dt_mask=( + data_mask=( peak_df[self.dt_cut_param].to_numpy() if self.dt_cut_param is not None else None @@ -1850,8 +1539,32 @@ def calculate_survival_fractions_sweep( return sfs, peak_dfs def calculate_survival_fractions( - self, data, aoe_param, peaks, fit_widths, mode="greater" + self, + data: pd.DataFrame, + aoe_param: str, + peaks: list, + fit_widths: list[tuple], + mode: str = "greater", ): + """ + Calculate survival fractions for the A/E cut for a list of peaks for the final + A/E cut value + + Parameters + ---------- + + data: pd.DataFrame + Dataframe containing the data + aoe_param: str + Name of the parameter in the dataframe for the final A/E classifier + peaks: list + List of peaks to calculate the survival fractions for + fit_widths: list + List of tuples of the energy range to fit the peak in + mode: str + mode to use for the cut determination, can be "greater" or "less" i.e. do we want to + keep events with A/E greater or less than the cut value + """ sfs = pd.DataFrame() for i, peak in enumerate(peaks): fwhm = self.eres_func(peak) @@ -1868,7 +1581,7 @@ def calculate_survival_fractions( self.low_cut_val, self.high_cut_val, mode=mode, - dt_mask=( + data_mask=( peak_df[self.dt_cut_param].to_numpy() if self.dt_cut_param is not None else None @@ -1897,7 +1610,7 @@ def calculate_survival_fractions( fit_range=fit_range, mode=mode, high_cut=self.high_cut_val, - dt_mask=( + data_mask=( peak_df[self.dt_cut_param].to_numpy() if self.dt_cut_param is not None else None @@ -1928,16 +1641,43 @@ def calculate_survival_fractions( def calibrate( self, - df, - initial_aoe_param, - peaks_of_interest=None, - fit_widths=None, - cut_peak_idx=0, - dep_acc=0.9, - sf_nsamples=11, - sf_cut_range=(-5, 5), - timecorr_mode="full", + df: pd.DataFrame, + initial_aoe_param: str, + peaks_of_interest: list = None, + fit_widths: list[tuple] = None, + cut_peak_idx: int = 0, + dep_acc: float = 0.9, + sf_nsamples: int = 11, + sf_cut_range: tuple = (-5, 5), + timecorr_mode: str = "full", ): + """ + Main function to run a full A/E calibration with all steps i.e. time correction, drift time correction, + energy correction, A/E cut determination and survival fraction calculation + + Parameters + ---------- + + df: pd.DataFrame + Dataframe containing the data + initial_aoe_param: str + Name of the A/E parameter in dataframe to use as starting point + peaks_of_interest: list + List of peaks to calculate the survival fractions for + fit_widths: list + List of tuples of the energy range to fit the peak in for survival fraction determination + cut_peak_idx: int + Index of the peak in peaks of interest to use for the cut determination + dep_acc: float + Desired survival fraction in the peak for final cut value + sf_nsamples: int + Number of samples to take in the survival fraction sweep + sf_cut_range: tuple + Range to use for the survival fraction sweep + timecorr_mode: str + Mode to use for the time correction, see time_correction function for details + + """ if peaks_of_interest is None: peaks_of_interest = [1592.5, 1620.5, 2039, 2103.53, 2614.50] if fit_widths is None: @@ -2035,6 +1775,22 @@ def calibrate( self.two_side_sfs_by_run = None +def get_peak_label(peak: float) -> str: + if peak == 2039: + return "CC @" + elif peak == 1592.5: + return "Tl DEP @" + elif peak == 1620.5: + return "Bi FEP @" + elif peak == 2103.53: + return "Tl SEP @" + elif peak == 2614.5: + return "Tl FEP @" + + +# below are a few plotting functions that can be used to plot the results of the A/E calibration + + def plot_aoe_mean_time( aoe_class, data, time_param="AoE_Timecorr", figsize=(12, 8), fontsize=12 ):