Skip to content

Commit

Permalink
Added LOCO (#50)
Browse files Browse the repository at this point in the history
* Can include also BPM and corrector scalings in the calculation (not yet in correction)


---------

Co-authored-by: elafmusa <“[email protected]”>
Co-authored-by: malina <[email protected]>
  • Loading branch information
3 people authored Jan 22, 2024
1 parent b032216 commit 6bd2d5f
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 2 deletions.
168 changes: 168 additions & 0 deletions pySC/correction/loco.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import at
import numpy as np
import multiprocessing
from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion
from pySC.core.constants import SETTING_ADD, TRACK_ORB
from pySC.core.beam import bpm_reading
from pySC.utils.sc_tools import SCgetPinv
import matplotlib.pyplot as plt
from scipy.optimize import least_squares
from pySC.utils import logging_tools
LOGGER = logging_tools.get_logger(__name__)


def calculate_jacobian(SC, C_model, dkick, used_cor_ind, bpm_indexes, quads_ind, dk, trackMode=TRACK_ORB,
useIdealRing=True, skewness=False, order=1, method=SETTING_ADD, includeDispersion=False, rf_step=1E3,
cav_ords=None, full_jacobian=True):
pool = multiprocessing.Pool()
quad_args = [(quad_index, SC, C_model, dkick, used_cor_ind, bpm_indexes, dk, trackMode, useIdealRing,
skewness, order, method, includeDispersion, rf_step, cav_ords) for quad_index in quads_ind]
# results = []
# for quad_arg in quad_args:
# results.append(generating_quads_response_matrices(quad_arg))
results = pool.map(generating_quads_response_matrices, quad_args)
pool.close()
pool.join()

results = [result / dk for result in results]
if full_jacobian: # Construct the complete Jacobian matrix for the LOCO
# assuming only linear scaling errors of BPMs and corrector magnets
n_correctors = len(np.concatenate(used_cor_ind))
n_bpms = len(bpm_indexes) * 2 # in both planes
j_cor = np.zeros((n_correctors,) + C_model.shape)
for i in range(n_correctors):
j_cor[i, :, i] = C_model[:, i] # a single column of response matrix corresponding to a corrector
j_bpm = np.zeros((n_bpms,) + C_model.shape)
for i in range(n_bpms):
j_bpm[i, i, :] = C_model[i, :] # a single row of response matrix corresponding to a given plane of BPM
return np.concatenate((results, j_cor, j_bpm), axis=0)
return results


def generating_quads_response_matrices(args):
(quad_index, SC, C_model, correctors_kick, used_cor_indexes, used_bpm_indexes, dk, trackMode, useIdealRing,
skewness, order, method, includeDispersion, rf_step, cav_ords) = args
LOGGER.debug('generating response to quad of index', quad_index)
if not includeDispersion:
SC.set_magnet_setpoints(quad_index, dk, skewness, order, method)
C_measured = SCgetModelRM(SC, used_bpm_indexes, used_cor_indexes, dkick=correctors_kick,
useIdealRing=useIdealRing,
trackMode=trackMode)
SC.set_magnet_setpoints(quad_index, -dk, skewness, order, method)
return C_measured - C_model

dispersion_model = SCgetModelDispersion(SC, used_bpm_indexes, CAVords=cav_ords)
SC.set_magnet_setpoints(quad_index, dk, skewness, order, method)
C_measured = SCgetModelRM(SC, used_bpm_indexes, used_cor_indexes, dkick=correctors_kick, useIdealRing=useIdealRing,
trackMode=trackMode)
dispersion_meas = SCgetModelDispersion(SC, used_bpm_indexes, CAVords=cav_ords, rfStep=rf_step)
SC.set_magnet_setpoints(quad_index, -dk, skewness, order, method)
return np.hstack((C_measured - C_model, (dispersion_meas - dispersion_model).reshape(-1, 1)))


def measure_closed_orbit_response_matrix(SC, bpm_ords, cm_ords, dkick=1e-5):
LOGGER.info('Calculating Measure response matrix')
n_turns = 1
n_bpms = len(bpm_ords)
n_cms = len(cm_ords[0]) + len(cm_ords[1])
response_matrix = np.full((2 * n_bpms * n_turns, n_cms), np.nan)
SC.INJ.trackMode = TRACK_ORB # TODO may modify SC (not a pure function)!

closed_orbits0 = bpm_reading(SC, bpm_ords=bpm_ords)[0]
cnt = 0
for n_dim in range(2):
for cm_ord in cm_ords[n_dim]:
SC.set_cm_setpoints(cm_ord, dkick, skewness=bool(n_dim), method=SETTING_ADD)
closed_orbits1 = bpm_reading(SC, bpm_ords=bpm_ords)[0]
SC.set_cm_setpoints(cm_ord, -dkick, skewness=bool(n_dim), method=SETTING_ADD)
response_matrix[:, cnt] = np.ravel((closed_orbits1 - closed_orbits0) / dkick)
cnt = cnt + 1
return response_matrix


def loco_correction_lm(initial_guess0, orm_model, orm_measured, Jn, lengths, including_fit_parameters, bounds=(-np.inf, np.inf), weights=1,
verbose=2):
mask = _get_parameters_mask(including_fit_parameters, lengths)
result = least_squares(lambda delta_params: objective(delta_params, orm_measured - orm_model, Jn[mask, :, :], weights),
initial_guess0[mask], #bounds=bounds,
method="lm",
verbose=verbose)
return result.x


def loco_correction_ng(initial_guess0, orm_model, orm_measured, J, lengths, including_fit_parameters, s_cut, weights=1):
initial_guess = initial_guess0.copy()
mask = _get_parameters_mask(including_fit_parameters, lengths)
residuals = objective(initial_guess[mask], orm_measured - orm_model, J[mask, :, :], weights)
r = residuals.reshape(np.transpose(orm_model).shape)
t2 = np.zeros([len(initial_guess[mask]), 1])
for i in range(len(initial_guess[mask])):
t2[i] = np.sum(np.dot(np.dot(J[i].T, weights), r.T))
return get_inverse(J[mask, :, :], t2, s_cut, weights)


def objective(masked_params, orm_residuals, masked_jacobian, weights):
return np.dot(np.transpose(orm_residuals - np.einsum("ijk,i->jk", masked_jacobian, masked_params)),
np.sqrt(weights)).ravel()


def _get_parameters_mask(including_fit_parameters, lengths):
len_quads, len_corr, len_bpm = lengths
mask = np.zeros(len_quads + len_corr + len_bpm, dtype=bool)
mask[:len_quads] = 'quads' in including_fit_parameters
mask[len_quads:len_quads + len_corr] = 'cor' in including_fit_parameters
mask[len_quads + len_corr:] = 'bpm' in including_fit_parameters
return mask


def set_correction(SC, r, elem_ind, individuals=True, skewness=False, order=1, method=SETTING_ADD, dipole_compensation=True):
if individuals:
SC.set_magnet_setpoints(elem_ind, -r, skewness, order, method, dipole_compensation=dipole_compensation)
return SC

for fam_num, quad_fam in enumerate(elem_ind):
SC.set_magnet_setpoints(quad_fam, -r[fam_num], skewness, order, method, dipole_compensation=dipole_compensation)
return SC


def model_beta_beat(ring, twiss, elements_indexes, plot=False):
_, _, twiss_error = at.get_optics(ring, elements_indexes)
s_pos = twiss_error.s_pos
bx = np.array(twiss_error.beta[:, 0] / twiss.beta[:, 0] - 1)
by = np.array(twiss_error.beta[:, 1] / twiss.beta[:, 1] - 1)
bx_rms = np.sqrt(np.mean(bx ** 2))
by_rms = np.sqrt(np.mean(by ** 2))

if plot:
init_font = plt.rcParams["font.size"]
plt.rcParams.update({'font.size': 14})

fig, ax = plt.subplots(nrows=2, sharex="all")
betas = [bx, by]
letters = ("x", "y")
for i in range(2):
ax[i].plot(s_pos, betas[i])
ax[i].set_xlabel("s_pos [m]")
ax[i].set_ylabel(rf'$\Delta\beta_{letters[i]}$ / $\beta_{letters[i]}$')
ax[i].ticklabel_format(axis='y', style='sci', scilimits=(0, 0))
ax[i].grid(True, which='both', linestyle=':', color='gray')

fig.show()
plt.rcParams.update({'font.size': init_font})

return bx_rms, by_rms


def select_equally_spaced_elements(total_elements, num_elements):
step = len(total_elements) // (num_elements - 1)
return total_elements[::step]


def get_inverse(jacobian, B, s_cut, weights, plot=False):
n_resp_mats = len(jacobian)
sum_corr = np.sum(jacobian, axis=2) # Sum over i and j for all planes
matrix = np.dot(np.dot(sum_corr, weights), sum_corr.T)
inv_matrix = SCgetPinv(matrix, num_removed=n_resp_mats - min(n_resp_mats, s_cut), plot=plot)
results = np.ravel(np.dot(inv_matrix, B))
# e = np.ravel(np.dot(matrix, results)) - np.ravel(B)
return results
4 changes: 2 additions & 2 deletions pySC/correction/loco_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def loco_model(SC, **kwargs):
))
flags.update(**kwargs)
ring_data, init = DotDict(), DotDict()
ring_data.CavityFrequency = SC.IDEALRING[SC.ORD.RF].Frequency
ring_data.CavityHarmNumber = SC.IDEALRING[SC.ORD.RF].HarmNumber
ring_data.CavityFrequency = SC.IDEALRING[SC.ORD.RF][0].Frequency
ring_data.CavityHarmNumber = SC.IDEALRING[SC.ORD.RF][0].HarmNumber
ring_data.Lattice = SC.IDEALRING
init.SC = SC
return ring_data, flags, init
Expand Down
Binary file added tests/inputs/hmba.mat
Binary file not shown.
105 changes: 105 additions & 0 deletions tests/test_loco.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import at
import numpy as np
import pytest
from pathlib import Path
from pySC.core.simulated_commissioning import SimulatedCommissioning
from pySC.utils.sc_tools import SCgetOrds
from pySC.utils import logging_tools
from pySC.correction import loco
from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion

