Skip to content

Commit

Permalink
BBA: error estimates and propagation
Browse files Browse the repository at this point in the history
    leads also to reduction of needed parameters
  • Loading branch information
lmalina committed Aug 16, 2023
1 parent 29320ad commit e6a4dcf
Showing 1 changed file with 68 additions and 117 deletions.
185 changes: 68 additions & 117 deletions pySC/correction/bba.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,18 @@
from pySC.utils import logging_tools
from pySC.core.classes import DotDict
from pySC.core.constants import TRACKING_MODES, TRACK_TBT, NUM_TO_AB
from pySC.utils.stats import weighted_mean, weighted_error, effective_sample_size, weights_from_errors

LOGGER = logging_tools.get_logger(__name__)

"""
Pieces of found function calls from ALS-U SR, i.e. nothing in PETRA IV
qOrd = SCgetOrds(SC.RING,'QF')
SC,errorFlags = SCBBA(SC, repmat(SC.ORD.BPM,2,1),QuadOrds, mode='TBT', fakeMeasForFailures=True,
quadOrdPhaseAdvance=qOrd[0], quadStrengthPhaseAdvance=[0.95 0.8 1.05], magSPvec=magSPvec, plotResults=SC.plot)
SC = SCBBA(SC, BPMordsQuadBBA, QuadOrds, mode='ORB',fakeMeasForFailures=True, outlierRejectionAt=200E-6, RMstruct=RMstruct,
plotResults=SC.plot, magSPvec=magSPvec, minSlopeForFit=0.005, minBPMrangeAtBBABBPM=1*20E-6,
BBABPMtarget=5*50E-6, minBPMrangeOtherBPM=0)
# Orbit correction (without weights)
SC = performOrbitCorr_ALSU_SR(SC,RMstruct,'weight=[])
# BBA on sextupoles (quad trim coils)
SC = SCBBA(SC, BPMordsSextBBA, sextOrds, mode='ORB',fakeMeasForFailures=True, outlierRejectionAt=200E-6,
RMstruct=RMstruct, plotResults=SC.plot, switch_off_sextupoles=True, setpoint_method='abs', magSPvec=magSPvecSext,
minSlopeForFit=0.005, minBPMrangeAtBBABBPM=1*10E-6, BBABPMtarget=5*50E-6, minBPMrangeOtherBPM=0)
"""


def bba(SC, bpm_ords, mag_ords, **kwargs):
par = DotDict(dict(mode=SC.INJ.trackMode, outlierRejectionAt=np.inf, nSteps=10, fit_order=1, magnet_order=1,
par = DotDict(dict(mode=SC.INJ.trackMode, nSteps=10, fit_order=1, magnet_order=1, skewness=False,
magSPvec=np.array([0.95, 1.05]), setpoint_method='rel', RMstruct=[], orbBumpWindow=5, BBABPMtarget=1E-3,
minBPMrangeAtBBABBPM=500E-6, minBPMrangeOtherBPM=100E-6, maxStdForFittedCenters=600E-6,
nXPointsNeededAtMeasBPM=3, maxNumOfDownstreamBPMs=len(SC.ORD.BPM), minSlopeForFit=0.03,
maxNumOfDownstreamBPMs=len(SC.ORD.BPM),
maxTrajChangeAtInjection=np.array([0.9E-3, 0.9E-3]), quadOrdPhaseAdvance=np.array([8]),
quadStrengthPhaseAdvance=np.array([0.95, 1.05]), fakeMeasForFailures=False, dipole_compensation=True,
skewness=False, switch_off_sextupoles=False, useBPMreadingsForOrbBumpRef=False, plotResults=False))
useBPMreadingsForOrbBumpRef=False, plotResults=False))
par.update(**kwargs)
if bpm_ords.shape != mag_ords.shape: # both in shape 2 x N
raise ValueError('Input arrays for BPMs and magnets must be same size.')
Expand All @@ -63,49 +43,37 @@ def bba(SC, bpm_ords, mag_ords, **kwargs):
quad_k.append(quads_strengths)
scalings.append(all_scalings)
bpm_ranges.append(all_bpm_ranges)
kick_vec0 = par.maxTrajChangeAtInjection.reshape(2, 1) * np.linspace(-1, 1, par.nSteps)

