Skip to content

Commit

Permalink
Added implementation for AprilTag Detection and camera position estim…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
JGHartel committed Dec 10, 2024
1 parent 1179368 commit 101f98b
Show file tree
Hide file tree
Showing 11 changed files with 15,197 additions and 3 deletions.
2 changes: 2 additions & 0 deletions pyneon/preprocess/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .preprocess import interpolate, window_average, concat_streams, concat_events
from .epoch import create_epoch, extract_event_times, construct_event_times, Epoch
from .filter import smooth_camera_positions

__all__ = [
"interpolate",
Expand All @@ -10,4 +11,5 @@
"extract_event_times",
"construct_event_times",
"Epoch",
"smooth_camera_positions",
]
99 changes: 99 additions & 0 deletions pyneon/preprocess/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import numpy as np
import pandas as pd

from typing import Optional


def smooth_camera_positions(
camera_position_raw: pd.DataFrame,
state_dim: int = 3,
meas_dim: int = 3,
process_noise: float = 0.005,
measurement_noise: float = 0.005,
gating_threshold: float = 3.0
) -> pd.DataFrame:
"""
Apply a Kalman filter to smooth camera positions and gate outliers based on Mahalanobis distance.
Expects a DataFrame containing 'frame_idx' and 'camera_pos' columns, where 'camera_pos' is a
length-3 array-like object representing [x, y, z] coordinates.
Parameters
----------
camera_position_raw : pd.DataFrame
DataFrame containing 'frame_idx' and 'camera_pos' columns.
state_dim : int, optional
Dimensionality of the state vector. Default is 3 (x, y, z).
meas_dim : int, optional
Dimensionality of the measurement vector. Default is 3 (x, y, z).
process_noise : float, optional
Process noise covariance scaling factor. Default is 0.005.
measurement_noise : float, optional
Measurement noise covariance scaling factor. Default is 0.005.
gating_threshold : float, optional
Mahalanobis distance threshold for gating outliers. Default is 3.0.
Returns
-------
pd.DataFrame
A DataFrame with the same 'frame_idx' as input and an additional column 'smoothed_camera_pos'
containing the smoothed positions.
"""
# Ensure the DataFrame is sorted by frame_idx
camera_position_raw = camera_position_raw.sort_values('frame_idx')

# Extract positions and frame indices
positions = np.stack(camera_position_raw['camera_pos'].values)
frame_indices = camera_position_raw['frame_idx'].values

# Define Kalman filter matrices
F = np.eye(state_dim) # State transition: Identity
H = np.eye(meas_dim) # Measurement matrix: Identity

Q = process_noise * np.eye(state_dim) # Process noise covariance
R = measurement_noise * np.eye(meas_dim) # Measurement noise covariance

# Initial state from first measurement
x = positions[0].reshape(-1, 1)
P = 0.1 * np.eye(state_dim)

smoothed_positions = [x.flatten()]

for i in range(1, len(positions)):
# Prediction
x = F @ x
P = F @ P @ F.T + Q

# Measurement
z = positions[i].reshape(-1, 1)

# Compute innovation (residual) and innovation covariance
y = z - H @ x
S = H @ P @ H.T + R

# Mahalanobis distance for gating
d = np.sqrt((y.T @ np.linalg.inv(S) @ y).item())

if d < gating_threshold:
# Update step
K = P @ H.T @ np.linalg.inv(S)
x = x + K @ y
P = (np.eye(state_dim) - K @ H) @ P
else:
# Outlier detected, skip update
# (You could log or count occurrences if needed)
pass

smoothed_positions.append(x.flatten())

smoothed_positions = np.array(smoothed_positions)

# Create a new DataFrame with smoothed results
smoothed_df = pd.DataFrame({
'frame_idx': frame_indices,
'smoothed_camera_pos': list(smoothed_positions)
})

final_results = camera_position_raw.copy()
final_results['smoothed_camera_pos'] = smoothed_df['smoothed_camera_pos'].values

return final_results
146 changes: 144 additions & 2 deletions pyneon/recording.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from typing import Union, Literal, Optional
from typing import Union, Literal, Optional, Dict, List
import pandas as pd
import numpy as np
import json
from datetime import datetime
import copy
Expand All @@ -10,11 +11,13 @@

