Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Timeseries #838

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions docs/changes/838.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add methods to stream timeseries data into chunks
225 changes: 181 additions & 44 deletions stingray/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from astropy.units import Quantity
from stingray.loggingconfig import setup_logger

from .io import _can_save_longdouble, _can_serialize_meta
from .io import _can_save_longdouble, _can_serialize_meta, DEFAULT_FORMAT
from .utils import (
sqsum,
assign_value_if_none,
Expand All @@ -36,6 +36,7 @@
get_total_gti_length,
bin_intervals_from_gtis,
time_intervals_from_gtis,
split_gtis_by_exposure,
)
from typing import TYPE_CHECKING, Type, TypeVar, Union

Expand Down Expand Up @@ -1075,7 +1076,7 @@ def __getitem__(self, index):
for attr in self.meta_attrs():
setattr(new_ts, attr, copy.deepcopy(getattr(self, attr)))

for attr in self.array_attrs() + [self.main_array_attr]:
for attr in self.array_attrs() + self.internal_array_attrs() + [self.main_array_attr]:
setattr(new_ts, attr, getattr(self, attr)[start:stop:step])

return new_ts
Expand Down Expand Up @@ -1402,26 +1403,149 @@ def split_by_gti(self, gti=None, min_points=2):
if gti is None:
gti = self.gti

list_of_tss = []
slices = []

start_bins, stop_bins = gti_border_bins(gti, self.time, self.dt)
for i in range(len(start_bins)):
start = start_bins[i]
stop = stop_bins[i]

if (stop - start) < min_points:
for s in self.stream_from_gti_lists([[g] for g in gti]):
if np.size(getattr(s, s.main_array_attr)) < min_points:
continue
slices.append(s)
return slices

def get_idx_from_time_range(self, start, stop):
lower_edge, upper_edge = np.searchsorted(self.time, [start, stop])
# Searchsorted will find the first number above stop. We want the last number below stop!
return lower_edge, upper_edge - 1

def stream_from_gti_lists(
self, new_gti_lists, root_file_name=None, fmt=DEFAULT_FORMAT, only_attrs=None
):
"""Split the event list into different files, each with a different GTI.

Parameters
----------
new_gti_lists : list of lists
A list of lists of GTIs. Each sublist should contain a list of GTIs
for a new file.

Other Parameters
----------------
root_file_name : str, default None
The root name of the output files. The file name will be appended with
"_00", "_01", etc.
If None, a generator is returned instead of writing the files.
fmt : str
The format of the output files. Default is 'hdf5'.

Yields
------
output_files : list of str
A list of the output file names.

"""

if only_attrs is not None and root_file_name is not None:
raise ValueError("You can only use only_attrs with a generator.")
new_gti_lists = np.asanyarray(new_gti_lists)
if len(new_gti_lists[0]) == len(self.gti) and np.all(
np.abs(np.asanyarray(new_gti_lists[0]).flatten() - self.gti.flatten()) < 1e-3
):
logger.info("No change of GTI")
if only_attrs is not None:
yield [copy.deepcopy(getattr(self, attr)) for attr in only_attrs]
else:
ev = self[:]
if root_file_name is None:
yield ev
else:
output_file = root_file_name + f"_00." + fmt.lstrip(".")
ev.write(output_file, fmt=fmt)
yield output_file
else:
for i, gti in enumerate(new_gti_lists):
if len(gti) == 0:
continue

new_gti = np.array([gti[i]])
mask = create_gti_mask(self.time, new_gti)
lower_edge, upper_edge = self.get_idx_from_time_range(gti[0, 0], gti[-1, 1])

# Note: GTIs are consistent with default in this case!
new_ts = self.apply_mask(mask)
new_ts.gti = new_gti
if only_attrs is not None:
yield [
copy.deepcopy(getattr(self, attr)[lower_edge : upper_edge + 1])
for attr in only_attrs
]
else:
ev = self[lower_edge : upper_edge + 1]
ev.gti = gti

if root_file_name is not None:
new_file = root_file_name + f"_{i:002d}." + fmt.lstrip(".")
logger.info(f"Writing {new_file}")
ev.write(new_file, fmt=fmt)
yield new_file
else:
yield ev

def stream_by_number_of_samples(
self, nsamples, root_file_name=None, fmt=DEFAULT_FORMAT, only_attrs=None
):
"""Split the event list into different files, each with approx. the given no. of photons.

Parameters
----------
nsamples : int
The number of photons in each output file.

list_of_tss.append(new_ts)
Other Parameters
----------------
root_file_name : str, default None
The root name of the output files. The file name will be appended with
"_00", "_01", etc.
If None, a generator is returned instead of writing the files.
fmt : str
The format of the output files. Default is 'hdf5'.

Yields
------
output_files : list of str
A list of the output file names.
"""
n_intervals = int(np.rint(self.n / nsamples))
exposure_per_interval = self.exposure / n_intervals
new_gti_lists = split_gtis_by_exposure(self.gti, exposure_per_interval)

return self.stream_from_gti_lists(
new_gti_lists, root_file_name=root_file_name, fmt=fmt, only_attrs=only_attrs
)

