diff --git a/docs/changes/737.feature.rst b/docs/changes/737.feature.rst new file mode 100644 index 000000000..70e8eb97b --- /dev/null +++ b/docs/changes/737.feature.rst @@ -0,0 +1,2 @@ +- Implemented the Lomb Scargle Fourier Transform (fast and slow versions) +- Using which wrote the corresponding :class:`LombScargleCrossspectrum` and :class:`LombScarglePowerspectrum` \ No newline at end of file diff --git a/docs/core.rst b/docs/core.rst index e9f3353f7..ac15d1c16 100644 --- a/docs/core.rst +++ b/docs/core.rst @@ -1,9 +1,9 @@ Core Stingray Functionality *************************** -Here we show how many of the core Stingray classes and methods -work in practice. We start with basic data constructs for -event data and light curve data, and then show how to produce +Here we show how many of the core Stingray classes and methods +work in practice. We start with basic data constructs for +event data and light curve data, and then show how to produce various Fourier products from these data sets. Working with Event Data @@ -85,3 +85,20 @@ Multi-taper Periodogram :maxdepth: 2 notebooks/Multitaper/multitaper_example.ipynb + + +Lomb Scargle Crossspectrum +-------------------------- +.. toctree:: + :maxdepth: 2 + + notebooks/LombScargle/LombScargleCrossspectrum_tutorial.ipynb + + +Lomb Scargle Powerspectrum +-------------------------- + +.. toctree:: + :maxdepth: 2 + + notebooks/LombScargle/LombScarglePowerspectrum_tutorial.ipynb diff --git a/docs/dataexplo.rst b/docs/dataexplo.rst index 7183275c1..df9b98a85 100644 --- a/docs/dataexplo.rst +++ b/docs/dataexplo.rst @@ -26,3 +26,15 @@ black hole binary using NICER data. :maxdepth: 2 notebooks/Spectral Timing/Spectral Timing Exploration.ipynb + + +Studying very slow variability with the Lomb-Scargle periodogram +================================================================ + +In this Tutorial, we will show an example of how to use the Lomb-Scargle +periodogram and cross spectrum to study very slow variability in a light curve. + +.. toctree:: + :maxdepth: 2 + + notebooks/LombScargle/Very slow variability with Lomb-Scargle methods.ipynb diff --git a/docs/index.rst b/docs/index.rst index 57760105f..b250d1297 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -21,26 +21,35 @@ Features Current Capabilities -------------------- -Currently implemented functionality in this library comprises: +1. Data handling and simulation +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * loading event lists from fits files of a few missions (RXTE/PCA, NuSTAR/FPM, XMM-Newton/EPIC, NICER/XTI) * constructing light curves from event data, various operations on light curves (e.g. addition, subtraction, joining, and truncation) +* simulating a light curve with a given power spectrum +* simulating a light curve from another light curve and a 1-d (time) or 2-d (time-energy) impulse response +* simulating an event list from a given light curve _and_ with a given energy spectrum * Good Time Interval operations -* power spectra in Leahy, rms normalization, absolute rms and no normalization -* averaged power spectra -* dynamical power spectra + +2. Fourier methods +~~~~~~~~~~~~~~~~~~ +* power spectra and cross spectra in Leahy, rms normalization, absolute rms and no normalization +* averaged power spectra and cross spectra +* dynamical power spectra and cross spectra * maximum likelihood fitting of periodograms/parametric models * (averaged) cross spectra * coherence, time lags -* cross correlation functions -* RMS spectra and lags (time vs energy, time vs frequency); *needs testing* +* Variability-Energy spectra, like covariance spectra and lags *needs testing* * covariance spectra; *needs testing* * bispectra; *needs testing* * (Bayesian) quasi-periodic oscillation searches -* simulating a light curve with a given power spectrum -* simulating a light curve from another light curve and a 1-d (time) or 2-d (time-energy) impulse response -* simulating an event list from a given light curve _and_ with a given energy spectrum +* Lomb-Scargle periodograms and cross spectra + +3. Other time series methods +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * pulsar searches with Epoch Folding, :math:`Z^2_n` test +* Gaussian Processes for QPO studies +* cross correlation functions Future Plans ------------ diff --git a/docs/notebooks b/docs/notebooks index fc7804914..4742e8ff0 160000 --- a/docs/notebooks +++ b/docs/notebooks @@ -1 +1 @@ -Subproject commit fc780491476f7981a1331feee6837ad047b44999 +Subproject commit 4742e8ff0795fd5ea94e04d3f4581a42c9c6924c diff --git a/stingray/__init__.py b/stingray/__init__.py index 33420352e..4131ebd45 100644 --- a/stingray/__init__.py +++ b/stingray/__init__.py @@ -12,6 +12,7 @@ from stingray.events import * from stingray.lightcurve import * from stingray.utils import * + from stingray.lombscargle import * from stingray.powerspectrum import * from stingray.crossspectrum import * from stingray.multitaper import * @@ -22,3 +23,4 @@ from stingray.stats import * from stingray.bispectrum import * from stingray.varenergyspectrum import * + from stingray.lombscargle import * diff --git a/stingray/fourier.py b/stingray/fourier.py index b298e2bd5..2210c0fa2 100644 --- a/stingray/fourier.py +++ b/stingray/fourier.py @@ -1,15 +1,18 @@ import copy import warnings from collections.abc import Iterable +from typing import Optional import numpy as np +import numpy.typing as npt from astropy.table import Table +from astropy.timeseries.periodograms.lombscargle.implementations.utils import trig_sum from .gti import ( generate_indices_of_segment_boundaries_binned, generate_indices_of_segment_boundaries_unbinned, ) -from .utils import histogram, show_progress, sum_if_not_none_or_initialize, fft, fftfreq +from .utils import fft, fftfreq, histogram, show_progress, sum_if_not_none_or_initialize def positive_fft_bins(n_bin, include_zero=False): @@ -1998,3 +2001,218 @@ def avg_cs_from_events( if results is not None: results.meta["gti"] = gti return results + + +def lsft_fast( + y: npt.ArrayLike, + t: npt.ArrayLike, + freqs: npt.ArrayLike, + sign: Optional[int] = 1, + oversampling: Optional[int] = 5, +) -> npt.NDArray: + """ + Calculates the Lomb-Scargle Fourier transform of a light curve. + Only considers non-negative frequencies. + Subtracts mean from data as it is required for the working of the algorithm. + + Parameters + ---------- + y : a :class:`numpy.array` of floats + Observations to be transformed. + + t : :class:`numpy.array` of floats + Times of the observations + + freqs : :class:`numpy.array` + An array of frequencies at which the transform is sampled. + + sign : int, optional, default: 1 + The sign of the fourier transform. 1 implies positive sign and -1 implies negative sign. + + fullspec : bool, optional, default: False + Return LSFT values for full frequency array (True) or just positive frequencies (False). + + oversampling : float, optional, default: 5 + Interpolation Oversampling Factor + + Returns + ------- + ft_res : numpy.ndarray + An array of Fourier transformed data. + """ + y_ = copy.deepcopy(y) - np.mean(y) + freqs = freqs[freqs >= 0] + # Constants initialization + sum_xx = np.sum(y_) + num_xt = len(y_) + num_ww = len(freqs) + + # Arrays initialization + ft_real = ft_imag = np.zeros((num_ww)) + f0, df, N = freqs[0], freqs[1] - freqs[0], len(freqs) + + # Sum (y_i * cos(wt - wtau)) + Sh, Ch = trig_sum(t, y_, df, N, f0, oversampling=oversampling) + + # Angular Frequency + w = freqs * 2 * np.pi + + # Summation of cos(2wt) and sin(2wt) + csum, ssum = trig_sum( + t, np.ones_like(y_) / len(y_), df, N, f0, freq_factor=2, oversampling=oversampling + ) + + wtau = 0.5 * np.arctan2(ssum, csum) + + S2, C2 = trig_sum( + t, + np.ones_like(y_), + df, + N, + f0, + freq_factor=2, + oversampling=oversampling, + ) + + const = np.sqrt(0.5) * np.sqrt(num_ww) + + coswtau = np.cos(wtau) + sinwtau = np.sin(wtau) + + sumr = Ch * coswtau + Sh * sinwtau + sumi = Sh * coswtau - Ch * sinwtau + + cos2wtau = np.cos(2 * wtau) + sin2wtau = np.sin(2 * wtau) + + scos2 = 0.5 * (N + C2 * cos2wtau + S2 * sin2wtau) + ssin2 = 0.5 * (N - C2 * cos2wtau - S2 * sin2wtau) + + ft_real = const * sumr / np.sqrt(scos2) + ft_imag = const * np.sign(sign) * sumi / np.sqrt(ssin2) + + ft_real[freqs == 0] = sum_xx / np.sqrt(num_xt) + ft_imag[freqs == 0] = 0 + + phase = wtau - w * t[0] + + ft_res = np.complex128(ft_real + (1j * ft_imag)) * np.exp(-1j * phase) + + return ft_res + + +def lsft_slow( + y: npt.ArrayLike, + t: npt.ArrayLike, + freqs: npt.ArrayLike, + sign: Optional[int] = 1, +) -> npt.NDArray: + """ + Calculates the Lomb-Scargle Fourier transform of a light curve. + Only considers non-negative frequencies. + Subtracts mean from data as it is required for the working of the algorithm. + + Parameters + ---------- + y : a `:class:numpy.array` of floats + Observations to be transformed. + + t : `:class:numpy.array` of floats + Times of the observations + + freqs : numpy.ndarray + An array of frequencies at which the transform is sampled. + + sign : int, optional, default: 1 + The sign of the fourier transform. 1 implies positive sign and -1 implies negative sign. + + fullspec : bool, optional, default: False + Return LSFT values for full frequency array (True) or just positive frequencies (False). + + Returns + ------- + ft_res : numpy.ndarray + An array of Fourier transformed data. + """ + y_ = y - np.mean(y) + freqs = np.asarray(freqs[np.asarray(freqs) >= 0]) + + ft_real = np.zeros_like(freqs) + ft_imag = np.zeros_like(freqs) + ft_res = np.zeros_like(freqs, dtype=np.complex128) + + num_y = y_.shape[0] + num_freqs = freqs.shape[0] + sum_y = np.sum(y_) + const1 = np.sqrt(0.5 * num_y) + const2 = const1 * np.sign(sign) + ft_real = ft_imag = np.zeros(num_freqs) + ft_res = np.zeros(num_freqs, dtype=np.complex128) + + for i in range(num_freqs): + wrun = freqs[i] * 2 * np.pi + if wrun == 0: + ft_real = sum_y / np.sqrt(num_y) + ft_imag = 0 + phase_this = 0 + else: + # Calculation of \omega \tau (II.5) -- + csum = np.sum(np.cos(2.0 * wrun * t)) + ssum = np.sum(np.sin(2.0 * wrun * t)) + + watan = np.arctan2(ssum, csum) + wtau = 0.5 * watan + # -- + # In the following, instead of t'_n we are using \omega t'_n = \omega t - \omega\tau + + # Terms of kind X_n * cos or sin (II.1) -- + sumr = np.sum(y_ * np.cos(wrun * t - wtau)) + sumi = np.sum(y_ * np.sin(wrun * t - wtau)) + # -- + + # A and B before the square root and inversion in (II.3) -- + scos2 = np.sum(np.power(np.cos(wrun * t - wtau), 2)) + ssin2 = np.sum(np.power(np.sin(wrun * t - wtau), 2)) + + # const2 is const1 times the sign. + # It's the F0 in II.2 without the phase factor + # The sign decides whether we are calculating the direct or inverse transform + ft_real = const1 * sumr / np.sqrt(scos2) + ft_imag = const2 * sumi / np.sqrt(ssin2) + + phase_this = wtau - wrun * t[0] + + ft_res[i] = np.complex128(ft_real + (1j * ft_imag)) * np.exp(-1j * phase_this) + return ft_res + + +def impose_symmetry_lsft( + ft_res: npt.ArrayLike, + sum_y: float, + len_y: int, + freqs: npt.ArrayLike, +) -> npt.ArrayLike: + """ + Impose symmetry on the input fourier transform. + + Parameters + ---------- + ft_res : np.array + The Fourier transform of the signal. + sum_y : float + The sum of the values of the signal. + len_y : int + The length of the signal. + freqs : np.array + An array of frequencies at which the transform is sampled. + + Returns + ------- + lsft_res : np.array + The Fourier transform of the signal with symmetry imposed. + freqs_new : np.array + The new frequencies + """ + ft_res_new = np.concatenate([np.conjugate(np.flip(ft_res)), [0.0], ft_res]) + freqs_new = np.concatenate([np.flip(-freqs), [0.0], freqs]) + return ft_res_new, freqs_new diff --git a/stingray/lightcurve.py b/stingray/lightcurve.py index 9617e3e0a..9b76965f2 100644 --- a/stingray/lightcurve.py +++ b/stingray/lightcurve.py @@ -591,6 +591,7 @@ def check_lightcurve(self): "Bin sizes in input time array aren't equal throughout! " "This could cause problems with Fourier transforms. " "Please make the input time evenly sampled." + "Only use with LombScargleCrossspectrum, LombScarglePowerspectrum and QPO using GPResult" ) def _operation_with_other_lc(self, other, operation): diff --git a/stingray/lombscargle.py b/stingray/lombscargle.py new file mode 100644 index 000000000..1018eed69 --- /dev/null +++ b/stingray/lombscargle.py @@ -0,0 +1,690 @@ +import copy +import warnings +from typing import Optional, Union + +import numpy as np +import numpy.typing as npt +from astropy.timeseries.periodograms import LombScargle + +from .crossspectrum import Crossspectrum +from .events import EventList +from .exceptions import StingrayError +from .fourier import impose_symmetry_lsft, lsft_fast, lsft_slow +from .lightcurve import Lightcurve +from .utils import simon + + +__all__ = ["LombScarglePowerspectrum", "LombScargleCrossspectrum"] + + +def _autofrequency(min_freq=None, max_freq=None, df=None, dt=None, length=None, nyquist_factor=1): + """Decide the frequency grid for the periodogram if not provided explicitly. + + Parameters + ---------- + min_freq : float + Minimum frequency to take the Lomb-Scargle Fourier Transform + max_freq : float + Maximum frequency to take the Lomb-Scargle Fourier Transform + df : float + The frequency resolution of the final periodogram. Defaults to 1 / length. + dt : float + The time resolution of the light curve. + length : float + The total length of the light curve. + + Returns + ------- + freq : numpy.ndarray + The array of mid-bin frequencies that the Fourier transform samples + + Examples + -------- + >>> freqs = _autofrequency(min_freq=0.1, max_freq=0.5, df=0.1) + >>> np.allclose(freqs, [0.1, 0.2, 0.3, 0.4, 0.5]) + True + >>> freqs = _autofrequency(min_freq=0.1, max_freq=0.5, length=10) + >>> np.allclose(freqs, [0.1, 0.2, 0.3, 0.4, 0.5]) + True + >>> freqs = _autofrequency(min_freq=0.1, dt=1, length=10) + >>> np.allclose(freqs, [0.1, 0.2, 0.3, 0.4, 0.5]) + True + >>> freqs = _autofrequency(max_freq=0.5, df=0.2) + >>> np.allclose(freqs, [0.1, 0.3, 0.5]) + True + """ + + if (df is None or df <= 0) and length is None: + raise ValueError("Either df or length must be specified.") + elif df is None or df <= 0: + df = 1 / length + + if max_freq is None and (dt is None or dt == 0): + raise ValueError("Either max_freq or dt must be specified.") + elif max_freq is None: + max_freq = nyquist_factor * 0.5 / dt + + if min_freq is None: + min_freq = df / 2 + elif min_freq <= 0: + warnings.warn("min_freq must be positive and >0. Setting to df / 2.") + min_freq = df / 2 + + freq = np.arange(min_freq, max_freq + df, df) + return freq + + +class LombScargleCrossspectrum(Crossspectrum): + main_array_attr = "freq" + type = "crossspectrum" + """ + Make a cross spectrum from an unevenly sampled (binned) light curve. + You can also make an empty :class:`LombScargleCrossspectrum` object to populate with your + own Fourier-transformed data (this can sometimes be useful when making + binned power spectra). + + Parameters + ---------- + data1: :class:`stingray.lightcurve.Lightcurve` or :class:`stingray.events.EventList`, optional, default ``None`` + The dataset for the first channel/band of interest. + + data2: :class:`stingray.lightcurve.Lightcurve` or :class:`stingray.events.EventList`, optional, default ``None`` + The dataset for the second, or "reference", band. + + norm: {``frac``, ``abs``, ``leahy``, ``none``}, string, optional, default ``none`` + The normalization of the cross spectrum. + + power_type: {``real``, ``absolute``, ``all`}, string, optional, default ``all`` + Parameter to choose among complete, real part and magnitude of the cross spectrum. + + fullspec: boolean, optional, default ``False`` + If False, keep only the positive frequencies, or if True, keep all of them . + + Other Parameters + ---------------- + dt: float + The time resolution of the light curve. Only needed when constructing + light curves in the case where ``data1``, ``data2`` are + :class:`EventList` objects + + skip_checks: bool + Skip initial checks, for speed or other reasons (you need to trust your + inputs!) + + min_freq : float + Minimum frequency to take the Lomb-Scargle Fourier Transform + + max_freq: float + Maximum frequency to take the Lomb-Scargle Fourier Transform + + df : float + The frequency resolution of the final periodogram. + + method : str + The method to be used by the Lomb-Scargle Fourier Transformation function. `fast` + and `slow` are the allowed values. Default is `fast`. fast uses the optimized Press + and Rybicki O(n*log(n)) + + oversampling : float, optional, default: 5 + Interpolation Oversampling Factor (for the fast algorithm) + + Attributes + ---------- + freq: numpy.ndarray + The array of mid-bin frequencies that the Fourier transform samples + + power: numpy.ndarray + The array of cross spectra (complex numbers) + + power_err: numpy.ndarray + The uncertainties of ``power``. + An approximation for each bin given by ``power_err= power/sqrt(m)``. + Where ``m`` is the number of power averaged in each bin (by frequency + binning, or averaging more than one spectra). Note that for a single + realization (``m=1``) the error is equal to the power. + + df: float + The frequency resolution + + m: int + The number of averaged cross-spectra amplitudes in each bin. + + n: int + The number of data points/time bins in one segment of the light + curves. + + k: array of int + The rebinning scheme if the object has been rebinned otherwise is set to 1. + + nphots1: float + The total number of photons in light curve 1 + + nphots2: float + The total number of photons in light curve 2 + + References + ---------- + .. [1] Scargle, J. D. , "Studies in astronomical time series analysis. III - Fourier + transforms, autocorrelation functions, and cross-correlation + functions of unevenly spaced data". ApJ 1:343, p874-887, 1989 + .. [2] Press W.H. and Rybicki, G.B, "Fast algorithm for spectral analysis + of unevenly sampled data". ApJ 1:338, p277, 1989 + """ + + def __init__( + self, + data1: Optional[Union[EventList, Lightcurve]] = None, + data2: Optional[Union[EventList, Lightcurve]] = None, + norm: Optional[str] = "none", + power_type: Optional[str] = "all", + dt: Optional[float] = None, + fullspec: Optional[bool] = False, + skip_checks: bool = False, + min_freq: float = None, + max_freq: float = None, + df: float = None, + method: str = "fast", + oversampling: int = 5, + ): + self._type = None + + if data1 is None and data2 is None: + self._initialize_empty() + return + + if dt is None: + if isinstance(data1, Lightcurve) or isinstance(data2, EventList): + dt = data1.dt + elif isinstance(data2, Lightcurve) or isinstance(data2, EventList) and dt is None: + dt = data2.dt + + if not skip_checks: + good_input = self.initial_checks( + data1=data1, + data2=data2, + norm=norm, + power_type=power_type, + dt=dt, + fullspec=fullspec, + min_freq=min_freq, + max_freq=max_freq, + df=df, + method=method, + oversampling=oversampling, + ) + + if data1 is not None and data2 is not None: + self._initialize_from_any_input( + data1, + data2, + dt=dt, + norm=norm, + power_type=power_type, + fullspec=fullspec, + min_freq=min_freq, + max_freq=max_freq, + df=None, + method=method, + oversampling=oversampling, + ) + + def initial_checks( + self, + data1, + data2, + norm, + power_type, + dt, + min_freq, + max_freq, + fullspec, + df, + method, + oversampling, + ): + if not isinstance(norm, str): + raise TypeError("norm must be a string") + + if not isinstance(power_type, str): + raise TypeError("power_type must be a string") + + if norm.lower() not in ["frac", "abs", "leahy", "none"]: + raise ValueError("norm must be one of ['frac','abs','leahy','none']") + + if power_type not in ["all", "absolute", "real"]: + raise ValueError("power_type must be one of ['all','absolute','real']") + + if data1 is None or data2 is None: + if data1 is not None or data2 is not None: + raise ValueError("You can't do a cross spectrum with just one lightcurve") + + if min_freq is not None and min_freq < 0: + raise ValueError("min_freq must be non-negative") + + if max_freq is not None and max_freq < 0: + raise ValueError("max_freq must be non-negative") + + if max_freq is not None and min_freq is not None: + if max_freq <= min_freq: + raise ValueError("max_freq must be greater than min_freq") + + if method not in ["fast", "slow"]: + raise ValueError("method must be one of ['fast','slow']") + + if not isinstance(oversampling, int): + raise TypeError("oversampling must be an integer") + + if not isinstance(fullspec, bool): + raise TypeError("fullspec must be a boolean") + + dt_is_invalid = (dt is None) or (dt <= np.finfo(float).resolution) + if type(data1) != type(data2): + raise TypeError("data1 and data2 must be of the same kind") + + if isinstance(data1, EventList): + if dt_is_invalid: + raise ValueError( + "If using event lists, please specify the bin time to generate lightcurves." + ) + elif isinstance(data1, Lightcurve): + if data1.err_dist.lower() != data2.err_dist.lower(): + simon( + "Your lightcurves have different statistics." + "The errors in the Crossspectrum will be incorrect." + ) + else: + raise TypeError("Input data are invalid") + return True + + def _initialize_from_any_input( + self, + data1, + data2, + dt, + norm, + power_type, + fullspec, + min_freq, + max_freq, + df, + method, + oversampling, + ): + """Not required for unevenly sampled data""" + if isinstance(data1, EventList): + data1 = data1.to_lc(dt) + if isinstance(data2, EventList): + data2 = data2.to_lc(dt) + + self.lc1 = data1.apply_gtis(inplace=False) + self.lc2 = data2.apply_gtis(inplace=False) + + spec = lscrossspectrum_from_lightcurve( + self.lc1, + self.lc2, + norm=norm, + power_type=power_type, + fullspec=fullspec, + min_freq=min_freq, + max_freq=max_freq, + df=df, + method=method, + oversampling=oversampling, + ) + + for key, val in spec.__dict__.items(): + setattr(self, key, val) + + def _initialize_empty(self): + self.freq = None + self.power = None + self.power_err = None + self.unnorm_power = None + self.unnorm_power_err = None + self.df = None + self.dt = None + self.nphots1 = None + self.nphots2 = None + self.m = 1 + self.n = None + self.fullspec = None + self.k = 1 + self.err_dist = None + self.method = None + self.meancounts1 = None + self.meancounts2 = None + self.oversampling = None + self.variance1 = None + self.variance2 = None + return + + def time_lag(self): + super().__doc__ + return self.phase_lag() / (2 * np.pi * self.freq) + + def classical_significances(self): + """Not applicable for unevenly sampled data""" + raise AttributeError( + "Object has no attribute named 'classical_significances' ! Not applicable for unevenly sampled data" + ) + + def from_time_array(self): + """Not applicable for unevenly sampled data""" + raise AttributeError( + "Object has no attribute named 'from_time_array' ! Not applicable for unevenly sampled data" + ) + + def from_events(self): + """Not applicable for unevenly sampled data""" + raise AttributeError( + "Object has no attribute named 'from_events' ! Not applicable for unevenly sampled data" + ) + + def from_lightcurve(self): + """Not applicable for unevenly sampled data""" + raise AttributeError( + "Object has no attribute named 'from_lightcurve' ! Not applicable for unevenly sampled data" + ) + + def from_lc_iterable(self): + """Not applicable for unevenly sampled data""" + raise AttributeError( + "Object has no attribute named 'from_lc_iterable' ! Not applicable for unevenly sampled data" + ) + + +class LombScarglePowerspectrum(LombScargleCrossspectrum): + type = "powerspectrum" + """ + Make a :class:`LombScarglePowerspectrum` (also called periodogram) from a unevenly sampled (binned) + light curve. Periodograms can be normalized by either Leahy normalization, + fractional rms normalization, absolute rms normalization, or not at all. + + You can also make an empty :class:`LombScarglePowerspectrum` object to populate with + your own fourier-transformed data (this can sometimes be useful when making + binned power spectra). + + Parameters + ---------- + data: :class:`stingray.lightcurve.Lightcurve` or :class:`stingray.events.EventList` object, optional, default ``None`` + The light curve data to be Fourier-transformed. + + norm: {``frac``, ``abs``, ``leahy``, ``none``}, string, optional, default ``none`` + The normalization of the power spectrum. + + power_type: {``real``, ``absolute``, ``all`}, string, optional, default ``all`` + Parameter to choose among complete, real part and magnitude of the power spectrum. + + fullspec: boolean, optional, default ``False`` + If False, keep only the positive frequencies, or if True, keep all of them . + + Other Parameters + ---------------- + dt: float + The time resolution of the light curve. Only needed when constructing + light curves in the case where ``data`` is a + :class:`EventList` object + + skip_checks: bool + Skip initial checks, for speed or other reasons (you need to trust your + inputs!). + + min_freq : float + Minimum frequency to take the Lomb-Scargle Fourier Transform + + max_freq: float + Maximum frequency to take the Lomb-Scargle Fourier Transform + + df : float + The time resolution of the light curve. Only needed where ``data`` is a :class`stingray.Eventlist` object + + method : str + The method to be used by the Lomb-Scargle Fourier Transformation function. `fast` + and `slow` are the allowed values. Default is `fast`. fast uses the optimized Press + and Rybicki O(n*log(n)) + + oversampling : float, optional, default: 5 + Interpolation Oversampling Factor (for the fast algorithm) + + Attributes + ---------- + freq: numpy.ndarray + The array of mid-bin frequencies that the Fourier transform samples. + + power: numpy.ndarray + The array of normalized squared absolute values of Fourier + amplitudes. + + power_err: numpy.ndarray + The uncertainties of ``power``. + An approximation for each bin given by ``power_err= power/sqrt(m)``. + Where ``m`` is the number of power averaged in each bin (by frequency + binning, or averaging power spectra of segments of a light curve). + Note that for a single realization (``m=1``) the error is equal to the + power. + + df: float + The frequency resolution. + + m: int + The number of averaged powers in each bin. + + n: int + The number of data points in the light curve. + + nphots: float + The total number of photons in the light curve. + """ + + def __init__( + self, + data: Optional[Union[Lightcurve, EventList]] = None, + norm: Optional[str] = "frac", + power_type: Optional[str] = "all", + dt: Optional[float] = None, + fullspec: Optional[bool] = False, + skip_checks: Optional[bool] = False, + min_freq: Optional[float] = None, + max_freq: Optional[float] = None, + df: Optional[float] = None, + method: Optional[str] = "fast", + oversampling: Optional[int] = 5, + ): + self._type = None + if data is None: + return self._initialize_empty() + good_input = True + if not skip_checks: + good_input = self.initial_checks( + data, + data, + norm, + power_type, + dt, + min_freq, + max_freq, + fullspec, + df, + method, + oversampling, + ) + + self._initialize_from_any_input( + data1=data, + data2=data, + dt=dt, + norm=norm, + power_type=power_type, + fullspec=fullspec, + min_freq=min_freq, + max_freq=max_freq, + df=df, + method=method, + oversampling=oversampling, + ) + self.nphots = self.nphots1 + self.dt = dt + + +def lscrossspectrum_from_lightcurve( + lc1, + lc2, + norm="frac", + power_type="all", + fullspec=False, + min_freq=None, + max_freq=None, + df=None, + method="fast", + oversampling=5, + nyquist_factor=1, +): + """Creates a Lomb Scargle Cross Spectrum from two light curves + Parameters + ---------- + lc1: :class:`stingray.lightcurve.Lightcurve` object + Light curve from channel 1. + lc2 : :class:`stingray.lightcurve.Lightcurve` object + Light curve from channel 2. + + Other parameters + ---------------- + norm : str, default "none" + The normalization of the periodogram. "frac" is fractional rms,"abs" is absolute + rms, "leahy" is Leahy normalization, and "none" is the unnormalized periodogram + + power_type : str, default "all" + Parameter to choose among complete, real part and magnitude of the spectrum + + fullspec : bool, default False + If False, keep only the positive frequencies, or if True, keep all of them + + min_freq : float, default 0 + Minimum frequency to take the Lomb-Scargle Fourier Transform + + max_freq : float, default None + Maximum frequency to take the Lomb-Scargle Fourier Transform + + method : str, default "fast" + The method to be used by the Lomb-Scargle Fourier Transformation function. + `fast` and `slow` are the allowed values. Default is `fast`. fast uses the + optimized Press and Rybicki O(n*log(n)) algorithm, while slow uses the original + O(n^2) algorithm. + + oversampling : int, default 5 + Interpolation Oversampling Factor (for the fast algorithm) + nyquist_factor : int, default 1 + How many times the Nyquist frequency to use as the maximum frequency + """ + lscs = LombScargleCrossspectrum() + + length = max(lc1.time[-1], lc2.time[-1]) - min(lc1.time[0], lc2.time[0]) + dt = np.min([lc1.dt, lc2.dt]) + + freq = _autofrequency( + min_freq=min_freq, + max_freq=max_freq, + df=df, + dt=dt, + length=length, + nyquist_factor=nyquist_factor, + ) + freq, cross = _ls_cross( + lc1, + lc2, + freq=freq, + fullspec=fullspec, + method=method, + oversampling=oversampling, + ) + lscs.unnorm_power = cross + lscs.freq = freq + lscs.lc1 = lc1 + lscs.lc2 = lc2 + lscs.norm = norm + lscs.power_type = power_type + lscs.fullspec = fullspec + lscs.min_freq = min_freq + lscs.max_freq = max_freq + lscs.oversampling = oversampling + lscs.nphots1 = lc1.counts.sum() + lscs.nphots2 = lc2.counts.sum() + lscs.dt = lc1.dt + lscs.n = lc1.n + lscs.method = method + lscs.err_dist = "poisson" + + if lc1.err_dist == "poisson": + lscs.variance1 = lc1.meancounts + else: + lscs.variance1 = np.mean(lc1.counts_err) ** 2 + lscs.err_dist = "gauss" + if lc2.err_dist == "poisson": + lscs.variance2 = lc2.meancounts + else: + lscs.variance2 = np.mean(lc2.counts_err) ** 2 + lscs.err_dist = "gauss" + + lscs.power = lscs._normalize_crossspectrum(lscs.unnorm_power) + + if power_type == "real": + lscs.power = np.real(lscs.power) + lscs.unnorm_power = np.real(lscs.power) + elif power_type == "absolute": + lscs.power = np.abs(lscs.power) + lscs.unnorm_power = np.abs(lscs.power) + return lscs + + +def _ls_cross( + lc1, + lc2, + freq=None, + fullspec=False, + method="fast", + oversampling=5, +): + """ + Lomb-Scargle Fourier transform the two light curves, then compute the cross spectrum. + Computed as CS = lc1 x lc2* (where lc2 is the one that gets + complex-conjugated). The user has the option to either get just the + positive frequencies or the full spectrum. + + Parameters + ---------- + lc1: :class:`stingray.lightcurve.Lightcurve` object + One light curve to be Lomb-Scargle Fourier transformed. Ths is the band of + interest or channel of interest. + + lc2: :class:`stingray.lightcurve.Lightcurve` object + Another light curve to be Fourier transformed. + This is the reference band. + + fullspec: boolean. Default is False. + If True, return the whole array of frequencies, or only positive frequencies (False). + + method : str + The method to be used by the Lomb-Scargle Fourier Transformation function. `fast` + and `slow` are the allowed values. Default is `fast`. fast uses the optimized Press + and Rybicki O(n*log(n)) + + Returns + ------- + freq: numpy.ndarray + The frequency grid at which the LSFT was evaluated + + cross: numpy.ndarray + The cross spectrum value at each frequency. + + """ + if method == "slow": + lsft1 = lsft_slow(lc1.counts, lc1.time, freq) + lsft2 = lsft_slow(lc2.counts, lc2.time, freq) + elif method == "fast": + lsft1 = lsft_fast(lc1.counts, lc1.time, freq, oversampling=oversampling) + lsft2 = lsft_fast(lc2.counts, lc2.time, freq, oversampling=oversampling) + if fullspec: + lsft1, _ = impose_symmetry_lsft(lsft1, np.sum((lc1.counts)), lc1.n, freq) + lsft2, freq = impose_symmetry_lsft(lsft2, np.sum(lc2.counts), lc2.n, freq) + cross = np.multiply(lsft1, np.conjugate(lsft2)) + return freq, cross diff --git a/stingray/tests/test_fourier.py b/stingray/tests/test_fourier.py index 80d4447e8..98e6d500d 100644 --- a/stingray/tests/test_fourier.py +++ b/stingray/tests/test_fourier.py @@ -1,6 +1,8 @@ import os from pickle import FALSE + import pytest + from stingray.fourier import * from stingray.utils import check_allclose_and_print @@ -598,3 +600,63 @@ def test_unnormalize_poisson_noise(self, norm, power_type): noise_notnorm = poisson_level("none", self.meanrate, self.nph) assert np.isclose(noise_notnorm, unnorm_noise) + + +@pytest.mark.parametrize("phlag", [0.05, 0.1, 0.2, 0.4]) +def test_lags(phlag): + freq = 1.1123232252 + + def func(time, phase=0): + return 2 + np.sin(2 * np.pi * (time * freq - phase)) + + time = np.sort(np.random.uniform(0, 100, 3000)) + ft0 = lsft_slow(func(time, 0), time, np.array([freq])) + ft1 = lsft_slow(func(time, phlag), time, np.array([freq])) + measured_lag = (np.angle(ft0) - np.angle(ft1)) / 2 / np.pi + while measured_lag > 0.5: + measured_lag -= 0.5 + while measured_lag <= -0.5: + measured_lag += 0.5 + + assert np.isclose((np.angle(ft1) - np.angle(ft0)) / 2 / np.pi, phlag, atol=0.02, rtol=0.02) + + +def test_lsft_slow_fast(): + np.random.seed(0) + rand = np.random.default_rng(42) + n = 1000 + t = np.sort(rand.random(n)) * np.sqrt(n) + y = np.sin(2 * np.pi * 3.0 * t) + sub = np.min(y) + y -= sub + freqs = np.fft.fftfreq(n, np.median(np.diff(t, 1))) + freqs = freqs[freqs >= 0] + lsftslow = lsft_slow(y, t, freqs, sign=1) + lsftfast = lsft_fast(y, t, freqs, sign=1, oversampling=10) + assert np.argmax(lsftslow) == np.argmax(lsftfast) + assert round(freqs[np.argmax(lsftslow)], 1) == round(freqs[np.argmax(lsftfast)], 1) == 3.0 + assert np.allclose((lsftslow * np.conjugate(lsftslow)).imag, [0]) & np.allclose( + (lsftfast * np.conjugate(lsftfast)).imag, 0 + ) + + +def test_impose_symmetry_lsft(): + np.random.seed(0) + rand = np.random.default_rng(42) + n = 1000 + t = np.sort(rand.random(n)) * np.sqrt(n) + y = np.sin(2 * np.pi * 3.0 * t) + sub = np.min(y) + y -= sub + freqs = np.fft.fftfreq(n, np.median(np.diff(t, 1))) + freqs = freqs[freqs >= 0] + lsftslow = lsft_slow(y, t, freqs, sign=1) + lsftfast = lsft_fast(y, t, freqs, sign=1, oversampling=5) + imp_sym_slow, freqs_new_slow = impose_symmetry_lsft(lsftslow, 0, n, freqs) + imp_sym_fast, freqs_new_fast = impose_symmetry_lsft(lsftfast, 0, n, freqs) + assert imp_sym_slow.shape == imp_sym_fast.shape == freqs_new_fast.shape == freqs_new_slow.shape + assert np.all((imp_sym_slow.real) == np.flip(imp_sym_slow.real)) + assert np.all((imp_sym_slow.imag) == -np.flip(imp_sym_slow.imag)) + assert np.all((imp_sym_fast.real) == np.flip(imp_sym_fast.real)) + assert np.all((imp_sym_fast.imag) == (-np.flip(imp_sym_fast.imag))) + assert np.all(freqs_new_slow == freqs_new_fast) diff --git a/stingray/tests/test_lombscargle.py b/stingray/tests/test_lombscargle.py new file mode 100644 index 000000000..d352f7076 --- /dev/null +++ b/stingray/tests/test_lombscargle.py @@ -0,0 +1,286 @@ +import copy + +import numpy as np +import pytest +from scipy.interpolate import interp1d + +from stingray.events import EventList +from stingray.exceptions import StingrayError +from stingray.lightcurve import Lightcurve +from stingray.lombscargle import LombScargleCrossspectrum, LombScarglePowerspectrum +from stingray.lombscargle import _autofrequency +from stingray.simulator import Simulator + + +def test_autofrequency(): + freqs = _autofrequency(min_freq=0.1, max_freq=0.5, df=0.1) + assert np.allclose(freqs, [0.1, 0.2, 0.3, 0.4, 0.5]) + freqs = _autofrequency(min_freq=0.1, max_freq=0.5, length=10) + assert np.allclose(freqs, [0.1, 0.2, 0.3, 0.4, 0.5]) + freqs = _autofrequency(max_freq=0.5, df=0.2) + assert np.allclose(freqs, [0.1, 0.3, 0.5]) + freqs = _autofrequency(min_freq=0.1, dt=1, length=10) + assert np.allclose(freqs, [0.1, 0.2, 0.3, 0.4, 0.5]) + with pytest.raises(ValueError, match="Either df or length must be specified."): + _autofrequency(min_freq=0.01, max_freq=0.5) + with pytest.raises(ValueError, match="Either max_freq or dt must be"): + _autofrequency(min_freq=0.01, df=1) + with pytest.warns(UserWarning, match="min_freq must be positive and >0."): + freqs = _autofrequency(min_freq=-0.1, max_freq=0.5, df=0.1) + + +class TestLombScargleCrossspectrum: + def setup_class(self): + sim = Simulator(0.0001, 50, 100, 1, random_state=42, tstart=0) + lc1 = sim.simulate(0) + lc2 = sim.simulate(0) + self.rate1 = lc1.countrate + self.rate2 = lc2.countrate + low, high = lc1.time.min(), lc1.time.max() + s1 = lc1.counts + s2 = lc2.counts + t = lc1.time + self.time = lc1.time + t_new = t.copy() + t_new[1:-1] = t[1:-1] + (np.random.rand(len(t) - 2) / (high - low)) + s1_new = interp1d(t, s1, fill_value="extrapolate")(t_new) + s2_new = interp1d(t, s2, fill_value="extrapolate")(t_new) + self.lc1 = Lightcurve(t, s1_new, dt=lc1.dt) + self.lc2 = Lightcurve(t, s2_new, dt=lc2.dt) + self.lscs = LombScargleCrossspectrum(lc1, lc2) + + def test_eventlist(self): + counts = np.random.poisson(10, 1000) + times = np.arange(0, 1000, 1) + lc1 = Lightcurve(times, counts, dt=1) + lc2 = Lightcurve(times, counts, dt=1) + ev1 = EventList.from_lc(lc1) + ev2 = EventList.from_lc(lc2) + ev_lscs = LombScargleCrossspectrum(ev1, ev2, dt=1) + lc_lscs = LombScargleCrossspectrum(lc1, lc2, dt=1) + + assert np.argmax(lc_lscs) == np.argmax(ev_lscs) + assert np.all(ev_lscs.freq == lc_lscs.freq) + assert np.all(ev_lscs.power == lc_lscs.power) + assert ev_lscs.freq[np.argmax(ev_lscs.power)] == lc_lscs.freq[np.argmax(lc_lscs.power)] != 0 + + @pytest.mark.parametrize("skip_checks", [True, False]) + def test_initialize_empty(self, skip_checks): + lscs = LombScargleCrossspectrum(skip_checks=skip_checks) + lscs.freq is None + lscs.power is None + + def test_make_empty_crossspectrum(self): + lscs = LombScargleCrossspectrum() + assert lscs.freq is None + assert lscs.power is None + assert lscs.df is None + assert lscs.nphots1 is None + assert lscs.nphots2 is None + assert lscs.m == 1 + assert lscs.n is None + assert lscs.power_err is None + assert lscs.dt is None + assert lscs.err_dist is None + assert lscs.variance1 is None + assert lscs.variance2 is None + assert lscs.meancounts1 is None + assert lscs.meancounts2 is None + assert lscs.oversampling is None + assert lscs.method is None + + def test_bad_input(self): + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(1, self.lc1) + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum("smooth", "criminal") + + def test_one_lightcurve(self): + with pytest.raises(ValueError): + lscs = LombScargleCrossspectrum(self.lc1, None) + + def test_init_with_norm_not_str(self): + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, norm=1) + + def test_init_with_invalid_norm(self): + with pytest.raises(ValueError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, norm="frabs") + + def test_init_with_power_type_not_str(self): + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, power_type=3) + + def test_init_with_invalid_power_type(self): + with pytest.raises(ValueError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, power_type="reel") + + def test_init_with_wrong_lc_instance(self): + lc1_ = {"a": 1, "b": 2} + lc2_ = {"a": 1, "b": 2} + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(lc1_, lc2_, dt=1) + + def test_init_with_wrong_lc2_instance(self): + lc_ = {"a": 1, "b": 2} + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(self.lc1, lc_) + + def test_init_with_wrong_lc1_instance(self): + lc_ = {"a": 1, "b": 2} + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(lc_, self.lc2) + + def test_init_with_invalid_min_freq(self): + with pytest.raises(ValueError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, min_freq=-1) + + def test_init_with_invalid_max_freq(self): + with pytest.raises(ValueError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, max_freq=1, min_freq=3) + + def test_init_with_negative_max_freq(self): + with pytest.raises(ValueError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, max_freq=-1) + + def test_make_crossspectrum_diff_lc_counts_shape(self): + lc_ = Simulator(0.0001, 103, 100, 1, random_state=42, tstart=0).simulate(0) + with pytest.warns(UserWarning) as record: + lscs = LombScargleCrossspectrum(self.lc1, lc_) + assert np.any(["different statistics" in r.message.args[0] for r in record]) + + def test_make_crossspectrum_diff_lc_stat(self): + lc_ = copy.deepcopy(self.lc1) + lc_.err_dist = "gauss" + with pytest.warns(UserWarning) as record: + cs = LombScargleCrossspectrum(self.lc1, lc_) + assert np.any(["different statistics" in r.message.args[0] for r in record]) + + @pytest.mark.parametrize("power_type", ["real", "absolute", "all"]) + def test_power_type(self, power_type): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, power_type=power_type) + assert lscs.power_type == power_type + + @pytest.mark.parametrize("method", ["fft", "randommethod"]) + def test_init_with_invalid_method(self, method): + with pytest.raises(ValueError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, method=method) + + def test_with_invalid_fullspec(self): + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, fullspec=1) + + def test_with_invalid_oversampling(self): + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, oversampling="invalid") + + def test_invalid_mixed_data(self): + data2 = EventList(self.lc2.time[3:], np.ones_like(self.lc2.time[3:])) + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(self.lc1, data2) + with pytest.raises(TypeError): + lscs = LombScargleCrossspectrum(data2, self.lc1) + + def test_fullspec(self): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2, fullspec=True) + assert lscs.fullspec + + def test_valid_method(self): + lscs_s = LombScargleCrossspectrum(self.lc1, self.lc2, method="slow") + assert lscs_s.method == "slow" + lscs_f = LombScargleCrossspectrum(self.lc1, self.lc2, method="fast", oversampling=5) + assert lscs_f.method == "fast" + assert ( + np.sum(np.isclose(lscs_f.unnorm_power, lscs_s.unnorm_power, rtol=0.1, atol=1)) + / lscs_f.power.shape[0] + > 0.9 + ) + + @pytest.mark.parametrize( + "func_name", + [ + "classical_significances", + "from_time_array", + "from_events", + "from_lightcurve", + "from_lc_iterable", + ], + ) + def test_raise_on_invalid_function(self, func_name): + with pytest.raises(AttributeError): + lscs = LombScargleCrossspectrum(self.lc1, self.lc2) + func = getattr(lscs, func_name) + func() + + def test_no_dt(self): + el1 = EventList(self.lc1.counts, self.lc1.time, dt=None) + el2 = EventList(self.lc2.counts, self.lc2.time, dt=None) + with pytest.raises(ValueError): + lscs = LombScargleCrossspectrum(el1, el2) + + @pytest.mark.parametrize("phase_lag", [0.05, 0.1, 0.2, 0.4]) + def test_time_phase_lag(self, phase_lag): + freq = 1.112323232252 + + def func(time, phase=0): + return 2 + np.sin(2 * np.pi * (time * freq - phase)) + + time = np.sort(np.random.uniform(0, 100, 3000)) + + with pytest.warns(UserWarning): + lc1 = Lightcurve(time, func(time, 0)) + lc2 = Lightcurve(time, func(time, phase_lag)) + + lscs = LombScargleCrossspectrum(lc1, lc2) + measured_time_lag = lscs.time_lag() * 2 * np.pi * lscs.freq[lscs.freq >= 0] + measured_phase_lag = lscs.phase_lag() + measured_time_lag[np.isnan(measured_time_lag)] = measured_phase_lag[ + np.isnan(measured_time_lag) + ] + assert np.allclose(measured_phase_lag, measured_time_lag, atol=1e-1) + + +class TestLombScarglePowerspectrum: + def setup_class(self): + sim = Simulator(0.0001, 100, 100, 1, random_state=42, tstart=0) + lc = sim.simulate(0) + self.rate = lc.countrate + low, high = lc.time.min(), lc.time.max() + s1 = lc.counts + t = lc.time + t_new = t.copy() + t_new[1:-1] = t[1:-1] + (np.random.rand(len(t) - 2) / (high - low)) + s_new = interp1d(t, s1, fill_value="extrapolate")(t_new) + self.lc = Lightcurve(t, s_new, dt=lc.dt) + + @pytest.mark.parametrize("norm", ["leahy", "frac", "abs", "none"]) + def test_normalize_powerspectrum(self, norm): + lps = LombScarglePowerspectrum(self.lc, norm=norm) + assert lps.norm == norm + + @pytest.mark.parametrize("skip_checks", [True, False]) + def test_init_empty(self, skip_checks): + ps = LombScarglePowerspectrum(skip_checks=skip_checks) + assert ps.freq is None + assert ps.power is None + assert ps.power_err is None + assert ps.df is None + assert ps.m == 1 + + def test_make_empty_powerspectrum(self): + ps = LombScarglePowerspectrum() + assert ps.freq is None + assert ps.power is None + assert ps.power_err is None + assert ps.df is None + assert ps.m == 1 + assert ps.nphots1 is None + assert ps.nphots2 is None + assert ps.method is None + + def test_ps_real(self): + counts = np.random.poisson(10, 1000) + times = np.arange(0, 1000, 1) + lc = Lightcurve(times, counts, dt=1) + ps = LombScarglePowerspectrum(lc) + assert np.allclose(ps.power.imag, np.zeros_like(ps.power.imag), atol=1e-4)