Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rf setup upgrade #34

Merged
merged 3 commits into from
Aug 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 137 additions & 124 deletions pySC/correction/rf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,161 +12,174 @@
from pySC.core.beam import bpm_reading
from pySC.core.lattice_setting import set_cavity_setpoints
from pySC.utils import logging_tools
from pySC.core.constants import SPEED_OF_LIGHT

LOGGER = logging_tools.get_logger(__name__)
MIN_TURNS_FOR_LINEAR_FIT = 3
MIN_TURNS_FOR_SINE_FIT = 6


def SCsynchPhaseCorrection(SC, cavOrd=None, nSteps=15, plotResults=False, plotProgress=False):
# TODO minimum number of turns as in energy correction?
def correct_rf_phase(SC, cav_ords=None, bpm_ords=None, n_steps=15, plot_results=False, plot_progress=False):
def _sin_fit_fun(x, a, b, c):
return a * np.sin(2 * np.pi * x + b) + c

LOGGER.debug(f'Calibrate RF phase with: \n {SC.INJ.nParticles} Particles \n {SC.INJ.nTurns} Turns '
f'\n {SC.INJ.nShots} Shots \n {nSteps} Phase steps.\n\n')
if cavOrd is None:
cavOrd = SC.ORD.RF
if turns_required := 2 > SC.INJ.nTurns:
raise ValueError(f"Not enough turns for the for the tracking: {turns_required=} set it to SC.INJ.nTurns")
if SC.INJ.trackMode != 'TBT':
raise ValueError('Trackmode should be turn-by-turn (''SC.INJ.trackmode=TBT'').')
bpm_shift = np.full(nSteps, np.nan)
lamb = 299792458 / SC.RING[cavOrd[0]].Frequency
l_test_vec = 1 / 2 * lamb * np.linspace(-1, 1, nSteps)
f'\n {SC.INJ.nShots} Shots \n {n_steps} Phase steps.\n\n')
cav_ords, bpm_ords = _check_inputs(SC, cav_ords, bpm_ords, n_steps)
lamb = SPEED_OF_LIGHT / SC.RING[cav_ords[0]].FrequencySetPoint
bpm_shift, bpm_shift_err = np.full(n_steps, np.nan), np.full(n_steps, np.nan)
test_vec = lamb * np.linspace(-0.5, 0.5, n_steps)

# Main loop
for nL in range(len(l_test_vec)):
SC = set_cavity_setpoints(SC, cavOrd, l_test_vec[nL], 'TimeLag', 'add')
bpm_shift[nL], TBTdE = _get_tbt_energy_shift(SC)
SC = set_cavity_setpoints(SC, cavOrd, -l_test_vec[nL], 'TimeLag', 'add')
if plotProgress:
_fun_plot_progress(TBTdE, bpm_shift, l_test_vec, nL, phase=True)

# Check data
l_test_vec = l_test_vec[~np.isnan(bpm_shift)]
bpm_shift = bpm_shift[~np.isnan(bpm_shift)]
if len(bpm_shift) < 3:
raise RuntimeError('Not enough data points for fit.')
if not (max(bpm_shift) > 0 > min(bpm_shift)):
raise RuntimeError('Zero crossing not within data set.\n')
for step in range(n_steps):
SC = set_cavity_setpoints(SC, cav_ords, test_vec[step], 'TimeLag', 'add')
bpm_shift[step], bpm_shift_err[step], mean_tbt_orbit = _get_tbt_energy_shift(SC, bpm_ords)
SC = set_cavity_setpoints(SC, cav_ords, -test_vec[step], 'TimeLag', 'add')
if plot_progress:
_plot_progress((test_vec, bpm_shift, bpm_shift_err), mean_tbt_orbit, phase=True)

# Fit sinusoidal function to data
param, param_cov = curve_fit(_sin_fit_fun, l_test_vec/lamb, bpm_shift, p0=np.array([max(bpm_shift)-np.mean(bpm_shift), 3.14, np.mean(bpm_shift)]))
sol = lambda x: _sin_fit_fun(x/lamb, param[0], param[1], param[2])
test_vec, bpm_shift, bpm_shift_err = _check_data(test_vec, bpm_shift, bpm_shift_err)