from .stream import NeonGaze, NeonIMU, NeonEyeStates, CustomStream
from .events import NeonBlinks, NeonFixations, NeonSaccades, NeonEvents
from .preprocess import concat_streams, concat_events
from .preprocess import concat_streams, concat_events, smooth_camera_positions
from .video import (
NeonVideo,
sync_gaze_to_video,
estimate_scanpath,
detect_apriltags,
compute_camera_positions,
)
from .vis import plot_distribution, plot_scanpath_on_video
from .export import export_motion_bids, export_eye_bids
Expand Down Expand Up @@ -463,6 +466,145 @@ def estimate_scanpath(
if (video := self.video) is None:
raise ValueError("Estimating scanpath requires video data.")
return estimate_scanpath(video, sync_gaze, lk_params)

def detect_apriltags(
self,
tag_family: str ='tag36h11'
) -> pd.DataFrame:
"""
Detect AprilTags in a video and report their data for every frame using the apriltag library.
Parameters
----------
tag_family : str, optional
The AprilTag family to detect (default is 'tag36h11').
Returns
-------
pd.DataFrame
A DataFrame containing AprilTag detections, with columns:
- 'frame_idx': The frame number
- 'tag_id': The ID of the detected AprilTag
- 'corners': A 4x2 array of the tag corner coordinates
- 'center': A 1x2 array with the tag center coordinates
"""
# Check if JSON already exists
if (json_file := self.recording_dir / "apriltags.json").is_file():
return pd.read_json(json_file, orient="records", lines=True)

all_detections = detect_apriltags(self.video, tag_family)
# Save to JSON
all_detections.to_json(self.recording_dir / "apriltags.json", orient="records", lines=True)

return all_detections


def compute_camera_positions(
self,
tag_locations: Dict[int, List[float]],
tag_size: Union[float, Dict[int, float]],
all_detections: pd.DataFrame = pd.DataFrame(),
) -> pd.DataFrame:
"""
Compute the camera positions from AprilTag detections in a video.
Parameters
----------
tag_locations : dict
A dictionary mapping AprilTag IDs to their 3D locations in the scene.
tag_size : float or dict
The size of the AprilTags in the scene. If a float, all tags are assumed to have the same size.
If a dictionary, the keys are the AprilTag IDs and the values are the sizes.
all_detections : pd.DataFrame, optional
DataFrame containing the AprilTag detections for each frame, with columns:
- 'frame_idx': The frame number
- 'tag_id': The ID of the detected AprilTag
- 'corners': A 4x2 array of the tag corner coordinates
- 'center': A 1x2 array with the tag center coordinates
If None, the detections are computed using the `detect_apriltags` function.
Returns
-------
pd.DataFrame
DataFrame containing the camera positions for each frame, with columns:
- 'frame_idx': The frame number
- 'position': A 1x3 array with the camera position
- 'rotation': A 1x3 array with the camera rotation
"""

# Check if JSON already exists
if (json_file := self.recording_dir / "camera_positions.json").is_file():
return pd.read_json(json_file, orient="records")

camera_positions = compute_camera_positions(self.video, tag_locations, tag_size, all_detections)
# Save to JSON
camera_positions.to_json(self.recording_dir / "camera_positions.json", orient="records")

return camera_positions


def smooth_camera_positions(
self,
camera_position_raw: pd.DataFrame = pd.DataFrame(),
state_dim: int = 3,
meas_dim: int = 3,
process_noise: float = 0.005,
measurement_noise: float = 0.005,
gating_threshold: float = 3.0
) -> pd.DataFrame:
"""
Apply a Kalman filter to smooth camera positions and gate outliers based on Mahalanobis distance.
Expects a DataFrame containing 'frame_idx' and 'camera_pos' columns, where 'camera_pos' is a
length-3 array-like object representing [x, y, z] coordinates.
Parameters
----------
camera_position_raw : pd.DataFrame
DataFrame containing 'frame_idx' and 'camera_pos' columns.
state_dim : int, optional
Dimensionality of the state vector. Default is 3 (x, y, z).
meas_dim : int, optional
Dimensionality of the measurement vector. Default is 3 (x, y, z).
process_noise : float, optional
Process noise covariance scaling factor. Default is 0.005.
measurement_noise : float, optional
Measurement noise covariance scaling factor. Default is 0.005.
gating_threshold : float, optional
Mahalanobis distance threshold for gating outliers. Default is 3.0.
Returns
-------
pd.DataFrame
A DataFrame with the same 'frame_idx' as input and an additional column 'smoothed_camera_pos'
containing the smoothed positions.
"""

if camera_position_raw.empty:
# Check if JSON already exists
if (json_file := self.recording_dir / "camera_positions.json").is_file():
camera_position_raw = pd.read_json(json_file, orient="records")
# Ensure 'camera_pos' is parsed as NumPy arrays
camera_position_raw['camera_pos'] = camera_position_raw['camera_pos'].apply(
lambda pos: np.array(pos, dtype=float)
)
else:
# Run the function to get the data
camera_position_raw = self.compute_camera_positions()

smoothed_positions = smooth_camera_positions(
camera_position_raw,
state_dim,
meas_dim,
process_noise,
measurement_noise,
gating_threshold
)

# Save to JSON
smoothed_positions.to_json(self.recording_dir / "camera_positions.json", orient="records")

return smoothed_positions


def plot_scanpath_on_video(
self,
Expand Down
3 changes: 3 additions & 0 deletions pyneon/video/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from .video import NeonVideo
from .mapping import sync_gaze_to_video, estimate_scanpath
from .apriltags import detect_apriltags, compute_camera_positions

__all__ = [
"NeonVideo",
"sync_gaze_to_video",
"estimate_scanpath",
"detect_apriltags",
"compute_camera_positions",
]
Loading

0 comments on commit 101f98b

Please sign in to comment.