diff --git a/src/pygama/math/binned_fitting.py b/src/pygama/math/binned_fitting.py index aef242e74..81f6455d3 100644 --- a/src/pygama/math/binned_fitting.py +++ b/src/pygama/math/binned_fitting.py @@ -90,7 +90,9 @@ def fit_binned(func: Callable, hist: np.ndarray, bins: np.ndarray, var: np.ndarr m = Minuit(cost_func, *guess) if bounds is not None: - m.limits = bounds + if isinstance(bounds, dict): + for key, val in bounds.items(): + m.limits[key] = val if fixed is not None: for fix in fixed: m.fixed[fix] = True diff --git a/src/pygama/pargen/energy_cal.py b/src/pygama/pargen/energy_cal.py index 7983230a7..830b9cef1 100644 --- a/src/pygama/pargen/energy_cal.py +++ b/src/pygama/pargen/energy_cal.py @@ -409,11 +409,310 @@ def hpge_get_energy_peaks( def hpge_cal_energy_peak_tops( self, e_uncal, + n_sigmas=1.2, + peaks_kev=None, + default_n_bins=50, + n_events=None, + allowed_p_val=0.05, + update_cal_pars=True, ): """ - Calibrates using energy peak top fits + Perform energy calibration for HPGe detector using peak fitting. + + Args: + e_uncal (array-like): + Uncalibrated energy values. + n_sigmas (float, optional): + Number of standard deviations to use for peak fitting range. Defaults to 1.2. + peaks_kev (array-like, optional): + Known peak positions in keV. If not provided, uses self.peaks_kev. Defaults to None. + default_n_bins (int, optional): + Number of bins for histogram. Defaults to 50. + n_events (int, optional): + Number of events to consider for calibration. Defaults to None which uses all events. + allowed_p_val (float, optional): + Maximum p-value for a fit to be considered valid. Defaults to 0.05. + update_cal_pars (bool, optional): + Whether to update the calibration parameters. Defaults to True. """ + results_dict = {} + + # check no peaks in self.peaks_kev missing from peak_pars + if peaks_kev is None: + peaks_kev = self.peaks_kev + + peak_pars = [(peak, None, pgf.gauss_on_uniform) for peak in peaks_kev] + + # convert peak pars to array of tuples + tmp = np.empty(len(peak_pars), dtype=object) + tmp[:] = peak_pars + peak_pars = tmp + + peak_pars_lines = [i[0] for i in peak_pars] + peaks_mask = np.array( + [True if peak in peaks_kev else False for peak in peak_pars_lines], + dtype=bool, + ) + peak_pars = peak_pars[peaks_mask] + + fit_peaks_mask = np.array( + [True for i in peak_pars if i[2] is not None], + dtype=bool, + ) + peak_pars = peak_pars[fit_peaks_mask] + + # First calculate range around peaks to fit + + euc_min, euc_max = ( + (Polynomial(self.pars) - i).roots() + for i in (peaks_kev[0] * 0.9, peaks_kev[-1] * 1.1) + ) + euc_min = euc_min[np.logical_and(euc_min >= 0, euc_min <= max(euc_max))][0] + euc_max = euc_max[ + np.logical_and(euc_max >= euc_min, euc_max <= np.nanmax(e_uncal) * 1.1) + ][0] + + d_euc = 0.5 / self.pars[1] + if self.uncal_is_int: + euc_min, euc_max, d_euc = pgh.better_int_binning( + x_lo=euc_min, x_hi=euc_max, dx=d_euc + ) + + hist, bins, var = pgh.get_hist(e_uncal, range=(euc_min, euc_max), dx=d_euc) + + uncal_peak_pars = [] + for pars in peak_pars: + peak, fit_range, func = pars + + if peak in self.peaks_kev: + loc = self.peak_locs[np.where(peak == self.peaks_kev)][0] + else: + loc = (Polynomial(self.pars) - peak).roots()[0] + + # Need to do initial fit + pt_pars, _ = hpge_fit_energy_peak_tops(hist, bins, var, [loc], n_to_fit=7) + # Drop failed fits + if pt_pars[0] is not None: + range_uncal = (float(pt_pars[0][1]) * 20, float(pt_pars[0][1]) * 20) + n_bins = default_n_bins + else: + range_uncal = None + if range_uncal is not None: + uncal_peak_pars.append((peak, loc, range_uncal, n_bins, func)) + + fit_dict = {} + + for i_peak, uncal_peak_par in enumerate(uncal_peak_pars): + peak_kev, mode_guess, wwidth_i, n_bins_i, func_i = uncal_peak_par + wleft_i, wright_i = wwidth_i + euc_min = mode_guess - wleft_i + euc_max = mode_guess + wright_i + + if self.uncal_is_int is True: + euc_min, euc_max, n_bins_i = pgh.better_int_binning( + x_lo=euc_min, x_hi=euc_max, n_bins=n_bins_i + ) + + energies = e_uncal[(e_uncal > euc_min) & (e_uncal < euc_max)][:n_events] + binw_1 = (euc_max - euc_min) / n_bins_i + + x0 = get_hpge_energy_peak_par_guess( + energies, + func_i, + (euc_min, euc_max), + bin_width=binw_1, + mode_guess=mode_guess, + ) + + euc_min = x0["mu"] - n_sigmas * x0["sigma"] + euc_max = x0["mu"] + n_sigmas * x0["sigma"] + + energies = e_uncal[(e_uncal > euc_min) & (e_uncal < euc_max)][:n_events] + bin_width = (x0["sigma"]) * len(energies) ** (-1 / 3) + n_bins_i = int((euc_max - euc_min) / bin_width) + + if self.uncal_is_int is True: + euc_min, euc_max, n_bins_i = pgh.better_int_binning( + x_lo=euc_min, x_hi=euc_max, n_bins=n_bins_i + ) + + hist, bins, var = pgh.get_hist( + energies, bins=n_bins_i, range=(euc_min, euc_max) + ) + + x0["x_lo"] = euc_min + x0["x_hi"] = euc_max + + fixed, mask = get_hpge_energy_fixed(func_i) + fixed.append("n_bkg") + mask[np.where(x0 == "n_bkg")[0]] = True + bounds = get_hpge_energy_bounds(func_i, x0) + + pars_i, errs_i, cov_i = pgb.fit_binned( + func_i.cdf_ext, + hist, + bins, + var=var, + guess=x0, + cost_func="LL", + Extended=True, + fixed=fixed, + bounds=bounds, + ) + valid_fit = True + + csqr = pgb.goodness_of_fit( + hist, + bins, + None, + func_i.get_pdf, + pars_i, + method="Pearson", + scale_bins=True, + ) + csqr = (csqr[0], csqr[1] + len(np.where(mask)[0])) + + if np.isnan(pars_i).any(): + log.debug( + f"hpge_cal_energy_peak_tops: fit failed for i_peak={i_peak} at loc {mode_guess:g}, par is nan : {pars_i}" + ) + raise RuntimeError + + p_val = scipy.stats.chi2.sf(csqr[0], csqr[1]) + + if ( + cov_i is None + or cov_i.ndim == 0 + or sum(sum(c) for c in cov_i[mask, :][:, mask]) == np.inf + or sum(sum(c) for c in cov_i[mask, :][:, mask]) == 0 + or np.isnan(sum(sum(c) for c in cov_i[mask, :][:, mask])) + ): + log.debug( + f"hpge_cal_energy_peak_tops: cov estimation failed for i_peak={i_peak} at loc {mode_guess:g}" + ) + valid_pk = False + + elif valid_fit is False: + log.debug( + f"hpge_cal_energy_peak_tops: peak fitting failed for i_peak={i_peak} at loc {mode_guess:g}" + ) + valid_pk = False + + elif ( + errs_i is None + or pars_i is None + or np.abs(np.array(errs_i)[mask] / np.array(pars_i)[mask]) < 1e-7 + ).any() or np.isnan(np.array(errs_i)[mask]).any(): + log.debug( + f"hpge_cal_energy_peak_tops: failed for i_peak={i_peak} at loc {mode_guess:g}, parameter error too low" + ) + valid_pk = False + + elif p_val < allowed_p_val or np.isnan(p_val): + log.debug( + f"hpge_cal_energy_peak_tops: fit failed for i_peak={i_peak}, p-value too low: {p_val}" + ) + valid_pk = False + else: + valid_pk = True + + fit_dict[peak_kev] = { + "function": func_i, + "validity": valid_pk, + "parameters": pars_i, + "uncertainties": errs_i, + "covariance": cov_i, + "nbins": binw_1, + "range": [euc_min, euc_max], + "p_value": p_val, + } + + results_dict["peak_parameters"] = fit_dict + + fitted_peaks_kev = np.array( + [peak for peak in fit_dict if fit_dict[peak]["validity"]] + ) + + log.info(f"{len(fitted_peaks_kev)} peaks fitted:") + for peak, peak_dict in fit_dict.items(): + if peak_dict["validity"] is True: + varnames = peak_dict["function"].required_args() + pars = np.asarray(peak_dict["parameters"], dtype=float) + errors = np.asarray(peak_dict["uncertainties"], dtype=float) + log.info(f"\tEnergy: {str(peak)}") + log.info("\t\tParameter | Value +/- Sigma ") + for vari, pari, errorsi in zip(varnames, pars, errors): + log.info( + f'\t\t{str(vari).ljust(10)} | {("%4.2f" % pari).rjust(8)} +/- {("%4.2f" % errorsi).ljust(8)}' + ) + + # Drop failed fits + pk_funcs = [ + fit_dict[peak]["function"] + for peak in fit_dict + if fit_dict[peak]["validity"] + ] + pk_pars = [ + fit_dict[peak]["parameters"] + for peak in fit_dict + if fit_dict[peak]["validity"] + ] + pk_errors = [ + fit_dict[peak]["uncertainties"] + for peak in fit_dict + if fit_dict[peak]["validity"] + ] + + if len(pk_pars) == 0: + log.error("hpge_fit_energy_peaks: no peaks fitted") + self.update_results_dict(results_dict) + return + + # Do a second calibration to the results of the full peak fits + # First, calculate the peak positions + mus = [ + func_i.get_mu(pars_i, errors=errors_i) + for func_i, pars_i, errors_i in zip(pk_funcs, pk_pars, pk_errors) + ] + mus, mu_vars = zip(*mus) + + mus = results_dict["pk_pos"] = np.asarray(mus) + mu_vars = results_dict["pk_pos_uncertainties"] = np.asarray(mu_vars) ** 2 + + if update_cal_pars is False: + self.update_results_dict(results_dict) + return + + self.peaks_kev = np.asarray(fitted_peaks_kev) + self.peak_locs = np.asarray(mus) + + # Now fit the E scale + try: + pars, errs, cov = hpge_fit_energy_scale( + mus, mu_vars, fitted_peaks_kev, deg=self.deg, fixed=self.fixed + ) + + results_dict["pk_cal_pars"] = pars + results_dict["pk_cal_errs"] = errs + results_dict["pk_cal_cov"] = cov + + # Invert the E scale fit to get a calibration function + pars, errs, cov = hpge_fit_energy_cal_func( + mus, + mu_vars, + fitted_peaks_kev, + pars, + deg=self.deg, + fixed=self.fixed, + ) + self.pars = np.array(pars) + + except ValueError: + log.error("Failed to fit enough peaks to get accurate calibration") + + self.update_results_dict(results_dict) + def hpge_fit_energy_peaks( self, e_uncal, @@ -523,7 +822,7 @@ def hpge_fit_energy_peaks( euc_max >= euc_min, euc_max <= np.nanmax(e_uncal) * 1.1 ) ][0] - d_euc = 1 / self.pars[1] + d_euc = 0.5 / self.pars[1] if self.uncal_is_int: euc_min, euc_max, d_euc = pgh.better_int_binning( x_lo=euc_min, x_hi=euc_max, dx=d_euc @@ -532,10 +831,12 @@ def hpge_fit_energy_peaks( e_uncal, range=(euc_min, euc_max), dx=d_euc ) # Need to do initial fit - pt_pars, _ = hpge_fit_energy_peak_tops(hist, bins, var, loc, n_to_fit=7) + pt_pars, _ = hpge_fit_energy_peak_tops( + hist, bins, var, [loc], n_to_fit=7 + ) # Drop failed fits if pt_pars[0] is not None: - range_uncal = float(pt_pars[0][1]) * 20 + range_uncal = (float(pt_pars[0][1]) * 20, float(pt_pars[0][1]) * 20) n_bins = default_n_bins else: range_uncal = None @@ -624,7 +925,7 @@ def hpge_fit_energy_peaks( func_i.get_pdf, pars_i, method="Pearson", - scale_bins=False, + scale_bins=True, ) csqr = (csqr[0], csqr[1] + len(np.where(mask)[0])) @@ -1544,7 +1845,11 @@ def get_hpge_energy_peak_par_guess( hist, bins, var = pgh.get_hist(energy, dx=bin_width, range=fit_range) - if func == pgf.gauss_on_step or func == pgf.hpge_peak: + if ( + func == pgf.gauss_on_step + or func == pgf.hpge_peak + or func == pgf.gauss_on_uniform + ): # get mu and height from a gauss fit, also sigma as fallback pars, cov = pgb.gauss_mode_width_max( hist, bins, var, mode_guess=mode_guess, n_bins=5 @@ -1608,18 +1913,19 @@ def get_hpge_energy_peak_par_guess( ) n_bkg = np.sum(hist) - n_sig - hstep = step / (bg + np.mean(hist[:10])) - parguess = { "n_sig": n_sig, "mu": mu, "sigma": sigma, "n_bkg": n_bkg, - "hstep": hstep, "x_lo": bins[0], "x_hi": bins[-1], } + if func == pgf.gauss_on_step or func == pgf.hpge_peak: + hstep = step / (bg + np.mean(hist[:10])) + parguess["hstep"] = hstep + if func == pgf.hpge_peak: sigma = sigma * 0.8 # roughly remove some amount due to tail # for now hard-coded @@ -1657,14 +1963,14 @@ def get_hpge_energy_fixed(func): A boolean mask indicating which parameters are fixed (False) and which are not fixed (True). """ - if func == pgf.gauss_on_step: + if ( + func == pgf.gauss_on_step + or func == pgf.hpge_peak + or func == pgf.gauss_on_uniform + ): # pars are: n_sig, mu, sigma, n_bkg, hstep, components fixed = ["x_lo", "x_hi"] - elif func == pgf.hpge_peak: - # pars are: n_sig, mu, sigma, htail,tau, n_bkg, hstep, components - fixed = ["x_lo", "x_hi"] - else: log.error(f"get_hpge_energy_fixed not implemented for {func.__name__}") return None @@ -1696,6 +2002,17 @@ def get_hpge_energy_bounds(func, parguess): "x_lo": (None, None), "x_hi": (None, None), } + + if func == pgf.gauss_on_uniform: + return { + "n_sig": (0, None), + "mu": (parguess["x_lo"], parguess["x_hi"]), + "sigma": (0, None), + "n_bkg": (0, None), + "x_lo": (None, None), + "x_hi": (None, None), + } + else: log.error(f"get_hpge_energy_bounds not implemented for {func.__name__}") return [] diff --git a/tests/pargen/test_ecal.py b/tests/pargen/test_ecal.py index 1cef50598..564da9877 100644 --- a/tests/pargen/test_ecal.py +++ b/tests/pargen/test_ecal.py @@ -81,6 +81,12 @@ def test_hpge_cal(lgnd_test_data): assert approx(cal.pars[1], 0.1) == 0.15 assert cal.pars[0] == 0.0 + cal.hpge_cal_energy_peak_tops(energy) + + assert (cal.peaks_kev == glines).all() + assert approx(cal.pars[1], 0.1) == 0.15 + assert cal.pars[0] == 0.0 + cal.hpge_fit_energy_peaks(energy, peak_pars=pk_pars) assert (cal.peaks_kev == glines).all()