Skip to content

Commit

Permalink
Made BPM readings more realistic (#36)
Browse files Browse the repository at this point in the history
* Introduced charge scaling of BPM noise
        When averaged, orbit is also weighted by charge

* Function returns also SUM signal

* Option to calculate trajectory measurement errors
        Weighted RMS when all shots reached given BPM

* Optimised memory when fraction of BPMs is used
  • Loading branch information
lmalina authored Aug 17, 2023
1 parent 79776a7 commit 44c5a17
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 58 deletions.
90 changes: 60 additions & 30 deletions pySC/core/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from numpy import ndarray

from pySC.core.simulated_commissioning import SimulatedCommissioning
from pySC.core.constants import TRACK_ORB, TRACK_PORB
from pySC.core.constants import TRACK_ORB, TRACK_PORB, TRACK_TBT
from pySC.utils.sc_tools import SCrandnc
from pySC.utils.at_wrapper import atgetfieldvalues, atpass, findorbit6, findspos
import warnings
Expand All @@ -21,7 +21,7 @@
LOGGER = logging_tools.get_logger(__name__)


def bpm_reading(SC: SimulatedCommissioning, bpm_ords: ndarray = None) -> ndarray:
def bpm_reading(SC: SimulatedCommissioning, bpm_ords: ndarray = None, calculate_errors: bool = False) -> Tuple:
"""
Calculates BPM readings with current injection setup `SC.INJ` and included all BPM uncertainties.
Included uncertainties are offsets, rolls, calibration errors, and position noise.
Expand All @@ -33,25 +33,46 @@ def bpm_reading(SC: SimulatedCommissioning, bpm_ords: ndarray = None) -> ndarray
SC: SimulatedCommissioning instance
bpm_ords: array of element indices of registered BPMs for which to calculate readings
(for convenience, otherwise `SC.ORD.BPM` is used)
calculate_errors: If true orbit errors are calculated
Returns:
Array of horizontal and vertical BPM readings (2, T x B) for T turns and B BPMs
Array of fractional transmission (eq. BPM sum signal) (2, T x B) for T turns and B BPMs
Optionally, in TBT mode and calculate_errors is True:
Array of measured errors for horizontal and vertical BPM readings (2, T x B) for T turns and B BPMs
"""
all_bpm_orbits_4d = np.full((2, len(SC.ORD.BPM), SC.INJ.nTurns, SC.INJ.nShots), np.nan)
bpm_inds = np.arange(len(SC.ORD.BPM), dtype=int) if bpm_ords is None else _only_registered_bpms(SC, bpm_ords)
bpm_orbits_4d = np.full((2, len(bpm_inds), SC.INJ.nTurns, SC.INJ.nShots), np.nan)
bpm_sums_4d = np.full((2, len(bpm_inds), SC.INJ.nTurns, SC.INJ.nShots), np.nan)
for shot_num in range(SC.INJ.nShots):
tracking_4d = _tracking(SC, SC.ORD.BPM)
all_bpm_orbits_4d[:, :, :, shot_num] = _real_bpm_reading(SC, tracking_4d)
tracking_4d = _tracking(SC, SC.ORD.BPM[bpm_inds])
bpm_orbits_4d[:, :, :, shot_num], bpm_sums_4d[:, :, :, shot_num] = _real_bpm_reading(SC, tracking_4d, bpm_inds)

# mean_bpm_orbits_3d is 3D (dim, BPM, turn)
mean_bpm_orbits_3d = np.average(np.ma.array(bpm_orbits_4d, mask=np.isnan(bpm_orbits_4d)),
weights=np.ma.array(bpm_sums_4d, mask=np.isnan(bpm_sums_4d)), axis=3)
# averaging "charge" also when the beam did not reach the location
mean_bpm_sums_3d = np.nansum(bpm_sums_4d, axis=3) / SC.INJ.nShots

mean_bpm_orbits_3d = np.nanmean(all_bpm_orbits_4d, axis=3) # mean_bpm_orbits_3d is 3D (dim, BPM, turn)
if SC.plot:
_plot_bpm_reading(SC, mean_bpm_orbits_3d)
_plot_bpm_reading(SC, mean_bpm_orbits_3d, bpm_inds)
if SC.INJ.trackMode == TRACK_PORB: # ORB averaged over low amount of turns
mean_bpm_orbits_3d = np.nanmean(mean_bpm_orbits_3d, axis=2, keepdims=True)
if bpm_ords is not None:
ind = _only_registered_bpms(SC, bpm_ords)
mean_bpm_orbits_3d = mean_bpm_orbits_3d[:, ind, :]
# Organising the array 2 x (nturns x nbpms) sorted by "arrival time"
return _reshape_3d_to_matlab_like_2d(mean_bpm_orbits_3d)
mean_bpm_orbits_3d = np.average(np.ma.array(mean_bpm_orbits_3d, mask=np.isnan(mean_bpm_orbits_3d)),
weights=np.ma.array(mean_bpm_sums_3d, mask=np.isnan(mean_bpm_sums_3d)), axis=2,
keepdims=True)
mean_bpm_sums_3d = np.nansum(mean_bpm_sums_3d, axis=2, keepdims=True) / SC.INJ.nTurns
if calculate_errors and SC.INJ.trackMode == TRACK_TBT:
bpm_orbits_4d[np.sum(np.isnan(bpm_orbits_4d), axis=3) > 0, :] = np.nan
squared_orbit_diffs = np.square(bpm_orbits_4d - mean_bpm_orbits_3d)
err_bpm_orbits_3d = np.sqrt(np.average(np.ma.array(squared_orbit_diffs), mask=np.isnan(bpm_orbits_4d),
weights=np.ma.array(bpm_sums_4d, mask=np.isnan(bpm_orbits_4d)), axis=3))
# Organising the array 2 x (nturns x nbpms) sorted by "arrival time"
# TODO keep in 3D when the response matrices are changed
return (_reshape_3d_to_matlab_like_2d(mean_bpm_orbits_3d),
_reshape_3d_to_matlab_like_2d(mean_bpm_sums_3d),
_reshape_3d_to_matlab_like_2d(err_bpm_orbits_3d))
return (_reshape_3d_to_matlab_like_2d(mean_bpm_orbits_3d),
_reshape_3d_to_matlab_like_2d(mean_bpm_sums_3d))


def all_elements_reading(SC: SimulatedCommissioning) -> Tuple[ndarray, ndarray]:
Expand All @@ -75,13 +96,13 @@ def all_elements_reading(SC: SimulatedCommissioning) -> Tuple[ndarray, ndarray]:
all_bpm_orbits_4d = np.full((2, len(SC.ORD.BPM), SC.INJ.nTurns, SC.INJ.nShots), np.nan)
for shot_num in range(SC.INJ.nShots):
tracking_4d = _tracking(SC, np.arange(n_refs))
all_bpm_orbits_4d[:, :, :, shot_num] = _real_bpm_reading(SC, tracking_4d[:, :, SC.ORD.BPM, :])
all_bpm_orbits_4d[:, :, :, shot_num] = _real_bpm_reading(SC, tracking_4d[:, :, SC.ORD.BPM, :])[0]
tracking_4d[:, np.isnan(tracking_4d[0, :])] = np.nan
all_readings_5d[:, :, :, :, shot_num] = tracking_4d[:, :, :, :]

mean_bpm_orbits_3d = np.nanmean(all_bpm_orbits_4d, axis=3) # mean_bpm_orbits_3d is 3D (dim, BPM, turn)
if SC.plot:
_plot_bpm_reading(SC, mean_bpm_orbits_3d, all_readings_5d)
_plot_bpm_reading(SC, mean_bpm_orbits_3d, None, all_readings_5d)
return _reshape_3d_to_matlab_like_2d(mean_bpm_orbits_3d), all_readings_5d


Expand Down Expand Up @@ -159,21 +180,25 @@ def plot_transmission(ax, fraction_survived, n_turns, beam_lost_at):
return ax


def _real_bpm_reading(SC, track_mat): # track_mat should be only x,y over all particles only at BPM positions
nBpms, nTurns = track_mat.shape[2:]
bpm_noise = np.transpose(atgetfieldvalues(SC.RING, SC.ORD.BPM, ('NoiseCO' if SC.INJ.trackMode == 'ORB' else "Noise")))
bpm_noise = bpm_noise[:, :, np.newaxis] * SCrandnc(2, (2, nBpms, nTurns))
bpm_offset = np.transpose(atgetfieldvalues(SC.RING, SC.ORD.BPM, 'Offset') + atgetfieldvalues(SC.RING, SC.ORD.BPM, 'SupportOffset'))
bpm_cal_error = np.transpose(atgetfieldvalues(SC.RING, SC.ORD.BPM, 'CalError'))
bpm_roll = np.squeeze(atgetfieldvalues(SC.RING, SC.ORD.BPM, 'Roll') + atgetfieldvalues(SC.RING, SC.ORD.BPM, 'SupportRoll'))
bpm_sum_error = np.transpose(atgetfieldvalues(SC.RING, SC.ORD.BPM, 'SumError'))[:, np.newaxis] * SCrandnc(2, (nBpms, nTurns))
def _real_bpm_reading(SC, track_mat, bpm_inds=None): # track_mat should be only x,y over all particles only at BPM positions
n_bpms, nTurns = track_mat.shape[2:]
bpm_ords = SC.ORD.BPM if bpm_inds is None else SC.ORD.BPM[bpm_inds]
bpm_noise = np.transpose(atgetfieldvalues(SC.RING, bpm_ords, ('NoiseCO' if SC.INJ.trackMode == 'ORB' else "Noise")))
bpm_noise = bpm_noise[:, :, np.newaxis] * SCrandnc(2, (2, n_bpms, nTurns))
bpm_offset = np.transpose(atgetfieldvalues(SC.RING, bpm_ords, 'Offset') + atgetfieldvalues(SC.RING, bpm_ords, 'SupportOffset'))
bpm_cal_error = np.transpose(atgetfieldvalues(SC.RING, bpm_ords, 'CalError'))
bpm_roll = np.squeeze(atgetfieldvalues(SC.RING, bpm_ords, 'Roll') + atgetfieldvalues(SC.RING, bpm_ords, 'SupportRoll'), axis=1)
bpm_sum_error = np.transpose(atgetfieldvalues(SC.RING, bpm_ords, 'SumError'))[:, np.newaxis] * SCrandnc(2, (n_bpms, nTurns))
# averaging the X and Y positions at BPMs over particles
mean_orbit = np.nanmean(track_mat, axis=1)
beam_lost = np.nonzero(np.mean(np.isnan(track_mat[0, :, :, :]), axis=0) * (1 + bpm_sum_error) > SC.INJ.beamLostAt)
transmission = np.mean(~np.isnan(track_mat[0, :, :, :]), axis=0) * (1 + bpm_sum_error)
beam_lost = np.nonzero(transmission < 1 - SC.INJ.beamLostAt)
mean_orbit[:, beam_lost[0], beam_lost[1]] = np.nan
transmission[beam_lost[0], beam_lost[1]] = np.nan

rolled_mean_orbit = np.einsum("ijk,jkl->ikl", _rotation_matrix(bpm_roll), mean_orbit)
return (rolled_mean_orbit - bpm_offset[:, :, np.newaxis]) * (1 + bpm_cal_error[:, :, np.newaxis]) + bpm_noise
return ((rolled_mean_orbit - bpm_offset[:, :, np.newaxis]) * (1 + bpm_cal_error[:, :, np.newaxis]) +
bpm_noise / transmission), np.tile(transmission, (2, 1, 1))


def _rotation_matrix(a):
Expand All @@ -186,14 +211,19 @@ def _tracking(SC: SimulatedCommissioning, refs: ndarray) -> ndarray:
# lattice_pass output: (6, N, R, T) coordinates of N particles at R reference points for T turns.
# findorbit second output value: (R, 6) closed orbit vector at each specified location
if SC.INJ.trackMode == TRACK_ORB:
return np.transpose(findorbit6(SC.RING, refs, keep_lattice=False)[1])[[0, 2], :].reshape(2, 1, len(refs), 1)
return atpass(SC.RING, generate_bunches(SC), SC.INJ.nTurns, refs, keep_lattice=False)[[0, 2], :, :, :]
pos = np.transpose(findorbit6(SC.RING, refs, keep_lattice=False)[1])[[0, 2], :].reshape(2, 1, len(refs), 1)
else:
pos = atpass(SC.RING, generate_bunches(SC), SC.INJ.nTurns, refs, keep_lattice=False)[[0, 2], :, :, :]
pos[1, np.isnan(pos[0, :, :, :])] = np.nan
return pos


def _only_registered_bpms(SC: SimulatedCommissioning, bpm_ords: ndarray) -> ndarray:
ind = np.where(np.isin(SC.ORD.BPM, bpm_ords))[0]
if len(ind) != len(bpm_ords):
LOGGER.warning('Not all specified ordinates are registered BPMs.')
if not len(ind):
raise ValueError("No registered BPMs specified.")
return ind


Expand All @@ -202,17 +232,17 @@ def _reshape_3d_to_matlab_like_2d(mean_bpm_orbits_3d: ndarray) -> ndarray:
return np.transpose(mean_bpm_orbits_3d, axes=(0, 2, 1)).reshape((2, np.prod(mean_bpm_orbits_3d.shape[1:])))


def _plot_bpm_reading(SC, bpm_orbits_3d, all_readings_5d=None):
def _plot_bpm_reading(SC, bpm_orbits_3d, bpm_inds=None, all_readings_5d=None):
ap_ords, apers = _get_ring_aperture(SC)
fig, ax = plt.subplots(num=1, nrows=2, ncols=1, sharex="all", figsize=(8, 6), dpi=100, facecolor="w")
s_pos = findspos(SC.RING)
circumference = s_pos[-1]

bpms = SC.ORD.BPM if bpm_inds is None else SC.ORD.BPM[bpm_inds]
if all_readings_5d is not None:
x = np.ravel(np.arange(SC.INJ.nTurns)[:, np.newaxis] * circumference + s_pos)
ax = _plot_all_trajectories(ax, x, all_readings_5d)
for n_dim in range(2):
x = np.ravel(np.arange(SC.INJ.nTurns)[:, np.newaxis] * circumference + s_pos[SC.ORD.BPM])
x = np.ravel(np.arange(SC.INJ.nTurns)[:, np.newaxis] * circumference + s_pos[bpms])
y = 1E3 * np.ravel(bpm_orbits_3d[n_dim, :, :].T)
ax[n_dim].plot(x, y, 'ro', label="BPM reading")
if len(ap_ords):
Expand Down
6 changes: 3 additions & 3 deletions pySC/correction/injection_fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def fit_injection_drift(SC: SimulatedCommissioning, n_dims: ndarray = np.array([
# uses SC.INJ.nShots
s_pos = findspos(SC.RING)
bpm_inds = np.arange(2, dtype=int) + len(SC.ORD.BPM) - 1
bref = bpm_reading(SC)[:, bpm_inds]
bref = bpm_reading(SC)[0][:, bpm_inds]
s_bpm = np.array([s_pos[SC.ORD.BPM[-1]] - s_pos[-1], s_pos[SC.ORD.BPM[0]]])
delta_z0 = _fit_bpm_data(s_bpm, bref)
if np.sum(np.isnan(delta_z0)):
Expand All @@ -37,7 +37,7 @@ def fit_injection_drift(SC: SimulatedCommissioning, n_dims: ndarray = np.array([
def fit_injection_trajectory(SC: SimulatedCommissioning, bpm_inds: ndarray = np.array([0, 1, 2]), plot: bool = False):
# uses SC.INJ.nShots
s_pos = findspos(SC.RING)
bref = bpm_reading(SC)[:, bpm_inds]
bref = bpm_reading(SC)[0][:, bpm_inds]
delta_z0 = np.zeros(6)
delta_z0[0:4] = -fmin(_merit_function, np.zeros(4), args=(SC, bref, SC.ORD.BPM[bpm_inds]))
if np.sum(np.isnan(delta_z0)):
Expand All @@ -47,7 +47,7 @@ def fit_injection_trajectory(SC: SimulatedCommissioning, bpm_inds: ndarray = np.
if plot:
fig, ax = plt.subplots(nrows=2, num=342, sharex="all")
SC.INJ.Z0 += delta_z0
bnew = bpm_reading(SC)[:, bpm_inds]
bnew = bpm_reading(SC)[0][:, bpm_inds]
SC.INJ.Z0 -= delta_z0
s_bpm = s_pos[SC.ORD.BPM[bpm_inds]]
for n_dim in range(2):
Expand Down
2 changes: 1 addition & 1 deletion pySC/correction/orbit_trajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _check_ords(SC, Mplus, reference, BPMords, CMords):


def _bpm_reading_and_logging(SC, BPMords, ind_history=None, orb_history=None):
bpm_readings = bpm_reading(SC, bpm_ords=BPMords)
bpm_readings = bpm_reading(SC, bpm_ords=BPMords)[0]
bpms_reached = ~np.isnan(bpm_readings[0])
if ind_history is None or orb_history is None:
return bpm_readings, [np.sum(bpms_reached)], [np.sqrt(np.mean(np.square(bpm_readings[:, bpms_reached]), axis=1))]
Expand Down
2 changes: 1 addition & 1 deletion pySC/correction/rf.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _check_data(test_vec, bpm_shift, bpm_shift_err, min_num_points=6):


def _get_tbt_energy_shift(SC, bpm_ords):
bpm_readings = bpm_reading(SC, bpm_ords)
bpm_readings = bpm_reading(SC, bpm_ords)[0]
x_reading = np.reshape(bpm_readings[0, :], (SC.INJ.nTurns, len(bpm_ords)))
mean_tbt = np.mean(x_reading - x_reading[0, :], axis=1)
mean_tbt_err = np.std(x_reading, axis=1) / np.sqrt(x_reading.shape[1])
Expand Down
12 changes: 6 additions & 6 deletions pySC/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ def _marker(name):
# RF cavity correction
SC.INJ.nTurns = 5
for nIter in range(2):
SC = SCsynchPhaseCorrection(SC, nSteps=25, plotResults=False, plotProgress=False)
SC = correct_rf_phase(SC, n_steps=25, plot_results=False, plot_progress=False)

SC = SCsynchEnergyCorrection(SC, f_range=40E3 * np.array([-1, 1]), # Frequency range [kHz]
nSteps=15, # Number of frequency steps
plotResults=False, plotProgress=False)
SC = correct_rf_frequency(SC, f_range=40E3 * np.array([-1, 1]), # Frequency range [kHz]
n_steps=15, # Number of frequency steps
plot_results=False, plot_progress=False)

# Plot phasespace after RF correction
plot_phase_space(SC, nParticles=10, nTurns=100)
Expand All @@ -168,8 +168,8 @@ def _marker(name):
CUR = SCfeedbackRun(SC, MinvCO, target=0, maxsteps=50, scaleDisp=1E8)
except RuntimeError:
break
B0rms = np.sqrt(np.mean(np.square(bpm_reading(SC)), axis=1))
Brms = np.sqrt(np.mean(np.square(bpm_reading(CUR)), axis=1))
B0rms = np.sqrt(np.mean(np.square(bpm_reading(SC)[0]), axis=1))
Brms = np.sqrt(np.mean(np.square(bpm_reading(CUR)[0]), axis=1))
if np.mean(B0rms) < np.mean(Brms):
break
SC = CUR
Expand Down
14 changes: 7 additions & 7 deletions pySC/lattice_properties/response_measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def response_matrix(SC, amp, bpm_ords, cm_ords, mode='fixedKick', n_steps=2, fit
rm = np.full((2 * SC.INJ.nTurns * bpm_ords.shape[0], n_hcm + n_vcm), np.nan)
error = np.full((2 * SC.INJ.nTurns * bpm_ords.shape[0], n_hcm + n_vcm), np.nan)
cm_steps = [np.zeros((n_steps, n_hcm)), np.zeros((n_steps, n_vcm))]
bref = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords))
bref = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)[0])
if SC.INJ.trackMode == 'ORB' and np.sum(np.isnan(bref)):
raise ValueError('No closed orbit found.')
i = 0
Expand All @@ -38,7 +38,7 @@ def response_matrix(SC, amp, bpm_ords, cm_ords, mode='fixedKick', n_steps=2, fit
if cm_step_vec[n_step] != 0 and cm_step_vec[n_step] != max_step:
SC = set_cm_setpoints(SC, cm_ords[n_dim][nCM], cmstart[nCM] + cm_step_vec[n_step], skewness=bool(n_dim))
real_cm_setpoint[n_step] = get_cm_setpoints(SC, cm_ords[n_dim][nCM], skewness=bool(n_dim))
gradient[n_step, :] = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)) - bref
gradient[n_step, :] = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)[0]) - bref
dCM = real_cm_setpoint - cmstart[nCM]
cm_steps[n_dim][:, nCM] = dCM
rm[:, i] = gradient / dCM
Expand All @@ -59,10 +59,10 @@ def response_matrix(SC, amp, bpm_ords, cm_ords, mode='fixedKick', n_steps=2, fit

def dispersion(SC, rf_step, bpm_ords=None, cav_ords=None, n_steps=2):
bpm_ords, cav_ords = _check_ords(SC, bpm_ords, cav_ords)
bref = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords))
bref = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)[0])
if n_steps == 2:
SC = set_cavity_setpoints(SC, cav_ords, rf_step, 'Frequency', 'add')
B = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords))
B = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)[0])
SC = set_cavity_setpoints(SC, cav_ords, -rf_step, 'Frequency', 'add')
return (B - bref) / rf_step
rf_steps = np.zeros((len(cav_ords), n_steps))
Expand All @@ -72,7 +72,7 @@ def dispersion(SC, rf_step, bpm_ords=None, cav_ords=None, n_steps=2):
rf0 = atgetfieldvalues(SC.RING, cav_ords, "FrequencySetPoint")
for nStep in range(n_steps):
SC = set_cavity_setpoints(SC, cav_ords, rf_steps[:, nStep], 'Frequency', 'abs')
dB[nStep, :] = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)) - bref
dB[nStep, :] = np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)[0]) - bref
SC = set_cavity_setpoints(SC, cav_ords, rf0, 'Frequency', 'abs')
return np.linalg.lstsq(np.linspace(-rf_step, rf_step, n_steps), dB)[0]