q0 = SC.RING[par.quadOrdPhaseAdvance].SetPointB[1]
for j_bpm in range(bpm_ords.shape[1]): # j_bpm: Index of BPM adjacent to magnet for BBA
LOGGER.info(f"BPM number {j_bpm}")
for n_dim in range(bpm_ords.shape[0]):
LOGGER.debug(f'BBA-BPM {j_bpm}/{bpm_ords.shape[1]}, n_dim = {n_dim}')
bpm_ind = np.where(bpm_ords[n_dim, j_bpm] == SC.ORD.BPM)[0][0]
m_ord = mag_ords[n_dim, j_bpm]
if par.switch_off_sextupoles:
SC0 = _very_deep_copy(SC)
SC = set_magnet_setpoints(SC, m_ord, setpoints=np.zeros(1), skewness=False, order=2, method='abs')
SC = SCfeedbackRun(SC, par.RMstruct.MinvCO, BPMords=par.RMstruct.BPMords, CMords=par.RMstruct.CMords,
target=0, maxsteps=50, scaleDisp=par.RMstruct.scaleDisp, eps=1E-6)
if par.mode == 'ORB':
# TODO very deep copy
SC0 = _very_deep_copy(SC)
bpm_pos, tmpTra = _data_measurement_orb(SC, m_ord, bpm_ind, j_bpm, n_dim, par,
*_get_orbit_bump(SC, m_ord, bpm_ords[n_dim, j_bpm], n_dim, par))
else:
set_ind = np.argmax(bpm_ranges[n_dim][:, bpm_ind])
if bpm_ranges[n_dim][set_ind, bpm_ind] < par.BBABPMtarget:
LOGGER.warning(f"Small range at BPM[{n_dim},{j_bpm}]")

SC = set_magnet_setpoints(SC, par.quadOrdPhaseAdvance, quad_k[n_dim][set_ind], False, 1, method='rel', dipole_compensation=True)
kick_vec = scalings[n_dim][set_ind] * par.maxTrajChangeAtInjection.reshape(2, 1) * np.linspace(-1, 1, par.nSteps)
bpm_pos, tmpTra = _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, par, kick_vec)
bpm_pos, tmpTra, mag_vec = _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, par, kick_vec)
SC = set_magnet_setpoints(SC, par.quadOrdPhaseAdvance, q0, False, 1, method='abs', dipole_compensation=True)

offset_changes[n_dim, j_bpm], error_flags[n_dim, j_bpm] = _data_evaluation(SC, bpm_pos, tmpTra, n_dim, m_ord, par)
if par.switch_off_sextupoles:
SC = SC0
# if not np.isnan(offset_change):
# SC.RING[bpm_ords[n_dim, j_bpm]].Offset[n_dim] += offset_change
# if par.plotResults:
# plot_bba_results(SC, init_offset_errors, error_flags, j_bpm, bpm_ords, mag_ords)

