Skip to content

Commit

Permalink
Format code with ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] committed Dec 10, 2024
1 parent 101f98b commit d87f415
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 139 deletions.
19 changes: 9 additions & 10 deletions pyneon/preprocess/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def smooth_camera_positions(
meas_dim: int = 3,
process_noise: float = 0.005,
measurement_noise: float = 0.005,
gating_threshold: float = 3.0
gating_threshold: float = 3.0,
) -> pd.DataFrame:
"""
Apply a Kalman filter to smooth camera positions and gate outliers based on Mahalanobis distance.
Expand Down Expand Up @@ -39,15 +39,15 @@ def smooth_camera_positions(
containing the smoothed positions.
"""
# Ensure the DataFrame is sorted by frame_idx
camera_position_raw = camera_position_raw.sort_values('frame_idx')
camera_position_raw = camera_position_raw.sort_values("frame_idx")

# Extract positions and frame indices
positions = np.stack(camera_position_raw['camera_pos'].values)
frame_indices = camera_position_raw['frame_idx'].values
positions = np.stack(camera_position_raw["camera_pos"].values)
frame_indices = camera_position_raw["frame_idx"].values

# Define Kalman filter matrices
F = np.eye(state_dim) # State transition: Identity
H = np.eye(meas_dim) # Measurement matrix: Identity
H = np.eye(meas_dim) # Measurement matrix: Identity

