diff --git a/src/pint/profile/__init__.py b/src/pint/profile/__init__.py index 34a9b21364..cc81b3047b 100644 --- a/src/pint/profile/__init__.py +++ b/src/pint/profile/__init__.py @@ -1,23 +1,137 @@ +"""Tools for working with pulse profiles. +The key tool here is `fftfit`, which allows one to find the phase shift that +optimally aligns a template with a profile, but there are also tools here for +doing those shifts and generating useful profiles. +""" +import numpy as np +import scipy.stats import pint.profile.fftfit_aarchiba import pint.profile.fftfit_nustar import pint.profile.fftfit_presto + +__all__ = [ + "wrap", + "vonmises_profile", + "upsample", + "shift", + "fftfit_full", + "fftfit_basic", +] + + +def wrap(a): + """Wrap a floating-point number or array to the range -0.5 to 0.5.""" + return (a + 0.5) % 1 - 0.5 + + +def zap_nyquist(profile): + if len(profile) % 2: + return profile + else: + c = np.fft.rfft(profile) + c[-1] = 0 + return np.fft.irfft(c) + + +def vonmises_profile(kappa, n, phase=0): + """Generate a profile based on a von Mises distribution. + + The von Mises distribution is a cyclic analogue of a Gaussian distribution. The width is + specified by the parameter kappa, which for large kappa is approximately 1/(2*pi*sigma**2). + """ + return np.diff( + scipy.stats.vonmises(kappa).cdf( + np.linspace(-2 * np.pi * phase, 2 * np.pi * (1 - phase), n + 1) + ) + ) + + +def upsample(profile, factor): + """Produce an up-sampled version of a pulse profile. + + This uses a Fourier algorithm, with zero in the new Fourier coefficients. + """ + output_len = len(profile) * factor + if output_len % 2: + raise ValueError("Cannot cope with odd output profile lengths") + c = np.fft.rfft(profile) + output_c = np.zeros(output_len // 2 + 1, dtype=complex) + output_c[: len(c)] = c * factor + output = np.fft.irfft(output_c) + assert len(output) == output_len + return output + + +def shift(profile, phase): + """Shift a profile in phase. + + This is a shift towards later phases - if your profile has a 1 in bin zero + and apply a phase shift of 1/4, the 1 will now be in bin n/4. If the + profile has even length, do not modify the Nyquist component. + """ + c = np.fft.rfft(profile) + if len(profile) % 2: + c *= np.exp(-2.0j * np.pi * phase * np.arange(len(c))) + else: + c[:-1] *= np.exp(-2.0j * np.pi * phase * np.arange(len(c) - 1)) + return np.fft.irfft(c, len(profile)) + + +def irfft_value(c, phase, n=None): + """Evaluate the inverse real FFT at a particular position. + + If the phase is one of the usual grid points the result will agree with + the results of `np.fft.irfft` there. + + No promises if n is small enough to imply truncation. + """ + natural_n = (len(c) - 1) * 2 + if n is None: + n = natural_n + phase = np.asarray(phase) + s = phase.shape + phase = np.atleast_1d(phase) + c = np.array(c) + c[0] /= 2 + if n == natural_n: + c[-1] /= 2 + return ( + ( + c[:, None] + * np.exp(2.0j * np.pi * phase[None, :] * np.arange(len(c))[:, None]) + ) + .sum(axis=0) + .real + * 2 + / n + ).reshape(s) + + def fftfit_full(template, profile, code="aarchiba"): - if code=="aarchiba": + """Match template to profile and return match properties. + + The returned object has a `.shift` attribute indicating the optimal shift, + a `.uncertainty` attribute containting an estimate of the uncertainty, and + possibly certain other attributes depending on which version of the code is + run. + """ + if code == "aarchiba": return pint.profile.fftfit_aarchiba.fftfit_full(template, profile) - elif code=="nustar": + elif code == "nustar": return pint.profile.fftfit_nustar.fftfit_full(template, profile) - elif code=="presto": + elif code == "presto": if pint.profile.fftfit_presto.presto is None: raise ValueError("The PRESTO compiled code is not available") return pint.profile.fftfit_presto.fftfit_full(template, profile) else: raise ValueError("Unrecognized FFTFIT implementation {}".format(code)) + def fftfit_basic(template, profile, code="aarchiba"): - if code=="aarchiba": + """Return the optimal phase shift to match template to profile.""" + if code == "aarchiba": return pint.profile.fftfit_aarchiba.fftfit_basic(template, profile) else: return fftfit_full(template, profile, code=code).shift - diff --git a/src/pint/profile/fftfit_aarchiba.py b/src/pint/profile/fftfit_aarchiba.py index 67be9a0dc9..229cc67556 100644 --- a/src/pint/profile/fftfit_aarchiba.py +++ b/src/pint/profile/fftfit_aarchiba.py @@ -10,82 +10,7 @@ import scipy.optimize import scipy.stats - -def wrap(a): - return (a + 0.5) % 1 - 0.5 - - -def zap_nyquist(profile): - if len(profile) % 2: - return profile - else: - c = np.fft.rfft(profile) - c[-1] = 0 - return np.fft.irfft(c) - - -def vonmises_profile(kappa, n, phase=0): - return np.diff( - scipy.stats.vonmises(kappa).cdf( - np.linspace(-2 * np.pi * phase, 2 * np.pi * (1 - phase), n + 1) - ) - ) - - -def upsample(profile, factor): - """Produce an up-sampled version of a pulse profile""" - output_len = len(profile) * factor - if output_len % 2: - raise ValueError("Cannot cope with odd output profile lengths") - c = np.fft.rfft(profile) - output_c = np.zeros(output_len // 2 + 1, dtype=complex) - output_c[: len(c)] = c * factor - output = np.fft.irfft(output_c) - assert len(output) == output_len - return output - - -def shift(profile, phase): - """Shift a profile in phase - - If the profile has even length, do not modify the Nyquist component. - """ - c = np.fft.rfft(profile) - if len(profile) % 2: - c *= np.exp(-2.0j * np.pi * phase * np.arange(len(c))) - else: - c[:-1] *= np.exp(-2.0j * np.pi * phase * np.arange(len(c) - 1)) - return np.fft.irfft(c, len(profile)) - - -def irfft_value(c, phase, n=None): - """Evaluate the inverse real FFT at a particular position - - If the phase is one of the usual grid points the result will agree with - the results of `np.fft.irfft` there. - - No promises if n is small enough to imply truncation. - """ - natural_n = (len(c) - 1) * 2 - if n is None: - n = natural_n - phase = np.asarray(phase) - s = phase.shape - phase = np.atleast_1d(phase) - c = np.array(c) - c[0] /= 2 - if n == natural_n: - c[-1] /= 2 - return ( - ( - c[:, None] - * np.exp(2.0j * np.pi * phase[None, :] * np.arange(len(c))[:, None]) - ) - .sum(axis=0) - .real - * 2 - / n - ).reshape(s) +import pint.profile class FFTFITResult: @@ -121,7 +46,7 @@ def fftfit_full( l, r = x - 1 / len(ccf), x + 1 / len(ccf) def gof(x): - return -irfft_value(ccf_c, x, n_long) + return -pint.profile.irfft_value(ccf_c, x, n_long) res = scipy.optimize.minimize_scalar( gof, bounds=(l, r), method="Bounded", options=dict(xatol=1e-5 / n_c) @@ -155,7 +80,9 @@ def gof(x): if compute_uncertainty: if std is None: - resid = r.scale * shift(template, r.shift) + r.offset - profile + resid = ( + r.scale * pint.profile.shift(template, r.shift) + r.offset - profile + ) std = np.sqrt(np.mean(resid ** 2)) J = np.zeros((2 * len(s_c) - 2, 2)) diff --git a/src/pint/profile/fftfit_presto.py b/src/pint/profile/fftfit_presto.py index 2f482ebeec..405185d9fb 100644 --- a/src/pint/profile/fftfit_presto.py +++ b/src/pint/profile/fftfit_presto.py @@ -1,28 +1,38 @@ import numpy as np from numpy.fft import rfft -from pint.profile.fftfit_aarchiba import wrap +import pint.profile try: import presto.fftfit except ImportError: presto = None + class FFTFITResult: pass + def fftfit_full(template, profile): if len(template) != len(profile): - raise ValueError("template has length {} but profile has length {}".format(len(template),len(profile))) - if len(template) > 2**13: - raise ValueError("template has length {} which is too long".format(len(template))) + raise ValueError( + "template has length {} but profile has length {}".format( + len(template), len(profile) + ) + ) + if len(template) > 2 ** 13: + raise ValueError( + "template has length {} which is too long".format(len(template)) + ) tc = rfft(template) - shift, eshift, snr, esnr, b, errb, ngood = presto.fftfit.fftfit(profile, np.abs(tc)[1:], -np.angle(tc)[1:]) + shift, eshift, snr, esnr, b, errb, ngood = presto.fftfit.fftfit( + profile, np.abs(tc)[1:], -np.angle(tc)[1:] + ) r = FFTFITResult() # Need to add 1 to the shift for some reason - r.shift = wrap((shift + 1)/len(template)) - r.uncertainty = eshift/len(template) + r.shift = pint.profile.wrap((shift + 1) / len(template)) + r.uncertainty = eshift / len(template) return r + def fftfit_basic(template, profile): return fftfit_full(template, profile).shift - diff --git a/tests/test_fftfit.py b/tests/test_fftfit.py index e6d02e8869..223c76d9e7 100644 --- a/tests/test_fftfit.py +++ b/tests/test_fftfit.py @@ -10,26 +10,23 @@ complex_numbers, composite, floats, - fractions, integers, just, one_of, ) -from numpy.testing import assert_allclose, assert_array_almost_equal - -import pint.profile.fftfit_aarchiba as fftfit -from pint.profile import fftfit_aarchiba -from pint.profile import fftfit_nustar -from pint.profile import fftfit_presto -from pint.profile import fftfit_full, fftfit_basic - - -fftfit_basics = [fftfit_aarchiba.fftfit_basic, fftfit_nustar.fftfit_basic] -fftfit_fulls = [fftfit_aarchiba.fftfit_full, fftfit_nustar.fftfit_full] - -if fftfit_presto.presto is not None: - fftfit_basics.append(fftfit_presto.fftfit_basic) - fftfit_fulls.append(fftfit_presto.fftfit_full) +from numpy.testing import assert_allclose + +import pint.profile +from pint.profile import ( + wrap, + vonmises_profile, + irfft_value, + fftfit_aarchiba, + fftfit_basic, + fftfit_full, + fftfit_nustar, + fftfit_presto, +) NO_PRESTO = fftfit_presto.presto is None @@ -46,9 +43,9 @@ def assert_rms_close(a, b, rtol=1e-8, atol=1e-8, name=None): def assert_allclose_phase(a, b, atol=1e-8, name=None): __tracebackhide__ = True if name is not None: - target(np.abs(fftfit.wrap(a - b)).max(), label="{} max".format(name)) - target(np.abs(fftfit.wrap(a - b)).mean(), label="{} mean".format(name)) - assert np.all(np.abs(fftfit.wrap(a - b)) <= atol) + target(np.abs(wrap(a - b)).max(), label="{} max".format(name)) + target(np.abs(wrap(a - b)).mean(), label="{} mean".format(name)) + assert np.all(np.abs(wrap(a - b)) <= atol) ONE_SIGMA = 1 - 2 * scipy.stats.norm().sf(1) @@ -91,13 +88,13 @@ def powers_of_two(draw): @composite def vonmises_templates(draw, ns=powers_of_two(), phase=floats(0, 1)): - return fftfit.vonmises_profile(draw(floats(1, 1000)), draw(ns), draw(phase)) + return vonmises_profile(draw(floats(1, 1000)), draw(ns), draw(phase)) @composite def vonmises_templates_noisy(draw, ns=powers_of_two(), phase=floats(0, 1)): n = draw(ns) - return fftfit.vonmises_profile(draw(floats(1, 1000)), n, draw(phase)) + ( + return vonmises_profile(draw(floats(1, 1000)), n, draw(phase)) + ( 1e-3 / n ) * np.random.default_rng(0).standard_normal(n) @@ -166,7 +163,7 @@ def test_irfft_value(c, n): c[0] = c[0].real c[-1] = 0 xs = np.linspace(0, 1, n, endpoint=False) - assert_rms_close(np.fft.irfft(c, n), fftfit.irfft_value(c, xs, n)) + assert_rms_close(np.fft.irfft(c, n), irfft_value(c, xs, n)) @given( @@ -176,12 +173,14 @@ def test_irfft_value(c, n): ) def test_irfft_value_one(c, n, x): assume(n >= 2 * (len(c) - 1)) - fftfit.irfft_value(c, x, n) + irfft_value(c, x, n) @given(floats(0, 1), one_of(vonmises_templates_noisy(), random_templates())) def test_shift_invertible(s, template): - assert_allclose(template, fftfit.shift(fftfit.shift(template, s), -s), atol=1e-14) + assert_allclose( + template, pint.profile.shift(pint.profile.shift(template, s), -s), atol=1e-14 + ) @given(integers(0, 2 ** 20), floats(1, 1000), integers(5, 16), floats(0, 1)) @@ -208,12 +207,12 @@ def test_fftfit_basic_integer_vonmises(code, i, kappa, profile_length, phase): if code == "presto": assume(profile_length <= 13) n = 2 ** profile_length - template = fftfit.vonmises_profile(kappa, n, phase) + ( - 1e-3 / n - ) * np.random.default_rng(0).standard_normal(n) + template = vonmises_profile(kappa, n, phase) + (1e-3 / n) * np.random.default_rng( + 0 + ).standard_normal(n) assume(sum(template > 0.5 * template.max()) > 1) s = i / len(template) - rs = fftfit_basic(template, fftfit.shift(template, s), code=code) + rs = fftfit_basic(template, pint.profile.shift(template, s), code=code) assert_allclose_phase(s, rs, atol=1 / (32 * len(template)), name="shift") @@ -241,7 +240,7 @@ def test_fftfit_basic_integer(code, i, template): if code != "aarchiba": assume(len(template) >= 32) s = i / len(template) - rs = fftfit_basic(template, fftfit.shift(template, s), code=code) + rs = fftfit_basic(template, pint.profile.shift(template, s), code=code) assert_allclose_phase(s, rs, name="shift") @@ -267,7 +266,7 @@ def test_fftfit_basic_integer(code, i, template): ) def test_fftfit_basic_integer_fraction(code, i, template): s = i / len(template) / 2 ** 5 - rs = fftfit_basic(template, fftfit.shift(template, s), code=code) + rs = fftfit_basic(template, pint.profile.shift(template, s), code=code) assert_allclose_phase(rs, s, atol=1e-4 / len(template), name="shift") @@ -294,10 +293,10 @@ def test_fftfit_basic_integer_fraction(code, i, template): def test_fftfit_basic_subbin(code, s, kappa, n): if code != "aarchiba": assume(n >= 32) - template = fftfit.vonmises_profile(kappa, n) + (1e-3 / n) * np.random.default_rng( + template = vonmises_profile(kappa, n) + (1e-3 / n) * np.random.default_rng( 0 ).standard_normal(n) - rs = fftfit_basic(template, fftfit.shift(template, s / n), code=code) + rs = fftfit_basic(template, pint.profile.shift(template, s / n), code=code) assert_allclose_phase(rs, s / n, atol=1e-4 / len(template), name="shift") @@ -327,7 +326,7 @@ def test_fftfit_basic_subbin(code, s, kappa, n): def test_fftfit_basic_template(code, s, template): if code != "aarchiba": assume(len(template) >= 32) - rs = fftfit_basic(template, fftfit.shift(template, s), code=code) + rs = fftfit_basic(template, pint.profile.shift(template, s), code=code) assert_allclose_phase(rs, s, atol=1e-3 / len(template), name="shift") @@ -388,7 +387,7 @@ def test_fftfit_shift_equivalence(code, profile1, profile2): assume(len(profile1) >= 32) s = fftfit_basic(profile1, profile2, code=code) assert_allclose_phase( - fftfit_basic(fftfit.shift(profile1, s), profile2, code=code), + fftfit_basic(pint.profile.shift(profile1, s), profile2, code=code), 0, atol=1e-3 / min(len(profile1), len(profile2)), name="shift", @@ -402,14 +401,14 @@ def test_fftfit_shift_equivalence(code, profile1, profile2): one_of(just(0.0), floats(-1, 1), floats(-1e5, 1e5)), ) def test_fftfit_compute_scale(template, s, a, b): - profile = a * fftfit.shift(template, s) + b - r = fftfit.fftfit_full(template, profile) + profile = a * pint.profile.shift(template, s) + b + r = fftfit_full(template, profile) assert_allclose_phase(s, r.shift, atol=1e-3 / len(template), name="shift") assert_allclose(b, r.offset, atol=a * 1e-8) assert_allclose(a, r.scale, atol=(1 + abs(b)) * 1e-8) assert_rms_close( profile, - r.scale * fftfit.shift(template, r.shift) + r.offset, + r.scale * pint.profile.shift(template, r.shift) + r.offset, atol=1e-7, name="profile", ) @@ -418,12 +417,12 @@ def test_fftfit_compute_scale(template, s, a, b): @pytest.mark.parametrize("kappa,n,std", [(10, 64, 0.01), (100, 1024, 0.02)]) @randomized_test() def test_fftfit_uncertainty_template(kappa, n, std, state): - template = fftfit.vonmises_profile(kappa, n) - r = fftfit.fftfit_full(template, template, std=std) + template = vonmises_profile(kappa, n) + r = fftfit_aarchiba.fftfit_full(template, template, std=std) def gen_shift(): - return fftfit.wrap( - fftfit.fftfit_basic( + return wrap( + fftfit_basic( template, template + std * state.standard_normal((len(template),)) ) ) @@ -444,11 +443,13 @@ def gen_shift(): ) def test_fftfit_uncertainty_scaling_invariance(kappa, n, std, shift, scale, offset): state = np.random.default_rng(0) - template = fftfit.vonmises_profile(kappa, n) - profile = fftfit.shift(template, shift) + std * state.standard_normal(len(template)) + template = vonmises_profile(kappa, n) + profile = pint.profile.shift(template, shift) + std * state.standard_normal( + len(template) + ) - r_1 = fftfit.fftfit_full(template, profile) - r_2 = fftfit.fftfit_full(template, scale * profile + offset) + r_1 = fftfit_full(template, profile) + r_2 = fftfit_full(template, scale * profile + offset) assert_allclose_phase(r_2.shift, r_1.shift, 1.0 / (32 * n)) assert_allclose(r_2.uncertainty, r_1.uncertainty, rtol=1e-3) @@ -476,19 +477,19 @@ def test_fftfit_uncertainty_estimate( kappa, n, std, shift, scale, offset, estimate, state ): """Check the noise level estimation works.""" - template = fftfit.vonmises_profile(kappa, n) + template = vonmises_profile(kappa, n) def value_within_one_sigma(): profile = ( - fftfit.shift(template, shift) + pint.profile.shift(template, shift) + offset + std * state.standard_normal(len(template)) ) if estimate: - r = fftfit.fftfit_full(template, scale * profile) + r = fftfit_aarchiba.fftfit_full(template, scale * profile) else: - r = fftfit.fftfit_full(template, scale * profile, std=scale * std) - return np.abs(fftfit.wrap(r.shift - shift)) < r.uncertainty + r = fftfit_aarchiba.fftfit_full(template, scale * profile, std=scale * std) + return np.abs(wrap(r.shift - shift)) < r.uncertainty assert_happens_with_probability(value_within_one_sigma, ONE_SIGMA) @@ -614,23 +615,23 @@ def test_fftfit_value(kappa, n, std, shift, scale, offset, code, state): one sigma as defined by the uncertainty returned by the aarchiba version of the code (this is presumably a trusted uncertainty). """ - template = fftfit.vonmises_profile(kappa, n) + template = vonmises_profile(kappa, n) profile = ( - fftfit.shift(template, shift) + pint.profile.shift(template, shift) + offset + std * state.standard_normal(len(template)) ) - r_true = fftfit.fftfit_full(template, scale * profile, std=scale * std) + r_true = fftfit_aarchiba.fftfit_full(template, scale * profile, std=scale * std) assert r_true.uncertainty < 0.1, "This uncertainty is too big for accuracy" def value_within_one_sigma(): profile = ( - fftfit.shift(template, shift) + pint.profile.shift(template, shift) + offset + std * state.standard_normal(len(template)) ) r = fftfit_full(template, scale * profile, code=code) - return np.abs(fftfit.wrap(r.shift - shift)) < r_true.uncertainty + return np.abs(wrap(r.shift - shift)) < r_true.uncertainty assert_happens_with_probability(value_within_one_sigma, ONE_SIGMA) @@ -757,17 +758,17 @@ def value_within_one_sigma(): @randomized_test(tries=8) def test_fftfit_value_vs_uncertainty(kappa, n, std, shift, scale, offset, code, state): """Check if the scatter matches the claimed uncertainty.""" - template = fftfit.vonmises_profile(kappa, n) + template = vonmises_profile(kappa, n) def value_within_one_sigma(): profile = ( - fftfit.shift(template, shift) + pint.profile.shift(template, shift) + offset + std * state.standard_normal(len(template)) ) r = fftfit_full(template, scale * profile, code=code) assert r.uncertainty < 0.1, "This uncertainty is too big for accuracy" - return np.abs(fftfit.wrap(r.shift - shift)) < r.uncertainty + return np.abs(wrap(r.shift - shift)) < r.uncertainty assert_happens_with_probability(value_within_one_sigma, ONE_SIGMA) @@ -793,7 +794,10 @@ def value_within_one_sigma(): 256, 0.01, "presto", - marks=[pytest.mark.xfail(reason="bug?"), pytest.mark.skipif(NO_PRESTO, reason="PRESTO not available")], + marks=[ + pytest.mark.xfail(reason="bug?"), + pytest.mark.skipif(NO_PRESTO, reason="PRESTO not available"), + ], ), pytest.param( 10, @@ -817,7 +821,10 @@ def value_within_one_sigma(): 256, 0.01, "presto", - marks=[pytest.mark.xfail(reason="bug?"), pytest.mark.skipif(NO_PRESTO, reason="PRESTO not available")], + marks=[ + pytest.mark.xfail(reason="bug?"), + pytest.mark.skipif(NO_PRESTO, reason="PRESTO not available"), + ], ), pytest.param( 11, @@ -840,16 +847,16 @@ def value_within_one_sigma(): @randomized_test(tries=8) def test_fftfit_wrong_profile(kappa1, kappa2, n, std, code, state): """Check that the uncertainty is okay or pessimistic if the template is wrong.""" - template = fftfit.vonmises_profile(kappa1, n) - wrong_template = fftfit.vonmises_profile(kappa2, n) + template = vonmises_profile(kappa1, n) + wrong_template = vonmises_profile(kappa2, n) def value_within_one_sigma(): shift = state.uniform(0, 1) - profile = fftfit.shift(template, shift) + std * state.standard_normal( + profile = pint.profile.shift(template, shift) + std * state.standard_normal( len(template) ) r = fftfit_full(wrong_template, profile, code=code) - return np.abs(fftfit.wrap(r.shift - shift)) < r.uncertainty + return np.abs(wrap(r.shift - shift)) < r.uncertainty # Must be pessimistic assert_happens_with_probability(value_within_one_sigma, ONE_SIGMA, p_upper=1)