offset_changes[n_dim, j_bpm], error_flags[n_dim, j_bpm] = _data_evaluation(SC, bpm_pos, tmpTra, mag_vec, n_dim, m_ord, par)
errors = np.zeros(bpm_ords.shape, dtype=int)
for j_bpm in range(bpm_ords.shape[1]): # j_bpm: Index of BPM adjacent to magnet for BBA
for n_dim in range(bpm_ords.shape[0]):
if not np.isnan(offset_changes[n_dim, j_bpm]):
if not np.isnan(offset_changes[n_dim, j_bpm]) and np.abs(offset_changes[n_dim, j_bpm]) > 2.5 * error_flags[n_dim, j_bpm]:
SC.RING[bpm_ords[n_dim, j_bpm]].Offset[n_dim] += offset_changes[n_dim, j_bpm]
else:
errors[n_dim, j_bpm] = 1
LOGGER.warning(f"Poor resolution for BPM {j_bpm} in plane {n_dim}: {offset_changes[n_dim, j_bpm]}+-{error_flags[n_dim, j_bpm]}")
if par.plotResults:
plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords)
if par.fakeMeasForFailures:
SC = _fake_measurement(SC, bpm_ords, mag_ords, error_flags)
SC = _fake_measurement(SC, bpm_ords, mag_ords, errors)
return SC, error_flags, offset_changes


Expand All @@ -121,7 +89,7 @@ def _very_deep_copy(SC):