if not (max(sol(l_test_vec)) > 0 > min(sol(l_test_vec))):
raise RuntimeError('Zero crossing not within fit function\n')
# Fit sinusoidal function to data
param, param_cov = curve_fit(_sin_fit_fun, test_vec / lamb, bpm_shift, sigma=bpm_shift_err, absolute_sigma=True,
p0=np.array([max(bpm_shift) - np.mean(bpm_shift), np.pi, np.mean(bpm_shift)]))
if np.abs(param[2]) > np.abs(param[0]): # fitted sin_wave does not cross zero
raise RuntimeError(f'Zero crossing not within fit function\n Consider increasing RF voltage to at least '
f'{SC.RING[cav_ords[0]].VoltageSetPoint * np.abs(param[2] / param[0])} V.')

# Find zero crossing of fitted function
delta_phi = fsolve(sol, l_test_vec[np.argmax(sol(l_test_vec))] - abs(l_test_vec[0]) / 2)
delta_phi = _fold_phase(delta_phi, lamb)
if np.isnan(delta_phi):
raise RuntimeError('SCsynchPhaseCorrection: ERROR (NaN phase)')

initial = _fold_phase((findorbit6(SC.RING)[0][5] - SC.INJ.Z0[5]) / lamb, lamb)
SC = set_cavity_setpoints(SC, cavOrd, delta_phi, 'TimeLag', 'add')
final = _fold_phase((findorbit6(SC.RING)[0][5] - SC.INJ.Z0[5]) / lamb, lamb)
LOGGER.debug(f'Time lag correction step: {delta_phi[0]:.3f} m\n')
LOGGER.debug(f'Static phase error corrected from {initial*360:.0f} deg to {final*360:.1f} deg')

if plotResults:
plt.plot(l_test_vec / lamb * 360, bpm_shift, 'o', color='red', label="data")
plt.plot(l_test_vec / lamb * 360, sol(l_test_vec), '--', color='blue', label="fit")
plt.plot(initial / lamb * 360, sol(initial), 'rD', markersize=12, label="Initial time lag")
plt.plot(delta_phi / lamb * 360, 0, 'kX', markersize=12, label="Final time lag")
plt.xlabel("RF phase [deg]")
plt.ylabel("BPM change [m]")
plt.legend()
plt.show()

sol = lambda x: _sin_fit_fun(x / lamb, param[0], param[1], param[2])
delta_phi = _fold_phase(fsolve(sol, test_vec[np.argmax(sol(test_vec))] - abs(test_vec[0]) / 2), lamb)
if np.abs(param[0]) < 2 * np.sqrt(param_cov[0, 0]) or np.abs(param[2]) < 2 * np.sqrt(param_cov[2, 2]):
LOGGER.warning("Measurements is not sufficiently precise to make a correction")
delta_phi = 0.0
if np.isnan(delta_phi): # TODO does this ever happen?
raise RuntimeError('RF phase correction unsuccessful: NaN phase')

SC = set_cavity_setpoints(SC, cav_ords, delta_phi, 'TimeLag', 'add')
LOGGER.info(f'Time lag correction step: {delta_phi[0]:.3f} m\n')
if plot_results:
_plot_results((test_vec, bpm_shift, bpm_shift_err), sol, delta_phi, x_scale=360 / lamb, phase=True)
return SC


