From e6a4dcfd9d01dcc3d828a2d3407aea49cebf77ed Mon Sep 17 00:00:00 2001 From: malina Date: Wed, 16 Aug 2023 13:58:12 +0200 Subject: [PATCH] BBA: error estimates and propagation leads also to reduction of needed parameters --- pySC/correction/bba.py | 185 +++++++++++++++-------------------------- 1 file changed, 68 insertions(+), 117 deletions(-) diff --git a/pySC/correction/bba.py b/pySC/correction/bba.py index d1fbe4c..9f15e6a 100644 --- a/pySC/correction/bba.py +++ b/pySC/correction/bba.py @@ -9,38 +9,18 @@ from pySC.utils import logging_tools from pySC.core.classes import DotDict from pySC.core.constants import TRACKING_MODES, TRACK_TBT, NUM_TO_AB +from pySC.utils.stats import weighted_mean, weighted_error, effective_sample_size, weights_from_errors LOGGER = logging_tools.get_logger(__name__) -""" -Pieces of found function calls from ALS-U SR, i.e. nothing in PETRA IV - -qOrd = SCgetOrds(SC.RING,'QF') -SC,errorFlags = SCBBA(SC, repmat(SC.ORD.BPM,2,1),QuadOrds, mode='TBT', fakeMeasForFailures=True, - quadOrdPhaseAdvance=qOrd[0], quadStrengthPhaseAdvance=[0.95 0.8 1.05], magSPvec=magSPvec, plotResults=SC.plot) - -SC = SCBBA(SC, BPMordsQuadBBA, QuadOrds, mode='ORB',fakeMeasForFailures=True, outlierRejectionAt=200E-6, RMstruct=RMstruct, - plotResults=SC.plot, magSPvec=magSPvec, minSlopeForFit=0.005, minBPMrangeAtBBABBPM=1*20E-6, - BBABPMtarget=5*50E-6, minBPMrangeOtherBPM=0) - -# Orbit correction (without weights) -SC = performOrbitCorr_ALSU_SR(SC,RMstruct,'weight=[]) - -# BBA on sextupoles (quad trim coils) -SC = SCBBA(SC, BPMordsSextBBA, sextOrds, mode='ORB',fakeMeasForFailures=True, outlierRejectionAt=200E-6, - RMstruct=RMstruct, plotResults=SC.plot, switch_off_sextupoles=True, setpoint_method='abs', magSPvec=magSPvecSext, - minSlopeForFit=0.005, minBPMrangeAtBBABBPM=1*10E-6, BBABPMtarget=5*50E-6, minBPMrangeOtherBPM=0) -""" - def bba(SC, bpm_ords, mag_ords, **kwargs): - par = DotDict(dict(mode=SC.INJ.trackMode, outlierRejectionAt=np.inf, nSteps=10, fit_order=1, magnet_order=1, + par = DotDict(dict(mode=SC.INJ.trackMode, nSteps=10, fit_order=1, magnet_order=1, skewness=False, magSPvec=np.array([0.95, 1.05]), setpoint_method='rel', RMstruct=[], orbBumpWindow=5, BBABPMtarget=1E-3, - minBPMrangeAtBBABBPM=500E-6, minBPMrangeOtherBPM=100E-6, maxStdForFittedCenters=600E-6, - nXPointsNeededAtMeasBPM=3, maxNumOfDownstreamBPMs=len(SC.ORD.BPM), minSlopeForFit=0.03, + maxNumOfDownstreamBPMs=len(SC.ORD.BPM), maxTrajChangeAtInjection=np.array([0.9E-3, 0.9E-3]), quadOrdPhaseAdvance=np.array([8]), quadStrengthPhaseAdvance=np.array([0.95, 1.05]), fakeMeasForFailures=False, dipole_compensation=True, - skewness=False, switch_off_sextupoles=False, useBPMreadingsForOrbBumpRef=False, plotResults=False)) + useBPMreadingsForOrbBumpRef=False, plotResults=False)) par.update(**kwargs) if bpm_ords.shape != mag_ords.shape: # both in shape 2 x N raise ValueError('Input arrays for BPMs and magnets must be same size.') @@ -63,7 +43,7 @@ def bba(SC, bpm_ords, mag_ords, **kwargs): quad_k.append(quads_strengths) scalings.append(all_scalings) bpm_ranges.append(all_bpm_ranges) - kick_vec0 = par.maxTrajChangeAtInjection.reshape(2, 1) * np.linspace(-1, 1, par.nSteps) + q0 = SC.RING[par.quadOrdPhaseAdvance].SetPointB[1] for j_bpm in range(bpm_ords.shape[1]): # j_bpm: Index of BPM adjacent to magnet for BBA LOGGER.info(f"BPM number {j_bpm}") @@ -71,41 +51,29 @@ def bba(SC, bpm_ords, mag_ords, **kwargs): LOGGER.debug(f'BBA-BPM {j_bpm}/{bpm_ords.shape[1]}, n_dim = {n_dim}') bpm_ind = np.where(bpm_ords[n_dim, j_bpm] == SC.ORD.BPM)[0][0] m_ord = mag_ords[n_dim, j_bpm] - if par.switch_off_sextupoles: - SC0 = _very_deep_copy(SC) - SC = set_magnet_setpoints(SC, m_ord, setpoints=np.zeros(1), skewness=False, order=2, method='abs') - SC = SCfeedbackRun(SC, par.RMstruct.MinvCO, BPMords=par.RMstruct.BPMords, CMords=par.RMstruct.CMords, - target=0, maxsteps=50, scaleDisp=par.RMstruct.scaleDisp, eps=1E-6) if par.mode == 'ORB': - # TODO very deep copy + SC0 = _very_deep_copy(SC) bpm_pos, tmpTra = _data_measurement_orb(SC, m_ord, bpm_ind, j_bpm, n_dim, par, *_get_orbit_bump(SC, m_ord, bpm_ords[n_dim, j_bpm], n_dim, par)) else: set_ind = np.argmax(bpm_ranges[n_dim][:, bpm_ind]) - if bpm_ranges[n_dim][set_ind, bpm_ind] < par.BBABPMtarget: - LOGGER.warning(f"Small range at BPM[{n_dim},{j_bpm}]") - SC = set_magnet_setpoints(SC, par.quadOrdPhaseAdvance, quad_k[n_dim][set_ind], False, 1, method='rel', dipole_compensation=True) kick_vec = scalings[n_dim][set_ind] * par.maxTrajChangeAtInjection.reshape(2, 1) * np.linspace(-1, 1, par.nSteps) - bpm_pos, tmpTra = _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, par, kick_vec) + bpm_pos, tmpTra, mag_vec = _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, par, kick_vec) SC = set_magnet_setpoints(SC, par.quadOrdPhaseAdvance, q0, False, 1, method='abs', dipole_compensation=True) - - offset_changes[n_dim, j_bpm], error_flags[n_dim, j_bpm] = _data_evaluation(SC, bpm_pos, tmpTra, n_dim, m_ord, par) - if par.switch_off_sextupoles: - SC = SC0 - # if not np.isnan(offset_change): - # SC.RING[bpm_ords[n_dim, j_bpm]].Offset[n_dim] += offset_change - # if par.plotResults: - # plot_bba_results(SC, init_offset_errors, error_flags, j_bpm, bpm_ords, mag_ords) - + offset_changes[n_dim, j_bpm], error_flags[n_dim, j_bpm] = _data_evaluation(SC, bpm_pos, tmpTra, mag_vec, n_dim, m_ord, par) + errors = np.zeros(bpm_ords.shape, dtype=int) for j_bpm in range(bpm_ords.shape[1]): # j_bpm: Index of BPM adjacent to magnet for BBA for n_dim in range(bpm_ords.shape[0]): - if not np.isnan(offset_changes[n_dim, j_bpm]): + if not np.isnan(offset_changes[n_dim, j_bpm]) and np.abs(offset_changes[n_dim, j_bpm]) > 2.5 * error_flags[n_dim, j_bpm]: SC.RING[bpm_ords[n_dim, j_bpm]].Offset[n_dim] += offset_changes[n_dim, j_bpm] + else: + errors[n_dim, j_bpm] = 1 + LOGGER.warning(f"Poor resolution for BPM {j_bpm} in plane {n_dim}: {offset_changes[n_dim, j_bpm]}+-{error_flags[n_dim, j_bpm]}") if par.plotResults: plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords) if par.fakeMeasForFailures: - SC = _fake_measurement(SC, bpm_ords, mag_ords, error_flags) + SC = _fake_measurement(SC, bpm_ords, mag_ords, errors) return SC, error_flags, offset_changes @@ -121,7 +89,7 @@ def _very_deep_copy(SC): def _get_bpm_offset_from_mag(ring, bpm_ords, mag_ords): offset = np.full(bpm_ords.shape, np.nan) - for n_dim in range(2): + for n_dim in range(bpm_ords.shape[0]): offset[n_dim, :] = (atgetfieldvalues(ring, bpm_ords[n_dim, :], 'Offset', n_dim) + atgetfieldvalues(ring, bpm_ords[n_dim, :], 'SupportOffset', n_dim) - atgetfieldvalues(ring, mag_ords[n_dim, :], 'MagnetOffset', n_dim) @@ -168,7 +136,7 @@ def _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, par, kick_vec): SC.INJ.Z0 = initialZ0 SC = set_magnet_setpoints(SC, m_ord, init_setpoint, par.skewness, par.magnet_order, method="abs", dipole_compensation=par.dipole_compensation) - return BPMpos, tmpTra + return BPMpos, tmpTra, par.magSPvec[n_dim, j_bpm] def _data_measurement_orb(SC, mOrd, BPMind, j_bpm, n_dim, par, CMords, cm_vec): @@ -191,65 +159,47 @@ def _data_measurement_orb(SC, mOrd, BPMind, j_bpm, n_dim, par, CMords, cm_vec): return BPMpos, tmpTra -def _data_evaluation(SC, bpm_pos, tmpTra, n_dim, m_ord, par): - offset_change = np.nan - Error = 5 - tmpCenter = np.full(((tmpTra.shape[0] - 1) * par.maxNumOfDownstreamBPMs), np.nan) - tmpNorm = np.full(((tmpTra.shape[0] - 1) * par.maxNumOfDownstreamBPMs), np.nan) - tmpRangeX = np.zeros(((tmpTra.shape[0] - 1) * par.maxNumOfDownstreamBPMs)) - tmpRangeY = np.zeros(((tmpTra.shape[0] - 1) * par.maxNumOfDownstreamBPMs)) - i = 0 - for nBPM in range(par.maxNumOfDownstreamBPMs): - y0 = np.diff(tmpTra[:, :, nBPM], 1, axis=1) - y0 = y0[:, np.sum(~np.isnan(y0), axis=0) > 0] - x0 = np.tile(np.mean(bpm_pos, 1), (y0.shape[1], 1)).T - for nKick in range(y0.shape[1]): - i = i + 1 - y = y0[:, nKick] - x = x0[:, nKick] - x = x[~np.isnan(y)] - y = y[~np.isnan(y)] - # if len(x) == 0 or len(y) == 0: - # continue - tmpRangeX[i] = abs(np.min(x) - np.max(x)) - tmpRangeY[i] = abs(np.min(y) - np.max(y)) - sol = np.full((2,), np.nan) - if len(x) >= par.nXPointsNeededAtMeasBPM and tmpRangeX[i] > par.minBPMrangeAtBBABBPM and tmpRangeY[i] > par.minBPMrangeOtherBPM: - if par.fit_order == 1: - sol = np.linalg.lstsq(np.vstack((np.ones(x.shape), x)).T, y)[0] - sol = sol[[1, 0]] - if abs(sol[0]) < par.minSlopeForFit: - sol[0] = np.nan - tmpCenter[i] = -sol[1] / sol[0] - tmpNorm[i] = 1 / np.sqrt(np.sum((sol[0] * x + sol[1] - y) ** 2)) - else: - sol = np.polyfit(x, y, par.fit_order) - if par.fit_order == 2: - tmpCenter[i] = - (sol[1] / (2 * sol[0])) - else: - tmpCenter[i] = min(abs(np.roots(sol))) - tmpNorm[i] = 1 / np.linalg.norm(np.polyval(sol, x) - y) - - if np.max(tmpRangeX) < par.minBPMrangeAtBBABBPM: - Error = 1 - elif np.max(tmpRangeY) < par.minBPMrangeOtherBPM: - Error = 2 - elif np.std(tmpCenter, ddof=1) > par.maxStdForFittedCenters: - Error = 3 - elif len(np.where(~np.isnan(tmpCenter))[0]) == 0: - Error = 4 - else: - offset_change = np.nansum(tmpCenter * tmpNorm) / np.nansum(tmpNorm) - Error = 0 - if not par.dipole_compensation and n_dim == 1 and SC.RING[m_ord].NomPolynomB[1] != 0: - B = getattr(SC.RING[m_ord], 'BendingAngle', 0) - K = SC.RING[m_ord].NomPolynomB[1] - L = SC.RING[m_ord].Length - offset_change = offset_change + B / L / K - if offset_change > par.outlierRejectionAt: - offset_change = np.nan - Error = 6 - return offset_change, Error +def _data_evaluation(SC, bpm_pos, tmpTra, mag_vec, n_dim, m_ord, par): + x = np.mean(bpm_pos, axis=1) + x_mask = ~np.isnan(x) + err = np.mean(np.std(bpm_pos[x_mask, :], axis=1)) + x = x[x_mask] + new_tmp_tra = tmpTra[x_mask, :, :] + + tmp_slope = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan) + tmp_slope_err = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan) + center = np.full((new_tmp_tra.shape[2]), np.nan) + center_err = np.full((new_tmp_tra.shape[2]), np.nan) + for i in range(new_tmp_tra.shape[0]): + for j in range(new_tmp_tra.shape[2]): + y = new_tmp_tra[i, :, j] + y_mask = ~np.isnan(y) + if np.sum(y_mask) < 2: + continue + # TODO once the position errors are calculated and propagated, should be used + p, pcov = np.polyfit(mag_vec[y_mask], y[y_mask], 1, w=np.ones(int(np.sum(y_mask))) / err, cov='unscaled') + tmp_slope[i, j], tmp_slope_err[i, j] = p[0], pcov[0, 0] + for j in range(min(new_tmp_tra.shape[2], par.maxNumOfDownstreamBPMs)): + y = tmp_slope[:, j] + y_err = tmp_slope_err[:, j] + y_mask = ~np.isnan(y) + if np.sum(y_mask) <= par.fit_order: + continue + # TODO here do odr as the x values have also measurement errors + p, pcov = np.polyfit(x[y_mask], y[y_mask], par.fit_order, w=1 / y_err[y_mask], cov='unscaled') + if par.fit_order < 3: + center[j] = -p[1] / (par.fit_order * p[0]) # zero-crossing if linear, minimum is quadratic + center_err[j] = np.sqrt(center[j] ** 2 * (pcov[0,0]/p[0]**2 + pcov[1,1]/p[1]**2 - 2 * pcov[0,1] / p[0] / p[1])) + # TODO perhaps treat the unlikely case when quadratic would have a maximum + else: + raise NotImplementedError + + mask = ~np.isnan(center) + offset_change = weighted_mean(center[mask], center_err[mask]) + offset_change_error = weighted_error(center[mask]-offset_change, center_err[mask]) / np.sqrt(effective_sample_size(center[mask], weights_from_errors(center_err[mask]))) + if not par.dipole_compensation and n_dim == 0 and SC.RING[m_ord].NomPolynomB[1] != 0: + offset_change += getattr(SC.RING[m_ord], 'BendingAngle', 0) / SC.RING[m_ord].NomPolynomB[1] / SC.RING[m_ord].Length + return offset_change, offset_change_error def single_phase_advance_injection_scan(SC, n_dim, last_bpm_ind, par): @@ -303,6 +253,7 @@ def _scale_injection_to_reach_bpms(SC, n_dim, last_bpm_ind, par): SC.INJ.nTurns = initial_nturns return scaling_factor, np.zeros(len(SC.ORD.BPM)) + def _get_orbit_bump(SC, cm_ord, bpm_ord, n_dim, par): tmpCMind = np.where(par.RMstruct.CMords[0] == cm_ord)[0] if len(tmpCMind): @@ -330,20 +281,10 @@ def _get_orbit_bump(SC, cm_ord, bpm_ord, n_dim, par): return CMords, CMvec -def _plot_bba_step(SC, ax, bpm_ind, n_dim): - s_pos = findspos(SC.RING) - bpm_readings, all_elements_positions = all_elements_reading(SC) - ax.plot(s_pos[SC.ORD.BPM], 1E3 * bpm_readings[n_dim, :len(SC.ORD.BPM)], marker='o') - ax.plot(s_pos[SC.ORD.BPM[bpm_ind]], 1E3 * bpm_readings[n_dim, bpm_ind], marker='o', markersize=10, markerfacecolor='k') - ax.plot(s_pos, 1E3 * all_elements_positions[n_dim, 0, :, 0, 0], linestyle='-') - return ax - - def plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords): plt.rcParams.update({'font.size': 10}) fom0 = init_offset_errors fom = _get_bpm_offset_from_mag(SC.RING, bpm_ords, mag_ords) - #fom[:, bpm_ind + 1:] = np.nan n_steps = 1 if bpm_ords.shape[1] == 1 else 1.1 * np.max(np.abs(fom0)) * np.linspace(-1, 1, int(np.floor(bpm_ords.shape[1] / 3))) f, ax = plt.subplots(nrows=3, num=90, figsize=(8, 8), facecolor="w") colors = ['#1f77b4', '#ff7f0e'] @@ -363,6 +304,7 @@ def plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords): ax[0].set_ylabel('Occurrences') mask_errors = error_flags == 0 + mask_errors = ~np.isnan(error_flags) plabels = ("Horizontal", "Vertical") for n_dim in range(bpm_ords.shape[0]): x = np.where(np.in1d(SC.ORD.BPM, bpm_ords[n_dim, :]))[0] @@ -384,3 +326,12 @@ def plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords): ax[2].legend() f.tight_layout() f.show() + + +def _plot_bba_step(SC, ax, bpm_ind, n_dim): + s_pos = findspos(SC.RING) + bpm_readings, all_elements_positions = all_elements_reading(SC) + ax.plot(s_pos[SC.ORD.BPM], 1E3 * bpm_readings[n_dim, :len(SC.ORD.BPM)], marker='o') + ax.plot(s_pos[SC.ORD.BPM[bpm_ind]], 1E3 * bpm_readings[n_dim, bpm_ind], marker='o', markersize=10, markerfacecolor='k') + ax.plot(s_pos, 1E3 * all_elements_positions[n_dim, 0, :, 0, 0], linestyle='-') + return ax