def _get_bpm_offset_from_mag(ring, bpm_ords, mag_ords):
offset = np.full(bpm_ords.shape, np.nan)
for n_dim in range(2):
for n_dim in range(bpm_ords.shape[0]):
offset[n_dim, :] = (atgetfieldvalues(ring, bpm_ords[n_dim, :], 'Offset', n_dim)
+ atgetfieldvalues(ring, bpm_ords[n_dim, :], 'SupportOffset', n_dim)
- atgetfieldvalues(ring, mag_ords[n_dim, :], 'MagnetOffset', n_dim)
Expand Down Expand Up @@ -168,7 +136,7 @@ def _data_measurement_tbt(SC, m_ord, bpm_ind, j_bpm, n_dim, par, kick_vec):
SC.INJ.Z0 = initialZ0
SC = set_magnet_setpoints(SC, m_ord, init_setpoint, par.skewness, par.magnet_order,
method="abs", dipole_compensation=par.dipole_compensation)
return BPMpos, tmpTra
return BPMpos, tmpTra, par.magSPvec[n_dim, j_bpm]


def _data_measurement_orb(SC, mOrd, BPMind, j_bpm, n_dim, par, CMords, cm_vec):
Expand All @@ -191,65 +159,47 @@ def _data_measurement_orb(SC, mOrd, BPMind, j_bpm, n_dim, par, CMords, cm_vec):
return BPMpos, tmpTra


def _data_evaluation(SC, bpm_pos, tmpTra, n_dim, m_ord, par):
offset_change = np.nan
Error = 5
tmpCenter = np.full(((tmpTra.shape[0] - 1) * par.maxNumOfDownstreamBPMs), np.nan)
tmpNorm = np.full(((tmpTra.shape[0] - 1) * par.maxNumOfDownstreamBPMs), np.nan)
tmpRangeX = np.zeros(((tmpTra.shape[0] - 1) * par.maxNumOfDownstreamBPMs))
tmpRangeY = np.zeros(((tmpTra.shape[0] - 1) * par.maxNumOfDownstreamBPMs))
i = 0
for nBPM in range(par.maxNumOfDownstreamBPMs):
y0 = np.diff(tmpTra[:, :, nBPM], 1, axis=1)
y0 = y0[:, np.sum(~np.isnan(y0), axis=0) > 0]
x0 = np.tile(np.mean(bpm_pos, 1), (y0.shape[1], 1)).T
for nKick in range(y0.shape[1]):
i = i + 1
y = y0[:, nKick]
x = x0[:, nKick]
x = x[~np.isnan(y)]
y = y[~np.isnan(y)]
# if len(x) == 0 or len(y) == 0:
# continue
tmpRangeX[i] = abs(np.min(x) - np.max(x))
tmpRangeY[i] = abs(np.min(y) - np.max(y))
sol = np.full((2,), np.nan)
if len(x) >= par.nXPointsNeededAtMeasBPM and tmpRangeX[i] > par.minBPMrangeAtBBABBPM and tmpRangeY[i] > par.minBPMrangeOtherBPM:
if par.fit_order == 1:
sol = np.linalg.lstsq(np.vstack((np.ones(x.shape), x)).T, y)[0]
sol = sol[[1, 0]]
if abs(sol[0]) < par.minSlopeForFit:
sol[0] = np.nan
tmpCenter[i] = -sol[1] / sol[0]
tmpNorm[i] = 1 / np.sqrt(np.sum((sol[0] * x + sol[1] - y) ** 2))
else:
sol = np.polyfit(x, y, par.fit_order)
if par.fit_order == 2:
tmpCenter[i] = - (sol[1] / (2 * sol[0]))
else:
tmpCenter[i] = min(abs(np.roots(sol)))
tmpNorm[i] = 1 / np.linalg.norm(np.polyval(sol, x) - y)

if np.max(tmpRangeX) < par.minBPMrangeAtBBABBPM:
Error = 1
elif np.max(tmpRangeY) < par.minBPMrangeOtherBPM:
Error = 2
elif np.std(tmpCenter, ddof=1) > par.maxStdForFittedCenters:
Error = 3
elif len(np.where(~np.isnan(tmpCenter))[0]) == 0:
Error = 4
else:
offset_change = np.nansum(tmpCenter * tmpNorm) / np.nansum(tmpNorm)
Error = 0
if not par.dipole_compensation and n_dim == 1 and SC.RING[m_ord].NomPolynomB[1] != 0:
B = getattr(SC.RING[m_ord], 'BendingAngle', 0)
K = SC.RING[m_ord].NomPolynomB[1]
L = SC.RING[m_ord].Length
offset_change = offset_change + B / L / K
if offset_change > par.outlierRejectionAt:
offset_change = np.nan
Error = 6
return offset_change, Error
def _data_evaluation(SC, bpm_pos, tmpTra, mag_vec, n_dim, m_ord, par):
x = np.mean(bpm_pos, axis=1)
x_mask = ~np.isnan(x)
err = np.mean(np.std(bpm_pos[x_mask, :], axis=1))
x = x[x_mask]
new_tmp_tra = tmpTra[x_mask, :, :]

tmp_slope = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan)
tmp_slope_err = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan)
center = np.full((new_tmp_tra.shape[2]), np.nan)
center_err = np.full((new_tmp_tra.shape[2]), np.nan)
for i in range(new_tmp_tra.shape[0]):
for j in range(new_tmp_tra.shape[2]):
y = new_tmp_tra[i, :, j]
y_mask = ~np.isnan(y)
if np.sum(y_mask) < 2:
continue
# TODO once the position errors are calculated and propagated, should be used
p, pcov = np.polyfit(mag_vec[y_mask], y[y_mask], 1, w=np.ones(int(np.sum(y_mask))) / err, cov='unscaled')
tmp_slope[i, j], tmp_slope_err[i, j] = p[0], pcov[0, 0]
for j in range(min(new_tmp_tra.shape[2], par.maxNumOfDownstreamBPMs)):
y = tmp_slope[:, j]
y_err = tmp_slope_err[:, j]
y_mask = ~np.isnan(y)
if np.sum(y_mask) <= par.fit_order:
continue
# TODO here do odr as the x values have also measurement errors
p, pcov = np.polyfit(x[y_mask], y[y_mask], par.fit_order, w=1 / y_err[y_mask], cov='unscaled')
if par.fit_order < 3:
center[j] = -p[1] / (par.fit_order * p[0]) # zero-crossing if linear, minimum is quadratic
center_err[j] = np.sqrt(center[j] ** 2 * (pcov[0,0]/p[0]**2 + pcov[1,1]/p[1]**2 - 2 * pcov[0,1] / p[0] / p[1]))
# TODO perhaps treat the unlikely case when quadratic would have a maximum
else:
raise NotImplementedError