LOGGER = logging_tools.get_logger(__name__)
INPUTS = Path(__file__).parent / "inputs"


def test_loco_hmba(at_ring):
np.random.seed(12345678)
sc = SimulatedCommissioning(at_ring)
sc.register_bpms(SCgetOrds(sc.RING, 'BPM'), Roll=0.0, CalError=1E-2 * np.ones(2), NoiseCO=np.array([1e-7, 1E-7]))
sc.register_magnets(SCgetOrds(sc.RING, 'QF|QD'), CalErrorB=np.array([0, 1E-2])) # relative
sc.register_magnets(SCgetOrds(sc.RING, 'CXY'), CalErrorA=np.array([1E-4, 0]), CalErrorB=np.array([1E-4, 0]))
sc.register_magnets(SCgetOrds(sc.RING, 'BEND'))
sc.register_magnets(SCgetOrds(sc.RING, 'SF|SD')) # [1/m]
sc.register_cavities(SCgetOrds(sc.RING, 'RFC'))
sc.apply_errors()
cor_ords = SCgetOrds(sc.RING, 'CXY')

used_correctors1 = loco.select_equally_spaced_elements(cor_ords, 10)
used_correctors2 = loco.select_equally_spaced_elements(cor_ords, 10)
used_cor_ords = [used_correctors1, used_correctors2]
used_bpms_ords = loco.select_equally_spaced_elements(sc.ORD.BPM, len(sc.ORD.BPM))
cav_ords = SCgetOrds(sc.RING, 'RFC')
quads_ords = [SCgetOrds(sc.RING, 'QF'), SCgetOrds(sc.RING, 'QD')]