def SCsynchEnergyCorrection(SC, cavOrd=None, f_range=(-1E3, 1E3), nSteps=15, minTurns=0, plotResults=False,
plotProgress=False):
def correct_rf_frequency(SC, cav_ords=None, bpm_ords=None, n_steps=15, f_range=(-1E3, 1E3), plot_results=False,
plot_progress=False):
LOGGER.debug(f'Correct energy error with: \n '
f'{SC.INJ.nParticles} Particles \n {SC.INJ.nTurns} Turns \n {SC.INJ.nShots} Shots \n {nSteps} '
f'{SC.INJ.nParticles} Particles \n {SC.INJ.nTurns} Turns \n {SC.INJ.nShots} Shots \n {n_steps} '
f'Frequency steps between {1E-3 * f_range[0]:.1f} and {1E-3 * f_range[1]:.1f} kHz.\n\n')
if turns_required := max(minTurns, 2) > SC.INJ.nTurns:
raise ValueError(f"Not enough turns for the for the tracking: {turns_required=} set it to SC.INJ.nTurns")
if SC.INJ.trackMode != 'TBT':
raise ValueError('Trackmode should be turn-by-turn (''SC.INJ.trackmode=TBT'').')
if cavOrd is None:
cavOrd = SC.ORD.RF
BPM_shift = np.full(nSteps, np.nan)
f_test_vec = np.linspace(f_range[0], f_range[1], nSteps)
cav_ords, bpm_ords = _check_inputs(SC, cav_ords, bpm_ords, n_steps)
bpm_shift, bpm_shift_err = np.full(n_steps, np.nan), np.full(n_steps, np.nan)
test_vec = np.linspace(f_range[0], f_range[1], n_steps)

# Main loop
for nE in range(len(f_test_vec)): # TODO later this does not have to set value back and forth in each step
SC = set_cavity_setpoints(SC, cavOrd, f_test_vec[nE], 'Frequency', 'add')
[BPM_shift[nE], TBTdE] = _get_tbt_energy_shift(SC, minTurns)
SC = set_cavity_setpoints(SC, cavOrd, -f_test_vec[nE], 'Frequency', 'add')
if plotProgress:
_fun_plot_progress(TBTdE, BPM_shift, f_test_vec, nE, phase=False)

# Check data
f_test_vec = f_test_vec[~np.isnan(BPM_shift)]
BPM_shift = BPM_shift[~np.isnan(BPM_shift)]
if len(BPM_shift) < 2:
raise RuntimeError('Not enough data points for fit.')

for step in range(n_steps):
SC = set_cavity_setpoints(SC, cav_ords, test_vec[step], 'Frequency', 'add')
bpm_shift[step], bpm_shift_err[step], mean_tbt_orbit = _get_tbt_energy_shift(SC, bpm_ords)
SC = set_cavity_setpoints(SC, cav_ords, -test_vec[step], 'Frequency', 'add')
if plot_progress:
_plot_progress((test_vec, bpm_shift, bpm_shift_err), mean_tbt_orbit, phase=False)

test_vec, bpm_shift, bpm_shift_err = _check_data(test_vec, bpm_shift, bpm_shift_err, min_num_points=3)
# Fit linear function to data
p = np.polyfit(f_test_vec, BPM_shift, 1)
delta_f = -p[1] / p[0]
param, param_cov = np.polyfit(test_vec, bpm_shift, 1, w=1 / bpm_shift_err, cov="unscaled")
delta_f = -param[1] / param[0]
if np.abs(param[0]) < 2 * np.sqrt(param_cov[0, 0]) or np.abs(param[1]) < 2 * np.sqrt(param_cov[1, 1]):
LOGGER.warning("Measurements is not sufficiently precise to make a correction")
delta_f = 0.0
if np.isnan(delta_f):
raise RuntimeError('SCsynchEnergyCorrection: ERROR (NaN frequency)')
raise RuntimeError('RF frequency correction unsuccessful: NaN frequency')

initial = SC.INJ.Z0[4] - findorbit6(SC.RING)[0][4]
SC = set_cavity_setpoints(SC, cavOrd, delta_f, 'Frequency', 'add')
final = SC.INJ.Z0[4] - findorbit6(SC.RING)[0][4]
SC = set_cavity_setpoints(SC, cav_ords, delta_f, 'Frequency', 'add')
LOGGER.info(f'Frequency correction step: {1E-3 * delta_f:.2f} kHz')
LOGGER.info(f'Energy error corrected from {1E2*initial:.2f}% to {1E2*final:.2f}%')
sol = lambda x: x * param[0] + param[1]
if plot_results:
_plot_results((test_vec, bpm_shift, bpm_shift_err), sol, delta_f, x_scale=1e-3, phase=False)
return SC