Expand Down Expand Up @@ -103,7 +103,7 @@ def _kick_amplitude(SC, bref, bpm_ords, cm_ord, amp, skewness: bool, mode):
LOGGER.debug(
f'Insufficient beam reach ({max_pos:d}/{max_pos_ref:d}). '
f'cm_step reduced to {1E6 * max_step:.1f}urad.')
return max_step, np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)) - bref
return max_step, np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)[0]) - bref


def _try_setpoint(SC, bpm_ords, cm_ord, cmstart, max_step, skewness):
Expand All @@ -113,4 +113,4 @@ def _try_setpoint(SC, bpm_ords, cm_ord, cmstart, max_step, skewness):
LOGGER.debug('CM clipped. Using different CM direction.')
max_step *= -1
SC = set_cm_setpoints(SC, cm_ord, cmstart + max_step, skewness)
return SC, max_step, np.ravel(bpm_reading(SC, bpm_ords=bpm_ords))
return SC, max_step, np.ravel(bpm_reading(SC, bpm_ords=bpm_ords)[0])
4 changes: 2 additions & 2 deletions pySC/utils/matlab_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def SCgetBeamTransmission(SC: SimulatedCommissioning, /, *, nParticles: int = No


def SCgetBPMreading(SC, /, *, BPMords=None):
return bpm_reading(SC, bpm_ords=BPMords)
return bpm_reading(SC, bpm_ords=BPMords)[0]


def SCgetCMSetPoints(SC: SimulatedCommissioning, CMords: ndarray, nDim: int) -> ndarray:
Expand Down Expand Up @@ -200,7 +200,7 @@ def SCparticlesIn3D(*args):
def SCplotBPMreading(SC, B=None, T=None):
init_plot = SC.plot
SC.plot = True
bpm_reading(SC)
_ = bpm_reading(SC)[0]
SC.plot = init_plot
return SC

Expand Down
10 changes: 4 additions & 6 deletions tests/test_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,16 @@ def test_all_reading_basic(sc):


def test_bpm_reading_basic(sc):
bpm_readings = bpm_reading(sc)
bpm_readings = bpm_reading(sc)[0]
assert bpm_readings.shape == (2, sc.INJ.nTurns * sc.ORD.BPM.shape[0])
bpm_readings2 = bpm_reading(sc)
bpm_readings2 = bpm_reading(sc)[0]
assert not np.allclose(bpm_readings2, bpm_readings)
bpm_readings = bpm_reading(sc, np.arange(3))
assert bpm_readings.shape == (2, 0)
sc.INJ.trackMode = "PORB"
sc.INJ.nTurns = 3
bpm_readings = bpm_reading(sc, np.arange(18))
bpm_readings = bpm_reading(sc, np.arange(18))[0]
assert bpm_readings.shape == (2, 1)
sc.plot = True
bpm_readings = bpm_reading(sc, 20 * np.ones(2))
bpm_readings = bpm_reading(sc, 20 * np.ones(2))[0]
assert bpm_readings.shape == (2, 1)


Expand Down
4 changes: 2 additions & 2 deletions tests/test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def test_example(at_lattice):
cur = SCfeedbackRun(sc, minv_co, target=0, maxsteps=50, scaleDisp=1E8)
except RuntimeError:
break
B0rms = np.sqrt(np.mean(np.square(bpm_reading(sc)), axis=1))
Brms = np.sqrt(np.mean(np.square(bpm_reading(cur)), axis=1))
B0rms = np.sqrt(np.mean(np.square(bpm_reading(sc)[0]), axis=1))
Brms = np.sqrt(np.mean(np.square(bpm_reading(cur)[0]), axis=1))
if np.mean(B0rms) < np.mean(Brms):
break
sc = cur
Expand Down

0 comments on commit 44c5a17

Please sign in to comment.