CMstep = np.array([1.e-4]) # correctors change [rad]
dk = 1.e-4 # quads change
RFstep = 1e3

_, _, twiss = at.get_optics(sc.IDEALRING, sc.ORD.BPM)
orbit_response_matrix_model = SCgetModelRM(sc, used_bpms_ords, used_cor_ords, trackMode='ORB', useIdealRing=True, dkick=CMstep)
model_dispersion = SCgetModelDispersion(sc, used_bpms_ords, cav_ords, trackMode='ORB', Z0=np.zeros(6), nTurns=1,
rfStep=RFstep, useIdealRing=True)
Jn = loco.calculate_jacobian(sc, orbit_response_matrix_model, CMstep, used_cor_ords, used_bpms_ords, np.concatenate(quads_ords), dk,
trackMode='ORB', useIdealRing=False, skewness=False, order=1, method='add',
includeDispersion=False, rf_step=RFstep, cav_ords=cav_ords)
weights = np.eye(len(used_bpms_ords) * 2)
n_singular_values = 20

_, _, twiss_err = at.get_optics(sc.RING, sc.ORD.BPM)
bx_rms_err, by_rms_err = loco.model_beta_beat(sc.RING, twiss, sc.ORD.BPM, plot=False)
info_tab = 14 * " "
LOGGER.info("RMS Beta-beating before LOCO:\n"
f"{info_tab}{bx_rms_err * 100:04.2f}% horizontal\n{info_tab}{by_rms_err * 100:04.2f}% vertical ")
n_iter = 3