mask = ~np.isnan(center)
offset_change = weighted_mean(center[mask], center_err[mask])
offset_change_error = weighted_error(center[mask]-offset_change, center_err[mask]) / np.sqrt(effective_sample_size(center[mask], weights_from_errors(center_err[mask])))
if not par.dipole_compensation and n_dim == 0 and SC.RING[m_ord].NomPolynomB[1] != 0:
offset_change += getattr(SC.RING[m_ord], 'BendingAngle', 0) / SC.RING[m_ord].NomPolynomB[1] / SC.RING[m_ord].Length
return offset_change, offset_change_error


def single_phase_advance_injection_scan(SC, n_dim, last_bpm_ind, par):
Expand Down Expand Up @@ -303,6 +253,7 @@ def _scale_injection_to_reach_bpms(SC, n_dim, last_bpm_ind, par):
SC.INJ.nTurns = initial_nturns
return scaling_factor, np.zeros(len(SC.ORD.BPM))


def _get_orbit_bump(SC, cm_ord, bpm_ord, n_dim, par):
tmpCMind = np.where(par.RMstruct.CMords[0] == cm_ord)[0]
if len(tmpCMind):
Expand Down Expand Up @@ -330,20 +281,10 @@ def _get_orbit_bump(SC, cm_ord, bpm_ord, n_dim, par):
return CMords, CMvec


def _plot_bba_step(SC, ax, bpm_ind, n_dim):
s_pos = findspos(SC.RING)
bpm_readings, all_elements_positions = all_elements_reading(SC)
ax.plot(s_pos[SC.ORD.BPM], 1E3 * bpm_readings[n_dim, :len(SC.ORD.BPM)], marker='o')
ax.plot(s_pos[SC.ORD.BPM[bpm_ind]], 1E3 * bpm_readings[n_dim, bpm_ind], marker='o', markersize=10, markerfacecolor='k')
ax.plot(s_pos, 1E3 * all_elements_positions[n_dim, 0, :, 0, 0], linestyle='-')
return ax


def plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords):
plt.rcParams.update({'font.size': 10})
fom0 = init_offset_errors
fom = _get_bpm_offset_from_mag(SC.RING, bpm_ords, mag_ords)
#fom[:, bpm_ind + 1:] = np.nan
n_steps = 1 if bpm_ords.shape[1] == 1 else 1.1 * np.max(np.abs(fom0)) * np.linspace(-1, 1, int(np.floor(bpm_ords.shape[1] / 3)))
f, ax = plt.subplots(nrows=3, num=90, figsize=(8, 8), facecolor="w")
colors = ['#1f77b4', '#ff7f0e']
Expand All @@ -363,6 +304,7 @@ def plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords):
ax[0].set_ylabel('Occurrences')

mask_errors = error_flags == 0
mask_errors = ~np.isnan(error_flags)
plabels = ("Horizontal", "Vertical")
for n_dim in range(bpm_ords.shape[0]):
x = np.where(np.in1d(SC.ORD.BPM, bpm_ords[n_dim, :]))[0]
Expand All @@ -384,3 +326,12 @@ def plot_bba_results(SC, init_offset_errors, error_flags, bpm_ords, mag_ords):
ax[2].legend()
f.tight_layout()
f.show()


def _plot_bba_step(SC, ax, bpm_ind, n_dim):
s_pos = findspos(SC.RING)
bpm_readings, all_elements_positions = all_elements_reading(SC)
ax.plot(s_pos[SC.ORD.BPM], 1E3 * bpm_readings[n_dim, :len(SC.ORD.BPM)], marker='o')
ax.plot(s_pos[SC.ORD.BPM[bpm_ind]], 1E3 * bpm_readings[n_dim, bpm_ind], marker='o', markersize=10, markerfacecolor='k')
ax.plot(s_pos, 1E3 * all_elements_positions[n_dim, 0, :, 0, 0], linestyle='-')
return ax

0 comments on commit e6a4dcf

Please sign in to comment.