+import copy
+import warnings
+from collections.abc import Iterable, Iterator, Generator
+
+import numpy as np
+import scipy
+import scipy.optimize
+import scipy.stats
+import matplotlib.pyplot as plt
+
+from stingray.exceptions import StingrayError
+from stingray.utils import rebin_data, rebin_data_log, simon
+
+from .base import StingrayObject
+from .events import EventList
+from .gti import cross_two_gtis, time_intervals_from_gtis
+from .lightcurve import Lightcurve
+from .fourier import avg_cs_from_iterables, error_on_averaged_cross_spectrum
+from .fourier import avg_cs_from_timeseries, poisson_level
+from .fourier import normalize_periodograms, raw_coherence
+from .fourier import get_flux_iterable_from_segments
+from .fourier import get_rms_from_unnorm_periodogram
+from .power_colors import power_color
+
+from scipy.special import factorial
+
+
+__all__ = [
+ "Crossspectrum",
+ "AveragedCrossspectrum",
+ "DynamicalCrossspectrum",
+ "cospectra_pvalue",
+ "normalize_crossspectrum",
+ "time_lag",
+ "coherence",
+ "get_flux_generator",
+]
+
+
+def get_flux_generator(data, segment_size, dt=None):
+ """Get a flux generator from different segments of a data object
+
+ It is just a wrapper around
+ ``stingray.fourier.get_flux_iterable_from_segments``, providing
+ this method with the information it needs to create the iterables,
+ starting from an event list or a light curve.
+
+ Only accepts `Lightcurve`s and `EventList`s.
+
+ Parameters
+ ----------
+ data : `Lightcurve` or `EventList`
+ Input data
+ segment_size : float
+ Segment size in seconds
+
+ Other parameters
+ ----------------
+ dt : float, default None
+ Sampling time of the output flux iterables. Required if input data
+ is an event list, otherwise the light curve sampling time is selected.
+
+ Returns
+ -------
+ flux_iterable : ``generator``
+ Generator of flux arrays.
+
+ Examples
+ --------
+ >>> mean = 256
+ >>> length = 128
+ >>> times = np.sort(np.random.uniform(0, length, int(mean * length)))
+ >>> events = EventList(time=times, gti=[[0, length]])
+ >>> dt = 0.125
+ >>> segment_size = 4
+
+ Create a light curve
+ >>> lc = events.to_lc(dt=dt)
+
+ Create a light curve with a different error distribution
+ >>> lc_renorm = copy.deepcopy(lc)
+ >>> lc_renorm.counts = lc.counts / mean
+ >>> lc_renorm.counts_err = lc.counts_err / mean
+ >>> lc_renorm.err_dist = "gauss"
+
+ Create an iterable from events, forgetting ``dt``. Should fail
+ >>> get_flux_generator(events, segment_size, dt=None)
+ Traceback (most recent call last):
+ ...
+ ValueError: If data is an EventList, you need to specify...
+
+ Create an iterable from events
+ >>> iter_ev = get_flux_generator(events, segment_size, dt=dt)
+
+ Create an iterable from the light curve
+ >>> iter_lc = get_flux_generator(lc, segment_size, dt=dt)
+
+ Create an iterable from the non-poisson light curve
+ >>> iter_lc_nonpois = get_flux_generator(lc_renorm, segment_size, dt=dt)
+
+ Verify that they are equivalent
+ >>> for l1, l2 in zip(iter_ev, iter_lc): assert np.allclose(l1, l2)
+
+ Note that the iterable for non-Poissonian light curves also returns the uncertainty
+ >>> for l1, (l2, l2e) in zip(iter_lc, iter_lc_nonpois): assert np.allclose(l1, l2 * mean)
+
+ """
+ times = data.time
+ gti = data.gti
+
+ counts = err = None
+ if isinstance(data, Lightcurve):
+ counts = data.counts
+ N = counts.size
+ if data.err_dist.lower() != "poisson":
+ err = data.counts_err
+ elif isinstance(data, EventList):
+ if dt is None:
+ raise ValueError("If data is an EventList, you need to specify the bin time dt")
+ N = int(np.rint(segment_size / dt))
+
+ flux_iterable = get_flux_iterable_from_segments(
+ times, gti, segment_size, N, fluxes=counts, errors=err
+ )
+ return flux_iterable
+
+
+
+
[docs]
+
def coherence(lc1, lc2):
+
"""
+
Estimate coherence function of two light curves.
+
For details on the definition of the coherence, see Vaughan and Nowak,
+
1996 [#]_.
+
+
Parameters
+
----------
+
lc1: :class:`stingray.Lightcurve` object
+
The first light curve data for the channel of interest.
+
lc2: :class:`stingray.Lightcurve` object
+
The light curve data for reference band
+
+
Returns
+
-------
+
coh : ``np.ndarray``
+
The array of coherence versus frequency
+
+
References
+
----------
+
.. [#] https://iopscience.iop.org/article/10.1086/310430
+
"""
+
+
warnings.warn(
+
"The coherence function, as implemented, does not work as expected. "
+
"Please use the coherence function of AveragedCrossspectrum, with the "
+
"correct parameters.",
+
DeprecationWarning,
+
)
+
if not isinstance(lc1, Lightcurve):
+
raise TypeError("lc1 must be a lightcurve.Lightcurve object")
+
+
if not isinstance(lc2, Lightcurve):
+
raise TypeError("lc2 must be a lightcurve.Lightcurve object")
+
+
cs = Crossspectrum(lc1, lc2, norm="none")
+
+
return cs.coherence()
+
+
+
+def time_lag(lc1, lc2):
+ """
+ Estimate the time lag of two light curves.
+ Calculate time lag and uncertainty.
+ Equation from Bendat & Piersol, 2011 [bendat-2011]_.
+
+ Parameters
+ ----------
+ lc1: :class:`stingray.Lightcurve` object
+ The first light curve data for the channel of interest.
+ lc2: :class:`stingray.Lightcurve` object
+ The light curve data for reference band
+
+ Returns
+ -------
+ lag : np.ndarray
+ The time lag
+ lag_err : np.ndarray
+ The uncertainty in the time lag
+
+ References
+ ----------
+ .. [bendat-2011] https://www.wiley.com/en-us/Random+Data%3A+Analysis+and+Measurement+Procedures%2C+4th+Edition-p-9780470248775
+ """
+
+ warnings.warn(
+ "This standalone time_lag function is deprecated. "
+ "Please use the time_lag method of AveragedCrossspectrum, with the "
+ "correct parameters.",
+ DeprecationWarning,
+ )
+
+ if not isinstance(lc1, Lightcurve):
+ raise TypeError("lc1 must be a lightcurve.Lightcurve object")
+
+ if not isinstance(lc2, Lightcurve):
+ raise TypeError("lc2 must be a lightcurve.Lightcurve object")
+
+ cs = Crossspectrum(lc1, lc2, norm="none")
+ lag = cs.time_lag()
+
+ return lag
+
+
+def normalize_crossspectrum(
+ unnorm_power, tseg, nbins, nphots1, nphots2, norm="none", power_type="real"
+):
+ """
+ Normalize the real part of the cross spectrum to Leahy, absolute rms^2,
+ fractional rms^2 normalization, or not at all.
+
+ Here for API compatibility purposes. Will be removed in the next
+ major release.
+
+ Parameters
+ ----------
+ unnorm_power: numpy.ndarray
+ The unnormalized cross spectrum.
+
+ tseg: int
+ The length of the Fourier segment, in seconds.
+
+ nbins : int
+ Number of bins in the light curve
+
+ nphots1 : int
+ Number of photons in the light curve no. 1
+
+ nphots2 : int
+ Number of photons in the light curve no. 2
+
+ Other parameters
+ ----------------
+ norm : str
+ One of `'leahy'` (Leahy+83), `'frac'` (fractional rms), `'abs'`
+ (absolute rms)
+
+ power_type : str
+ One of `'real'` (real part), `'all'` (all complex powers), `'abs'`
+ (absolute value)
+
+ Returns
+ -------
+ power: numpy.nd.array
+ The normalized co-spectrum (real part of the cross spectrum). For
+ 'none' normalization, imaginary part is returned as well.
+ """
+ warnings.warn(
+ "normalize_crossspectrum is now deprecated and will be removed "
+ "in the next major release. Please use "
+ "stingray.fourier.normalize_periodograms instead.",
+ DeprecationWarning,
+ )
+ dt = tseg / nbins
+ nph = np.sqrt(nphots1 * nphots2)
+ mean = nph / nbins
+ return normalize_periodograms(
+ unnorm_power, dt, nbins, mean, n_ph=nph, norm=norm, power_type=power_type
+ )
+
+
+def normalize_crossspectrum_gauss(
+ unnorm_power, mean_flux, var, dt, N, norm="none", power_type="real"
+):
+ """
+ Normalize the real part of the cross spectrum to Leahy, absolute rms^2,
+ fractional rms^2 normalization, or not at all.
+
+ Here for API compatibility purposes. Will be removed in the next
+ major release.
+
+ Parameters
+ ----------
+ unnorm_power: numpy.ndarray
+ The unnormalized cross spectrum.
+
+ mean_flux: float
+ The mean flux of the light curve (if a cross spectrum, the geometrical
+ mean of the flux in the two channels)
+
+ var: float
+ The variance of the light curve (if a cross spectrum, the geometrical
+ mean of the variance in the two channels)
+
+ dt: float
+ The sampling time of the light curve
+
+ N: int
+ The number of bins in the light curve
+
+ Other parameters
+ ----------------
+ norm : str
+ One of `'leahy'` (Leahy+83), `'frac'` (fractional rms), `'abs'`
+ (absolute rms)
+
+ power_type : str
+ One of `'real'` (real part), `'all'` (all complex powers), `'abs'`
+ (absolute value)
+
+ Returns
+ -------
+ power: numpy.nd.array
+ The normalized co-spectrum (real part of the cross spectrum). For
+ 'none' normalization, imaginary part is returned as well.
+ """
+ warnings.warn(
+ "normalize_crossspectrum_gauss is now deprecated and will be "
+ "removed in the next major release. Please use "
+ "stingray.fourier.normalize_periodograms instead.",
+ DeprecationWarning,
+ )
+ mean = mean_flux * dt
+ return normalize_periodograms(
+ unnorm_power, dt, N, mean, variance=var, norm=norm, power_type=power_type
+ )
+
+
+def _averaged_cospectra_cdf(xcoord, n):
+ """
+ Function calculating the cumulative distribution function for
+ averaged cospectra, Equation 19 of Huppenkothen & Bachetti (2018).
+
+ Parameters
+ ----------
+ xcoord : float or iterable
+ The cospectral power for which to calculate the CDF.
+
+ n : int
+ The number of averaged cospectra
+
+ Returns
+ -------
+ cdf : float
+ The value of the CDF at `xcoord` for `n` averaged cospectra
+ """
+ if np.size(xcoord) == 1:
+ xcoord = [xcoord]
+
+ cdf = np.zeros_like(xcoord)
+
+ for i, x in enumerate(xcoord):
+ prefac_bottom1 = factorial(n - 1)
+ for j in range(n):
+ prefac_top = factorial(n - 1 + j)
+ prefac_bottom2 = factorial(n - 1 - j) * factorial(j)
+ prefac_bottom3 = 2.0 ** (n + j)
+
+ prefac = prefac_top / (prefac_bottom1 * prefac_bottom2 * prefac_bottom3)
+
+ gf = -j + n
+
+ first_fac = scipy.special.gamma(gf)
+ if x >= 0:
+ second_fac = scipy.special.gammaincc(gf, n * x) * first_fac
+ fac = 2.0 * first_fac - second_fac
+ else:
+ fac = scipy.special.gammaincc(gf, -n * x) * first_fac
+
+ cdf[i] += prefac * fac
+ if np.size(xcoord) == 1:
+ return cdf[i]
+
+ return cdf
+
+
+def cospectra_pvalue(power, nspec):
+ """
+ This function computes the single-trial p-value that the power was
+ observed under the null hypothesis that there is no signal in
+ the data.
+
+ Important: the underlying assumption that make this calculation valid
+ is that the powers in the cross spectrum follow a Laplace distribution,
+ and this requires that:
+
+ 1. the co-spectrum is normalized according to [Leahy 1983]_
+ 2. there is only white noise in the light curve. That is, there is no
+ aperiodic variability that would change the overall shape of the power
+ spectrum.
+
+ Also note that the p-value is for a *single trial*, i.e. the power
+ currently being tested. If more than one power or more than one power
+ spectrum are being tested, the resulting p-value must be corrected for the
+ number of trials (Bonferroni correction).
+
+ Mathematical formulation in [Huppenkothen 2017]_.
+
+ Parameters
+ ----------
+ power : float
+ The squared Fourier amplitude of a spectrum to be evaluated
+
+ nspec : int
+ The number of spectra or frequency bins averaged in ``power``.
+ This matters because averaging spectra or frequency bins increases
+ the signal-to-noise ratio, i.e. makes the statistical distributions
+ of the noise narrower, such that a smaller power might be very
+ significant in averaged spectra even though it would not be in a single
+ cross spectrum.
+
+ Returns
+ -------
+ pval : float
+ The classical p-value of the observed power being consistent with
+ the null hypothesis of white noise
+
+ References
+ ----------
+
+ * .. [Leahy 1983] https://ui.adsabs.harvard.edu/#abs/1983ApJ...266..160L/abstract
+ * .. [Huppenkothen 2017] http://adsabs.harvard.edu/abs/2018ApJS..236...13H
+
+ """
+ if not np.all(np.isfinite(power)):
+ raise ValueError("power must be a finite floating point number!")
+
+ # if power < 0:
+ # raise ValueError("power must be a positive real number!")
+
+ if not np.isfinite(nspec):
+ raise ValueError("nspec must be a finite integer number")
+
+ if not np.isclose(nspec % 1, 0):
+ raise ValueError("nspec must be an integer number!")
+
+ if nspec < 1:
+ raise ValueError("nspec must be larger or equal to 1")
+
+ elif nspec == 1:
+ lapl = scipy.stats.laplace(0, 1)
+ pval = lapl.sf(power)
+
+ elif nspec > 50:
+ exp_sigma = np.sqrt(2) / np.sqrt(nspec)
+ gauss = scipy.stats.norm(0, exp_sigma)
+ pval = gauss.sf(power)
+
+ else:
+ pval = 1.0 - _averaged_cospectra_cdf(power, nspec)
+
+ return pval
+
+
+
+
[docs]
+
class Crossspectrum(StingrayObject):
+
main_array_attr = "freq"
+
type = "crossspectrum"
+
+
"""
+
Make a cross spectrum from a (binned) light curve.
+
You can also make an empty :class:`Crossspectrum` object to populate with your
+
own Fourier-transformed data (this can sometimes be useful when making
+
binned power spectra). Stingray uses the scipy.fft standards for the sign
+
of the Nyquist frequency.
+
+
Parameters
+
----------
+
data1: :class:`stingray.Lightcurve` or :class:`stingray.events.EventList`, optional, default ``None``
+
The dataset for the first channel/band of interest.
+
+
data2: :class:`stingray.Lightcurve` or :class:`stingray.events.EventList`, optional, default ``None``
+
The dataset for the second, or "reference", band.
+
+
norm: {``frac``, ``abs``, ``leahy``, ``none``}, default ``none``
+
The normalization of the (real part of the) cross spectrum.
+
+
power_type: string, optional, default ``real``
+
Parameter to choose among complete, real part and magnitude of the cross spectrum.
+
+
fullspec: boolean, optional, default ``False``
+
If False, keep only the positive frequencies, or if True, keep all of them .
+
+
Other Parameters
+
----------------
+
gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+
Good Time intervals. Defaults to the common GTIs from the two input
+
objects. Could throw errors if these GTIs have overlaps with the input
+
`Lightcurve` GTIs! If you're getting errors regarding your GTIs, don't
+
use this and only give GTIs to the `Lightcurve` objects before making
+
the cross spectrum.
+
+
lc1: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects
+
For backwards compatibility only. Like ``data1``, but no
+
:class:`stingray.events.EventList` objects allowed
+
+
lc2: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects
+
For backwards compatibility only. Like ``data2``, but no
+
:class:`stingray.events.EventList` objects allowed
+
+
dt: float
+
The time resolution of the light curve. Only needed when constructing
+
light curves in the case where ``data1``, ``data2`` are
+
:class:`EventList` objects
+
+
skip_checks: bool
+
Skip initial checks, for speed or other reasons (you need to trust your
+
inputs!)
+
+
+
Attributes
+
----------
+
freq: numpy.ndarray
+
The array of mid-bin frequencies that the Fourier transform samples
+
+
power: numpy.ndarray
+
The array of cross spectra (complex numbers)
+
+
power_err: numpy.ndarray
+
The uncertainties of ``power``.
+
An approximation for each bin given by ``power_err= power/sqrt(m)``.
+
Where ``m`` is the number of power averaged in each bin (by frequency
+
binning, or averaging more than one spectra). Note that for a single
+
realization (``m=1``) the error is equal to the power.
+
+
df: float
+
The frequency resolution
+
+
m: int
+
The number of averaged cross-spectra amplitudes in each bin.
+
+
n: int
+
The number of data points/time bins in one segment of the light
+
curves.
+
+
k: array of int
+
The rebinning scheme if the object has been rebinned otherwise is set to 1.
+
+
nphots1: float
+
The total number of photons in light curve 1
+
+
nphots2: float
+
The total number of photons in light curve 2
+
+
"""
+
+
def __init__(
+
self,
+
data1=None,
+
data2=None,
+
norm="frac",
+
gti=None,
+
lc1=None,
+
lc2=None,
+
power_type="all",
+
dt=None,
+
fullspec=False,
+
skip_checks=False,
+
save_all=False,
+
):
+
self._type = None
+
# for backwards compatibility
+
if data1 is None:
+
data1 = lc1
+
if data2 is None:
+
data2 = lc2
+
+
empty = data1 is None and data2 is None
+
good_input = not empty
+
+
if not skip_checks:
+
good_input = self.initial_checks(
+
data1=data1,
+
data2=data2,
+
norm=norm,
+
gti=gti,
+
lc1=lc1,
+
lc2=lc2,
+
power_type=power_type,
+
dt=dt,
+
fullspec=fullspec,
+
)
+
+
self.dt = dt
+
norm = norm.lower()
+
self.norm = norm
+
self.k = 1
+
+
if empty or not good_input:
+
return self._initialize_empty()
+
+
return self._initialize_from_any_input(
+
data1,
+
data2,
+
dt=dt,
+
norm=norm,
+
power_type=power_type,
+
fullspec=fullspec,
+
gti=gti,
+
save_all=save_all,
+
)
+
+
+
[docs]
+
def initial_checks(
+
self,
+
data1=None,
+
data2=None,
+
norm="frac",
+
gti=None,
+
lc1=None,
+
lc2=None,
+
segment_size=None,
+
power_type="real",
+
dt=None,
+
fullspec=False,
+
):
+
"""Run initial checks on the input.
+
+
Returns True if checks are passed, False if they are not.
+
+
Raises various errors for different bad inputs
+
+
Examples
+
--------
+
>>> times = np.arange(0, 10)
+
>>> counts = np.random.poisson(100, 10)
+
>>> lc1 = Lightcurve(times, counts, skip_checks=True)
+
>>> lc2 = Lightcurve(times, counts, skip_checks=True)
+
>>> ev1 = EventList(times)
+
>>> ev2 = EventList(times)
+
>>> c = Crossspectrum()
+
>>> ac = AveragedCrossspectrum()
+
+
If norm is not a string, raise a TypeError
+
>>> Crossspectrum.initial_checks(c, norm=1)
+
Traceback (most recent call last):
+
...
+
TypeError: norm must be a string...
+
+
If ``norm`` is not one of the valid norms, raise a ValueError
+
>>> Crossspectrum.initial_checks(c, norm="blabla")
+
Traceback (most recent call last):
+
...
+
ValueError: norm must be 'frac'...
+
+
If ``power_type`` is not one of the valid norms, raise a ValueError
+
>>> Crossspectrum.initial_checks(c, power_type="blabla")
+
Traceback (most recent call last):
+
...
+
ValueError: `power_type` not recognized!
+
+
If the user passes only one light curve, raise a ValueError
+
+
>>> Crossspectrum.initial_checks(c, data1=lc1, data2=None)
+
Traceback (most recent call last):
+
...
+
ValueError: You can't do a cross spectrum...
+
+
If the user passes an event list without dt, raise a ValueError
+
+
>>> Crossspectrum.initial_checks(c, data1=ev1, data2=ev2, dt=None)
+
Traceback (most recent call last):
+
...
+
ValueError: If using event lists, please specify...
+
"""
+
if isinstance(norm, str) is False:
+
raise TypeError("norm must be a string")
+
+
if norm.lower() not in ["frac", "abs", "leahy", "none"]:
+
raise ValueError("norm must be 'frac', 'abs', 'leahy', or 'none'!")
+
+
if power_type not in ["all", "absolute", "real"]:
+
raise ValueError("`power_type` not recognized!")
+
+
# check if input data is a Lightcurve object, if not make one or
+
# make an empty Crossspectrum object if lc1 == ``None`` or lc2 == ``None``
+
+
if lc1 is not None or lc2 is not None:
+
warnings.warn(
+
"The lcN keywords are now deprecated. Use dataN instead", DeprecationWarning
+
)
+
+
if data1 is None or data2 is None:
+
if data1 is not None or data2 is not None:
+
raise ValueError("You can't do a cross spectrum with just one light curve!")
+
else:
+
return False
+
+
dt_is_invalid = (dt is None) or (dt <= np.finfo(float).resolution)
+
+
if segment_size is None:
+
# checks to be run for non-averaged spectra
+
if gti is not None and len(gti) > 1:
+
raise TypeError("Non-averaged cross spectra need a single GTI")
+
+
if type(data1) != type(data2):
+
raise TypeError("Input data have to be of the same kind")
+
+
if isinstance(data1, EventList):
+
if dt_is_invalid:
+
raise ValueError(
+
"If using event lists, please specify the bin time to generate lightcurves."
+
)
+
elif isinstance(data1, Lightcurve):
+
if data1.err_dist.lower() != data2.err_dist.lower():
+
simon(
+
"Your lightcurves have different statistics."
+
"The errors in the Crossspectrum will be incorrect."
+
)
+
+
# If dt differs slightly, its propagated error must not be more than
+
# 1/100th of the bin
+
if not np.isclose(data1.dt, data2.dt, rtol=0.01 * data1.dt / data1.tseg):
+
raise StingrayError("Light curves do not have same time binning dt.")
+
+
if data1.tseg != data2.tseg:
+
simon(
+
"Lightcurves do not have same tseg. This means that the data"
+
"from the two channels are not completely in sync. This "
+
"might or might not be an issue. Keep an eye on it."
+
)
+
elif isinstance(data1, (list, tuple)):
+
if not isinstance(data1[0], Lightcurve) or not isinstance(data2[0], Lightcurve):
+
raise TypeError("Inputs lists have to contain light curve objects")
+
+
if data1[0].err_dist.lower() != data2[0].err_dist.lower():
+
simon(
+
"Your lightcurves have different statistics."
+
"The errors in the Crossspectrum will be incorrect."
+
)
+
elif isinstance(data1, (Generator, Iterator)):
+
pass
+
else:
+
raise TypeError("Input data are invalid")
+
+
return True
+
+
+
+
[docs]
+
def rebin(self, df=None, f=None, method="mean"):
+
"""
+
Rebin the cross spectrum to a new frequency resolution ``df``.
+
+
Parameters
+
----------
+
df: float
+
The new frequency resolution
+
+
Other Parameters
+
----------------
+
f: float
+
the rebin factor. If specified, it substitutes df with ``f*self.df``
+
+
Returns
+
-------
+
bin_cs = :class:`Crossspectrum` (or one of its subclasses) object
+
The newly binned cross spectrum or power spectrum.
+
Note: this object will be of the same type as the object
+
that called this method. For example, if this method is called
+
from :class:`AveragedPowerspectrum`, it will return an object of class
+
:class:`AveragedPowerspectrum`, too.
+
"""
+
+
if f is None and df is None:
+
raise ValueError("You need to specify at least one between f and df")
+
elif f is not None:
+
df = f * self.df
+
+
# rebin cross spectrum to new resolution
+
binfreq, bincs, binerr, step_size = rebin_data(
+
self.freq, self.power, df, self.power_err, method=method, dx=self.df
+
)
+
# make an empty cross spectrum object
+
# note: syntax deliberate to work with subclass Powerspectrum
+
bin_cs = copy.copy(self)
+
+
# store the binned periodogram in the new object
+
bin_cs.freq = binfreq
+
bin_cs.power = bincs
+
bin_cs.df = df
+
bin_cs.power_err = binerr
+
+
if hasattr(self, "unnorm_power") and self.unnorm_power is not None:
+
unnorm_power_err = None
+
if hasattr(self, "unnorm_power_err") and self.unnorm_power_err is not None:
+
unnorm_power_err = self.unnorm_power_err
+
+
_, binpower_unnorm, binpower_err_unnorm, _ = rebin_data(
+
self.freq, self.unnorm_power, df, dx=self.df, yerr=unnorm_power_err, method=method
+
)
+
+
if hasattr(self, "unnorm_power_err") and self.unnorm_power_err is not None:
+
bin_cs.unnorm_power_err = binpower_err_unnorm
+
+
bin_cs.unnorm_power = binpower_unnorm
+
+
if hasattr(self, "cs_all"):
+
cs_all = []
+
for c in self.cs_all:
+
cs_all.append(
+
rebin_data(self.freq, c, dx_new=df, yerr=None, method=method, dx=self.df)[1]
+
)
+
bin_cs.cs_all = cs_all
+
if hasattr(self, "pds1"):
+
bin_cs.pds1 = self.pds1.rebin(df=df, f=f, method=method)
+
if hasattr(self, "pds2"):
+
bin_cs.pds2 = self.pds2.rebin(df=df, f=f, method=method)
+
+
bin_cs.m = np.rint(step_size * self.m)
+
+
return bin_cs
+
+
+
+
[docs]
+
def to_norm(self, norm, inplace=False):
+
"""Convert Cross spectrum to new normalization.
+
+
Parameters
+
----------
+
norm : str
+
The new normalization of the spectrum
+
+
Other parameters
+
----------------
+
inplace: bool, default False
+
If True, change the current instance. Otherwise, return a new one
+
+
Returns
+
-------
+
new_spec : object, same class as input
+
The new, normalized, spectrum.
+
"""
+
if norm == self.norm:
+
return copy.deepcopy(self)
+
+
variance1 = variance2 = variance = None
+
if self.type == "powerspectrum":
+
# This is the case for Powerspectrum
+
mean = mean1 = mean2 = self.nphots / self.n
+
if hasattr(self, "err_dist") and self.err_dist != "poisson":
+
variance = self.variance
+
nph = self.nphots
+
else:
+
nph = np.sqrt(self.nphots1 * self.nphots2)
+
mean1 = self.nphots1 / self.n
+
mean2 = self.nphots2 / self.n
+
mean = nph / self.n
+
if hasattr(self, "err_dist") and self.err_dist != "poisson":
+
variance1 = self.variance1
+
variance2 = self.variance2
+
variance = np.sqrt(self.variance1 * self.variance2)
+
+
if inplace:
+
new_spec = self
+
else:
+
new_spec = copy.deepcopy(self)
+
+
power_type = "all"
+
if hasattr(self, "power_type"):
+
power_type = self.power_type
+
+
for attr in ["power", "power_err"]:
+
unnorm_attr = "unnorm_" + attr
+
if not hasattr(self, unnorm_attr) or getattr(self, unnorm_attr) is None:
+
continue
+
power = normalize_periodograms(
+
getattr(self, unnorm_attr),
+
self.dt,
+
self.n,
+
mean,
+
n_ph=nph,
+
variance=variance,
+
norm=norm,
+
power_type=power_type,
+
)
+
setattr(new_spec, attr, power)
+
new_spec.norm = norm
+
if hasattr(self, "pds1"):
+
p1 = normalize_periodograms(
+
getattr(self.pds1, unnorm_attr),
+
self.dt,
+
self.n,
+
mean1,
+
n_ph=self.nphots1,
+
variance=variance1,
+
norm=norm,
+
power_type=power_type,
+
)
+
setattr(new_spec.pds1, attr, p1)
+
p2 = normalize_periodograms(
+
getattr(self.pds2, unnorm_attr),
+
self.dt,
+
self.n,
+
mean2,
+
n_ph=self.nphots2,
+
variance=variance2,
+
norm=norm,
+
power_type=power_type,
+
)
+
setattr(new_spec.pds2, attr, p2)
+
new_spec.pds1.norm = new_spec.pds2.norm = norm
+
+
return new_spec
+
+
+
def _normalize_crossspectrum(self, unnorm_power):
+
"""
+
Normalize the real part of the cross spectrum to Leahy, absolute rms^2,
+
fractional rms^2 normalization, or not at all.
+
+
Parameters
+
----------
+
unnorm_power: numpy.ndarray
+
The unnormalized cross spectrum.
+
+
Returns
+
-------
+
power: numpy.nd.array
+
The normalized co-spectrum (real part of the cross spectrum). For
+
'none' normalization, imaginary part is returned as well.
+
"""
+
+
nph = np.sqrt(self.nphots1 * self.nphots2)
+
mean = nph / self.n
+
variance = None
+
if self.err_dist != "poisson":
+
variance = np.sqrt(self.variance1 * self.variance2)
+
return normalize_periodograms(
+
unnorm_power,
+
self.dt,
+
self.n,
+
mean,
+
n_ph=nph,
+
variance=variance,
+
norm=self.norm,
+
power_type=self.power_type,
+
)
+
+
+
[docs]
+
def rebin_log(self, f=0.01):
+
"""
+
Logarithmic rebin of the periodogram.
+
The new frequency depends on the previous frequency
+
modified by a factor f:
+
+
.. math::
+
+
d\\nu_j = d\\nu_{j-1} (1+f)
+
+
Parameters
+
----------
+
f: float, optional, default ``0.01``
+
parameter that steers the frequency resolution
+
+
+
Returns
+
-------
+
new_spec : :class:`Crossspectrum` (or one of its subclasses) object
+
The newly binned cross spectrum or power spectrum.
+
Note: this object will be of the same type as the object
+
that called this method. For example, if this method is called
+
from :class:`AveragedPowerspectrum`, it will return an object of class
+
"""
+
+
binfreq, binpower, binpower_err, nsamples = rebin_data_log(
+
self.freq, self.power, f, y_err=self.power_err, dx=self.df
+
)
+
+
new_spec = copy.copy(self)
+
new_spec.freq = binfreq
+
new_spec.power = binpower
+
new_spec.power_err = binpower_err
+
new_spec.m = nsamples * self.m
+
new_spec.dt = self.dt
+
new_spec.k = nsamples
+
+
if hasattr(self, "unnorm_power") and self.unnorm_power is not None:
+
unnorm_power_err = None
+
if hasattr(self, "unnorm_power_err") and self.unnorm_power_err is not None:
+
unnorm_power_err = self.unnorm_power_err
+
_, binpower_unnorm, binpower_err_unnorm, _ = rebin_data_log(
+
self.freq, self.unnorm_power, f, dx=self.df, y_err=unnorm_power_err
+
)
+
+
new_spec.unnorm_power = binpower_unnorm
+
if hasattr(self, "unnorm_power_err") and self.unnorm_power_err is not None:
+
new_spec.unnorm_power_err = binpower_err_unnorm
+
+
if hasattr(self, "pds1"):
+
new_spec.pds1 = self.pds1.rebin_log(f)
+
if hasattr(self, "pds2"):
+
new_spec.pds2 = self.pds2.rebin_log(f)
+
+
if hasattr(self, "cs_all"):
+
cs_all = []
+
+
for c in self.cs_all:
+
cs_all.append(rebin_data_log(self.freq, c, f, dx=self.df)[1])
+
new_spec.cs_all = cs_all
+
+
return new_spec
+
+
+
+
[docs]
+
def coherence(self):
+
"""Compute Coherence function of the cross spectrum.
+
+
Coherence is defined in Vaughan and Nowak, 1996 [#]_.
+
It is a Fourier frequency dependent measure of the linear correlation
+
between time series measured simultaneously in two energy channels.
+
+
Returns
+
-------
+
coh : numpy.ndarray
+
Coherence function
+
+
References
+
----------
+
.. [#] https://iopscience.iop.org/article/10.1086/310430
+
"""
+
# this computes the averaged power spectrum, but using the
+
# cross spectrum code to avoid circular imports
+
+
return raw_coherence(
+
self.unnorm_power, self.pds1.unnorm_power, self.pds2.unnorm_power, 0, 0, self.n
+
)
+
+
+
+
[docs]
+
def phase_lag(self):
+
"""Calculate the fourier phase lag of the cross spectrum.
+
+
This is defined as the argument of the complex cross spectrum, and gives
+
the delay at all frequencies, in cycles, of one input light curve with respect
+
to the other.
+
"""
+
return np.angle(self.unnorm_power)
+
+
+
+
[docs]
+
def time_lag(self):
+
r"""Calculate the fourier time lag of the cross spectrum.
+
The time lag is calculated by taking the phase lag :math:`\phi` and
+
+
..math::
+
+
\tau = \frac{\phi}{\two pi \nu}
+
+
where :math:`\nu` is the center of the frequency bins.
+
"""
+
if self.__class__ in [Crossspectrum, AveragedCrossspectrum]:
+
ph_lag = self.phase_lag()
+
+
return ph_lag / (2 * np.pi * self.freq)
+
else:
+
raise AttributeError("Object has no attribute named 'time_lag' !")
+
+
+
+
[docs]
+
def plot(
+
self, labels=None, axis=None, title=None, marker="-", save=False, filename=None, ax=None
+
):
+
"""
+
Plot the amplitude of the cross spectrum vs. the frequency using ``matplotlib``.
+
+
Parameters
+
----------
+
labels : iterable, default ``None``
+
A list of tuple with ``xlabel`` and ``ylabel`` as strings.
+
+
axis : list, tuple, string, default ``None``
+
Parameter to set axis properties of the ``matplotlib`` figure. For example
+
it can be a list like ``[xmin, xmax, ymin, ymax]`` or any other
+
acceptable argument for the``matplotlib.pyplot.axis()`` method.
+
+
title : str, default ``None``
+
The title of the plot.
+
+
marker : str, default '-'
+
Line style and color of the plot. Line styles and colors are
+
combined in a single format string, as in ``'bo'`` for blue
+
circles. See ``matplotlib.pyplot.plot`` for more options.
+
+
save : boolean, optional, default ``False``
+
If ``True``, save the figure with specified filename.
+
+
filename : str
+
File name of the image to save. Depends on the boolean ``save``.
+
+
ax : ``matplotlib.Axes`` object
+
An axes object to fill with the cross correlation plot.
+
"""
+
+
if ax is None:
+
fig = plt.figure("crossspectrum")
+
ax = fig.add_subplot(1, 1, 1)
+
+
ax2 = None
+
if np.any(np.iscomplex(self.power)):
+
ax.plot(self.freq, np.abs(self.power), marker, color="k", label="Amplitude")
+
+
ax2 = ax.twinx()
+
ax2.tick_params("y", colors="b")
+
ax2.plot(
+
self.freq, self.power.imag, marker, color="b", alpha=0.5, label="Imaginary Part"
+
)
+
+
ax.plot(self.freq, self.power.real, marker, color="r", alpha=0.5, label="Real Part")
+
+
lines, line_labels = ax.get_legend_handles_labels()
+
lines2, line_labels2 = ax2.get_legend_handles_labels()
+
lines = lines + lines2
+
line_labels = line_labels + line_labels2
+
+
else:
+
ax.plot(self.freq, np.abs(self.power), marker, color="b")
+
lines, line_labels = ax.get_legend_handles_labels()
+
+
xlabel = "Frequency (Hz)"
+
ylabel = f"Power ({self.norm})"
+
+
if labels is not None:
+
try:
+
xlabel = labels[0]
+
ylabel = labels[1]
+
+
except IndexError:
+
simon("``labels`` must have two labels for x and y axes.")
+
# Not raising here because in case of len(labels)==1, only
+
# x-axis will be labelled.
+
+
ax.set_xlabel(xlabel)
+
if ax2 is not None:
+
ax.set_ylabel(ylabel + "-Real")
+
ax2.set_ylabel(ylabel + "-Imaginary")
+
else:
+
ax.set_ylabel(ylabel)
+
+
ax.legend(lines, line_labels, loc="best")
+
+
if axis is not None:
+
ax.set_xlim(axis[0:2])
+
ax.set_ylim(axis[2:4])
+
if ax2 is not None:
+
ax2.set_ylim(axis[2:4])
+
if title is not None:
+
ax.set_title(title)
+
+
if save:
+
if filename is None:
+
plt.gcf().savefig("spec.png")
+
else:
+
plt.gcf().savefig(filename)
+
+
return ax
+
+
+
+
[docs]
+
def classical_significances(self, threshold=1, trial_correction=False):
+
"""
+
Compute the classical significances for the powers in the power
+
spectrum, assuming an underlying noise distribution that follows a
+
chi-square distributions with 2M degrees of freedom, where M is the
+
number of powers averaged in each bin.
+
+
Note that this function will *only* produce correct results when the
+
following underlying assumptions are fulfilled:
+
+
1. The power spectrum is Leahy-normalized
+
2. There is no source of variability in the data other than the
+
periodic signal to be determined with this method. This is important!
+
If there are other sources of (aperiodic) variability in the data, this
+
method will *not* produce correct results, but instead produce a large
+
number of spurious false positive detections!
+
3. There are no significant instrumental effects changing the
+
statistical distribution of the powers (e.g. pile-up or dead time)
+
+
By default, the method produces ``(index,p-values)`` for all powers in
+
the power spectrum, where index is the numerical index of the power in
+
question. If a ``threshold`` is set, then only powers with p-values
+
*below* that threshold with their respective indices. If
+
``trial_correction`` is set to ``True``, then the threshold will be corrected
+
for the number of trials (frequencies) in the power spectrum before
+
being used.
+
+
Parameters
+
----------
+
threshold : float, optional, default ``1``
+
The threshold to be used when reporting p-values of potentially
+
significant powers. Must be between 0 and 1.
+
Default is ``1`` (all p-values will be reported).
+
+
trial_correction : bool, optional, default ``False``
+
A Boolean flag that sets whether the ``threshold`` will be corrected
+
by the number of frequencies before being applied. This decreases
+
the ``threshold`` (p-values need to be lower to count as significant).
+
Default is ``False`` (report all powers) though for any application
+
where `threshold`` is set to something meaningful, this should also
+
be applied!
+
+
Returns
+
-------
+
pvals : iterable
+
A list of ``(index, p-value)`` tuples for all powers that have p-values
+
lower than the threshold specified in ``threshold``.
+
+
"""
+
if not self.norm == "leahy":
+
raise ValueError("This method only works on Leahy-normalized power spectra!")
+
+
if np.size(self.m) == 1:
+
# calculate p-values for all powers
+
# leave out zeroth power since it just encodes the number of photons!
+
pv = np.array([cospectra_pvalue(power, self.m) for power in self.power])
+
else:
+
pv = np.array([cospectra_pvalue(power, m) for power, m in zip(self.power, self.m)])
+
+
# if trial correction is used, then correct the threshold for
+
# the number of powers in the power spectrum
+
if trial_correction:
+
threshold /= self.power.shape[0]
+
+
# need to add 1 to the indices to make up for the fact that
+
# we left out the first power above!
+
indices = np.where(pv < threshold)[0]
+
+
pvals = np.vstack([pv[indices], indices])
+
+
return pvals
+
+
+
+
[docs]
+
@staticmethod
+
def from_time_array(
+
times1,
+
times2,
+
dt,
+
segment_size=None,
+
gti=None,
+
norm="none",
+
power_type="all",
+
silent=False,
+
fullspec=False,
+
use_common_mean=True,
+
):
+
"""Calculate AveragedCrossspectrum from two arrays of event times.
+
+
Parameters
+
----------
+
times1 : `np.array`
+
Event arrival times of channel 1
+
times2 : `np.array`
+
Event arrival times of channel 2
+
dt : float
+
The time resolution of the intermediate light curves
+
(sets the Nyquist frequency)
+
+
Other parameters
+
----------------
+
segment_size : float
+
The length, in seconds, of the light curve segments that will be
+
averaged. Only relevant (and required) for `AveragedCrossspectrum`.
+
gti : [[gti0, gti1], ...]
+
Good Time intervals. Defaults to the common GTIs from the two input
+
objects. Could throw errors if these GTIs have overlaps with the
+
input object GTIs! If you're getting errors regarding your GTIs,
+
don't use this and only give GTIs to the input objects before
+
making the cross spectrum.
+
norm : str, default "frac"
+
The normalization of the periodogram. "abs" is absolute rms, "frac" is
+
fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+
unnormalized periodogram
+
use_common_mean : bool, default True
+
The mean of the light curve can be estimated in each interval, or on
+
the full light curve. This gives different results (Alston+2013).
+
Here we assume the mean is calculated on the full light curve, but
+
the user can set ``use_common_mean`` to False to calculate it on a
+
per-segment basis.
+
fullspec : bool, default False
+
Return the full periodogram, including negative frequencies
+
silent : bool, default False
+
Silence the progress bars
+
power_type : str, default 'all'
+
If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+
the real part
+
"""
+
+
return crossspectrum_from_time_array(
+
times1,
+
times2,
+
dt,
+
segment_size=segment_size,
+
gti=gti,
+
norm=norm,
+
power_type=power_type,
+
silent=silent,
+
fullspec=fullspec,
+
use_common_mean=use_common_mean,
+
)
+
+
+
+
[docs]
+
@staticmethod
+
def from_events(
+
events1,
+
events2,
+
dt,
+
segment_size=None,
+
norm="none",
+
power_type="all",
+
silent=False,
+
fullspec=False,
+
use_common_mean=True,
+
gti=None,
+
):
+
"""Calculate AveragedCrossspectrum from two event lists
+
+
Parameters
+
----------
+
events1 : `stingray.EventList`
+
Events from channel 1
+
events2 : `stingray.EventList`
+
Events from channel 2
+
dt : float
+
The time resolution of the intermediate light curves
+
(sets the Nyquist frequency)
+
+
Other parameters
+
----------------
+
segment_size : float
+
The length, in seconds, of the light curve segments that will be averaged.
+
Only relevant (and required) for AveragedCrossspectrum
+
norm : str, default "frac"
+
The normalization of the periodogram. "abs" is absolute rms, "frac" is
+
fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+
unnormalized periodogram
+
use_common_mean : bool, default True
+
The mean of the light curve can be estimated in each interval, or on
+
the full light curve. This gives different results (Alston+2013).
+
Here we assume the mean is calculated on the full light curve, but
+
the user can set ``use_common_mean`` to False to calculate it on a
+
per-segment basis.
+
fullspec : bool, default False
+
Return the full periodogram, including negative frequencies
+
silent : bool, default False
+
Silence the progress bars
+
power_type : str, default 'all'
+
If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+
the real part
+
gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+
Good Time intervals. Defaults to the common GTIs from the two input
+
objects. Could throw errors if these GTIs have overlaps with the
+
input object GTIs! If you're getting errors regarding your GTIs,
+
don't use this and only give GTIs to the input objects before
+
making the cross spectrum.
+
"""
+
+
return crossspectrum_from_events(
+
events1,
+
events2,
+
dt,
+
segment_size=segment_size,
+
norm=norm,
+
power_type=power_type,
+
silent=silent,
+
fullspec=fullspec,
+
use_common_mean=use_common_mean,
+
gti=gti,
+
)
+
+
+
+
[docs]
+
@staticmethod
+
def from_lightcurve(
+
lc1,
+
lc2,
+
segment_size=None,
+
norm="none",
+
power_type="all",
+
silent=False,
+
fullspec=False,
+
use_common_mean=True,
+
gti=None,
+
):
+
"""Calculate AveragedCrossspectrum from two light curves
+
+
Parameters
+
----------
+
lc1 : `stingray.Lightcurve`
+
Light curve from channel 1
+
lc2 : `stingray.Lightcurve`
+
Light curve from channel 2
+
+
Other parameters
+
----------------
+
segment_size : float
+
The length, in seconds, of the light curve segments that will be averaged.
+
Only relevant (and required) for AveragedCrossspectrum
+
norm : str, default "frac"
+
The normalization of the periodogram. "abs" is absolute rms, "frac" is
+
fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+
unnormalized periodogram
+
use_common_mean : bool, default True
+
The mean of the light curve can be estimated in each interval, or on
+
the full light curve. This gives different results (Alston+2013).
+
Here we assume the mean is calculated on the full light curve, but
+
the user can set ``use_common_mean`` to False to calculate it on a
+
per-segment basis.
+
fullspec : bool, default False
+
Return the full periodogram, including negative frequencies
+
silent : bool, default False
+
Silence the progress bars
+
power_type : str, default 'all'
+
If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+
the real part
+
gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+
Good Time intervals. Defaults to the common GTIs from the two input
+
objects. Could throw errors if these GTIs have overlaps with the
+
input object GTIs! If you're getting errors regarding your GTIs,
+
don't use this and only give GTIs to the input objects before
+
making the cross spectrum.
+
"""
+
return crossspectrum_from_lightcurve(
+
lc1,
+
lc2,
+
segment_size=segment_size,
+
norm=norm,
+
power_type=power_type,
+
silent=silent,
+
fullspec=fullspec,
+
use_common_mean=use_common_mean,
+
gti=gti,
+
)
+
+
+
+
[docs]
+
@staticmethod
+
def from_stingray_timeseries(
+
ts1,
+
ts2,
+
flux_attr,
+
error_flux_attr=None,
+
segment_size=None,
+
norm="none",
+
power_type="all",
+
silent=False,
+
fullspec=False,
+
use_common_mean=True,
+
gti=None,
+
):
+
"""Calculate AveragedCrossspectrum from two light curves
+
+
Parameters
+
----------
+
ts1 : `stingray.Timeseries`
+
Time series from channel 1
+
ts2 : `stingray.Timeseries`
+
Time series from channel 2
+
flux_attr : `str`
+
What attribute of the time series will be used.
+
+
Other parameters
+
----------------
+
error_flux_attr : `str`
+
What attribute of the time series will be used as error bar.
+
segment_size : float
+
The length, in seconds, of the light curve segments that will be averaged.
+
Only relevant (and required) for AveragedCrossspectrum
+
norm : str, default "frac"
+
The normalization of the periodogram. "abs" is absolute rms, "frac" is
+
fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+
unnormalized periodogram
+
use_common_mean : bool, default True
+
The mean of the light curve can be estimated in each interval, or on
+
the full light curve. This gives different results (Alston+2013).
+
Here we assume the mean is calculated on the full light curve, but
+
the user can set ``use_common_mean`` to False to calculate it on a
+
per-segment basis.
+
fullspec : bool, default False
+
Return the full periodogram, including negative frequencies
+
silent : bool, default False
+
Silence the progress bars
+
power_type : str, default 'all'
+
If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+
the real part
+
gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+
Good Time intervals. Defaults to the common GTIs from the two input
+
objects. Could throw errors if these GTIs have overlaps with the
+
input object GTIs! If you're getting errors regarding your GTIs,
+
don't use this and only give GTIs to the input objects before
+
making the cross spectrum.
+
"""
+
return crossspectrum_from_timeseries(
+
ts1,
+
ts2,
+
flux_attr=flux_attr,
+
error_flux_attr=error_flux_attr,
+
segment_size=segment_size,
+
norm=norm,
+
power_type=power_type,
+
silent=silent,
+
fullspec=fullspec,
+
use_common_mean=use_common_mean,
+
gti=gti,
+
)
+
+
+
+
[docs]
+
@staticmethod
+
def from_lc_iterable(
+
iter_lc1,
+
iter_lc2,
+
dt,
+
segment_size,
+
norm="none",
+
power_type="all",
+
silent=False,
+
fullspec=False,
+
use_common_mean=True,
+
gti=None,
+
):
+
"""Calculate AveragedCrossspectrum from two light curves
+
+
Parameters
+
----------
+
iter_lc1 : iterable of `stingray.Lightcurve` objects or `np.array`
+
Light curves from channel 1. If arrays, use them as counts
+
iter_lc1 : iterable of `stingray.Lightcurve` objects or `np.array`
+
Light curves from channel 2. If arrays, use them as counts
+
dt : float
+
The time resolution of the light curves
+
(sets the Nyquist frequency)
+
+
Other parameters
+
----------------
+
segment_size : float
+
The length, in seconds, of the light curve segments that will be averaged.
+
Only relevant (and required) for AveragedCrossspectrum
+
norm : str, default "frac"
+
The normalization of the periodogram. "abs" is absolute rms, "frac" is
+
fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+
unnormalized periodogram
+
use_common_mean : bool, default True
+
The mean of the light curve can be estimated in each interval, or on
+
the full light curve. This gives different results (Alston+2013).
+
Here we assume the mean is calculated on the full light curve, but
+
the user can set ``use_common_mean`` to False to calculate it on a
+
per-segment basis.
+
fullspec : bool, default False
+
Return the full periodogram, including negative frequencies
+
silent : bool, default False
+
Silence the progress bars
+
power_type : str, default 'all'
+
If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+
the real part
+
gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+
Good Time intervals. Defaults to the common GTIs from the two input
+
objects. Could throw errors if these GTIs have overlaps with the
+
input object GTIs! If you're getting errors regarding your GTIs,
+
don't use this and only give GTIs to the input objects before
+
making the cross spectrum.
+
save_all : bool, default False
+
If True, save the cross spectrum of each segment in the ``cs_all``
+
attribute of the output :class:`Crossspectrum` object.
+
"""
+
+
return crossspectrum_from_lc_iterable(
+
iter_lc1,
+
iter_lc2,
+
dt,
+
segment_size,
+
norm=norm,
+
power_type=power_type,
+
silent=silent,
+
fullspec=fullspec,
+
use_common_mean=use_common_mean,
+
gti=gti,
+
)
+
+
+
def _initialize_from_any_input(
+
self,
+
data1,
+
data2,
+
dt=None,
+
segment_size=None,
+
norm="frac",
+
power_type="all",
+
silent=False,
+
fullspec=False,
+
gti=None,
+
use_common_mean=True,
+
save_all=False,
+
):
+
"""Initialize the class, trying to understand the input types.
+
+
The input arguments are the same as ``__init__()``. Based on the type
+
of ``data1``, this method will call the appropriate
+
``crossspectrum_from_XXXX`` function, and initialize ``self`` with
+
the correct attributes.
+
"""
+
if segment_size is None and isinstance(data1, StingrayObject):
+
common_gti = cross_two_gtis(data1.gti, data2.gti)
+
data1.gti = common_gti
+
data2.gti = common_gti
+
data1 = data1.apply_gtis(inplace=False)
+
data2 = data2.apply_gtis(inplace=False)
+
+
data1_is_binned = (
+
"counts" in data1.array_attrs() or "_counts" in data1.internal_array_attrs()
+
)
+
data2_is_binned = (
+
"counts" in data2.array_attrs() or "_counts" in data2.internal_array_attrs()
+
)
+
+
if data1_is_binned and data2_is_binned:
+
assert data2.time.size == data1.time.size and np.allclose(
+
data2.time - data1.time, 0
+
), "Time arrays are not the same"
+
elif data1_is_binned or data2_is_binned:
+
raise ValueError("Please use input data of the same kind")
+
+
if isinstance(data1, EventList):
+
spec = crossspectrum_from_events(
+
data1,
+
data2,
+
dt,
+
segment_size,
+
norm=norm,
+
power_type=power_type,
+
silent=silent,
+
fullspec=fullspec,
+
use_common_mean=use_common_mean,
+
gti=gti,
+
save_all=save_all,
+
)
+
elif isinstance(data1, Lightcurve):
+
spec = crossspectrum_from_lightcurve(
+
data1,
+
data2,
+
segment_size,
+
norm=norm,
+
power_type=power_type,
+
silent=silent,
+
fullspec=fullspec,
+
use_common_mean=use_common_mean,
+
gti=gti,
+
save_all=save_all,
+
)
+
spec.lc1 = data1
+
spec.lc2 = data2
+
elif isinstance(data1, (tuple, list)):
+
dt = data1[0].dt
+
# This is a list of light curves.
+
spec = crossspectrum_from_lc_iterable(
+
data1,
+
data2,
+
dt,
+
segment_size,
+
norm=norm,
+
power_type=power_type,
+
silent=silent,
+
fullspec=fullspec,
+
gti=gti,
+
use_common_mean=use_common_mean,
+
save_all=save_all,
+
)
+
else: # pragma: no cover
+
raise TypeError(f"Bad inputs to Crosssspectrum: {type(data1)}")
+
+
for key, val in spec.__dict__.items():
+
setattr(self, key, val)
+
return
+
+
def _initialize_empty(self):
+
"""Set all attributes to None."""
+
self.freq = None
+
self.power = None
+
self.power_err = None
+
self.unnorm_power = None
+
self.unnorm_power_err = None
+
self.df = None
+
self.dt = None
+
self.nphots1 = None
+
self.nphots2 = None
+
self.m = 1
+
self.n = None
+
self.fullspec = None
+
self.k = 1
+
return
+
+
+
[docs]
+
def deadtime_correct(
+
self, dead_time, rate, background_rate=0, limit_k=200, n_approx=None, paralyzable=False
+
):
+
"""
+
Correct the power spectrum for dead time effects.
+
+
This correction is based on the formula given in Zhang et al. 2015, assuming
+
a constant dead time for all events.
+
For more advanced dead time corrections, see the FAD method from `stingray.deadtime.fad`
+
+
Parameters
+
----------
+
dead_time: float
+
The dead time of the detector.
+
rate : float
+
Detected source count rate
+
+
Other Parameters
+
----------------
+
background_rate : float, default 0
+
Detected background count rate. This is important to estimate when deadtime is given by the
+
combination of the source counts and background counts (e.g. in an imaging X-ray detector).
+
paralyzable: bool, default False
+
If True, the dead time correction is done assuming a paralyzable
+
dead time. If False, the correction is done assuming a non-paralyzable
+
(more common) dead time.
+
limit_k : int, default 200
+
Limit to this value the number of terms in the inner loops of
+
calculations. Check the plots returned by the `check_B` and
+
`check_A` functions to test that this number is adequate.
+
n_approx : int, default None
+
Number of bins to calculate the model power spectrum. If None, it will use the size of
+
the input frequency array. Relatively simple models (e.g., low count rates compared to
+
dead time) can use a smaller number of bins to speed up the calculation, and the final
+
power values will be interpolated.
+
+
Returns
+
-------
+
spectrum: :class:`Crossspectrum` or derivative.
+
The dead-time corrected spectrum.
+
"""
+
# I put it here to avoid circular imports
+
from stingray.deadtime import non_paralyzable_dead_time_model
+
+
if paralyzable:
+
raise NotImplementedError("Paralyzable dead time correction is not implemented yet.")
+
+
model = non_paralyzable_dead_time_model(
+
self.freq,
+
dead_time,
+
rate,
+
bin_time=self.dt,
+
limit_k=limit_k,
+
background_rate=background_rate,
+
n_approx=n_approx,
+
)
+
correction = 2 / model
+
new_spec = copy.deepcopy(self)
+
new_spec.power *= correction
+
+
# Now correct internal attributes for both the spectrum and its possible
+
# sub-spectra (PDSs from single channels, dynamical spectra, etc.)
+
objects = [new_spec]
+
if hasattr(new_spec, "pds1"):
+
objects += [new_spec.pds1, new_spec.pds2]
+
+
for obj in objects:
+
for attr in ["unnorm_power", "power_err", "unnorm_power_err"]:
+
if hasattr(obj, attr):
+
setattr(obj, attr, getattr(obj, attr) * correction)
+
+
for attr in ["cs_all", "unnorm_cs_all"]:
+
if hasattr(new_spec, attr):
+
dynsp = getattr(new_spec, attr)
+
for i, power in enumerate(dynsp):
+
dynsp[i] = power * correction
+
+
return new_spec
+
+
+
+
+
+
[docs]
+
class AveragedCrossspectrum(Crossspectrum):
+
type = "crossspectrum"
+
"""
+
Make an averaged cross spectrum from a light curve by segmenting two
+
light curves, Fourier-transforming each segment and then averaging the
+
resulting cross spectra.
+
+
Parameters
+
----------
+
data1: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects OR :class:`stingray.EventList` object
+
A light curve from which to compute the cross spectrum. In some cases,
+
this would be the light curve of the wavelength/energy/frequency band
+
of interest.
+
+
data2: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects OR :class:`stingray.EventList` object
+
A second light curve to use in the cross spectrum. In some cases, this
+
would be the wavelength/energy/frequency reference band to compare the
+
band of interest with.
+
+
segment_size: float
+
The size of each segment to average. Note that if the total duration of
+
each :class:`Lightcurve` object in ``lc1`` or ``lc2`` is not an
+
integer multiple of the ``segment_size``, then any fraction left-over
+
at the end of the time series will be lost. Otherwise you introduce
+
artifacts.
+
+
norm: {``frac``, ``abs``, ``leahy``, ``none``}, default ``none``
+
The normalization of the (real part of the) cross spectrum.
+
+
Other Parameters
+
----------------
+
gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+
Good Time intervals. Defaults to the common GTIs from the two input
+
objects. Could throw errors if these GTIs have overlaps with the
+
input object GTIs! If you're getting errors regarding your GTIs,
+
don't use this and only give GTIs to the input objects before
+
making the cross spectrum.
+
+
dt : float
+
The time resolution of the light curve. Only needed when constructing
+
light curves in the case where data1 or data2 are of :class:EventList
+
+
power_type: string, optional, default ``all``
+
Parameter to choose among complete, real part and magnitude of
+
the cross spectrum.
+
+
silent : bool, default False
+
Do not show a progress bar when generating an averaged cross spectrum.
+
Useful for the batch execution of many spectra
+
+
lc1: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects
+
For backwards compatibility only. Like ``data1``, but no
+
:class:`stingray.events.EventList` objects allowed
+
+
lc2: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects
+
For backwards compatibility only. Like ``data2``, but no
+
:class:`stingray.events.EventList` objects allowed
+
+
fullspec: boolean, optional, default ``False``
+
If True, return the full array of frequencies, otherwise return just the
+
positive frequencies.
+
+
save_all : bool, default False
+
Save all intermediate PDSs used for the final average. Use with care.
+
This is likely to fill up your RAM on medium-sized datasets, and to
+
slow down the computation when rebinning.
+
+
skip_checks: bool
+
Skip initial checks, for speed or other reasons (you need to trust your
+
inputs!)
+
+
use_common_mean: bool
+
Averaged cross spectra are normalized in two possible ways: one is by normalizing
+
each of the single spectra that get averaged, the other is by normalizing after the
+
averaging. If `use_common_mean` is selected, the spectrum will be normalized
+
after the average.
+
+
gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+
Good Time intervals. Defaults to the common GTIs from the two input
+
objects. Could throw errors if these GTIs have overlaps with the
+
input object GTIs! If you're getting errors regarding your GTIs,
+
don't use this and only give GTIs to the input objects before
+
making the cross spectrum.
+
+
Attributes
+
----------
+
freq: numpy.ndarray
+
The array of mid-bin frequencies that the Fourier transform samples.
+
+
power: numpy.ndarray
+
The array of cross spectra.
+
+
power_err: numpy.ndarray
+
The uncertainties of ``power``.
+
An approximation for each bin given by ``power_err= power/sqrt(m)``.
+
Where ``m`` is the number of power averaged in each bin (by frequency
+
binning, or averaging power spectra of segments of a light curve).
+
Note that for a single realization (``m=1``) the error is equal to the
+
power.
+
+
df: float
+
The frequency resolution.
+
+
m: int
+
The number of averaged cross spectra.
+
+
n: int
+
The number of time bins per segment of light curve.
+
+
nphots1: float
+
The total number of photons in the first (interest) light curve.
+
+
nphots2: float
+
The total number of photons in the second (reference) light curve.
+
+
gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+
Good Time intervals.
+
"""
+
+
def __init__(
+
self,
+
data1=None,
+
data2=None,
+
segment_size=None,
+
norm="frac",
+
gti=None,
+
power_type="all",
+
silent=False,
+
lc1=None,
+
lc2=None,
+
dt=None,
+
fullspec=False,
+
save_all=False,
+
use_common_mean=True,
+
skip_checks=False,
+
):
+
self._type = None
+
# for backwards compatibility
+
if data1 is None:
+
data1 = lc1
+
if data2 is None:
+
data2 = lc2
+
+
empty = data1 is None and data2 is None
+
good_input = not empty
+
+
if not skip_checks:
+
good_input = self.initial_checks(
+
data1=data1,
+
data2=data2,
+
norm=norm,
+
gti=gti,
+
lc1=lc1,
+
lc2=lc2,
+
power_type=power_type,
+
dt=dt,
+
fullspec=fullspec,
+
segment_size=segment_size,
+
)
+
norm = norm.lower()
+
self.norm = norm
+
self.dt = dt
+
self.save_all = save_all
+
self.segment_size = segment_size
+
self.show_progress = not silent
+
+
if empty or not good_input:
+
return self._initialize_empty()
+
+
if isinstance(data1, Generator):
+
warnings.warn(
+
"The averaged Cross spectrum from a generator of "
+
"light curves pre-allocates the full list of light "
+
"curves, losing all advantage of lazy loading. If it "
+
"is important for you, use the "
+
"AveragedCrossspectrum.from_lc_iterable static "
+
"method, specifying the sampling time `dt`."
+
)
+
data1 = list(data1)
+
data2 = list(data2)
+
+
return self._initialize_from_any_input(
+
data1,
+
data2,
+
dt=dt,
+
segment_size=segment_size,
+
gti=gti,
+
norm=norm,
+
power_type=power_type,
+
silent=silent,
+
fullspec=fullspec,
+
use_common_mean=use_common_mean,
+
save_all=save_all,
+
)
+
+
+
[docs]
+
def initial_checks(self, data1, segment_size=None, **kwargs):
+
"""
+
+
Examples
+
--------
+
>>> times = np.arange(0, 10)
+
>>> ev1 = EventList(times)
+
>>> ev2 = EventList(times)
+
>>> ac = AveragedCrossspectrum()
+
+
If AveragedCrossspectrum, you need ``segment_size``
+
>>> AveragedCrossspectrum.initial_checks(ac, data1=ev1, data2=ev2, dt=1)
+
Traceback (most recent call last):
+
...
+
ValueError: segment_size must be specified...
+
+
And it needs to be finite!
+
>>> AveragedCrossspectrum.initial_checks(ac, data1=ev1, data2=ev2, dt=1., segment_size=np.nan)
+
Traceback (most recent call last):
+
...
+
ValueError: segment_size must be finite!
+
"""
+
good = Crossspectrum.initial_checks(self, data1, segment_size=segment_size, **kwargs)
+
if not good:
+
return False
+
if isinstance(self, AveragedCrossspectrum) and segment_size is None and data1 is not None:
+
raise ValueError("segment_size must be specified")
+
+
if (
+
isinstance(self, AveragedCrossspectrum)
+
and segment_size is not None
+
and not np.isfinite(segment_size)
+
):
+
raise ValueError("segment_size must be finite!")
+
return True
+
+
+
+
[docs]
+
def coherence(self):
+
"""Averaged Coherence function.
+
+
+
Coherence is defined in Vaughan and Nowak, 1996 [#]_.
+
It is a Fourier frequency dependent measure of the linear correlation
+
between time series measured simultaneously in two energy channels.
+
+
Compute an averaged Coherence function of cross spectrum by computing
+
coherence function of each segment and averaging them. The return type
+
is a tuple with first element as the coherence function and the second
+
element as the corresponding uncertainty associated with it.
+
+
Note : The uncertainty in coherence function is strictly valid for Gaussian \
+
statistics only.
+
+
Returns
+
-------
+
(coh, uncertainty) : tuple of np.ndarray
+
Tuple comprising the coherence function and uncertainty.
+
+
References
+
----------
+
.. [#] https://iopscience.iop.org/article/10.1086/310430
+
"""
+
if np.any(self.m < 50):
+
simon(
+
"Number of segments used in averaging is "
+
"significantly low. The result might not follow the "
+
"expected statistical distributions."
+
)
+
c = self.unnorm_power
+
p1 = self.pds1.unnorm_power
+
p2 = self.pds2.unnorm_power
+
+
meanrate1 = self.nphots1 / self.n / self.dt
+
meanrate2 = self.nphots2 / self.n / self.dt
+
+
P1noise = poisson_level(norm="none", meanrate=meanrate1, n_ph=self.nphots1)
+
P2noise = poisson_level(norm="none", meanrate=meanrate2, n_ph=self.nphots2)
+
+
coh = raw_coherence(c, p1, p2, P1noise, P2noise, self.n)
+
+
# Calculate uncertainty
+
uncertainty = (2**0.5 * coh * (1 - coh)) / (np.sqrt(coh) * self.m**0.5)
+
+
uncertainty[coh == 0] = 0.0
+
+
return (coh, uncertainty)
+
+
+
+
[docs]
+
def phase_lag(self):
+
"""Return the fourier phase lag of the cross spectrum."""
+
lag = np.angle(self.unnorm_power)
+
coh, uncert = self.coherence()
+
+
dum = (1.0 - coh) / (2.0 * coh)
+
+
dum[coh == 0] = 0.0
+
+
lag_err = np.sqrt(dum / self.m)
+
return lag, lag_err
+
+
+
+
[docs]
+
def time_lag(self):
+
"""Calculate time lag and uncertainty.
+
+
Equation from Bendat & Piersol, 2011 [bendat-2011]__.
+
+
Returns
+
-------
+
lag : np.ndarray
+
The time lag
+
+
lag_err : np.ndarray
+
The uncertainty in the time lag
+
"""
+
ph_lag, ph_lag_err = self.phase_lag()
+
+
lag = ph_lag / (2 * np.pi * self.freq)
+
lag_err = ph_lag_err / (2 * np.pi * self.freq)
+
+
return lag, lag_err
+
+
+
+
+class DynamicalCrossspectrum(AveragedCrossspectrum):
+ type = "crossspectrum"
+ """
+ Create a dynamical cross spectrum, also often called a *spectrogram*.
+
+ This class will divide two :class:`Lightcurve` objects into segments of
+ length ``segment_size``, create a cross spectrum for each segment and store
+ all powers in a matrix as a function of both time (using the mid-point of
+ each segment) and frequency.
+
+ This is often used to trace changes in period of a (quasi-)periodic signal
+ over time.
+
+ Parameters
+ ----------
+ data1 : :class:`stingray.Lightcurve` or :class:`stingray.EventList` object
+ The time series or event list from the subject band, or channel, for which
+ the dynamical cross spectrum is to be calculated. If :class:`stingray.EventList`, ``dt``
+ must be specified as well.
+
+ data2 : :class:`stingray.Lightcurve` or :class:`stingray.EventList` object
+ The time series or event list from the reference band, or channel, of the dynamical
+ cross spectrum. If :class:`stingray.EventList`, ``dt`` must be specified as well.
+
+ segment_size : float, default 1
+ Length of the segment of light curve, default value is 1 (in whatever
+ units the ``time`` array in the :class:`Lightcurve`` object uses).
+
+ norm: {"leahy" | "frac" | "abs" | "none" }, optional, default "frac"
+ The normaliation of the periodogram to be used.
+
+ Other Parameters
+ ----------------
+ gti: 2-d float array
+ ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]`` -- Good Time intervals.
+ This choice overrides the GTIs in the single light curves. Use with
+ care, especially if these GTIs have overlaps with the input
+ object GTIs! If you're getting errors regarding your GTIs, don't
+ use this and only give GTIs to the input object before making
+ the cross spectrum.
+
+ sample_time: float
+ Compulsory for input :class:`stingray.EventList` data. The time resolution of the
+ lightcurve that is created internally from the input event lists. Drives the
+ Nyquist frequency.
+
+ Attributes
+ ----------
+ segment_size: float
+ The size of each segment to average. Note that if the total
+ duration of each input object in lc is not an integer multiple
+ of the ``segment_size``, then any fraction left-over at the end of the
+ time series will be lost.
+
+ dyn_ps : np.ndarray
+ The matrix of normalized squared absolute values of Fourier
+ amplitudes. The axis are given by the ``freq``
+ and ``time`` attributes.
+
+ norm: {``leahy`` | ``frac`` | ``abs`` | ``none``}
+ The normalization of the periodogram.
+
+ freq: numpy.ndarray
+ The array of mid-bin frequencies that the Fourier transform samples.
+
+ time: numpy.ndarray
+ The array of mid-point times of each interval used for the dynamical
+ power spectrum.
+
+ df: float
+ The frequency resolution.
+
+ dt: float
+ The time resolution of the dynamical spectrum. It is **not** the time resolution of the
+ input light curve. It is the integration time of each line of the dynamical power
+ spectrum (typically, an integer multiple of ``segment_size``).
+
+ m: int
+ The number of averaged powers in each spectral bin (initially 1, it changes after rebinning).
+ """
+
+ def __init__(
+ self, data1=None, data2=None, segment_size=None, norm="frac", gti=None, sample_time=None
+ ):
+ self.segment_size = segment_size
+ self.sample_time = sample_time
+ self.gti = gti
+ self.norm = norm
+
+ if segment_size is None and data1 is None and data2 is None:
+ self._initialize_empty()
+ self.dyn_ps = None
+ return
+
+ if segment_size is None or data1 is None or data2 is None:
+ raise TypeError("data1, data2, and segment_size must all be specified")
+
+ if isinstance(data1, EventList) and sample_time is None:
+ raise ValueError("To pass input event lists, please specify sample_time")
+ elif isinstance(data1, Lightcurve):
+ sample_time = data1.dt
+ if segment_size > data1.tseg:
+ raise ValueError(
+ "Length of the segment is too long to create "
+ "any segments of the light curve!"
+ )
+ if segment_size < 2 * sample_time:
+ raise ValueError("Length of the segment is too short to form a light curve!")
+
+ self._make_matrix(data1, data2)
+
+ def _make_matrix(self, data1, data2):
+ """
+ Create a matrix of powers for each time step and each frequency step.
+
+ Time increases with row index, frequency with column index.
+
+ Parameters
+ ----------
+ data1 : :class:`Lightcurve` or :class:`EventList`
+ The object providing the subject band/channel for the dynamical
+ cross spectrum.
+ data2 : :class:`Lightcurve` or :class:`EventList`
+ The object providing the reference band for the dynamical
+ cross spectrum.
+ """
+ avg = AveragedCrossspectrum(
+ data1,
+ data2,
+ dt=self.sample_time,
+ segment_size=self.segment_size,
+ norm=self.norm,
+ gti=self.gti,
+ save_all=True,
+ )
+ self.dyn_ps = np.array(avg.cs_all).T
+ conv = avg.cs_all / avg.unnorm_cs_all
+ self.unnorm_conversion = np.nanmean(conv)
+ self.freq = avg.freq
+ current_gti = avg.gti
+ self.nphots1 = avg.nphots1
+ self.nphots2 = avg.nphots2
+
+ tstart, _ = time_intervals_from_gtis(current_gti, self.segment_size)
+
+ self.time = tstart + 0.5 * self.segment_size
+ self.df = avg.df
+ self.dt = self.segment_size
+ self.m = 1
+
+ def rebin_frequency(self, df_new, method="average"):
+ """
+ Rebin the Dynamic Power Spectrum to a new frequency resolution.
+ Rebinning is an in-place operation, i.e. will replace the existing
+ ``dyn_ps`` attribute.
+
+ While the new resolution does not need to be an integer of the previous frequency
+ resolution, be aware that if this is the case, the last frequency bin will be cut
+ off by the fraction left over by the integer division
+
+ Parameters
+ ----------
+ df_new: float
+ The new frequency resolution of the dynamical power spectrum.
+ Must be larger than the frequency resolution of the old dynamical
+ power spectrum!
+
+ method: {"sum" | "mean" | "average"}, optional, default "average"
+ This keyword argument sets whether the powers in the new bins
+ should be summed or averaged.
+ """
+ new_dynspec_object = copy.deepcopy(self)
+ dynspec_new = []
+ for data in self.dyn_ps.T:
+ freq_new, bin_counts, bin_err, step = rebin_data(
+ self.freq, data, dx_new=df_new, method=method
+ )
+ dynspec_new.append(bin_counts)
+
+ new_dynspec_object.freq = freq_new
+ new_dynspec_object.dyn_ps = np.array(dynspec_new).T
+ new_dynspec_object.df = df_new
+ new_dynspec_object.m = int(step) * self.m
+
+ return new_dynspec_object
+
+ def rebin_time(self, dt_new, method="average"):
+ """
+ Rebin the Dynamic Power Spectrum to a new time resolution.
+
+ Note: this is *not* changing the time resolution of the input light
+ curve! ``dt`` is the integration time of each line of the dynamical power
+ spectrum (typically, an integer multiple of ``segment_size``).
+
+ While the new resolution does not need to be an integer of the previous time
+ resolution, be aware that if this is the case, the last time bin will be cut
+ off by the fraction left over by the integer division
+
+ Parameters
+ ----------
+ dt_new: float
+ The new time resolution of the dynamical power spectrum.
+ Must be larger than the time resolution of the old dynamical power
+ spectrum!
+
+ method: {"sum" | "mean" | "average"}, optional, default "average"
+ This keyword argument sets whether the powers in the new bins
+ should be summed or averaged.
+
+ Returns
+ -------
+ time_new: numpy.ndarray
+ Time axis with new rebinned time resolution.
+
+ dynspec_new: numpy.ndarray
+ New rebinned Dynamical Cross Spectrum.
+ """
+ if dt_new < self.dt:
+ raise ValueError("New time resolution must be larger than old time resolution!")
+
+ new_dynspec_object = copy.deepcopy(self)
+
+ dynspec_new = []
+ for data in self.dyn_ps:
+ time_new, bin_counts, _, step = rebin_data(
+ self.time, data, dt_new, method=method, dx=self.dt
+ )
+ dynspec_new.append(bin_counts)
+
+ new_dynspec_object.time = time_new
+ new_dynspec_object.dyn_ps = np.array(dynspec_new)
+ new_dynspec_object.dt = dt_new
+ new_dynspec_object.m = int(step) * self.m
+ return new_dynspec_object
+
+ def rebin_by_n_intervals(self, n, method="average"):
+ """
+ Rebin the Dynamic Power Spectrum to a new time resolution, by summing contiguous intervals.
+
+ This is different from meth:`DynamicalPowerspectrum.rebin_time` in that it averages ``n``
+ consecutive intervals regardless of their distance in time. ``rebin_time`` will instead
+ average intervals that are separated at most by a time ``dt_new``.
+
+ Parameters
+ ----------
+ n: int
+ The number of intervals to be combined into one.
+
+ method: {"sum" | "mean" | "average"}, optional, default "average"
+ This keyword argument sets whether the powers in the new bins
+ should be summed or averaged.
+
+ Returns
+ -------
+ time_new: numpy.ndarray
+ Time axis with new rebinned time resolution.
+
+ dynspec_new: numpy.ndarray
+ New rebinned Dynamical Cross Spectrum.
+ """
+ if not np.issubdtype(type(n), np.integer):
+ warnings.warn("n must be an integer. Casting to int")
+
+ n = int(n)
+
+ if n == 1:
+ return copy.deepcopy(self)
+ if n < 1:
+ raise ValueError("n must be >= 1")
+
+ new_dynspec_object = copy.deepcopy(self)
+
+ dynspec_new = []
+ time_new = []
+ for i, data in enumerate(self.dyn_ps.T):
+ if i % n == 0:
+ count = 1
+ bin_counts = data
+ bin_times = self.time[i]
+ else:
+ count += 1
+ bin_counts += data
+ bin_times += self.time[i]
+ if count == n:
+ if method in ["mean", "average"]:
+ bin_counts /= n
+ dynspec_new.append(bin_counts)
+ bin_times /= n
+ time_new.append(bin_times)
+
+ new_dynspec_object.time = time_new
+ new_dynspec_object.dyn_ps = np.array(dynspec_new).T
+ new_dynspec_object.dt *= n
+ new_dynspec_object.m = n * self.m
+
+ return new_dynspec_object
+
+ def trace_maximum(self, min_freq=None, max_freq=None):
+ """
+ Return the indices of the maximum powers in each segment
+ :class:`Powerspectrum` between specified frequencies.
+
+ Parameters
+ ----------
+ min_freq: float, default ``None``
+ The lower frequency bound.
+
+ max_freq: float, default ``None``
+ The upper frequency bound.
+
+ Returns
+ -------
+ max_positions : np.array
+ The array of indices of the maximum power in each segment having
+ frequency between ``min_freq`` and ``max_freq``.
+ """
+ if min_freq is None:
+ min_freq = np.min(self.freq)
+ if max_freq is None:
+ max_freq = np.max(self.freq)
+
+ max_positions = []
+ for ps in self.dyn_ps.T:
+ indices = np.logical_and(self.freq <= max_freq, min_freq <= self.freq)
+ max_power = np.max(ps[indices])
+ max_positions.append(np.where(ps == max_power)[0][0])
+
+ return np.array(max_positions)
+
+ def shift_and_add(self, f0_list, nbins=100, output_obj_type=AveragedCrossspectrum, rebin=None):
+ """Shift and add the dynamical cross spectrum.
+
+ This is the basic operation for the shift-and-add operation used to track
+ kHz QPOs in X-ray binaries (e.g. Méndez et al. 1998, ApJ, 494, 65).
+
+ Parameters
+ ----------
+ freqs : np.array
+ Array of frequencies, the same for all powers. Must be sorted and on a uniform
+ grid.
+ power_list : list of np.array
+ List of power spectra. Each power spectrum must have the same length
+ as the frequency array.
+ f0_list : list of float
+ List of central frequencies
+
+ Other parameters
+ ----------------
+ nbins : int, default 100
+ Number of bins to extract
+ rebin : int, default None
+ Rebin the final spectrum by this factor. At the moment, the rebinning
+ is linear.
+ output_obj_type : class, default :class:`AveragedCrossspectrum`
+ The type of the output object. Can be, e.g. :class:`AveragedCrossspectrum` or
+ :class:`AveragedPowerspectrum`.
+
+ Returns
+ -------
+ output: :class:`AveragedPowerspectrum` or :class:`AveragedCrossspectrum`
+ The final averaged power spectrum.
+
+ Examples
+ --------
+ >>> power_list = [[2, 5, 2, 2, 2], [1, 1, 5, 1, 1], [3, 3, 3, 5, 3]]
+ >>> power_list = np.array(power_list).T
+ >>> freqs = np.arange(5) * 0.1
+ >>> f0_list = [0.1, 0.2, 0.3, 0.4]
+ >>> dps = DynamicalCrossspectrum()
+ >>> dps.dyn_ps = power_list
+ >>> dps.freq = freqs
+ >>> dps.df = 0.1
+ >>> dps.m = 1
+ >>> output = dps.shift_and_add(f0_list, nbins=5)
+ >>> assert np.array_equal(output.m, [2, 3, 3, 3, 2])
+ >>> assert np.array_equal(output.power, [2. , 2. , 5. , 2. , 1.5])
+ >>> assert np.allclose(output.freq, [0.05, 0.15, 0.25, 0.35, 0.45])
+ """
+ from .fourier import shift_and_add
+
+ final_freqs, final_powers, count = shift_and_add(
+ self.freq, self.dyn_ps.T, f0_list, nbins=nbins, M=self.m, df=self.df, rebin=rebin
+ )
+
+ output = output_obj_type()
+ good = ~np.isnan(final_powers)
+
+ output.freq = final_freqs[good]
+ output.power = final_powers[good]
+ output.m = count[good]
+ output.df = self.df
+ output.norm = self.norm
+ output.gti = self.gti
+ output.segment_size = self.segment_size
+
+ return output
+
+ def power_colors(
+ self,
+ freq_edges=[1 / 256, 1 / 32, 0.25, 2.0, 16.0],
+ freqs_to_exclude=None,
+ poisson_power=0,
+ ):
+ """
+ Compute the power colors of the dynamical power spectrum.
+
+ See Heil et al. 2015, MNRAS, 448, 3348
+
+ Parameters
+ ----------
+ freq_edges: iterable
+ The edges of the frequency bins to be used for the power colors.
+
+ freqs_to_exclude : 1-d or 2-d iterable, optional, default None
+ The ranges of frequencies to exclude from the calculation of the power color.
+ For example, the frequencies containing strong QPOs.
+ A 1-d iterable should contain two values for the edges of a single range. (E.g.
+ ``[0.1, 0.2]``). ``[[0.1, 0.2], [3, 4]]`` will exclude the ranges 0.1-0.2 Hz and
+ 3-4 Hz.
+
+ poisson_power : float or iterable, optional, default 0
+ The Poisson noise level of the power spectrum. If iterable, it should have the same
+ length as ``freq``. (This might apply to the case of a power spectrum with a
+ strong dead time distortion)
+
+ Returns
+ -------
+ pc0: np.ndarray
+ pc0_err: np.ndarray
+ pc1: np.ndarray
+ pc1_err: np.ndarray
+ The power colors for each spectrum and their respective errors
+ """
+ power_colors = []
+ if np.iscomplexobj(self.dyn_ps):
+ warnings.warn("When using power_colors, complex powers will be cast to real.")
+
+ for ps in self.dyn_ps.T:
+ power_colors.append(
+ power_color(
+ self.freq,
+ ps.real,
+ freq_edges=freq_edges,
+ freqs_to_exclude=freqs_to_exclude,
+ df=self.df,
+ poisson_power=poisson_power,
+ m=self.m,
+ )
+ )
+
+ pc0, pc0_err, pc1, pc1_err = np.array(power_colors).T
+ return pc0, pc0_err, pc1, pc1_err
+
+ def compute_rms(self, min_freq, max_freq, poisson_noise_level=0):
+ """
+ Compute the fractional rms amplitude in the power spectrum
+ between two frequencies.
+
+ Parameters
+ ----------
+ min_freq: float
+ The lower frequency bound for the calculation.
+
+ max_freq: float
+ The upper frequency bound for the calculation.
+
+ Other parameters
+ ----------------
+ poisson_noise_level : float, default is None
+ This is the Poisson noise level of the PDS with same
+ normalization as the PDS.
+
+ Returns
+ -------
+ rms: float
+ The fractional rms amplitude contained between ``min_freq`` and
+ ``max_freq``.
+
+ rms_err: float
+ The error on the fractional rms amplitude.
+
+ """
+ dyn_ps_unnorm = self.dyn_ps / self.unnorm_conversion
+ poisson_noise_unnrom = poisson_noise_level / self.unnorm_conversion
+ if not hasattr(self, "nphots"):
+ nphots = (self.nphots1 * self.nphots2) ** 0.5
+ else:
+ nphots = self.nphots
+
+ good = (self.freq >= min_freq) & (self.freq <= max_freq)
+
+ M_freq = self.m
+ if isinstance(self.m, Iterable):
+ M_freq = self.m[good]
+
+ rmss = []
+ rms_errs = []
+
+ for unnorm_powers in dyn_ps_unnorm.T:
+ r, re = get_rms_from_unnorm_periodogram(
+ unnorm_powers[good],
+ nphots,
+ self.df,
+ M=M_freq,
+ poisson_noise_unnorm=poisson_noise_unnrom,
+ segment_size=self.segment_size,
+ kind="frac",
+ )
+
+ rmss.append(r)
+ rms_errs.append(re)
+ return rmss, rms_errs
+
+
+def crossspectrum_from_time_array(
+ times1,
+ times2,
+ dt,
+ segment_size=None,
+ gti=None,
+ norm="none",
+ power_type="all",
+ silent=False,
+ fullspec=False,
+ use_common_mean=True,
+ save_all=False,
+):
+ """Calculate AveragedCrossspectrum from two arrays of event times.
+
+ Parameters
+ ----------
+ times1 : `np.array`
+ Event arrival times of channel 1
+ times2 : `np.array`
+ Event arrival times of channel 2
+ dt : float
+ The time resolution of the intermediate light curves
+ (sets the Nyquist frequency)
+
+ Other parameters
+ ----------------
+ segment_size : float
+ The length, in seconds, of the light curve segments that will be averaged
+ gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+ Good Time intervals. Defaults to the common GTIs from the two input
+ objects
+ norm : str, default "frac"
+ The normalization of the periodogram. "abs" is absolute rms, "frac" is
+ fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+ unnormalized periodogram
+ use_common_mean : bool, default True
+ The mean of the light curve can be estimated in each interval, or on
+ the full light curve. This gives different results (Alston+2013).
+ Here we assume the mean is calculated on the full light curve, but
+ the user can set ``use_common_mean`` to False to calculate it on a
+ per-segment basis.
+ fullspec : bool, default False
+ Return the full periodogram, including negative frequencies
+ silent : bool, default False
+ Silence the progress bars
+ power_type : str, default 'all'
+ If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+ the real part
+
+ Returns
+ -------
+ spec : `AveragedCrossspectrum` or `Crossspectrum`
+ The output cross spectrum.
+ """
+ force_averaged = segment_size is not None
+ # Suppress progress bar for single periodogram
+ silent = silent or (segment_size is None)
+ results = avg_cs_from_timeseries(
+ times1,
+ times2,
+ gti,
+ segment_size,
+ dt,
+ norm=norm,
+ use_common_mean=use_common_mean,
+ fullspec=fullspec,
+ silent=silent,
+ power_type=power_type,
+ return_auxil=True,
+ return_subcs=save_all,
+ )
+
+ return _create_crossspectrum_from_result_table(results, force_averaged=force_averaged)
+
+
+def crossspectrum_from_events(
+ events1,
+ events2,
+ dt,
+ segment_size=None,
+ norm="none",
+ power_type="all",
+ silent=False,
+ fullspec=False,
+ use_common_mean=True,
+ gti=None,
+ save_all=False,
+):
+ """Calculate AveragedCrossspectrum from two event lists
+
+ Parameters
+ ----------
+ events1 : `stingray.EventList`
+ Events from channel 1
+ events2 : `stingray.EventList`
+ Events from channel 2
+ dt : float
+ The time resolution of the intermediate light curves
+ (sets the Nyquist frequency)
+
+ Other parameters
+ ----------------
+ segment_size : float, default None
+ The length, in seconds, of the light curve segments that will be averaged
+ norm : str, default "frac"
+ The normalization of the periodogram. "abs" is absolute rms, "frac" is
+ fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+ unnormalized periodogram
+ use_common_mean : bool, default True
+ The mean of the light curve can be estimated in each interval, or on
+ the full light curve. This gives different results (Alston+2013).
+ Here we assume the mean is calculated on the full light curve, but
+ the user can set ``use_common_mean`` to False to calculate it on a
+ per-segment basis.
+ fullspec : bool, default False
+ Return the full periodogram, including negative frequencies
+ silent : bool, default False
+ Silence the progress bars
+ power_type : str, default 'all'
+ If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+ the real part
+ gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+ Good Time intervals. Defaults to the common GTIs from the two input
+ objects
+
+ Returns
+ -------
+ spec : `AveragedCrossspectrum` or `Crossspectrum`
+ The output cross spectrum.
+ """
+
+ if gti is None:
+ gti = cross_two_gtis(events1.gti, events2.gti)
+
+ return crossspectrum_from_time_array(
+ events1.time,
+ events2.time,
+ dt,
+ segment_size,
+ gti,
+ norm=norm,
+ power_type=power_type,
+ silent=silent,
+ fullspec=fullspec,
+ use_common_mean=use_common_mean,
+ save_all=save_all,
+ )
+
+
+def crossspectrum_from_lightcurve(
+ lc1,
+ lc2,
+ segment_size=None,
+ norm="none",
+ power_type="all",
+ silent=False,
+ fullspec=False,
+ use_common_mean=True,
+ gti=None,
+ save_all=False,
+):
+ """Calculate AveragedCrossspectrum from two light curves
+
+ Parameters
+ ----------
+ lc1 : `stingray.Lightcurve`
+ Light curve from channel 1
+ lc2 : `stingray.Lightcurve`
+ Light curve from channel 2
+
+ Other parameters
+ ----------------
+ segment_size : float, default None
+ The length, in seconds, of the light curve segments that will be averaged
+ norm : str, default "frac"
+ The normalization of the periodogram. "abs" is absolute rms, "frac" is
+ fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+ unnormalized periodogram
+ use_common_mean : bool, default True
+ The mean of the light curve can be estimated in each interval, or on
+ the full light curve. This gives different results (Alston+2013).
+ Here we assume the mean is calculated on the full light curve, but
+ the user can set ``use_common_mean`` to False to calculate it on a
+ per-segment basis.
+ fullspec : bool, default False
+ Return the full periodogram, including negative frequencies
+ silent : bool, default False
+ Silence the progress bars
+ power_type : str, default 'all'
+ If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+ the real part
+ gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+ Good Time intervals. Defaults to the common GTIs from the two input
+ objects
+ save_all : bool, default False
+ Save all intermediate spectra used for the final average. Use with care.
+ This is likely to fill up your RAM on medium-sized datasets, and to
+ slow down the computation when rebinning.
+
+ Returns
+ -------
+ spec : `AveragedCrossspectrum` or `Crossspectrum`
+ The output cross spectrum.
+ """
+ error_flux_attr = None
+
+ if lc1.err_dist == "gauss":
+ error_flux_attr = "_counts_err"
+
+ return crossspectrum_from_timeseries(
+ lc1,
+ lc2,
+ "_counts",
+ error_flux_attr=error_flux_attr,
+ segment_size=segment_size,
+ norm=norm,
+ power_type=power_type,
+ silent=silent,
+ fullspec=fullspec,
+ use_common_mean=use_common_mean,
+ gti=gti,
+ save_all=save_all,
+ )
+
+
+def crossspectrum_from_timeseries(
+ ts1,
+ ts2,
+ flux_attr,
+ error_flux_attr=None,
+ segment_size=None,
+ norm="none",
+ power_type="all",
+ silent=False,
+ fullspec=False,
+ use_common_mean=True,
+ gti=None,
+ save_all=False,
+):
+ """Calculate AveragedCrossspectrum from two time series
+
+ Parameters
+ ----------
+ ts1 : `stingray.StingrayTimeseries`
+ Time series from channel 1
+ ts2 : `stingray.StingrayTimeseries`
+ Time series from channel 2
+ flux_attr : `str`
+ What attribute of the time series will be used.
+
+ Other parameters
+ ----------------
+ error_flux_attr : `str`
+ What attribute of the time series will be used as error bar.
+ segment_size : float, default None
+ The length, in seconds, of the light curve segments that will be averaged
+ norm : str, default "frac"
+ The normalization of the periodogram. "abs" is absolute rms, "frac" is
+ fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+ unnormalized periodogram
+ use_common_mean : bool, default True
+ The mean of the light curve can be estimated in each interval, or on
+ the full light curve. This gives different results (Alston+2013).
+ Here we assume the mean is calculated on the full light curve, but
+ the user can set ``use_common_mean`` to False to calculate it on a
+ per-segment basis.
+ fullspec : bool, default False
+ Return the full periodogram, including negative frequencies
+ silent : bool, default False
+ Silence the progress bars
+ power_type : str, default 'all'
+ If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+ the real part
+ gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+ Good Time intervals. Defaults to the common GTIs from the two input
+ objects
+ save_all : bool, default False
+ Save all intermediate spectra used for the final average. Use with care.
+ This is likely to fill up your RAM on medium-sized datasets, and to
+ slow down the computation when rebinning.
+
+ Returns
+ -------
+ spec : `AveragedCrossspectrum` or `Crossspectrum`
+ The output cross spectrum.
+ """
+ force_averaged = segment_size is not None
+ # Suppress progress bar for single periodogram
+ silent = silent or (segment_size is None)
+ if gti is None:
+ gti = cross_two_gtis(ts1.gti, ts2.gti)
+
+ err1 = err2 = None
+ if error_flux_attr is not None:
+ err1 = getattr(ts1, error_flux_attr)
+ err2 = getattr(ts2, error_flux_attr)
+
+ results = avg_cs_from_timeseries(
+ ts1.time,
+ ts2.time,
+ gti,
+ segment_size,
+ ts1.dt,
+ norm=norm,
+ use_common_mean=use_common_mean,
+ fullspec=fullspec,
+ silent=silent,
+ power_type=power_type,
+ fluxes1=getattr(ts1, flux_attr),
+ fluxes2=getattr(ts2, flux_attr),
+ errors1=err1,
+ errors2=err2,
+ return_auxil=True,
+ return_subcs=save_all,
+ )
+
+ return _create_crossspectrum_from_result_table(results, force_averaged=force_averaged)
+
+
+def crossspectrum_from_lc_iterable(
+ iter_lc1,
+ iter_lc2,
+ dt,
+ segment_size,
+ norm="none",
+ power_type="all",
+ silent=False,
+ fullspec=False,
+ use_common_mean=True,
+ gti=None,
+ save_all=False,
+):
+ """Calculate AveragedCrossspectrum from two light curves
+
+ Parameters
+ ----------
+ iter_lc1 : iterable of `stingray.Lightcurve` objects or `np.array`
+ Light curves from channel 1. If arrays, use them as counts
+ iter_lc1 : iterable of `stingray.Lightcurve` objects or `np.array`
+ Light curves from channel 2. If arrays, use them as counts
+ dt : float
+ The time resolution of the light curves
+ (sets the Nyquist frequency)
+ segment_size : float
+ The length, in seconds, of the light curve segments that will be averaged
+
+ Other parameters
+ ----------------
+ norm : str, default "frac"
+ The normalization of the periodogram. "abs" is absolute rms, "frac" is
+ fractional rms, "leahy" is Leahy+83 normalization, and "none" is the
+ unnormalized periodogram
+ use_common_mean : bool, default True
+ The mean of the light curve can be estimated in each interval, or on
+ the full light curve. This gives different results (Alston+2013).
+ Here we assume the mean is calculated on the full light curve, but
+ the user can set ``use_common_mean`` to False to calculate it on a
+ per-segment basis.
+ fullspec : bool, default False
+ Return the full periodogram, including negative frequencies
+ silent : bool, default False
+ Silence the progress bars
+ power_type : str, default 'all'
+ If 'all', give complex powers. If 'abs', the absolute value; if 'real',
+ the real part
+ gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
+ Good Time intervals. The GTIs of the input light curves are
+ intersected with these.
+
+ Returns
+ -------
+ spec : `AveragedCrossspectrum` or `Crossspectrum`
+ The output cross spectrum.
+ """
+
+ force_averaged = segment_size is not None
+ # Suppress progress bar for single periodogram
+ silent = silent or (segment_size is None)
+
+ common_gti = gti
+
+ def iterate_lc_counts(iter_lc):
+ for lc in iter_lc:
+ if hasattr(lc, "counts"):
+ n_bin = np.rint(segment_size / lc.dt).astype(int)
+
+ gti = lc.gti
+ if common_gti is not None:
+ gti = cross_two_gtis(common_gti, lc.gti)
+
+ err = None
+ if lc.err_dist == "gauss":
+ err = lc.counts_err
+
+ flux_iterable = get_flux_iterable_from_segments(
+ lc.time, gti, segment_size, n_bin, fluxes=lc.counts, errors=err
+ )
+ for out in flux_iterable:
+ yield out
+ else:
+ yield lc
+
+ results = avg_cs_from_iterables(
+ iterate_lc_counts(iter_lc1),
+ iterate_lc_counts(iter_lc2),
+ dt,
+ norm=norm,
+ use_common_mean=use_common_mean,
+ silent=silent,
+ fullspec=fullspec,
+ power_type=power_type,
+ return_auxil=True,
+ return_subcs=save_all,
+ )
+ return _create_crossspectrum_from_result_table(results, force_averaged=force_averaged)
+
+
+def _create_crossspectrum_from_result_table(table, force_averaged=False):
+ """Copy the columns and metadata from the results of
+ ``stingray.fourier.avg_cs_from_XX`` functions into
+ `AveragedCrossspectrum` or `Crossspectrum` objects.
+
+ By default, allocates a Crossspectrum object if the number of
+ averaged spectra is 1, otherwise an AveragedCrossspectrum.
+ If the user specifies ``force_averaged=True``, it always allocates
+ an AveragedCrossspectrum.
+
+ Parameters
+ ----------
+ table : `astropy.table.Table`
+ results of `avg_cs_from_iterables` or `avg_cs_from_iterables_quick`
+
+ Other parameters
+ ----------------
+ force_averaged : bool, default False
+
+ Returns
+ -------
+ spec : `AveragedCrossspectrum` or `Crossspectrum`
+ The output cross spectrum.
+ """
+ if table.meta["m"] > 1 or force_averaged:
+ cs = AveragedCrossspectrum()
+ cs.pds1 = AveragedCrossspectrum()
+ cs.pds2 = AveragedCrossspectrum()
+ else:
+ cs = Crossspectrum()
+ cs.pds1 = Crossspectrum()
+ cs.pds2 = Crossspectrum()
+
+ cs.freq = cs.pds1.freq = cs.pds2.freq = np.array(table["freq"])
+ cs.norm = cs.pds1.norm = cs.pds2.norm = table.meta["norm"]
+
+ cs.power = np.array(table["power"])
+ cs.pds1.power = np.array(table["pds1"])
+ cs.pds2.power = np.array(table["pds2"])
+ cs.unnorm_power = np.array(table["unnorm_power"])
+ cs.pds1.unnorm_power = np.array(table["unnorm_pds1"])
+ cs.pds2.unnorm_power = np.array(table["unnorm_pds2"])
+
+ cs.pds1.type = cs.pds2.type = "powerspectrum"
+
+ if "subcs" in table.meta:
+ cs.cs_all = np.array(table.meta["subcs"])
+ cs.unnorm_cs_all = np.array(table.meta["unnorm_subcs"])
+
+ for attr, val in table.meta.items():
+ if not attr.endswith("subcs"):
+ setattr(cs, attr, val)
+ setattr(cs.pds1, attr, val)
+ setattr(cs.pds2, attr, val)
+
+ cs.err_dist = "poisson"
+ if cs.variance is not None:
+ cs.err_dist = cs.pds1.err_dist = cs.pds2.err_dist = "gauss"
+
+ # Transform nphods1 in nphots for pds1, etc.
+ for attr, val in table.meta.items():
+ if attr.endswith("1"):
+ setattr(cs.pds1, attr[:-1], val)
+ if attr.endswith("2"):
+ setattr(cs.pds2, attr[:-1], val)
+
+ # I start from unnormalized, and I normalize after correcting for bad error values
+ P1noise = poisson_level(norm="none", meanrate=cs.countrate1, n_ph=cs.nphots1)
+ P2noise = poisson_level(norm="none", meanrate=cs.countrate2, n_ph=cs.nphots2)
+
+ dRe, dIm, _, _ = error_on_averaged_cross_spectrum(
+ cs.unnorm_power,
+ cs.pds1.unnorm_power,
+ cs.pds2.unnorm_power,
+ cs.m,
+ P1noise,
+ P2noise,
+ common_ref="False",
+ )
+
+ bad = np.isnan(dRe) | np.isnan(dIm)
+
+ if np.any(bad):
+ warnings.warn(
+ "Some error bars in the Averaged Crossspectrum are invalid."
+ "Defaulting to sqrt(2 / M) in Leahy norm, rescaled to the appropriate norm."
+ )
+
+ Nph = np.sqrt(cs.nphots1 * cs.nphots2)
+ default_err = np.sqrt(2 / cs.m) * Nph / 2
+
+ dRe[bad] = default_err
+ dIm[bad] = default_err
+
+ power_err = dRe + 1.0j * dIm
+
+ cs.unnorm_power_err = power_err
+
+ mean = table.meta["mean"]
+ nph = table.meta["nphots"]
+ cs.power_err = normalize_periodograms(
+ power_err, cs.dt, cs.n, mean, n_ph=nph, variance=cs.variance, norm=cs.norm
+ )
+
+ cs.pds1.power_err = cs.pds1.power / np.sqrt(cs.pds1.m)
+ cs.pds2.power_err = cs.pds2.power / np.sqrt(cs.pds2.m)
+ cs.pds1.unnorm_power_err = cs.pds1.unnorm_power / np.sqrt(cs.pds1.m)
+ cs.pds2.unnorm_power_err = cs.pds2.unnorm_power / np.sqrt(cs.pds2.m)
+
+ assert hasattr(cs, "df")
+ assert hasattr(cs, "dt")
+ return cs
+