for x in range(n_iter): # optics correction using QF and QD
LOGGER.info(f'LOCO iteration {x}')
orbit_response_matrix_measured = loco.measure_closed_orbit_response_matrix(sc, used_bpms_ords, used_cor_ords, CMstep)
n_quads, n_corrs, n_bpms = len(np.concatenate(quads_ords)), len(np.concatenate(used_cor_ords)), len(used_bpms_ords) * 2
bx_rms_err, by_rms_err = loco.model_beta_beat(sc.RING, twiss, sc.ORD.BPM, plot=False)
total_length = n_bpms + n_corrs + n_quads
lengths = [n_quads, n_corrs, n_bpms]
including_fit_parameters = ['quads', 'cor', 'bpm']
initial_guess = np.zeros(total_length)

initial_guess[:lengths[0]] = 1e-6
initial_guess[lengths[0]:lengths[0] + lengths[1]] = 1e-6
initial_guess[lengths[0] + lengths[1]:] = 1e-6

# method lm (least squares)
fit_parameters = loco.loco_correction_lm(initial_guess, orbit_response_matrix_model,
orbit_response_matrix_measured, Jn, lengths,
including_fit_parameters, bounds=(-0.03, 0.03), weights=weights, verbose=2)

# method ng
# fit_parameters = loco.loco_correction_ng(initial_guess, orbit_response_matrix_model,
# orbit_response_matrix_measured, Jn, lengths,
# including_fit_parameters, n_singular_values, weights=weights)

dg = fit_parameters[:lengths[0]] if len(fit_parameters) > n_quads else fit_parameters
sc = loco.set_correction(sc, dg, np.concatenate(quads_ords))
bx_rms_cor, by_rms_cor = loco.model_beta_beat(sc.RING, twiss, sc.ORD.BPM, plot=False)
LOGGER.info(f"RMS Beta-beating after {x + 1} LOCO iterations:\n"
f"{info_tab}{bx_rms_cor * 100:04.2f}% horizontal\n{info_tab}{by_rms_cor * 100:04.2f}% vertical ")
LOGGER.info(f"Correction reduction: \n"
f" beta_x: {(1 - bx_rms_cor / bx_rms_err) * 100:.2f}%\n"
f" beta_y: {(1 - by_rms_cor / by_rms_err) * 100:.2f}%\n")
assert bx_rms_cor < 0.002
assert by_rms_cor < 0.002


@pytest.fixture
def at_ring():
ring = at.load_mat(f'{INPUTS}/hmba.mat')
bpm_indexes = at.get_refpts(ring, at.elements.Monitor)
for bpm_index in reversed(bpm_indexes):
corrector = at.elements.Corrector(f'CXY{bpm_index}', length=0, kick_angle=[0, 0], PolynomA=[0, 0],
PolynomB=[0, 0])
ring.insert(bpm_index + 1, corrector)
ring.enable_6d()
at.set_cavity_phase(ring)
at.set_rf_frequency(ring)

ring.tapering(niter=3, quadrupole=True, sextupole=True)
ring = at.Lattice(ring)
return ring

0 comments on commit 6bd2d5f

Please sign in to comment.