if plotResults:
plt.plot(1E-3 * f_test_vec, 1E6 * BPM_shift, 'o', label='Measurement')
plt.plot(1E-3 * f_test_vec, 1E6 * (f_test_vec * p[0] + p[1]), '--', label='Fit')
plt.plot(1E-3 * delta_f, 0, 'kX', markersize=16, label='dE correction')
plt.xlabel(r'$\Delta f$ [$kHz$]')
plt.ylabel(r'$<\Delta x>$ [$\mu$m/turn]')
plt.show()

return SC
def phase_and_energy_error(SC, cav_ords):
"""This is not a simple observable in reality."""
lamb = SPEED_OF_LIGHT / SC.RING[cav_ords[0]].Frequency
orbit6 = findorbit6(SC.RING)[0]
phase_error = _fold_phase((orbit6[5] - SC.INJ.Z0[5]) / lamb, lamb) * 360
energy_error = SC.INJ.Z0[4] - orbit6[4]
LOGGER.info(f'Static phase error {phase_error:.0f} deg')
LOGGER.info(f'Energy error {1E2 * energy_error:.2f}%')
return phase_error, energy_error


def _sin_fit_fun(x, a, b, c):
return a * np.sin(2 * np.pi * x + b) + c
def _check_data(test_vec, bpm_shift, bpm_shift_err, min_num_points=6):
mask = ~np.isnan(bpm_shift)
if np.sum(~np.isnan(bpm_shift)) < min_num_points:
raise RuntimeError('Not enough data points for fit.')
return test_vec[mask], bpm_shift[mask], bpm_shift_err[mask]


def _get_tbt_energy_shift(SC, min_turns=2):
bpm_readings = bpm_reading(SC)
x_reading = np.reshape(bpm_readings[0, :], (SC.INJ.nTurns, len(SC.ORD.BPM)))
TBTdE = np.mean(x_reading - x_reading[0, :], axis=1)
if len(TBTdE[~np.isnan(TBTdE)]) < min_turns: # not enough data for linear fit
return np.nan, TBTdE
nan_mask = ~np.isnan(TBTdE)
return np.polyfit(np.arange(SC.INJ.nTurns)[nan_mask], TBTdE[nan_mask], 1)[0], TBTdE
def _get_tbt_energy_shift(SC, bpm_ords):
bpm_readings = bpm_reading(SC, bpm_ords)
x_reading = np.reshape(bpm_readings[0, :], (SC.INJ.nTurns, len(bpm_ords)))
mean_tbt = np.mean(x_reading - x_reading[0, :], axis=1)
mean_tbt_err = np.std(x_reading, axis=1) / np.sqrt(x_reading.shape[1])
mask = ~np.isnan(mean_tbt)
if np.sum(mask) < MIN_TURNS_FOR_LINEAR_FIT:
return np.nan, np.nan, mean_tbt
fit_result = np.polyfit(np.arange(SC.INJ.nTurns)[mask], mean_tbt[mask], 1,
w=1 / (mean_tbt_err[mask] + 1e-9), cov="unscaled")
slope, err_slope = fit_result[0][0], np.sqrt(fit_result[1][0, 0])
return slope, err_slope, mean_tbt


def _fun_plot_progress(TBTdE, BPM_shift, f_test_vec, nE, phase=True):
def _plot_progress(data, mean_tbt, phase=True):
test_vec, bpm_shift, bpm_shift_err = data
f, ax = plt.subplots(nrows=2, num=2)
ax[0].plot(TBTdE, 'o')
ax[0].plot(np.arange(len(TBTdE)) * BPM_shift[nE], '--')
y_scale = 1e6
ax[0].plot(y_scale * mean_tbt, 'o')
ax[0].plot(y_scale * np.arange(len(mean_tbt)) * bpm_shift[~np.isnan(bpm_shift)][-1], '--')
ax[0].set_xlabel('Number of turns')
ax[0].set_ylabel(r'$<\Delta x_\mathrm{TBT}>$ [m]')
ax[1].plot(f_test_vec[:nE], BPM_shift[:nE], 'o')
ax[1].set_xlabel(r'$\Delta \phi$ [m]' if phase else r'$\Delta f$ [m]')
ax[1].set_ylabel(r'$<\Delta x>$ [m/turn]')
plt.show()
ax[0].set_ylabel(r'$\overline{\Delta x}$ [$\mu$m / turn]')
ax[1].errorbar(test_vec, y_scale * bpm_shift, yerr=y_scale * bpm_shift_err, fmt='o')
ax[1].set_xlabel(r'$\Delta \phi$ [m]' if phase else r'$\Delta f$ [Hz]')
ax[1].set_ylabel(r'$\overline{\Delta x}$ [$\mu$m / turn]')
f.tight_layout()
f.show()


