diff --git a/pySC/correction/bba.py b/pySC/correction/bba.py index 2a0a8d4..d1fbe4c 100644 --- a/pySC/correction/bba.py +++ b/pySC/correction/bba.py @@ -1,5 +1,4 @@ import matplotlib.pyplot as plt -from matplotlib.patches import Rectangle import numpy as np import copy from pySC.utils.at_wrapper import findspos, atgetfieldvalues @@ -9,7 +8,7 @@ from pySC.core.lattice_setting import set_cm_setpoints, set_magnet_setpoints, get_cm_setpoints from pySC.utils import logging_tools from pySC.core.classes import DotDict -from pySC.core.constants import TRACKING_MODES, TRACK_TBT +from pySC.core.constants import TRACKING_MODES, TRACK_TBT, NUM_TO_AB LOGGER = logging_tools.get_logger(__name__) @@ -41,8 +40,7 @@ def bba(SC, bpm_ords, mag_ords, **kwargs): nXPointsNeededAtMeasBPM=3, maxNumOfDownstreamBPMs=len(SC.ORD.BPM), minSlopeForFit=0.03, maxTrajChangeAtInjection=np.array([0.9E-3, 0.9E-3]), quadOrdPhaseAdvance=np.array([8]), quadStrengthPhaseAdvance=np.array([0.95, 1.05]), fakeMeasForFailures=False, dipole_compensation=True, - skewQuadrupole=False, switch_off_sextupoles=False, useBPMreadingsForOrbBumpRef=False, - plotLines=False, plotResults=False)) + skewness=False, switch_off_sextupoles=False, useBPMreadingsForOrbBumpRef=False, plotResults=False)) par.update(**kwargs) if bpm_ords.shape != mag_ords.shape: # both in shape 2 x N raise ValueError('Input arrays for BPMs and magnets must be same size.') @@ -56,36 +54,59 @@ def bba(SC, bpm_ords, mag_ords, **kwargs): init_offset_errors = _get_bpm_offset_from_mag(SC.RING, bpm_ords, mag_ords) error_flags = np.full(bpm_ords.shape, np.nan) + offset_changes = np.full(bpm_ords.shape, np.nan) + if par.mode == TRACK_TBT: + quad_k, scalings, bpm_ranges = [], [], [] + for n_dim in range(bpm_ords.shape[0]): + last_bpm_ind = np.where(bpm_ords[n_dim, -1] == SC.ORD.BPM)[0][0] + quads_strengths, all_scalings, all_bpm_ranges = single_phase_advance_injection_scan(SC, n_dim, last_bpm_ind, par) + quad_k.append(quads_strengths) + scalings.append(all_scalings) + bpm_ranges.append(all_bpm_ranges) kick_vec0 = par.maxTrajChangeAtInjection.reshape(2, 1) * np.linspace(-1, 1, par.nSteps) - + q0 = SC.RING[par.quadOrdPhaseAdvance].SetPointB[1] for j_bpm in range(bpm_ords.shape[1]): # j_bpm: Index of BPM adjacent to magnet for BBA + LOGGER.info(f"BPM number {j_bpm}") for n_dim in range(bpm_ords.shape[0]): LOGGER.debug(f'BBA-BPM {j_bpm}/{bpm_ords.shape[1]}, n_dim = {n_dim}') - SC0 = _very_deep_copy(SC) bpm_ind = np.where(bpm_ords[n_dim, j_bpm] == SC.ORD.BPM)[0][0] m_ord = mag_ords[n_dim, j_bpm] if par.switch_off_sextupoles: + SC0 = _very_deep_copy(SC) SC = set_magnet_setpoints(SC, m_ord, setpoints=np.zeros(1), skewness=False, order=2, method='abs') SC = SCfeedbackRun(SC, par.RMstruct.MinvCO, BPMords=par.RMstruct.BPMords, CMords=par.RMstruct.CMords, target=0, maxsteps=50, scaleDisp=par.RMstruct.scaleDisp, eps=1E-6) if par.mode == 'ORB': + # TODO very deep copy bpm_pos, tmpTra = _data_measurement_orb(SC, m_ord, bpm_ind, j_bpm, n_dim, par, *_get_orbit_bump(SC, m_ord, bpm_ords[n_dim, j_bpm], n_dim, par)) else: - kick_vec, bpm_range = _scale_injection_to_reach_bpm(SC, bpm_ind, n_dim, kick_vec0) - if par.quadOrdPhaseAdvance and bpm_range < par.BBABPMtarget: - SC, kick_vec = _scan_phase_advance(SC, bpm_ind, n_dim, kick_vec0, par) + set_ind = np.argmax(bpm_ranges[n_dim][:, bpm_ind]) + if bpm_ranges[n_dim][set_ind, bpm_ind] < par.BBABPMtarget: + LOGGER.warning(f"Small range at BPM[{n_dim},{j_bpm}]") + + SC = set_magnet_setpoints(SC, par.quadOrdPhaseAdvance, quad_k[n_dim][set_ind], False, 1, method='rel', dipole_compensation=True) + kick_vec = scalings[n_dim][set_ind] * par.maxTrajChangeAtInjection.reshape(2, 1) * np.linspace(-1, 1, par.nSteps) bpm_pos, tmpTra = _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, par, kick_vec) - offset_change, error_flags[n_dim, j_bpm] = _data_evaluation(SC, bpm_ords, j_bpm, bpm_pos, tmpTra, n_dim, m_ord, - par) - SC = SC0 - if not np.isnan(offset_change): - SC.RING[bpm_ords[n_dim, j_bpm]].Offset[n_dim] += offset_change - if par.plotResults: - plot_bba_results(SC, init_offset_errors, error_flags, j_bpm, bpm_ords, mag_ords) + SC = set_magnet_setpoints(SC, par.quadOrdPhaseAdvance, q0, False, 1, method='abs', dipole_compensation=True) + + offset_changes[n_dim, j_bpm], error_flags[n_dim, j_bpm] = _data_evaluation(SC, bpm_pos, tmpTra, n_dim, m_ord, par) + if par.switch_off_sextupoles: + SC = SC0 + # if not np.isnan(offset_change): + # SC.RING[bpm_ords[n_dim, j_bpm]].Offset[n_dim] += offset_change + # if par.plotResults: + # plot_bba_results(SC, init_offset_errors, error_flags, j_bpm, bpm_ords, mag_ords) + + for j_bpm in range(bpm_ords.shape[1]): # j_bpm: Index of BPM adjacent to magnet for BBA + for n_dim in range(bpm_ords.shape[0]): + if not np.isnan(offset_changes[n_dim, j_bpm]): + SC.RING[bpm_ords[n_dim, j_bpm]].Offset[n_dim] += offset_changes[n_dim, j_bpm] + if par.plotResults: + plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords) if par.fakeMeasForFailures: SC = _fake_measurement(SC, bpm_ords, mag_ords, error_flags) - return SC, error_flags + return SC, error_flags, offset_changes def _very_deep_copy(SC): @@ -97,6 +118,7 @@ def _very_deep_copy(SC): new.IDEALRING[ind] = element.deepcopy() return new + def _get_bpm_offset_from_mag(ring, bpm_ords, mag_ords): offset = np.full(bpm_ords.shape, np.nan) for n_dim in range(2): @@ -128,77 +150,48 @@ def _fake_measurement(SC, bpm_ords, mag_ords, error_flags): def _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, par, kick_vec): - sPos = findspos(SC.RING) - measDim = 1 - n_dim if par.skewQuadrupole else n_dim + measDim = 1 - n_dim if par.skewness else n_dim initialZ0 = SC.INJ.Z0.copy() - print(f"Init Z0: {SC.INJ.Z0}") nMsteps = kick_vec.shape[1] tmpTra = np.full((nMsteps, len(par.magSPvec[n_dim, j_bpm]), par.maxNumOfDownstreamBPMs), np.nan) - BPMpos = np.full((nMsteps, len(par.magSPvec[n_dim, j_bpm])), np.nan) - init_setpoint = getattr(SC.RING[m_ord], f"SetPoint{'A' if par.skewQuadrupole else 'B'}")[par.magnet_order] - if par.plotLines: - f, ax = plt.subplots(nrows=len(par.magSPvec[n_dim, j_bpm]), num=99) + init_setpoint = getattr(SC.RING[m_ord], f"SetPoint{NUM_TO_AB[int(par.skewness)]}")[par.magnet_order] for nQ, setpointQ in enumerate(par.magSPvec[n_dim, j_bpm]): - SC = set_magnet_setpoints(SC, m_ord, setpointQ, par.skewQuadrupole, par.magnet_order, + SC = set_magnet_setpoints(SC, m_ord, setpointQ, par.skewness, par.magnet_order, method=par.setpoint_method, dipole_compensation=par.dipole_compensation) for nKick in range(nMsteps): SC.INJ.Z0[2 * n_dim:2 * n_dim + 2] = initialZ0[2 * n_dim:2 * n_dim + 2] + kick_vec[:, nKick] B = bpm_reading(SC) - if par.plotLines: - ax[nQ] = _plot_bba_step(SC, ax[nQ], bpm_ind, n_dim) BPMpos[nKick, nQ] = B[n_dim, bpm_ind] tmpTra[nKick, nQ, :] = B[measDim, bpm_ind:(bpm_ind + par.maxNumOfDownstreamBPMs)] - if par.plotLines: - ax[nQ].add_patch(Rectangle((sPos[m_ord], -1), width=sPos[m_ord + 1] - sPos[m_ord], height =2, facecolor=[0, 0.4470, 0.7410])) - ax[nQ].set_xlim(sPos[m_ord] + np.array([-10, 10])) - ax[nQ].set_ylim(1.3 * np.array([-1, 1])) - if par.plotLines: - f.show() SC.INJ.Z0 = initialZ0 - SC = set_magnet_setpoints(SC, m_ord, init_setpoint, par.skewQuadrupole, par.magnet_order, + SC = set_magnet_setpoints(SC, m_ord, init_setpoint, par.skewness, par.magnet_order, method="abs", dipole_compensation=par.dipole_compensation) - print(f"Final Z0: {SC.INJ.Z0}") return BPMpos, tmpTra def _data_measurement_orb(SC, mOrd, BPMind, j_bpm, n_dim, par, CMords, cm_vec): - s_pos = findspos(SC.RING) - meas_dim = 1 - n_dim if par.skewQuadrupole else n_dim + meas_dim = 1 - n_dim if par.skewness else n_dim initial_z0 = SC.INJ.Z0.copy() nMsteps = cm_vec[n_dim].shape[0] tmpTra = np.full((nMsteps, len(par.magSPvec[n_dim, j_bpm]), len(SC.ORD.BPM)), np.nan) - BPMpos = np.full((nMsteps, len(par.magSPvec[n_dim, j_bpm])), np.nan) - if par.plotLines: - f, ax = plt.subplots(nrows=len(par.magSPvec[n_dim, j_bpm]), num=99) for nQ, setpointQ in enumerate(par.magSPvec[n_dim, j_bpm]): - SC = set_magnet_setpoints(SC, mOrd, setpointQ, par.skewQuadrupole, par.magnet_order, method=par.setpoint_method, + SC = set_magnet_setpoints(SC, mOrd, setpointQ, par.skewness, par.magnet_order, method=par.setpoint_method, dipole_compensation=par.dipole_compensation) for nKick in range(nMsteps): for nD in range(2): SC = set_cm_setpoints(SC, CMords[nD], cm_vec[nD][nKick, :], bool(nD), method='abs') B = bpm_reading(SC) - if par.plotLines: - ax[nQ] = _plot_bba_step(SC, ax[nQ], BPMind, n_dim) BPMpos[nKick, nQ] = B[n_dim, BPMind] tmpTra[nKick, nQ, :] = B[meas_dim, :] - if par.plotLines: - ax[nQ].rectangle([s_pos[mOrd], -1, s_pos[mOrd + 1] - s_pos[mOrd], 1], facecolor=[0, 0.4470, 0.7410]) - ax[nQ].set_xlim(s_pos[mOrd] + np.array([-10, 10])) - ax[nQ].set_ylim(1.3 * np.array([-1, 1])) - plt.show() SC.INJ.Z0 = initial_z0 return BPMpos, tmpTra -def _data_evaluation(SC, bpm_ords, j_bpm, bpm_pos, tmpTra, n_dim, m_ord, par): - if par.plotLines: - plt.rcParams.update({'font.size': 18}) - fig, ax = plt.subplots(num=56, facecolor="w", projection="3d") - p1 = ax.plot(0, 1E6 * SC.RING[m_ord].T2[2 * n_dim - 1], 0, 'rD', MarkerSize=40, MarkerFaceColor='b') +def _data_evaluation(SC, bpm_pos, tmpTra, n_dim, m_ord, par): offset_change = np.nan Error = 5 tmpCenter = np.full(((tmpTra.shape[0] - 1) * par.maxNumOfDownstreamBPMs), np.nan) @@ -208,6 +201,7 @@ def _data_evaluation(SC, bpm_ords, j_bpm, bpm_pos, tmpTra, n_dim, m_ord, par): i = 0 for nBPM in range(par.maxNumOfDownstreamBPMs): y0 = np.diff(tmpTra[:, :, nBPM], 1, axis=1) + y0 = y0[:, np.sum(~np.isnan(y0), axis=0) > 0] x0 = np.tile(np.mean(bpm_pos, 1), (y0.shape[1], 1)).T for nKick in range(y0.shape[1]): i = i + 1 @@ -215,8 +209,8 @@ def _data_evaluation(SC, bpm_ords, j_bpm, bpm_pos, tmpTra, n_dim, m_ord, par): x = x0[:, nKick] x = x[~np.isnan(y)] y = y[~np.isnan(y)] - if len(x) == 0 or len(y) == 0: - continue + # if len(x) == 0 or len(y) == 0: + # continue tmpRangeX[i] = abs(np.min(x) - np.max(x)) tmpRangeY[i] = abs(np.min(y) - np.max(y)) sol = np.full((2,), np.nan) @@ -235,11 +229,7 @@ def _data_evaluation(SC, bpm_ords, j_bpm, bpm_pos, tmpTra, n_dim, m_ord, par): else: tmpCenter[i] = min(abs(np.roots(sol))) tmpNorm[i] = 1 / np.linalg.norm(np.polyval(sol, x) - y) - if par.plotLines: - p2 = ax.plot(np.tile(j_bpm + nBPM, x.shape), 1E6 * x, 1E3 * y, 'ko') - tmp = ax.plot(np.tile(j_bpm + nBPM, x.shape), 1E6 * x, 1E3 * np.polyval(sol, x), 'k-') - p3 = tmp[0] - p4 = plt.plot(j_bpm + nBPM, 1E6 * tmpCenter[nBPM], 0, 'Or', MarkerSize=10) + if np.max(tmpRangeX) < par.minBPMrangeAtBBABBPM: Error = 1 elif np.max(tmpRangeY) < par.minBPMrangeOtherBPM: @@ -259,68 +249,59 @@ def _data_evaluation(SC, bpm_ords, j_bpm, bpm_pos, tmpTra, n_dim, m_ord, par): if offset_change > par.outlierRejectionAt: offset_change = np.nan Error = 6 - if par.plotLines: - p5 = plt.plot(0, 1E6 * offset_change, 0, 'kD', MarkerSize=30, MarkerFaceColor='r') - p6 = plt.plot(0, 1E6 * (SC.RING[bpm_ords[n_dim, j_bpm]].Offset[n_dim] + SC.RING[bpm_ords[n_dim, j_bpm]].SupportOffset[ - n_dim] + offset_change), 0, 'kD', MarkerSize=30, MarkerFaceColor='g') - ax.title( - f'BBA-BPM: {j_bpm:d} \n mOrd: {m_ord:d} \n mFam: {SC.RING[m_ord].FamName} \n nDim: {n_dim:d} \n FinOffset = {1E6 * np.abs(SC.RING[bpm_ords[n_dim, j_bpm]].Offset[n_dim] + SC.RING[bpm_ords[n_dim, j_bpm]].SupportOffset[n_dim] + offset_change - SC.RING[m_ord].MagnetOffset[n_dim] - SC.RING[m_ord].SupportOffset[n_dim]):3.0f} $\\mu m$') - ax.legend((p1, p2, p3, p4, p5, p6), ( - 'Magnet center', 'Measured offset change', 'Line fit', 'Fitted BPM offset (individual)', - 'Fitted BPM offset (mean)', 'Predicted magnet center')) - ax.set_xlabel('Index of BPM') - ax.set_ylabel(r'BBA-BPM offset [$\mu$m]') - ax.set_zlabel('Offset change [mm]') - plt.show() return offset_change, Error -def _scale_injection_to_reach_bpm(SC, bpm_ind, n_dim, kick_vec0): - initial_z0 = SC.INJ.Z0.copy() - for scaling_factor in (1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1): - tmp_bpm_pos = np.full(kick_vec0.shape[1], np.nan) - for nK in range(kick_vec0.shape[1]): - SC.INJ.Z0[2 * n_dim:2 * n_dim + 2] = initial_z0[2 * n_dim:2 * n_dim + 2] + scaling_factor * kick_vec0[:, nK] - tmp_bpm_pos[nK] = bpm_reading(SC, np.array([SC.ORD.BPM[bpm_ind]]))[n_dim, 0] - SC.INJ.Z0 = initial_z0.copy() +def single_phase_advance_injection_scan(SC, n_dim, last_bpm_ind, par): - if np.sum(np.isnan(tmp_bpm_pos)) == 0: - bpm_range = np.max(tmp_bpm_pos) - np.min(tmp_bpm_pos) - kick_vec = scaling_factor * kick_vec0 - LOGGER.debug(f'Initial trajectory variation scaled to [{100 * (kick_vec[0] / kick_vec0[0])}| ' - f'{100 * (kick_vec[-1] / kick_vec0[-1])}]% of its initial value, ' - f'BBA-BPM range {1E6 * bpm_range:.0f} um.') - return kick_vec, bpm_range - else: - LOGGER.warning('Something went wrong. No beam transmission at all(?)') - return kick_vec0, 0 - - -def _scan_phase_advance(SC, bpm_ind, n_dim, kick_vec0, par): + scaling, bpm_ranges = _scale_injection_to_reach_bpms(SC, n_dim, last_bpm_ind, par) + if not par.quadOrdPhaseAdvance: + return np.ones(1), np.array([scaling]), bpm_ranges[np.newaxis, :] q_ord = par.quadOrdPhaseAdvance q_vec = par.quadStrengthPhaseAdvance q0 = SC.RING[q_ord].SetPointB[1] - all_bpm_ranges = np.zeros(len(q_vec)) + all_bpm_ranges = np.zeros((len(q_vec) + 1, len(SC.ORD.BPM))) + all_scalings = np.zeros(len(q_vec) + 1) + quads_strengths = np.concatenate((np.ones(1), q_vec)) + all_bpm_ranges[0, :] = bpm_ranges + all_scalings[0] = scaling for nQ in range(len(q_vec)): - LOGGER.debug(f'BBA-BPM range too small, try to change phase advance with quad ord {q_ord} to {q_vec[nQ]:.2f} of nom. SP.') SC = set_magnet_setpoints(SC, q_ord, q_vec[nQ], False, 1, method='rel', dipole_compensation=True) - kick_vec, bpm_range = _scale_injection_to_reach_bpm(SC, bpm_ind, n_dim, kick_vec0) - if bpm_range >= par.BBABPMtarget: - LOGGER.debug(f'Change phase advance with quad ord {q_ord} successful. BBA-BPM range = {1E6 * bpm_range:.0f} um.') - return SC, kick_vec - all_bpm_ranges[nQ] = bpm_range - - if all_bpm_ranges[-1] < np.max(all_bpm_ranges): - LOGGER.debug(f'Changing phase advance of quad with ord {q_ord} NOT succesfull, ' - f'returning to best value with BBA-BPM range = {1E6 * max(all_bpm_ranges):.0f}um.') - SC = set_magnet_setpoints(SC, q_ord, np.max(q_vec), False, 1, method='rel', dipole_compensation=True) - kick_vec, _ = _scale_injection_to_reach_bpm(SC, bpm_ind, n_dim, kick_vec0) - return SC, kick_vec - LOGGER.debug(f'Changing phase advance of quad with ord {q_ord} NOT succesfull, returning to initial setpoint.') + all_scalings[nQ + 1], all_bpm_ranges[nQ + 1] = _scale_injection_to_reach_bpms(SC, n_dim, last_bpm_ind, par) SC = set_magnet_setpoints(SC, q_ord, q0, False, 1, method='abs', dipole_compensation=True) - kick_vec, _ = _scale_injection_to_reach_bpm(SC, bpm_ind, n_dim, kick_vec0) - return SC, kick_vec + return quads_strengths, all_scalings, all_bpm_ranges + +def _scale_injection_to_reach_bpms(SC, n_dim, last_bpm_ind, par): + n_steps = min(par.nSteps, 10) + kick_vec0 = par.maxTrajChangeAtInjection.reshape(2, 1) * np.linspace(-1, 1, n_steps) + initial_z0 = SC.INJ.Z0.copy() + initial_nturns = SC.INJ.nTurns + SC.INJ.nTurns = 1 + scaling_factor = 1.0 + mask = np.ones(len(SC.ORD.BPM), dtype=bool) + if last_bpm_ind + 1 < len(SC.ORD.BPM): + mask[last_bpm_ind + 1:] = False + for scaling_factor in (1, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1): #for _ in range(6): + tmp_bpm_pos = np.full((par.nSteps, len(SC.ORD.BPM)), np.nan) + + for nK in range(par.nSteps): + SC.INJ.Z0[2 * n_dim:2 * n_dim + 2] = initial_z0[2 * n_dim:2 * n_dim + 2] + scaling_factor * kick_vec0[:, nK] + tmp_bpm_pos[nK, :] = bpm_reading(SC)[n_dim, :] + SC.INJ.Z0 = initial_z0.copy() + + if np.sum(np.isnan(tmp_bpm_pos[:, mask])) == 0: + bpm_ranges = np.max(tmp_bpm_pos, axis=0) - np.min(tmp_bpm_pos, axis=0) + LOGGER.debug(f'Initial trajectory variation scaled to [{100 * scaling_factor}| ' + f'{100 * scaling_factor}]% of its initial value, ' + f'BBA-BPM range from {1E6 * np.min(bpm_ranges):.0f} um to {1E6 * np.max(bpm_ranges):.0f}.') + SC.INJ.nTurns = initial_nturns + return scaling_factor, bpm_ranges + #scaling_factor *= 0.8 + else: + LOGGER.warning('Something went wrong. No beam transmission at all(?)') + SC.INJ.nTurns = initial_nturns + return scaling_factor, np.zeros(len(SC.ORD.BPM)) def _get_orbit_bump(SC, cm_ord, bpm_ord, n_dim, par): tmpCMind = np.where(par.RMstruct.CMords[0] == cm_ord)[0] @@ -354,15 +335,15 @@ def _plot_bba_step(SC, ax, bpm_ind, n_dim): bpm_readings, all_elements_positions = all_elements_reading(SC) ax.plot(s_pos[SC.ORD.BPM], 1E3 * bpm_readings[n_dim, :len(SC.ORD.BPM)], marker='o') ax.plot(s_pos[SC.ORD.BPM[bpm_ind]], 1E3 * bpm_readings[n_dim, bpm_ind], marker='o', markersize=10, markerfacecolor='k') - ax.plot(s_pos, 1E3 * all_elements_positions[n_dim, 0, :, 0,0], linestyle='-') + ax.plot(s_pos, 1E3 * all_elements_positions[n_dim, 0, :, 0, 0], linestyle='-') return ax -def plot_bba_results(SC, init_offset_errors, error_flags, bpm_ind, bpm_ords, mag_ords): +def plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords): plt.rcParams.update({'font.size': 10}) fom0 = init_offset_errors fom = _get_bpm_offset_from_mag(SC.RING, bpm_ords, mag_ords) - fom[:, bpm_ind + 1:] = np.nan + #fom[:, bpm_ind + 1:] = np.nan n_steps = 1 if bpm_ords.shape[1] == 1 else 1.1 * np.max(np.abs(fom0)) * np.linspace(-1, 1, int(np.floor(bpm_ords.shape[1] / 3))) f, ax = plt.subplots(nrows=3, num=90, figsize=(8, 8), facecolor="w") colors = ['#1f77b4', '#ff7f0e']