Skip to content

Commit

Permalink
Simplifying lattice settings (#33)
Browse files Browse the repository at this point in the history
* Unified setting methods signatures (SC, ords, setpoints, ...)

* Expanded the allowed types for ords and setpoints (not only `np.array`) : simplified usage

* All setting methods return only the SC structure

* Added tests
  • Loading branch information
lmalina authored Aug 9, 2023
1 parent 76ac73e commit d594448
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 176 deletions.
197 changes: 92 additions & 105 deletions pySC/core/lattice_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,154 +5,141 @@
This module contains the 'machine-based' functions to interact with lattice under study.
"""
import numpy as np
from typing import Tuple
from typing import Union, List

from at import Lattice
from numpy import ndarray

from pySC.core.constants import SETTING_METHODS, SETTING_ABS, SETTING_REL, SETTING_ADD
from pySC.core.constants import SETTING_METHODS, SETTING_ABS, SETTING_REL, SETTING_ADD, NUM_TO_AB
from pySC.core.simulated_commissioning import SimulatedCommissioning
from pySC.utils.at_wrapper import atgetfieldvalues
from pySC.utils import logging_tools

LOGGER = logging_tools.get_logger(__name__)


def set_cavity_setpoints(SC: SimulatedCommissioning, ords: ndarray, type: str, setpoints: ndarray,
method: str = SETTING_ABS) -> SimulatedCommissioning:
new_setpoints = _check_input_and_setpoints(method, ords, setpoints)
setpoint_str = f"{type}SetPoint"
for i, ord in enumerate(ords):
new_setpoint = new_setpoints[i]
if method == SETTING_REL:
new_setpoint *= getattr(SC.RING[ord], setpoint_str)
if method == SETTING_ADD:
new_setpoint += getattr(SC.RING[ord], setpoint_str)
setattr(SC.RING[ord], setpoint_str, new_setpoint)
SC.update_cavities(ords)
SETPOINT = "SetPoint"


def set_cavity_setpoints(SC: SimulatedCommissioning,
ords: Union[int, List[int], ndarray],
setpoints: Union[float, List[float], ndarray],
param: str, method: str = SETTING_ABS) -> SimulatedCommissioning:
ords_1d, setpoints_1d = _check_input_and_setpoints(method, ords, setpoints)
setpoint_str = f"{param}{SETPOINT}"
if method == SETTING_REL:
setpoints_1d *= atgetfieldvalues(SC.RING, ords_1d, setpoint_str)
if method == SETTING_ADD:
setpoints_1d += atgetfieldvalues(SC.RING, ords_1d, setpoint_str)
for i, ord in enumerate(ords_1d):
setattr(SC.RING[ord], setpoint_str, setpoints_1d[i])
SC.update_cavities(ords_1d)
return SC


def switch_rf(ring: Lattice, ords: ndarray, state: bool) -> Lattice:
cavs = [i for i in ords if hasattr(ring[i], 'Frequency')]
cavs = [i for i in np.ravel(np.array([ords], dtype=int)) if hasattr(ring[i], 'Frequency')]
for ind in cavs:
ring[ind].PassMethod = 'RFCavityPass' if state else 'IdentityPass'
return ring


def get_cm_setpoints(SC: SimulatedCommissioning, ords: ndarray, skewness: bool) -> ndarray:
setpoints = np.nan*np.ones(len(ords))
def get_cm_setpoints(SC: SimulatedCommissioning, ords: Union[int, List[int], ndarray], skewness: bool) -> ndarray:
ords_1d = np.ravel(np.array([ords], dtype=int))
order = 0
ndim = 1 if skewness else 0
for i, ord in enumerate(ords):
if SC.RING[ord].PassMethod == 'CorrectorPass':
norm_by = np.array([1, 1])
else:
ndim = int(skewness)
letter = NUM_TO_AB[ndim]
setpoints = atgetfieldvalues(SC.RING, ords_1d, f"{SETPOINT}{letter}", order)
for i, ord1d in enumerate(ords_1d):
if SC.RING[ord1d].PassMethod != 'CorrectorPass':
# positive setpoint -> positive kick -> negative horizontal field
norm_by = np.array([-1, 1]) * SC.RING[ord].Length
if skewness:
setpoints[i] = SC.RING[ord].SetPointA[order] * norm_by[ndim]
else:
setpoints[i] = SC.RING[ord].SetPointB[order] * norm_by[ndim]
setpoints[i] *= (-1) ** (ndim + 1) * SC.RING[ord1d].Length
return setpoints


def set_cm_setpoints(SC: SimulatedCommissioning, ords: ndarray, setpoints: ndarray, skewness: bool,
method: str = SETTING_ABS) -> Tuple[SimulatedCommissioning, ndarray]:
new_setpoints = _check_input_and_setpoints(method, ords, setpoints)
def set_cm_setpoints(SC: SimulatedCommissioning,
ords: Union[int, List[int], ndarray],
setpoints: Union[float, List[float], ndarray],
skewness: bool, method: str = SETTING_ABS) -> SimulatedCommissioning:
# TODO corrector does not have PolynomA/B in at?
ords_1d, setpoints_1d = _check_input_and_setpoints(method, ords, setpoints)
order = 0
ndim = 1 if skewness else 0
for i, ord in enumerate(ords):
if SC.RING[ord].PassMethod == 'CorrectorPass':
norm_by = np.array([1, 1])
else:
# positive setpoint -> positive kick -> negative horizontal field
norm_by = np.array([-1, 1]) * SC.RING[ord].Length
ndim = int(skewness)
letter = NUM_TO_AB[ndim]
for i, ord in enumerate(ords_1d):
# positive setpoint -> positive kick -> negative horizontal field
norm_by = (-1) ** (ndim + 1) * SC.RING[ord].Length if SC.RING[ord].PassMethod != 'CorrectorPass' else 1
if method == SETTING_REL:
new_setpoints[i] *= (SC.RING[ord].SetPointA[order] if skewness else SC.RING[ord].SetPointB[order]) * norm_by[ndim]
setpoints_1d[i] *= getattr(SC.RING[ord], f"{SETPOINT}{letter}")[order] * norm_by
if method == SETTING_ADD:
new_setpoints[i] += (SC.RING[ord].SetPointA[order] if skewness else SC.RING[ord].SetPointB[order]) * norm_by[ndim]

if hasattr(SC.RING[ord], 'CMlimit') and abs(new_setpoints[i]) > abs(SC.RING[ord].CMlimit[ndim]):
setpoints_1d[i] += getattr(SC.RING[ord], f"{SETPOINT}{letter}")[order] * norm_by
if hasattr(SC.RING[ord], 'CMlimit') and abs(setpoints_1d[i]) > abs(SC.RING[ord].CMlimit[ndim]):
LOGGER.info(f'CM (ord: {ord} / dim: {ndim}) is clipping')
new_setpoints[i] = np.sign(new_setpoints[i]) * SC.RING[ord].CMlimit[ndim]
if skewness:
SC.RING[ord].SetPointA[order] = new_setpoints[i] / norm_by[ndim]
else:
SC.RING[ord].SetPointB[order] = new_setpoints[i] / norm_by[ndim]
SC.update_magnets(ords)
return SC, new_setpoints


def set_magnet_setpoints(SC: SimulatedCommissioning, ords: ndarray, skewness: bool, order: int, setpoints: ndarray,
method: str = SETTING_ABS, dipole_compensation: bool = False) -> SimulatedCommissioning:
new_setpoints = _check_input_and_setpoints(method, ords, setpoints)
for i, ord in enumerate(ords):
if method == SETTING_REL:
new_setpoints[i] *= SC.RING[ord].NomPolynomA[order] if skewness else SC.RING[ord].NomPolynomB[order]
if method == SETTING_ADD:
new_setpoints[i] += SC.RING[ord].SetPointA[order] if skewness else SC.RING[ord].SetPointB[order]
if skewness and order == 1: # skew quad
if hasattr(SC.RING[ord], 'SkewQuadLimit') and abs(new_setpoints[i]) > abs(SC.RING[ord].SkewQuadLimit):
LOGGER.info(f'SC:SkewLim \n Skew quadrupole (ord: {ord}) is clipping')
new_setpoints[i] = np.sign(new_setpoints[i]) * SC.RING[ord].SkewQuadLimit
setpoints_1d[i] = np.sign(setpoints_1d[i]) * SC.RING[ord].CMlimit[ndim]
getattr(SC.RING[ord], f"{SETPOINT}{letter}")[order] = setpoints_1d[i] / norm_by

SC.update_magnets(ords_1d)
return SC


def set_magnet_setpoints(SC: SimulatedCommissioning,
ords: Union[int, List[int], ndarray],
setpoints: Union[float, List[float], ndarray],
skewness: bool, order: int, method: str = SETTING_ABS,
dipole_compensation: bool = False) -> SimulatedCommissioning:
ords_1d, setpoints_1d = _check_input_and_setpoints(method, ords, setpoints)
letter = NUM_TO_AB[int(skewness)]
if method == SETTING_REL:
setpoints_1d *= atgetfieldvalues(SC.RING, ords_1d, f"NomPolynom{letter}", order)
if method == SETTING_ADD:
setpoints_1d += atgetfieldvalues(SC.RING, ords_1d, f"{SETPOINT}{letter}", order)
for i, ord in enumerate(ords_1d):
if skewness and order == 1 and getattr(SC.RING[ord], 'SkewQuadLimit', np.inf) < np.abs(setpoints_1d[i]):
LOGGER.info(f'SkewLim \n Skew quadrupole (ord: {ord}) is clipping')
setpoints_1d[i] = np.sign(setpoints_1d[i]) * SC.RING[ord].SkewQuadLimit
# TODO should check CF magnets
if dipole_compensation and order == 1: # quad # TODO check also skewness?
SC = _dipole_compensation(SC, ord, new_setpoints[i])
if skewness:
SC.RING[ord].SetPointA[order] = new_setpoints[i]
else:
SC.RING[ord].SetPointB[order] = new_setpoints[i]
SC.update_magnets(ords)
SC = _dipole_compensation(SC, ord, setpoints_1d[i])
getattr(SC.RING[ord], f"{SETPOINT}{letter}")[order] = setpoints_1d[i]

SC.update_magnets(ords_1d)
return SC


def SCcronoff(ring: Lattice, *args: str) -> Lattice: # TODO some at methods do that?
def switch_cavity_and_radiation(ring: Lattice, *args: str) -> Lattice: # TODO some at methods do that?
valid_args = ('radiationoff', 'radiationon', 'cavityoff', 'cavityon')
if invalid_args := [arg for arg in args if arg not in valid_args]:
raise ValueError(f"Unknown arguments found: {invalid_args}"
f"Available options are: {valid_args}")
for mode in args:
if mode == 'radiationoff':
for ind in range(len(ring)):
if ring[ind].PassMethod == 'BndMPoleSymplectic4RadPass':
ring[ind].PassMethod = 'BndMPoleSymplectic4Pass'
elif ring[ind].PassMethod == 'BndMPoleSymplectic4E2RadPass':
ring[ind].PassMethod = 'BndMPoleSymplectic4E2Pass'
elif ring[ind].PassMethod == 'StrMPoleSymplectic4RadPass':
ring[ind].PassMethod = 'StrMPoleSymplectic4Pass'
elif mode == 'radiationon':
for ind in range(len(ring)):
if ring[ind].PassMethod == 'BndMPoleSymplectic4Pass':
ring[ind].PassMethod = 'BndMPoleSymplectic4RadPass'
elif ring[ind].PassMethod == 'BndMPoleSymplectic4E2Pass':
ring[ind].PassMethod = 'BndMPoleSymplectic4E2RadPass'
elif ring[ind].PassMethod == 'StrMPoleSymplectic4Pass':
ring[ind].PassMethod = 'StrMPoleSymplectic4RadPass'
elif mode == 'cavityoff':
for ind in range(len(ring)):
if hasattr(ring[ind], 'Frequency'):
ring[ind].PassMethod = 'IdentityPass'
elif mode == 'cavityon':
for ind in range(len(ring)):
if hasattr(ring[ind], 'Frequency'):
ring[ind].PassMethod = 'RFCavityPass'
non_rad_pass_methods = ['BndMPoleSymplectic4Pass', 'BndMPoleSymplectic4E2Pass', 'StrMPoleSymplectic4Pass']
rad_pass_methods = [method.replace("Pass", "RadPass") for method in non_rad_pass_methods]

if 'radiationoff' in args:
for ind in range(len(ring)):
if ring[ind].PassMethod in rad_pass_methods:
ring[ind].PassMethod = ring[ind].PassMethod.replace("Rad", "")
elif 'radiationon' in args:
for ind in range(len(ring)):
if ring[ind].PassMethod in non_rad_pass_methods:
ring[ind].PassMethod = ring[ind].PassMethod.replace("Pass", "RadPass")
if 'cavityoff' in args:
return switch_rf(ring, np.arange(len(ring)), False)
elif 'cavityon' in args:
return switch_rf(ring, np.arange(len(ring)), True)
return ring


def _dipole_compensation(SC, ord, setpoint):
if not (hasattr(SC.RING[ord], 'BendingAngle') and SC.RING[ord].BendingAngle != 0 and ord in SC.ORD.CM[0]):
return SC
ideal_kick_difference = ((setpoint - (SC.RING[ord].SetPointB[1] - SC.RING[ord].NomPolynomB[1])) /
SC.RING[ord].NomPolynomB[1] - 1) * SC.RING[ord].BendingAngle / SC.RING[ord].Length
SC, _ = set_cm_setpoints(SC, ord, ideal_kick_difference * SC.RING[ord].Length, skewness=False, method=SETTING_ADD)
if getattr(SC.RING[ord], 'BendingAngle', 0) != 0 and ord in SC.ORD.HCM:
return set_cm_setpoints(
SC, ord, (setpoint - SC.RING[ord].SetPointB[1]) / SC.RING[ord].NomPolynomB[1] * SC.RING[ord].BendingAngle,
skewness=False, method=SETTING_ADD)
return SC


def _check_input_and_setpoints(method, ords, setpoints):
if method not in SETTING_METHODS:
raise ValueError(f'Unsupported setpoint method: {method}. Allowed options are: {SETTING_METHODS}.')
if len(setpoints) not in (1, len(ords)) or np.prod(setpoints.shape) > len(ords):
ords_1d = np.ravel(np.array([ords], dtype=int))
setpoints_1d = np.ravel(np.array([setpoints]))
if len(setpoints_1d) not in (1, len(ords_1d)):
raise ValueError(f'Setpoints have to have length of 1 or matching to the length or ordinates.')
if len(setpoints) == 1:
return np.repeat(setpoints, len(ords))
return setpoints.copy()
return ords_1d, (np.repeat(setpoints_1d, len(ords_1d)) if len(setpoints_1d) == 1 else setpoints_1d)
12 changes: 6 additions & 6 deletions pySC/correction/bba.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def SCBBA(SC, bpm_ords, mag_ords, **kwargs):
BPMind = np.where(bpm_ords[nDim, jBPM] == SC.ORD.BPM)[0][0]
mOrd = mag_ords[nDim, jBPM]
if par.switchOffSext:
SC = set_magnet_setpoints(SC, mOrd, skewness=False, order=2, setpoints=np.zeros(1), method='abs')
SC = set_magnet_setpoints(SC, mOrd, setpoints=np.zeros(1), skewness=False, order=2, method='abs')
SC = SCfeedbackRun(SC, par.RMstruct.MinvCO, BPMords=par.RMstruct.BPMords, CMords=par.RMstruct.CMords,
target=0, maxsteps=50, scaleDisp=par.RMstruct.scaleDisp, eps=1E-6)
if par.mode == 'ORB':
Expand Down Expand Up @@ -131,12 +131,12 @@ def _data_measurement(SC, mOrd, BPMind, jBPM, nDim, par, varargin):
if par.plotLines:
f, ax = plt.subplots(nrows=len(par.magSPvec[nDim, jBPM]), num=99)
for nQ, setpointQ in enumerate(par.magSPvec[nDim, jBPM]):
SC = set_magnet_setpoints(SC, mOrd, par.skewQuadrupole, par.magOrder, setpointQ, method=par.magSPflag,
SC = set_magnet_setpoints(SC, mOrd, setpointQ, par.skewQuadrupole, par.magOrder, method=par.magSPflag,
dipole_compensation=par.dipCompensation)
for nKick in range(nMsteps):
if par.mode == 'ORB':
for nD in range(2):
SC, _ = set_cm_setpoints(SC, CMords[nD], CMvec[nD][nKick, :], bool(nD), method='abs')
SC = set_cm_setpoints(SC, CMords[nD], CMvec[nD][nKick, :], bool(nD), method='abs')
else:
SC.INJ.Z0[2 * nDim:2 * nDim + 2] = initialZ0[2 * nDim:2 * nDim + 2] + kickVec[:, nKick]
B = bpm_reading(SC)
Expand Down Expand Up @@ -269,7 +269,7 @@ def _scan_phase_advance(SC, BPMind, nDim, kickVec0, par):
for nQ in range(len(qVec)):
LOGGER.debug(f'BBA-BPM range to small, try to change phase advance with quad ord {par.quadOrdPhaseAdvance} '
f'to {qVec[nQ]:.2f} of nom. SP.')
SC = set_magnet_setpoints(SC, mOrd, False, 1, qVec[nQ], method='rel', dipole_compensation=True)
SC = set_magnet_setpoints(SC, mOrd, qVec[nQ], False, 1, method='rel', dipole_compensation=True)
kickVec, BPMrange = _scale_injection_to_reach_bpm(SC, BPMind, nDim, kickVec0)

if BPMrange >= par.BBABPMtarget:
Expand All @@ -281,9 +281,9 @@ def _scan_phase_advance(SC, BPMind, nDim, kickVec0, par):
if BPMrange < np.max(allBPMRange):
LOGGER.debug(f'Changing phase advance of quad with ord {mOrd} NOT succesfull, '
f'returning to best value with BBA-BPM range = {1E6 * max(allBPMRange):.0f}um.')
return set_magnet_setpoints(SC, mOrd, False, 1, np.max(qVec), method='rel', dipole_compensation=True), kickVec
return set_magnet_setpoints(SC, mOrd, np.max(qVec), False, 1, method='rel', dipole_compensation=True), kickVec
LOGGER.debug(f'Changing phase advance of quad with ord {mOrd} NOT succesfull, returning to initial setpoint.')
return set_magnet_setpoints(SC, mOrd, False, 1, q0, method='abs', dipole_compensation=True), kickVec
return set_magnet_setpoints(SC, mOrd, q0, False, 1, method='abs', dipole_compensation=True), kickVec


def _get_orbit_bump(SC, mOrd, BPMord, nDim, par):
Expand Down
7 changes: 4 additions & 3 deletions pySC/correction/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ def fit_chroma(SC, s_ords, target_chroma=None, init_step_size=np.array([2, 2]),
return SC
if tune_knobs_ords is not None and tune_knobs_delta_k is not None:
for nFam in range(len(tune_knobs_ords)):
SC = set_magnet_setpoints(SC, tune_knobs_ords[nFam], False, 1, tune_knobs_delta_k[nFam], method='add') # TODO quads here?
SC = set_magnet_setpoints(SC, tune_knobs_ords[nFam], tune_knobs_delta_k[nFam], False, 1,
method='add') # TODO quads here?
LOGGER.debug(f'Fitting chromaticities from {atlinopt(SC.RING, 0, [])[2]} to {target_chroma}.') # first two elements
SP0 = np.zeros((len(s_ords), len(s_ords[0]))) # TODO can the lengts vary
for nFam in range(len(s_ords)):
for n in range(len(s_ords[nFam])):
SP0[nFam][n] = SC.RING[s_ords[nFam][n]].SetPointB[2]
fun = lambda x: _fit_chroma_fun(SC, s_ords, x, SP0, target_chroma)
sol = fmin(fun, init_step_size, xtol=xtol, ftol=ftol)
SC = set_magnet_setpoints(SC, s_ords, False, 1, sol + SP0, method='abs', dipole_compensation=True)
SC = set_magnet_setpoints(SC, s_ords, sol + SP0, False, 1, method='abs', dipole_compensation=True)
LOGGER.debug(f' Final chromaticity: {atlinopt(SC.RING, 0, [])[2]}\n Setpoints change: {sol}.') # first two elements
return SC


def _fit_chroma_fun(SC, q_ords, setpoints, init_setpoints, target):
SC = set_magnet_setpoints(SC, q_ords, False, 2, setpoints + init_setpoints, method='abs', dipole_compensation=True)
SC = set_magnet_setpoints(SC, q_ords, setpoints + init_setpoints, False, 2, method='abs', dipole_compensation=True)
_, _, nu = atlinopt(SC.RING, 0, [])
return np.sqrt(np.mean((nu - target) ** 2))
4 changes: 2 additions & 2 deletions pySC/correction/loco_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ def apply_lattice_correction(SC, fit_parameters, dipole_compensation=True, dampi
setpoint = fit_parameters.OrigValues[n_group] + damping * (
fit_parameters.IdealValues[n_group] - fit_parameters.Values[n_group])
if field == 'SetPointB': # Normal quadrupole
SC = set_magnet_setpoints(SC, ord, False, 1, setpoint, dipole_compensation=dipole_compensation)
SC = set_magnet_setpoints(SC, ord, setpoint, False, 1, dipole_compensation=dipole_compensation)
elif field == 'SetPointA': # Skew quadrupole
SC = set_magnet_setpoints(SC, ord, True, 1, setpoint)
SC = set_magnet_setpoints(SC, ord, setpoint, True, 1)
SC = SC.update_magnets(SC.ORD.Magnet)
return SC

Expand Down
Loading

0 comments on commit d594448

Please sign in to comment.