Skip to content

Commit

Permalink
added more advanced implementation for camera pose estimation
Browse files Browse the repository at this point in the history
  • Loading branch information
JGHartel committed Dec 17, 2024
1 parent 101f98b commit a362c14
Show file tree
Hide file tree
Showing 13 changed files with 221,203 additions and 394 deletions.
4 changes: 2 additions & 2 deletions pyneon/preprocess/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .preprocess import interpolate, window_average, concat_streams, concat_events
from .epoch import create_epoch, extract_event_times, construct_event_times, Epoch
from .filter import smooth_camera_positions
from .filter import smooth_camera_pose

__all__ = [
"interpolate",
Expand All @@ -11,5 +11,5 @@
"extract_event_times",
"construct_event_times",
"Epoch",
"smooth_camera_positions",
"smooth_camera_pose",
]
123 changes: 71 additions & 52 deletions pyneon/preprocess/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@

from typing import Optional

import numpy as np
import pandas as pd

def smooth_camera_positions(
def smooth_camera_pose(
camera_position_raw: pd.DataFrame,
state_dim: int = 3,
meas_dim: int = 3,
initial_state_noise: float = 0.1,
process_noise: float = 0.005,
measurement_noise: float = 0.005,
gating_threshold: float = 3.0
gating_threshold: float = 3.0,
bidirectional: bool = False
) -> pd.DataFrame:
"""
Apply a Kalman filter to smooth camera positions and gate outliers based on Mahalanobis distance.
Expects a DataFrame containing 'frame_idx' and 'camera_pos' columns, where 'camera_pos' is a
length-3 array-like object representing [x, y, z] coordinates.
Apply a Kalman filter to smooth camera positions, with optional forward-backward smoothing (RTS smoother).
Handles missing measurements and propagates predictions.
Parameters
----------
Expand All @@ -25,75 +28,91 @@ def smooth_camera_positions(
Dimensionality of the state vector. Default is 3 (x, y, z).
meas_dim : int, optional
Dimensionality of the measurement vector. Default is 3 (x, y, z).
initial_state_noise : float, optional
Initial state covariance scaling factor. Default is 0.1.
process_noise : float, optional
Process noise covariance scaling factor. Default is 0.005.
measurement_noise : float, optional
Measurement noise covariance scaling factor. Default is 0.005.
gating_threshold : float, optional
Mahalanobis distance threshold for gating outliers. Default is 3.0.
bidirectional : bool, optional
If True, applies forward-backward RTS smoothing. Default is False.
Returns
-------
pd.DataFrame
A DataFrame with the same 'frame_idx' as input and an additional column 'smoothed_camera_pos'
containing the smoothed positions.
A DataFrame with 'frame_idx' and 'smoothed_camera_pos'.
"""
# Ensure the DataFrame is sorted by frame_idx
camera_position_raw = camera_position_raw.sort_values('frame_idx')

# Extract positions and frame indices
positions = np.stack(camera_position_raw['camera_pos'].values)
frame_indices = camera_position_raw['frame_idx'].values
# Extract all frame indices and create a complete range
all_frames = np.arange(camera_position_raw['frame_idx'].min(),
camera_position_raw['frame_idx'].max() + 1)

# Create a lookup for frame detections
position_lookup = dict(zip(camera_position_raw['frame_idx'], camera_position_raw['camera_pos']))

# Define Kalman filter matrices
# Kalman filter matrices
F = np.eye(state_dim) # State transition: Identity
H = np.eye(meas_dim) # Measurement matrix: Identity

Q = process_noise * np.eye(state_dim) # Process noise covariance
R = measurement_noise * np.eye(meas_dim) # Measurement noise covariance

# Initial state from first measurement
x = positions[0].reshape(-1, 1)
P = 0.1 * np.eye(state_dim)
# Forward pass storage
x_fwd = [] # Forward state estimates
P_fwd = [] # Forward covariances

smoothed_positions = [x.flatten()]

for i in range(1, len(positions)):
# Prediction
# Initialize
x = np.array(position_lookup[all_frames[0]]).reshape(-1, 1)
P = initial_state_noise * np.eye(state_dim)

for frame in all_frames:
# Prediction step
x = F @ x
P = F @ P @ F.T + Q

# Measurement
z = positions[i].reshape(-1, 1)

# Compute innovation (residual) and innovation covariance
y = z - H @ x
S = H @ P @ H.T + R

# Mahalanobis distance for gating
d = np.sqrt((y.T @ np.linalg.inv(S) @ y).item())

if d < gating_threshold:
# Update step
K = P @ H.T @ np.linalg.inv(S)
x = x + K @ y
P = (np.eye(state_dim) - K @ H) @ P
else:
# Outlier detected, skip update
# (You could log or count occurrences if needed)
pass

smoothed_positions.append(x.flatten())

smoothed_positions = np.array(smoothed_positions)

# Create a new DataFrame with smoothed results
smoothed_df = pd.DataFrame({
'frame_idx': frame_indices,
'smoothed_camera_pos': list(smoothed_positions)
# Measurement update
if frame in position_lookup:
z = np.array(position_lookup[frame]).reshape(-1, 1)
y = z - H @ x
S = H @ P @ H.T + R
d = np.sqrt((y.T @ np.linalg.inv(S) @ y).item())

if d < gating_threshold:
K = P @ H.T @ np.linalg.inv(S)
x = x + K @ y
P = (np.eye(state_dim) - K @ H) @ P

x_fwd.append(x.copy())
P_fwd.append(P.copy())

# If bidirectional smoothing is not needed, return forward results
if not bidirectional:
smoothed_positions = [x.flatten() for x in x_fwd]
result_df = pd.DataFrame({
'frame_idx': all_frames,
'smoothed_camera_pos': smoothed_positions
})
return result_df

# Backward pass (RTS Smoother)
x_smooth = x_fwd.copy()
P_smooth = P_fwd.copy()

for k in reversed(range(len(all_frames) - 1)):
decay_factor = (1.0 - (k / len(all_frames)))**2 # Reduce smoothing at boundaries
G = decay_factor * (P_fwd[k] @ F.T @ np.linalg.inv(P_fwd[k + 1]))
x_smooth[k] = x_fwd[k] + G @ (x_smooth[k + 1] - F @ x_fwd[k])
P_smooth[k] = P_fwd[k] + G @ (P_smooth[k + 1] - P_fwd[k + 1]) @ G.T

# Final smoothed positions
smoothed_positions = [x.flatten() for x in x_smooth]

# Return results
result_df = pd.DataFrame({
'frame_idx': all_frames,
'smoothed_camera_pos': smoothed_positions
})

final_results = camera_position_raw.copy()
final_results['smoothed_camera_pos'] = smoothed_df['smoothed_camera_pos'].values

return final_results
return result_df
83 changes: 56 additions & 27 deletions pyneon/recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from .stream import NeonGaze, NeonIMU, NeonEyeStates, CustomStream
from .events import NeonBlinks, NeonFixations, NeonSaccades, NeonEvents
from .preprocess import concat_streams, concat_events, smooth_camera_positions
from .preprocess import concat_streams, concat_events, smooth_camera_pose
from .video import (
NeonVideo,
sync_gaze_to_video,
Expand Down Expand Up @@ -501,56 +501,83 @@ def detect_apriltags(

def compute_camera_positions(
self,
tag_locations: Dict[int, List[float]],
tag_size: Union[float, Dict[int, float]],
tag_locations_df: pd.DataFrame,
all_detections: pd.DataFrame = pd.DataFrame(),
overwrite: bool = False,
) -> pd.DataFrame:
"""
Compute the camera positions from AprilTag detections in a video.
Parameters
----------
tag_locations : dict
A dictionary mapping AprilTag IDs to their 3D locations in the scene.
tag_size : float or dict
The size of the AprilTags in the scene. If a float, all tags are assumed to have the same size.
If a dictionary, the keys are the AprilTag IDs and the values are the sizes.
tag_locations_df : pd.DataFrame
A DataFrame containing AprilTag 3D locations, orientations, and sizes.
Required columns:
- 'tag_id': int, ID of the tag
- 'x', 'y', 'z': float, coordinates of the tag's center
- 'normal_x', 'normal_y', 'normal_z': float, components of the tag's normal vector
- 'size': float, the side length of the tag in meters
all_detections : pd.DataFrame, optional
DataFrame containing the AprilTag detections for each frame, with columns:
- 'frame_idx': The frame number
- 'tag_id': The ID of the detected AprilTag
- 'corners': A 4x2 array of the tag corner coordinates
- 'center': A 1x2 array with the tag center coordinates
If None, the detections are computed using the `detect_apriltags` function.
DataFrame containing AprilTag detections for each frame, with columns:
- 'frame_idx': The frame number (int)
- 'tag_id': The ID of the detected AprilTag (int)
- 'corners': A (4x2) array of the tag corner pixel coordinates (np.ndarray)
- 'center': A (1x2) array of the tag center pixel coordinates (np.ndarray)
If empty, the detections are computed using the `detect_apriltags` function.
Returns
-------
pd.DataFrame
DataFrame containing the camera positions for each frame, with columns:
- 'frame_idx': The frame number
- 'position': A 1x3 array with the camera position
- 'rotation': A 1x3 array with the camera rotation
- 'translation_vector': A (3,) array with the camera translation vector
- 'rotation_vector': A (3,) array with the camera rotation vector (Rodrigues form)
- 'camera_pos': A (3,) array with the camera position in world coordinates
"""

# Check if JSON already exists
if (json_file := self.recording_dir / "camera_positions.json").is_file():
required_columns = {"tag_id", "x", "y", "z", "normal_x", "normal_y", "normal_z", "size"}
if not required_columns.issubset(tag_locations_df.columns):
missing = required_columns - set(tag_locations_df.columns)
raise ValueError(f"tag_locations_df is missing required columns: {missing}")

#check for detections dataframe
if all_detections.empty:
detection_file = self.recording_dir / "apriltags.json"
#open apriltags
if detection_file.is_file():
all_detections = pd.read_json(detection_file, orient="records", lines=True)
else:
all_detections = self.detect_apriltags()


# Check if result JSON already exists
json_file = self.recording_dir / "camera_positions.json"
if json_file.is_file() and not overwrite:
return pd.read_json(json_file, orient="records")

camera_positions = compute_camera_positions(self.video, tag_locations, tag_size, all_detections)
# Compute camera positions
camera_positions = compute_camera_positions(
video=self.video,
tag_locations_df=tag_locations_df,
all_detections=all_detections
)

# Save to JSON
camera_positions.to_json(self.recording_dir / "camera_positions.json", orient="records")
camera_positions.to_json(json_file, orient="records")

return camera_positions


def smooth_camera_positions(
def smooth_camera_pose(
self,
camera_position_raw: pd.DataFrame = pd.DataFrame(),
state_dim: int = 3,
meas_dim: int = 3,
initial_state_noise: float = 0.1,
process_noise: float = 0.005,
measurement_noise: float = 0.005,
gating_threshold: float = 3.0
gating_threshold: float = 3.0,
bidirectional: bool = False,
) -> pd.DataFrame:
"""
Apply a Kalman filter to smooth camera positions and gate outliers based on Mahalanobis distance.
Expand Down Expand Up @@ -591,20 +618,22 @@ def smooth_camera_positions(
# Run the function to get the data
camera_position_raw = self.compute_camera_positions()

smoothed_positions = smooth_camera_positions(
smoothed_positions = smooth_camera_pose(
camera_position_raw,
state_dim,
meas_dim,
initial_state_noise,
process_noise,
measurement_noise,
gating_threshold
gating_threshold,
bidirectional
)

# Save to JSON
smoothed_positions.to_json(self.recording_dir / "camera_positions.json", orient="records")
smoothed_positions.to_json(self.recording_dir / "smoothed_camera_positions.json", orient="records")

return smoothed_positions


def plot_scanpath_on_video(
self,
Expand Down
Loading

0 comments on commit a362c14

Please sign in to comment.