Skip to content

Commit

Permalink
Merge pull request #111 from watsonjj/thread_safe
Browse files Browse the repository at this point in the history
Alter TIOReader to be thread safe
  • Loading branch information
watsonjj authored Aug 12, 2019
2 parents b5f1010 + c0f0753 commit d7fa959
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 112 deletions.
4 changes: 2 additions & 2 deletions CHECLabPy/calib/amplitude_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def _extract_amplitude_array(self, r0_path):
n_pixels = reader.n_pixels
amplitude = np.zeros((n_events, n_pixels))
for wfs in reader:
iev = reader.index
fci = reader.first_cell_ids
iev = wfs.iev
fci = wfs.first_cell_id
amplitude[iev] = self._extract_amplitude(wfs, fci)
return amplitude

Expand Down
7 changes: 4 additions & 3 deletions CHECLabPy/calib/waveform.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ def __init__(self, pedestal_path, n_pixels, n_samples, sn_list=None):
else:
from target_calib import CalibratorMultiFile
self.calibrator = CalibratorMultiFile(pedestal_path, sn_list)
self.calibrated_wfs = np.zeros((n_pixels, n_samples), dtype=np.float32)
self.n_pixels = n_pixels
self.n_samples = n_samples

def __call__(self, waveforms, fci):
self.calibrator.ApplyEvent(waveforms, fci, self.calibrated_wfs)
return self.calibrated_wfs
calibrated_wfs = waveforms.astype(np.float32, copy=True)
self.calibrator.ApplyEvent(waveforms, fci, calibrated_wfs)
calibrated_wfs.r1 = True
return calibrated_wfs

@classmethod
def from_tio_reader(cls, pedestal_path, tio_reader):
Expand Down
61 changes: 33 additions & 28 deletions CHECLabPy/core/io/simtel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from CHECLabPy.core.io.waveform import WaveformReader
from CHECLabPy.core.io.waveform import WaveformReader, Waveform
from CHECLabPy.utils.mapping import get_clp_mapping_from_tc_mapping, \
get_row_column
import numpy as np
Expand All @@ -7,6 +7,12 @@
import gzip


class SimtelWaveform(Waveform):
@property
def t_cpu(self):
return self._t_cpu_container