def _plot_results(data, fit, corrected, x_scale, phase=True):
test_vec, bpm_shift, bpm_shift_err = data
f, ax = plt.subplots(nrows=1)
y_scale = 1e6
ax.errorbar(x_scale * test_vec, y_scale * bpm_shift, yerr=y_scale * bpm_shift_err, fmt='o', label="Measurement")
ax.plot(x_scale * test_vec, y_scale * fit(test_vec), '--', label='Fit')
ax.plot(x_scale * 0, y_scale * fit(0), 'rD', markersize=12, label="Initial")
ax.plot(x_scale * corrected, y_scale * 0, 'kX', markersize=16, label='Corrected')
ax.set_xlabel(("RF phase [deg]" if phase else r'$\Delta f$ [$kHz$]'))
ax.set_ylabel(r'$\overline{\Delta x}$ [$\mu$m / turn]')
ax.legend()
f.tight_layout()
f.show()


def _fold_phase(delta_phi, lamb):
if np.abs(delta_phi) > lamb / 2:
return delta_phi - lamb * np.sign(delta_phi)
return delta_phi


# def inputCheck(SC, par):
# if SC.INJ.trackMode != 'TBT':
# raise ValueError('Trackmode should be turn-by-turn (''SC.INJ.trackmode=TBT'').')
# if nSteps < 2 or nTurns < 2:
# raise ValueError('Number of steps and number of turns must be larger than 2.')
# if not hasattr(SC.RING[cavOrd], 'Frequency'):
# raise ValueError('This is not a cavity (ord: %d)' % cavOrd)
# if not any(SC.RING[cavOrd].PassMethod in ['CavityPass', 'RFCavityPass']):
# raise ValueError('Cavity (ord: %d) seemed to be switched off.' % cavOrd)
dphi = np.remainder(delta_phi, lamb)
if np.abs(dphi) > lamb / 2:
return dphi - lamb * np.sign(dphi)
return dphi


def _check_inputs(SC, cav_ords, bpm_ords, n_steps):
if cav_ords is None:
cav_ords = SC.ORD.RF
if bpm_ords is None:
bpm_ords = SC.ORD.BPM
if SC.INJ.nTurns < MIN_TURNS_FOR_LINEAR_FIT:
raise ValueError(f"Not enough turns for the tracking: set at least {MIN_TURNS_FOR_LINEAR_FIT} to SC.INJ.nTurns")
if SC.INJ.trackMode != 'TBT':
raise ValueError('Trackmode should be turn-by-turn (''SC.INJ.trackMode=TBT'').')
if n_steps < MIN_TURNS_FOR_SINE_FIT:
raise ValueError(f'Number of steps should be at least {MIN_TURNS_FOR_SINE_FIT}.')
for cav_ord in cav_ords:
if not hasattr(SC.RING[cav_ord], 'Frequency'):
raise ValueError(f'This is not a cavity (ord: {cav_ord})')
if SC.RING[cav_ord].PassMethod not in ('CavityPass', 'RFCavityPass'):
raise ValueError(f'Cavity (ord: {cav_ord}) seems to be switched off.')
return cav_ords, bpm_ords
7 changes: 3 additions & 4 deletions pySC/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pySC.plotting.plot_support import plot_support
from pySC.plotting.plot_lattice import plot_lattice
from pySC.core.lattice_setting import set_magnet_setpoints, switch_cavity_and_radiation
from pySC.correction.rf import SCsynchPhaseCorrection, SCsynchEnergyCorrection
from pySC.correction.rf import correct_rf_phase, correct_rf_frequency, phase_and_energy_error
from pySC.utils import logging_tools