Q = process_noise * np.eye(state_dim) # Process noise covariance
R = measurement_noise * np.eye(meas_dim) # Measurement noise covariance
Expand Down Expand Up @@ -88,12 +88,11 @@ def smooth_camera_positions(
smoothed_positions = np.array(smoothed_positions)

# Create a new DataFrame with smoothed results
smoothed_df = pd.DataFrame({
'frame_idx': frame_indices,
'smoothed_camera_pos': list(smoothed_positions)
})
smoothed_df = pd.DataFrame(
{"frame_idx": frame_indices, "smoothed_camera_pos": list(smoothed_positions)}
)

final_results = camera_position_raw.copy()
final_results['smoothed_camera_pos'] = smoothed_df['smoothed_camera_pos'].values
final_results["smoothed_camera_pos"] = smoothed_df["smoothed_camera_pos"].values

return final_results
38 changes: 20 additions & 18 deletions pyneon/recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,11 @@ def estimate_scanpath(
if (video := self.video) is None:
raise ValueError("Estimating scanpath requires video data.")
return estimate_scanpath(video, sync_gaze, lk_params)

def detect_apriltags(
self,
tag_family: str ='tag36h11'
) -> pd.DataFrame:

def detect_apriltags(self, tag_family: str = "tag36h11") -> pd.DataFrame:
"""
Detect AprilTags in a video and report their data for every frame using the apriltag library.
Parameters
----------
tag_family : str, optional
Expand All @@ -494,11 +491,12 @@ def detect_apriltags(

all_detections = detect_apriltags(self.video, tag_family)
# Save to JSON
all_detections.to_json(self.recording_dir / "apriltags.json", orient="records", lines=True)
all_detections.to_json(
self.recording_dir / "apriltags.json", orient="records", lines=True
)

return all_detections


def compute_camera_positions(
self,
tag_locations: Dict[int, List[float]],
Expand Down Expand Up @@ -536,21 +534,24 @@ def compute_camera_positions(
if (json_file := self.recording_dir / "camera_positions.json").is_file():
return pd.read_json(json_file, orient="records")

camera_positions = compute_camera_positions(self.video, tag_locations, tag_size, all_detections)
camera_positions = compute_camera_positions(
self.video, tag_locations, tag_size, all_detections
)
# Save to JSON
camera_positions.to_json(self.recording_dir / "camera_positions.json", orient="records")
camera_positions.to_json(
self.recording_dir / "camera_positions.json", orient="records"
)

return camera_positions


def smooth_camera_positions(
self,
camera_position_raw: pd.DataFrame = pd.DataFrame(),
state_dim: int = 3,
meas_dim: int = 3,
process_noise: float = 0.005,
measurement_noise: float = 0.005,
gating_threshold: float = 3.0
gating_threshold: float = 3.0,
) -> pd.DataFrame:
"""
Apply a Kalman filter to smooth camera positions and gate outliers based on Mahalanobis distance.
Expand Down Expand Up @@ -584,9 +585,9 @@ def smooth_camera_positions(
if (json_file := self.recording_dir / "camera_positions.json").is_file():
camera_position_raw = pd.read_json(json_file, orient="records")
# Ensure 'camera_pos' is parsed as NumPy arrays
camera_position_raw['camera_pos'] = camera_position_raw['camera_pos'].apply(
lambda pos: np.array(pos, dtype=float)
)
camera_position_raw["camera_pos"] = camera_position_raw[
"camera_pos"
].apply(lambda pos: np.array(pos, dtype=float))
else:
# Run the function to get the data
camera_position_raw = self.compute_camera_positions()
Expand All @@ -597,15 +598,16 @@ def smooth_camera_positions(
meas_dim,
process_noise,
measurement_noise,
gating_threshold
gating_threshold,
)

# Save to JSON
smoothed_positions.to_json(self.recording_dir / "camera_positions.json", orient="records")
smoothed_positions.to_json(
self.recording_dir / "camera_positions.json", orient="records"
)

return smoothed_positions


def plot_scanpath_on_video(
self,
scanpath: pd.DataFrame,
Expand Down
138 changes: 85 additions & 53 deletions pyneon/video/apriltags.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@
from ..recording import NeonRecording
from .video import NeonVideo

def detect_apriltags(
video: "NeonVideo",
tag_family: str ='tag36h11'
):


def detect_apriltags(video: "NeonVideo", tag_family: str = "tag36h11"):
"""
Detect AprilTags in a video and report their data for every frame using the apriltag library.
Parameters
----------
video : cv2.VideoCapture or similar video object
Expand All @@ -45,43 +42,46 @@ def detect_apriltags(

all_detections = []
frame_idx = 0

while True:
ret, frame = video.read()
if not ret:
break

# Convert frame to grayscale for detection
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

# Detect AprilTags
detections = detector.detect(gray_frame)

for detection in detections:
# Extract the tag ID and corners
tag_id = detection.tag_id
corners = detection.corners

# Calculate the center of the tag
center = np.mean(corners, axis=0)

# Store the detection data
all_detections.append({
"frame_idx": frame_idx,
"tag_id": tag_id,
"corners": corners,
"center": center
})

all_detections.append(
{
"frame_idx": frame_idx,
"tag_id": tag_id,
"corners": corners,
"center": center,
}
)

frame_idx += 1

video.release()

# convert to pandas DataFrame
all_detections = pd.DataFrame(all_detections)

return all_detections


def compute_camera_positions(
video: "NeonVideo",
tag_locations: Dict[int, List[float]],
Expand All @@ -91,9 +91,9 @@ def compute_camera_positions(
"""
Compute the camera position for each frame using AprilTag detections stored in a DataFrame.
This function uses a pinhole camera model and the `cv2.solvePnP` function to determine the
camera translation and rotation vectors for each frame. The known 3D positions of each tag
and the detected 2D corners are used as correspondences. The resulting camera pose is then
This function uses a pinhole camera model and the `cv2.solvePnP` function to determine the
camera translation and rotation vectors for each frame. The known 3D positions of each tag
and the detected 2D corners are used as correspondences. The resulting camera pose is then
expressed in world coordinates.
Parameters
Expand Down Expand Up @@ -123,9 +123,9 @@ def compute_camera_positions(
Notes
-----
- The camera's intrinsic parameters are estimated from the field of view and resolution.
- The camera's intrinsic parameters are estimated from the field of view and resolution.
For more accurate results, use known camera intrinsics.
- The function assumes that each tag's known location is provided as the center of the tag,
- The function assumes that each tag's known location is provided as the center of the tag,
and constructs the tag's corners in 3D world coordinates by offsetting from its center.
Raises
Expand All @@ -134,14 +134,21 @@ def compute_camera_positions(
If no sufficient points are found to solve PnP for a given frame.
"""

#check if all_detections is empty
# check if all_detections is empty
if all_detections.empty:
# call the detect_apriltags function to get the detections
all_detections = detect_apriltags(video)
# if still empty, return an empty DataFrame
if all_detections.empty:
print("No AprilTag detections found in the video.")
return pd.DataFrame(columns=["frame_idx", "translation_vector", "rotation_vector", "camera_pos"])
return pd.DataFrame(
columns=[
"frame_idx",
"translation_vector",
"rotation_vector",
"camera_pos",
]
)

# Handle tag size inputs
if isinstance(tag_size, float):
Expand All @@ -152,11 +159,15 @@ def get_tag_half_size(tid):
# Different sizes per tag
def get_tag_half_size(tid):
if tid not in tag_size:
raise ValueError(f"Tag ID {tid} not found in provided tag_size dictionary.")
raise ValueError(
f"Tag ID {tid} not found in provided tag_size dictionary."
)
return tag_size[tid] / 2.0
else:
raise TypeError("tag_size must be either a float or a dictionary mapping tag IDs to sizes.")

raise TypeError(
"tag_size must be either a float or a dictionary mapping tag IDs to sizes."
)

camera_matrix = video.camera_matrix
dist_coeffs = video.dist_coeffs

Expand All @@ -167,8 +178,8 @@ def get_tag_half_size(tid):
results = []

# Process each unique frame
for frame in all_detections['frame_idx'].unique():
frame_detections = all_detections.loc[all_detections['frame_idx'] == frame]
for frame in all_detections["frame_idx"].unique():
frame_detections = all_detections.loc[all_detections["frame_idx"] == frame]

if frame_detections.empty:
# No tags detected in this frame, skip
Expand All @@ -179,8 +190,8 @@ def get_tag_half_size(tid):

# Collect all object-image correspondences for this frame
for _, row in frame_detections.iterrows():
tag_id = row['tag_id']
corners = row['corners'] # shape (4,2)
tag_id = row["tag_id"]
corners = row["corners"] # shape (4,2)

if tag_id not in tag_locations:
# If no known location for this tag is provided, skip it
Expand All @@ -191,12 +202,31 @@ def get_tag_half_size(tid):

# Compute the 3D corners of the tag from its center
# The tag plane orientation is assumed. Adjust as needed.
tag_3d_corners = np.array([
[tag_center_3d[0], tag_center_3d[1] - half_size, tag_center_3d[2] + half_size],
[tag_center_3d[0], tag_center_3d[1] + half_size, tag_center_3d[2] + half_size],
[tag_center_3d[0], tag_center_3d[1] + half_size, tag_center_3d[2] - half_size],
[tag_center_3d[0], tag_center_3d[1] - half_size, tag_center_3d[2] - half_size]
], dtype=np.float32)
tag_3d_corners = np.array(
[
[
tag_center_3d[0],
tag_center_3d[1] - half_size,
tag_center_3d[2] + half_size,
],
[
tag_center_3d[0],
tag_center_3d[1] + half_size,
tag_center_3d[2] + half_size,
],
[
tag_center_3d[0],
tag_center_3d[1] + half_size,
tag_center_3d[2] - half_size,
],
[
tag_center_3d[0],
tag_center_3d[1] - half_size,
tag_center_3d[2] - half_size,
],
],
dtype=np.float32,
)

object_points.extend(tag_3d_corners)
image_points.extend(corners)
Expand All @@ -210,10 +240,7 @@ def get_tag_half_size(tid):

# Solve the PnP problem to find rotation and translation vectors
success, rotation_vector, translation_vector = cv2.solvePnP(
object_points,
image_points,
camera_matrix,
dist_coeffs
object_points, image_points, camera_matrix, dist_coeffs
)

if not success:
Expand All @@ -222,18 +249,23 @@ def get_tag_half_size(tid):

# Convert rotation vector to rotation matrix
R, _ = cv2.Rodrigues(rotation_vector)

# Compute camera position in world coordinates
# World to camera: Pc = R * Pw + t
# Pw = R^T * (Pc - t), with Pc=0 (camera center)
# camera_pos = -R^T * t
camera_pos = -R.T @ translation_vector

results.append({
"frame_idx": frame,
"translation_vector": translation_vector.reshape(-1),
"rotation_vector": rotation_vector.reshape(-1),
"camera_pos": camera_pos.reshape(-1)
})
results.append(
{
"frame_idx": frame,
"translation_vector": translation_vector.reshape(-1),
"rotation_vector": rotation_vector.reshape(-1),
"camera_pos": camera_pos.reshape(-1),
}
)

return pd.DataFrame(results, columns=["frame_idx", "translation_vector", "rotation_vector", "camera_pos"])
return pd.DataFrame(
results,
columns=["frame_idx", "translation_vector", "rotation_vector", "camera_pos"],
)
Loading

0 comments on commit d87f415

Please sign in to comment.