return list_of_tss
def stream_from_time_intervals(
self, time_intervals, root_file_name=None, fmt=DEFAULT_FORMAT, only_attrs=None
):
"""Filter the event list at the given time intervals.

Parameters
----------
time_intervals : 2-d float array
List of time intervals of the form ``[[time0_0, time0_1], [time1_0, time1_1], ...]``

Other Parameters
----------------
root_file_name : str, default None
The root name of the output files. The file name will be appended with
"_00", "_01", etc.
If None, a generator is returned instead of writing the files.
fmt : str
The format of the output files. Default is 'hdf5'.

Yields
------
output_files : list of str
A list of the output file names.
"""
if len(np.shape(time_intervals)) == 1:
time_intervals = [time_intervals]
new_gti = [cross_two_gtis(self.gti, [t_int]) for t_int in time_intervals]
return self.stream_from_gti_lists(
new_gti, root_file_name=root_file_name, fmt=fmt, only_attrs=only_attrs
)

def to_astropy_timeseries(self) -> TimeSeries:
"""Save the ``StingrayTimeseries`` to an ``Astropy`` timeseries.
Expand Down Expand Up @@ -1707,9 +1831,13 @@ def __getitem__(self, index):
"""

new_ts = super().__getitem__(index)
step = 1

if isinstance(index, slice):
if index.start is None and index.stop is None and index.step is None:
return copy.deepcopy(new_ts)
step = assign_value_if_none(index.step, 1)
else:
step = 1

dt = self.dt
if np.isscalar(dt):
Expand Down Expand Up @@ -2579,6 +2707,36 @@ def estimate_segment_size(self, min_counts=None, min_samples=None, even_sampling

return segment_size

def get_segment_borders(self, segment_size=None, fraction_step=1):
"""Get the start and stop times of the segments for segment-by-segment analysis.

Parameters
----------
segment_size : float
Length in seconds of the light curve segments. If None, the full GTIs are considered
instead as segments.
fraction_step : float
If the step is not a full ``segment_size`` but less (e.g. a moving window),
this indicates the ratio between step step and ``segment_size`` (e.g.
0.5 means that the window shifts of half ``segment_size``)

Returns
-------
start_times : array
Lower time boundaries of all time segments.
stop_times : array
Upper time boundaries of all segments.

"""
if segment_size is None:
start_times = self.gti[:, 0]
stop_times = self.gti[:, 1]
else:
start_times, stop_times = time_intervals_from_gtis(
self.gti, segment_size, fraction_step=fraction_step
)
return start_times, stop_times

def analyze_segments(self, func, segment_size, fraction_step=1, **kwargs):
"""Analyze segments of the light curve with any function.

Expand Down Expand Up @@ -2631,40 +2789,19 @@ def analyze_segments(self, func, segment_size, fraction_step=1, **kwargs):
>>> np.allclose(res, 10)
True
"""

if segment_size is None:
start_times = self.gti[:, 0]
stop_times = self.gti[:, 1]
start = np.searchsorted(self.time, start_times)
stop = np.searchsorted(self.time, stop_times)
elif self.dt > 0:
start, stop = bin_intervals_from_gtis(
self.gti, segment_size, self.time, fraction_step=fraction_step, dt=self.dt
)
start_times = self.time[start] - 0.5 * self.dt
# Remember that stop is one element above the last element, because
# it's defined to be used in intervals start:stop
stop_times = self.time[stop - 1] + self.dt * 1.5
else:
start_times, stop_times = time_intervals_from_gtis(
self.gti, segment_size, fraction_step=fraction_step
)
start = np.searchsorted(self.time, start_times)
stop = np.searchsorted(self.time, stop_times)
start_times, stop_times = self.get_segment_borders(segment_size, fraction_step)

results = []

n_outs = 1
for i, (st, sp, tst, tsp) in enumerate(zip(start, stop, start_times, stop_times)):
if sp - st <= 1:
for i, lc_filt in enumerate(
self.stream_from_time_intervals(list(zip(start_times, stop_times)))
):
if lc_filt is None or len(lc_filt.time) <= 1:
warnings.warn(
f"Segment {i} ({tst}--{tsp}) has one data point or less. Skipping it "
f"Segment {i} ({start_times[i]}--{stop_times[i]}) has one data point or less. Skipping it "
)

continue
lc_filt = self[st:sp]
lc_filt.gti = np.asanyarray([[tst, tsp]])

res = func(lc_filt, **kwargs)
results.append(res)
if isinstance(res, Iterable) and not isinstance(res, str):
Expand Down
2 changes: 1 addition & 1 deletion stingray/fourier.py
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,7 @@ def local_show_progress(a):
ft1 = fft(flux1)
ft2 = fft(flux2)

# Calculate the sum of each light curve, to calculate the mean
# Calculate the sum of each light curve chunk, to calculate the mean
n_ph1 = flux1.sum()
n_ph2 = flux2.sum()
n_ph = np.sqrt(n_ph1 * n_ph2)
Expand Down
Loading
Loading