LOGGER = logging_tools.get_logger(__name__)
Expand All @@ -40,9 +40,8 @@ def _marker(name):
rfc = at.RFCavity('RFCav', energy=2.5E9, voltage=2e6, frequency=500653404.8599995, harmonic_number=167, length=0)
new_ring.insert(0, rfc)
new_ring.enable_6d()
new_ring.set_cavity_phase()
new_ring.set_rf_frequency()

at.set_cavity_phase(new_ring)
at.set_rf_frequency(new_ring)
return new_ring


Expand Down
8 changes: 4 additions & 4 deletions pySC/utils/matlab_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pySC.correction.orbit_trajectory import SCfeedbackFirstTurn as first_turn, SCfeedbackStitch as stitch, \
SCfeedbackRun as frun, SCfeedbackBalance as fbalance, SCpseudoBBA as pseudo_bba
from pySC.correction.ramp_errors import SCrampUpErrors as ramp_up_errors
from pySC.correction.rf import SCsynchPhaseCorrection as synch_phase_corr, SCsynchEnergyCorrection as synch_energy_corr
from pySC.correction.rf import correct_rf_phase, correct_rf_frequency
from pySC.correction.tune import tune_scan
from pySC.lattice_properties.apertures import SCdynamicAperture as dynamic_aperture, \
SCmomentumAperture as momentum_aperture
Expand Down Expand Up @@ -313,8 +313,8 @@ def SCsynchEnergyCorrection(SC, /, *, cavOrd=None, f_range=(-1E3, 1E3), nSteps=1
plotProgress=False, verbose=False):
init_nturns = SC.INJ.nTurns
SC.INJ.nTurns = nTurns
SC = synch_energy_corr(SC, cavOrd=cavOrd, f_range=f_range, nSteps=nSteps, minTurns=minTurns,
plotResults=plotResults, plotProgress=plotProgress, )
SC = correct_rf_frequency(SC, cav_ords=cavOrd, f_range=f_range, n_steps=nSteps, minTurns=minTurns,
plot_results=plotResults, plot_progress=plotProgress, )
SC.INJ.nTurns = init_nturns
return SC

Expand All @@ -323,7 +323,7 @@ def SCsynchPhaseCorrection(SC, /, *, cavOrd=None, nSteps=15, nTurns=20, plotResu
verbose=False):
init_nturns = SC.INJ.nTurns
SC.INJ.nTurns = nTurns
SC = synch_phase_corr(SC, cavOrd=cavOrd, nSteps=nSteps, plotResults=plotResults, plotProgress=plotProgress, )
SC = correct_rf_phase(SC, cav_ords=cavOrd, n_steps=nSteps, plot_results=plotResults, plot_progress=plotProgress, )
SC.INJ.nTurns = init_nturns
return SC

Expand Down
6 changes: 3 additions & 3 deletions tests/test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pySC.lattice_properties.response_model import SCgetModelRM, SCgetModelDispersion
from pySC.utils.sc_tools import SCgetOrds, SCgetPinv
from pySC.core.lattice_setting import set_magnet_setpoints, switch_cavity_and_radiation
from pySC.correction.rf import SCsynchPhaseCorrection, SCsynchEnergyCorrection
from pySC.correction.rf import correct_rf_phase, correct_rf_frequency


def test_example(at_lattice):
Expand Down Expand Up @@ -91,8 +91,8 @@ def test_example(at_lattice):

# RF cavity correction
sc.INJ.nTurns = 5
sc = SCsynchPhaseCorrection(sc, nSteps=15)
sc = SCsynchEnergyCorrection(sc, f_range=4E3 * np.array([-1, 1]), nSteps=15)
sc = correct_rf_phase(sc, n_steps=15)
sc = correct_rf_frequency(sc, n_steps=15, f_range=4E3 * np.array([-1, 1]))

sc = SCpseudoBBA(sc, np.tile(sc.ORD.BPM, (2, 1)), np.tile(SCgetOrds(sc.RING, 'QF|QD'), (2, 1)), np.array([50E-6]))

Expand Down