class SimtelReader(WaveformReader):
def __init__(self, path, max_events=None):
"""
Expand Down Expand Up @@ -48,7 +54,6 @@ def __init__(self, path, max_events=None):
shape = first_event.r0.tel[self.tel].waveform.shape
_, self.n_pixels, self.n_samples = shape
self.n_modules = self.n_pixels // 64
self.index = 0

n_modules = 32
camera_version = "1.1.0"
Expand All @@ -70,18 +75,32 @@ def __init__(self, path, max_events=None):
self.reference_pulse_path = self._camera_config.GetReferencePulsePath()
self.camera_version = self._camera_config.GetVersion()

self.gps_time = None
self.mc_true = None
self._iev = None
self._t_cpu = None
self.mc = None
self.pointing = None
self.mcheader = None

def _build_waveform(self, event):
self._fill_event_containers(event)
samples = event.r1.tel[self.tel].waveform[0][self.pixel_order]
mc_true = event.mc.tel[self.tel].photo_electron_image[self.pixel_order]
waveform = SimtelWaveform(
samples,
iev=self._iev,
is_r1=True,
mc_true=mc_true,
t_cpu_container=self._t_cpu
)
return waveform

def _get_event(self, iev):
event = self.seeker[iev]
self._fill_event_containers(event)
waveforms = event.r1.tel[self.tel].waveform[0]
waveforms = waveforms[self.pixel_order]
return waveforms
return self._build_waveform(event)

def __iter__(self):
for event in self.seeker:
yield self._build_waveform(event)

@staticmethod
def is_compatible(path):
Expand All @@ -98,24 +117,14 @@ def is_compatible(path):
int_marker, = struct.unpack('I', marker_bytes)
return int_marker == 3558836791 or int_marker == 931798996

def __iter__(self):
for event in self.seeker:
self._fill_event_containers(event)
waveforms = event.r1.tel[self.tel].waveform[0]
waveforms = waveforms[self.pixel_order]
yield waveforms

def _fill_event_containers(self, event):
self.index = event.count
self._iev = event.count
self._t_cpu = pd.to_datetime(event.trig.gps_time.value, unit='s')
self.run_id = event.r0.obs_id
self.gps_time = event.trig.gps_time

self.mc_true = event.mc.tel[self.tel].photo_electron_image
self.mc_true = self.mc_true[self.pixel_order]

self.mc = dict(
iev=self.index,
t_cpu=self.t_cpu,
iev=self._iev,
t_cpu=self._t_cpu,
energy=event.mc.energy.value,
alt=event.mc.alt.value,
az=event.mc.az.value,
Expand All @@ -127,8 +136,8 @@ def _fill_event_containers(self, event):
)

self.pointing = dict(
iev=self.index,
t_cpu=self.t_cpu,
iev=self._iev,
t_cpu=self._t_cpu,
azimuth_raw=event.mc.tel[self.tel].azimuth_raw,
altitude_raw=event.mc.tel[self.tel].altitude_raw,
azimuth_cor=event.mc.tel[self.tel].azimuth_cor,
Expand Down Expand Up @@ -178,7 +187,3 @@ def _fill_event_containers(self, event):
@property
def n_events(self):
return len(self.seeker)

@property
def t_cpu(self):
return pd.to_datetime(self.gps_time.value, unit='s')
56 changes: 19 additions & 37 deletions CHECLabPy/core/io/tio.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import numpy as np
from astropy.io import fits
import warnings
from CHECLabPy.core.io.waveform import WaveformReader
from CHECLabPy.core.io.waveform import WaveformReader, Waveform
from CHECLabPy.utils.mapping import get_clp_mapping_from_tc_mapping
import pandas as pd
import numpy as np
import gzip


Expand Down Expand Up @@ -68,41 +66,32 @@ def __init__(self, path, max_events=None,
self.camera_version = self._camera_config.GetVersion()
self.reference_pulse_path = self._camera_config.GetReferencePulsePath()

self.current_tack = None
self.current_cpu_ns = None
self.current_cpu_s = None

self.first_cell_ids = np.zeros(self.n_pixels, dtype=np.uint16)
self.stale = np.zeros(self.n_pixels, dtype=np.uint8)

if self.is_r1:
self.samples = np.zeros((self.n_pixels, self.n_samples),
dtype=np.float32)
self.dtype = np.float32
self.get_tio_event = self._reader.GetR1Event
else:
self.samples = np.zeros((self.n_pixels, self.n_samples),
dtype=np.uint16)
self.dtype = np.uint16
self.get_tio_event = self._reader.GetR0Event

if max_events and max_events < self._n_events:
self._n_events = max_events

def _get_event(self, iev):
self.index = iev
try: # TODO: Remove try in future version
self.get_tio_event(iev, self.samples, self.first_cell_ids,
self.stale)
except TypeError:
warnings.warn(
"This call to WaveformArrayReader has been deprecated. "
"Please update TargetIO",
SyntaxWarning
)
self.get_tio_event(iev, self.samples, self.first_cell_ids)
self.current_tack = self._reader.fCurrentTimeTack
self.current_cpu_ns = self._reader.fCurrentTimeNs
self.current_cpu_s = self._reader.fCurrentTimeSec
return self.samples
samples = np.zeros((self.n_pixels, self.n_samples), self.dtype)
first_cell_id = np.zeros(self.n_pixels, dtype=np.uint16)
stale = np.zeros(self.n_pixels, dtype=np.uint8)
self.get_tio_event(iev, samples, first_cell_id, stale)
t_tack, t_cpu_s, t_cpu_ns = self._reader.GetTimestamps(iev)
waveform = Waveform(
input_array=samples,
iev=iev,
is_r1=self.is_r1,
first_cell_id=first_cell_id,
stale=stale,
t_tack=t_tack,
t_cpu_container=(t_cpu_s, t_cpu_ns),
)
return waveform

@staticmethod
def is_compatible(path):
Expand All @@ -129,13 +118,6 @@ def is_compatible(path):
def n_events(self):
return self._n_events

@property
def t_cpu(self):
return pd.to_datetime(
np.int64(self.current_cpu_s * 1E9) + np.int64(self.current_cpu_ns),
unit='ns'
)

@property
def mapping(self):
return get_clp_mapping_from_tc_mapping(self.tc_mapping)
Expand Down
57 changes: 44 additions & 13 deletions CHECLabPy/core/io/waveform.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,44 @@
import os
import numpy as np
import pandas as pd
from CHECLabPy.core import child_subclasses
from abc import ABC, abstractmethod


class Waveform(np.ndarray):
def __new__(cls, input_array, iev, is_r1=False,
first_cell_id=0, stale=None, t_tack=0,
t_cpu_container=0, mc_true=None):
obj = np.asarray(input_array).view(cls)
obj.iev = iev
obj.is_r1 = is_r1
obj.first_cell_id = first_cell_id
obj.stale = stale
obj.t_tack = t_tack
obj._t_cpu_container = t_cpu_container
obj.mc_true = mc_true
return obj

def __array_finalize__(self, obj):
if obj is None:
return
self.iev = getattr(obj, 'iev', None)
self.is_r1 = getattr(obj, 'is_r1', None)
self.first_cell_id = getattr(obj, 'first_cell_id', None)
self.stale = getattr(obj, 'stale', None)
self.t_tack = getattr(obj, 't_tack', None)
self._t_cpu_container = getattr(obj, '_t_cpu_container', None)
self.mc_true = getattr(obj, 'mc_true', None)

@property
def t_cpu(self):
t_cpu_s, t_cpu_ns = self._t_cpu_container
return pd.to_datetime(
np.int64(t_cpu_s * 1E9) + np.int64(t_cpu_ns),
unit='ns'
)


class WaveformReader(ABC):
"""
Base class for waveform-file readers to define some common interface,
Expand All @@ -24,29 +59,25 @@ def __init__(self, path, max_events=None):
self.camera_version = ''
self.reference_pulse_path = None

