Skip to content

Commit

Permalink
make up for tabular data part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
qian-chu committed Oct 6, 2024
1 parent 6592008 commit efd2065
Show file tree
Hide file tree
Showing 10 changed files with 421 additions and 334 deletions.
17 changes: 13 additions & 4 deletions pyneon/events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .data import NeonTabular
from .tabular import NeonTabular
import pandas as pd


Expand All @@ -20,10 +20,11 @@ class NeonBlinks(NeonEV):

def __init__(self, file):
super().__init__(file)
if self.data.index.name != "start timestamp [ns]":
raise ValueError("Blink data should be indexed by `start timestamp [ns]`.")
self.data = self.data.astype(
{
"blink id": "Int32",
"start timestamp [ns]": "Int64",
"end timestamp [ns]": "Int64",
"duration [ms]": "Int64",
}
Expand All @@ -35,10 +36,13 @@ class NeonFixations(NeonEV):

def __init__(self, file):
super().__init__(file)
if self.data.index.name != "start timestamp [ns]":
raise ValueError(
"Fixation data should be indexed by `start timestamp [ns]`."
)
self.data = self.data.astype(
{
"fixation id": "Int32",
"start timestamp [ns]": "Int64",
"end timestamp [ns]": "Int64",
"duration [ms]": "Int64",
"fixation x [px]": float,
Expand All @@ -54,6 +58,10 @@ class NeonSaccades(NeonEV):

def __init__(self, file):
super().__init__(file)
if self.data.index.name != "start timestamp [ns]":
raise ValueError(
"Saccade data should be indexed by `start timestamp [ns]`."
)
self.data = self.data.astype(
{
"saccade id": "Int32",
Expand All @@ -72,9 +80,10 @@ class NeonEvents(NeonEV):

def __init__(self, file):
super().__init__(file)
if self.data.index.name != "timestamp [ns]":
raise ValueError("Event data should be indexed by `timestamp [ns]`.")
self.data = self.data.astype(
{
"timestamp [ns]": "Int64",
"name": str,
"type": str,
}
Expand Down
5 changes: 2 additions & 3 deletions pyneon/preprocess/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from .preprocess import crop, interpolate, concat_streams, concat_events, window_average
from .preprocess import interpolate, window_average, concat_streams, concat_events
from .mapping import map_gaze_to_video, estimate_scanpath, overlay_scanpath_on_video
from .epoch import create_epoch, extract_event_times, construct_event_times, Epoch

__all__ = [
"crop",
"interpolate",
"window_average",
"concat_streams",
"concat_events",
"window_average",
"map_gaze_to_video",
"estimate_scanpath",
"overlay_scanpath_on_video",
Expand Down
177 changes: 77 additions & 100 deletions pyneon/preprocess/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pandas as pd
import numpy as np

from typing import TYPE_CHECKING, Union, Literal
from typing import TYPE_CHECKING, Union, Optional
from scipy.interpolate import interp1d

from numbers import Number
Expand All @@ -10,49 +10,14 @@
from ..recording import NeonRecording


def _check_data(data: pd.DataFrame, t_col_name: str = "timestamp [ns]") -> None:
if t_col_name not in data.columns:
raise ValueError(f"Data must contain a {t_col_name} column")
if np.any(np.diff(data[t_col_name]) < 0):
raise ValueError(f"{t_col_name} must be monotonically increasing")


def crop(
data: pd.DataFrame,
tmin: Union[Number, None] = None,
tmax: Union[Number, None] = None,
by: Literal["timestamp", "time"] = "timestamp",
) -> pd.DataFrame:
"""
Crop data to a specific time range.
Parameters
----------
data : pd.DataFrame
Data to crop. Must contain a monotonically increasing
``timestamp [ns]`` or ``time [s]`` column.
tmin : number, optional
Start time or timestamp to crop the data to. If ``None``,
the minimum timestamp or time in the data is used. Defaults to ``None``.
tmax : number, optional
End time or timestamp to crop the data to. If ``None``,
the maximum timestamp or time in the data is used. Defaults to ``None``.
by : "timestamp" or "time", optional
Whether tmin and tmax are UTC timestamps in nanoseconds
or relative times in seconds. Defaults to "timestamp".
Returns
-------
pd.DataFrame
Cropped data.
"""
if tmin is None and tmax is None:
raise ValueError("At least one of tmin or tmax must be provided")
t_col_name = "timestamp [ns]" if by == "timestamp" else "time [s]"
_check_data(data, t_col_name=t_col_name)
tmin = tmin if tmin is not None else data[t_col_name].min()
tmax = tmax if tmax is not None else data[t_col_name].max()
return data[(data[t_col_name] >= tmin) & (data[t_col_name] <= tmax)]
# Data must be from a NeonStream object
def _check_data(data: pd.DataFrame) -> None:
# Check if index name is timestamp [ns]
if data.index.name != "timestamp [ns]":
raise ValueError("Index name must be 'timestamp [ns]'")
# Check if index is sorted
if not data.index.is_monotonic_increasing:
raise ValueError("Index must be sorted in increasing order")


def interpolate(
Expand All @@ -66,43 +31,40 @@ def interpolate(
Parameters
----------
new_ts : np.ndarray, optional
New timestamps to evaluate the interpolant at.
new_ts : np.ndarray
An array of new timestamps (in nanoseconds)
at which to evaluate the interpolant.
data : pd.DataFrame
Data to interpolate. Must contain a monotonically increasing
``timestamp [ns]`` column.
Data to interpolate. Must have a monotonically increasing
index named ``timestamp [ns]``.
float_kind : str, optional
Kind of interpolation applied on columns of float type,
by default "linear". For details see :class:`scipy.interpolate.interp1d`.
by default ``"linear"``. For details see :class:`scipy.interpolate.interp1d`.
other_kind : str, optional
Kind of interpolation applied on columns of other types,
by default "nearest".
by default ``"nearest"``. For details see :class:`scipy.interpolate.interp1d`.
Returns
-------
pandas.DataFrame
Interpolated data.
"""
_check_data(data)
new_ts = np.sort(new_ts)
new_data = pd.DataFrame(data=new_ts, columns=["timestamp [ns]"], dtype="Int64")
new_data["time [s]"] = (new_ts - new_ts[0]) / 1e9
new_ts = np.sort(new_ts).astype(np.int64)
new_data = pd.DataFrame(index=new_ts, columns=data.columns)
for col in data.columns:
# Skip time columns
if col == "timestamp [ns]" or col == "time [s]":
continue
# Float columns are interpolated with float_kind
if pd.api.types.is_float_dtype(data[col]):
new_data[col] = interp1d(
data["timestamp [ns]"],
data.index,
data[col],
kind=float_kind,
bounds_error=False,
)(new_ts)
# Other columns are interpolated with other_kind
else:
new_data[col] = interp1d(
data["timestamp [ns]"],
data.index,
data[col],
kind=other_kind,
bounds_error=False,
Expand All @@ -115,24 +77,30 @@ def interpolate(
def window_average(
new_ts: np.ndarray,
data: pd.DataFrame,
window_size: Union[int, None] = None,
window_size: Optional[int] = None,
) -> pd.DataFrame:
"""
Take the average over a time window to obtain smoothed data at new timestamps.
Parameters
----------
new_ts : np.ndarray
New timestamps to evaluate the window average at. The median of the differences
between the new timestamps must be larger than the median of the differences
between the old timestamps. In other words, only downsampling is supported.
An array of new timestamps (in nanoseconds)
at which to compute the windowed averages.
The median interval between these new timestamps must be larger than
the median interval between the original data timestamps, i.e.,
``np.median(np.diff(new_ts)) > np.median(np.diff(data.index))``.
In other words, only downsampling is supported.
data : pd.DataFrame
Data to apply window average to. Must contain a monotonically increasing
``timestamp [ns]`` column.
Data to apply window average to. Must have a monotonically increasing
index named ``timestamp [ns]``.
window_size : int, optional
Size of the time window in nanoseconds. If ``None``, the window size is
set to the median of the differences between the new timestamps.
Defaults to ``None``.
The size of the time window (in nanoseconds)
over which to compute the average around each new timestamp.
If ``None`` (default), the window size is set to the median interval
between the new timestamps, i.e., ``np.median(np.diff(new_ts))``.
The window size must be larger than the median interval between the original data timestamps,
i.e., ``window_size > np.median(np.diff(data.index))``.
Returns
-------
Expand All @@ -142,36 +110,49 @@ def window_average(
_check_data(data)
new_ts = np.sort(new_ts)
new_ts_median_diff = np.median(np.diff(new_ts))
# Assert that the new_ts has a lower sampling frequency than the old data
if new_ts_median_diff < np.mean(np.diff(data["timestamp [ns]"])):
original_ts_median_diff = np.median(np.diff(data.index))

# Check for downsampling
if new_ts_median_diff < original_ts_median_diff:
raise ValueError(
"new_ts must have a lower sampling frequency than the old data"
)

if window_size is None:
window_size = int(new_ts_median_diff)
new_data = pd.DataFrame(data=new_ts, columns=["timestamp [ns]"], dtype="Int64")
new_data["time [s]"] = (new_ts - new_ts[0]) / 1e9

if window_size < original_ts_median_diff:
raise ValueError(
"Window size must be larger than the median difference "
"between the old timestamps"
)

new_data = pd.DataFrame(index=new_ts, columns=data.columns)

for ts in new_ts:
lower_bound = ts - window_size / 2
upper_bound = ts + window_size / 2

# Select data within the window
window_data = data[(data.index >= lower_bound) & (data.index <= upper_bound)]
if not window_data.empty:
# Compute mean for each column
mean_values = window_data.mean(axis=0)
# Assign mean values to new_data at index ts
new_data.loc[ts, :] = mean_values
else:
# Assign NaN to new_data at index ts
new_data.loc[ts, :] = np.nan

# Reset the dtypes of new_data columns to match those of the original data
for col in data.columns:
# Skip time columns
if col == "timestamp [ns]" or col == "time [s]":
continue
new_values = [] # List to store the downsampled values
for ts in new_ts:
# Define the time window around the current new timestamp
lower_bound = ts - window_size / 2
upper_bound = ts + window_size / 2
# Select rows from old_data that fall within the time window
window_data = data[
(data["timestamp [ns]"] >= lower_bound)
& (data["timestamp [ns]"] <= upper_bound)
]
# Append data within this window
if not window_data.empty:
new_values.append(window_data[col].mean())
else:
new_values.append(np.nan)
# Assign the downsampled values to the new DataFrame
new_data[col] = new_values
original_dtype = data[col].dtype
if pd.api.types.is_integer_dtype(original_dtype):
# For integer columns, round the mean values and convert to integers
new_data[col] = new_data[col].round().astype(original_dtype)
else:
# For other dtypes, cast to the original dtype if possible
new_data[col] = new_data[col].astype(original_dtype)

return new_data

Expand Down Expand Up @@ -205,10 +186,9 @@ def concat_streams(
sampling_freq : float or int or str, optional
Sampling frequency of the concatenated streams.
If numeric, the streams will be interpolated to this frequency.
If ``"min"``, the lowest nominal sampling frequency
If ``"min"`` (default), the lowest nominal sampling frequency
of the selected streams will be used.
If ``"max"``, the highest nominal sampling frequency will be used.
Defaults to ``"min"``.
interp_float_kind : str, optional
Kind of interpolation applied on columns of float type,
Defaults to ``"linear"``. For details see :class:`scipy.interpolate.interp1d`.
Expand Down Expand Up @@ -339,17 +319,14 @@ def concat_streams(
dtype=np.int64,
)

concat_data = pd.DataFrame(data=new_ts, columns=["timestamp [ns]"], dtype="Int64")
concat_data["time [s]"] = (new_ts - new_ts[0]) / 1e9
concat_data = pd.DataFrame(index=new_ts)
for stream in stream_info["stream"]:
resamp_df = stream.interpolate(
new_ts, interp_float_kind, interp_other_kind, inplace=inplace
)
).data
assert concat_data.shape[0] == resamp_df.shape[0]
assert concat_data["timestamp [ns]"].equals(resamp_df["timestamp [ns]"])
concat_data = pd.merge(
concat_data, resamp_df, on=["timestamp [ns]", "time [s]"], how="inner"
)
assert concat_data.index.equals(resamp_df.index)
concat_data = pd.concat([concat_data, resamp_df], axis=1)
assert concat_data.shape[0] == resamp_df.shape[0]
return concat_data

Expand Down
Loading

0 comments on commit efd2065

Please sign in to comment.