self.index = 0
self.current_tack = 0
self.first_cell_ids = 0
self.stale = np.array([0])

def __iter__(self):
for iev in range(self.n_events):
yield self._get_event(iev)

def __getitem__(self, iev):
if isinstance(iev, slice):
if isinstance(iev, int):
if iev < 0:
iev += self.n_events
if iev < 0 or iev >= len(self):
raise IndexError(
"The requested event ({}) is out of range".format(iev)
)
return self._get_event(iev)
elif isinstance(iev, slice):
ev_list = [self[ii] for ii in range(*iev.indices(self.n_events))]
return np.array(ev_list)
elif isinstance(iev, list):
ev_list = [self[ii] for ii in iev]
return np.array(ev_list)
elif isinstance(iev, int):
if iev < 0:
iev += self.n_events
if iev < 0 or iev >= len(self):
raise IndexError("The requested event ({}) is out of range"
.format(iev))
return np.copy(self._get_event(iev))
else:
raise TypeError("Invalid argument type")

Expand Down
2 changes: 1 addition & 1 deletion CHECLabPy/core/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_readerr1_getitem():
reader = ReaderR1(get_file("chec_r1.tio"))
event = reader[1]
assert event.shape == (reader.n_pixels, reader.n_samples)
assert reader.index == 1
assert event.iev == 1


def test_readerr1_single_module():
Expand Down
21 changes: 12 additions & 9 deletions CHECLabPy/scripts/extract_dl1.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ def main():
n_events_stale = 0
desc = "Processing events"
for waveforms in tqdm(reader, total=n_events, desc=desc):
iev = reader.index
t_cpu = reader.t_cpu

stale = reader.stale.any()
if stale:
iev = waveforms.iev
t_tack = waveforms.t_tack
t_cpu = waveforms.t_cpu
stale = waveforms.stale
first_cell_id = waveforms.first_cell_id
mc_true = waveforms.mc_true

if stale is not None and stale.any():
n_events_stale += 1
continue

Expand All @@ -117,13 +120,13 @@ def main():
iev=iev,
pixel=pixel_array,
t_cpu=t_cpu,
t_tack=reader.current_tack,
first_cell_id=reader.first_cell_ids,
t_tack=t_tack,
first_cell_id=first_cell_id,
baseline_subtracted=bs,
**chain.process(waveforms_bs),
)
if is_mc:
params['mc_true'] = reader.mc_true
if mc_true is not None:
params['mc_true'] = mc_true

writer.append(
pd.DataFrame(params), key='data',
Expand Down
6 changes: 3 additions & 3 deletions CHECLabPy/utils/waveform.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def __init__(self, source, n_base=50, n_base_samples=16):

self.baseline_waveforms = np.zeros((n_base, n_pixels, n_base_samples))
for waveforms in source:
ev = source.index
if ev >= n_base:
iev = waveforms.iev
if iev >= n_base:
break
self.baseline_waveforms[ev] = waveforms[:, :n_base_samples]
self.baseline_waveforms[iev] = waveforms[:, :n_base_samples]
self.baseline = np.mean(self.baseline_waveforms, axis=(0, 2))

def update_baseline(self, waveforms):
Expand Down
2 changes: 1 addition & 1 deletion CHECLabPy/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.1.0'
__version__ = '3.0.0'
Loading

0 comments on commit d7fa959

Please sign in to comment.