From d87f41576652c8c31084ccbefa53c334d9ddb298 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 10 Dec 2024 15:40:51 +0000 Subject: [PATCH] Format code with ruff --- pyneon/preprocess/filter.py | 19 ++- pyneon/recording.py | 38 +++--- pyneon/video/apriltags.py | 138 +++++++++++++--------- pyneon/video/video.py | 1 + source/tutorials/apriltag_detection.ipynb | 44 +++---- source/tutorials/sensor_fusion.ipynb | 136 +++++++++++++++------ 6 files changed, 237 insertions(+), 139 deletions(-) diff --git a/pyneon/preprocess/filter.py b/pyneon/preprocess/filter.py index 3a85c97..80c8e96 100644 --- a/pyneon/preprocess/filter.py +++ b/pyneon/preprocess/filter.py @@ -10,7 +10,7 @@ def smooth_camera_positions( meas_dim: int = 3, process_noise: float = 0.005, measurement_noise: float = 0.005, - gating_threshold: float = 3.0 + gating_threshold: float = 3.0, ) -> pd.DataFrame: """ Apply a Kalman filter to smooth camera positions and gate outliers based on Mahalanobis distance. @@ -39,15 +39,15 @@ def smooth_camera_positions( containing the smoothed positions. """ # Ensure the DataFrame is sorted by frame_idx - camera_position_raw = camera_position_raw.sort_values('frame_idx') + camera_position_raw = camera_position_raw.sort_values("frame_idx") # Extract positions and frame indices - positions = np.stack(camera_position_raw['camera_pos'].values) - frame_indices = camera_position_raw['frame_idx'].values + positions = np.stack(camera_position_raw["camera_pos"].values) + frame_indices = camera_position_raw["frame_idx"].values # Define Kalman filter matrices F = np.eye(state_dim) # State transition: Identity - H = np.eye(meas_dim) # Measurement matrix: Identity + H = np.eye(meas_dim) # Measurement matrix: Identity Q = process_noise * np.eye(state_dim) # Process noise covariance R = measurement_noise * np.eye(meas_dim) # Measurement noise covariance @@ -88,12 +88,11 @@ def smooth_camera_positions( smoothed_positions = np.array(smoothed_positions) # Create a new DataFrame with smoothed results - smoothed_df = pd.DataFrame({ - 'frame_idx': frame_indices, - 'smoothed_camera_pos': list(smoothed_positions) - }) + smoothed_df = pd.DataFrame( + {"frame_idx": frame_indices, "smoothed_camera_pos": list(smoothed_positions)} + ) final_results = camera_position_raw.copy() - final_results['smoothed_camera_pos'] = smoothed_df['smoothed_camera_pos'].values + final_results["smoothed_camera_pos"] = smoothed_df["smoothed_camera_pos"].values return final_results diff --git a/pyneon/recording.py b/pyneon/recording.py index 0789d24..3c06ca6 100644 --- a/pyneon/recording.py +++ b/pyneon/recording.py @@ -466,14 +466,11 @@ def estimate_scanpath( if (video := self.video) is None: raise ValueError("Estimating scanpath requires video data.") return estimate_scanpath(video, sync_gaze, lk_params) - - def detect_apriltags( - self, - tag_family: str ='tag36h11' - ) -> pd.DataFrame: + + def detect_apriltags(self, tag_family: str = "tag36h11") -> pd.DataFrame: """ Detect AprilTags in a video and report their data for every frame using the apriltag library. - + Parameters ---------- tag_family : str, optional @@ -494,11 +491,12 @@ def detect_apriltags( all_detections = detect_apriltags(self.video, tag_family) # Save to JSON - all_detections.to_json(self.recording_dir / "apriltags.json", orient="records", lines=True) + all_detections.to_json( + self.recording_dir / "apriltags.json", orient="records", lines=True + ) return all_detections - def compute_camera_positions( self, tag_locations: Dict[int, List[float]], @@ -536,13 +534,16 @@ def compute_camera_positions( if (json_file := self.recording_dir / "camera_positions.json").is_file(): return pd.read_json(json_file, orient="records") - camera_positions = compute_camera_positions(self.video, tag_locations, tag_size, all_detections) + camera_positions = compute_camera_positions( + self.video, tag_locations, tag_size, all_detections + ) # Save to JSON - camera_positions.to_json(self.recording_dir / "camera_positions.json", orient="records") + camera_positions.to_json( + self.recording_dir / "camera_positions.json", orient="records" + ) return camera_positions - def smooth_camera_positions( self, camera_position_raw: pd.DataFrame = pd.DataFrame(), @@ -550,7 +551,7 @@ def smooth_camera_positions( meas_dim: int = 3, process_noise: float = 0.005, measurement_noise: float = 0.005, - gating_threshold: float = 3.0 + gating_threshold: float = 3.0, ) -> pd.DataFrame: """ Apply a Kalman filter to smooth camera positions and gate outliers based on Mahalanobis distance. @@ -584,9 +585,9 @@ def smooth_camera_positions( if (json_file := self.recording_dir / "camera_positions.json").is_file(): camera_position_raw = pd.read_json(json_file, orient="records") # Ensure 'camera_pos' is parsed as NumPy arrays - camera_position_raw['camera_pos'] = camera_position_raw['camera_pos'].apply( - lambda pos: np.array(pos, dtype=float) - ) + camera_position_raw["camera_pos"] = camera_position_raw[ + "camera_pos" + ].apply(lambda pos: np.array(pos, dtype=float)) else: # Run the function to get the data camera_position_raw = self.compute_camera_positions() @@ -597,15 +598,16 @@ def smooth_camera_positions( meas_dim, process_noise, measurement_noise, - gating_threshold + gating_threshold, ) # Save to JSON - smoothed_positions.to_json(self.recording_dir / "camera_positions.json", orient="records") + smoothed_positions.to_json( + self.recording_dir / "camera_positions.json", orient="records" + ) return smoothed_positions - def plot_scanpath_on_video( self, scanpath: pd.DataFrame, diff --git a/pyneon/video/apriltags.py b/pyneon/video/apriltags.py index a615951..6cdc048 100644 --- a/pyneon/video/apriltags.py +++ b/pyneon/video/apriltags.py @@ -11,14 +11,11 @@ from ..recording import NeonRecording from .video import NeonVideo -def detect_apriltags( - video: "NeonVideo", - tag_family: str ='tag36h11' - ): - + +def detect_apriltags(video: "NeonVideo", tag_family: str = "tag36h11"): """ Detect AprilTags in a video and report their data for every frame using the apriltag library. - + Parameters ---------- video : cv2.VideoCapture or similar video object @@ -45,43 +42,46 @@ def detect_apriltags( all_detections = [] frame_idx = 0 - + while True: ret, frame = video.read() if not ret: break - + # Convert frame to grayscale for detection gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - + # Detect AprilTags detections = detector.detect(gray_frame) - + for detection in detections: # Extract the tag ID and corners tag_id = detection.tag_id corners = detection.corners - + # Calculate the center of the tag center = np.mean(corners, axis=0) - + # Store the detection data - all_detections.append({ - "frame_idx": frame_idx, - "tag_id": tag_id, - "corners": corners, - "center": center - }) - + all_detections.append( + { + "frame_idx": frame_idx, + "tag_id": tag_id, + "corners": corners, + "center": center, + } + ) + frame_idx += 1 - + video.release() # convert to pandas DataFrame all_detections = pd.DataFrame(all_detections) - + return all_detections + def compute_camera_positions( video: "NeonVideo", tag_locations: Dict[int, List[float]], @@ -91,9 +91,9 @@ def compute_camera_positions( """ Compute the camera position for each frame using AprilTag detections stored in a DataFrame. - This function uses a pinhole camera model and the `cv2.solvePnP` function to determine the - camera translation and rotation vectors for each frame. The known 3D positions of each tag - and the detected 2D corners are used as correspondences. The resulting camera pose is then + This function uses a pinhole camera model and the `cv2.solvePnP` function to determine the + camera translation and rotation vectors for each frame. The known 3D positions of each tag + and the detected 2D corners are used as correspondences. The resulting camera pose is then expressed in world coordinates. Parameters @@ -123,9 +123,9 @@ def compute_camera_positions( Notes ----- - - The camera's intrinsic parameters are estimated from the field of view and resolution. + - The camera's intrinsic parameters are estimated from the field of view and resolution. For more accurate results, use known camera intrinsics. - - The function assumes that each tag's known location is provided as the center of the tag, + - The function assumes that each tag's known location is provided as the center of the tag, and constructs the tag's corners in 3D world coordinates by offsetting from its center. Raises @@ -134,14 +134,21 @@ def compute_camera_positions( If no sufficient points are found to solve PnP for a given frame. """ - #check if all_detections is empty + # check if all_detections is empty if all_detections.empty: # call the detect_apriltags function to get the detections all_detections = detect_apriltags(video) # if still empty, return an empty DataFrame if all_detections.empty: print("No AprilTag detections found in the video.") - return pd.DataFrame(columns=["frame_idx", "translation_vector", "rotation_vector", "camera_pos"]) + return pd.DataFrame( + columns=[ + "frame_idx", + "translation_vector", + "rotation_vector", + "camera_pos", + ] + ) # Handle tag size inputs if isinstance(tag_size, float): @@ -152,11 +159,15 @@ def get_tag_half_size(tid): # Different sizes per tag def get_tag_half_size(tid): if tid not in tag_size: - raise ValueError(f"Tag ID {tid} not found in provided tag_size dictionary.") + raise ValueError( + f"Tag ID {tid} not found in provided tag_size dictionary." + ) return tag_size[tid] / 2.0 else: - raise TypeError("tag_size must be either a float or a dictionary mapping tag IDs to sizes.") - + raise TypeError( + "tag_size must be either a float or a dictionary mapping tag IDs to sizes." + ) + camera_matrix = video.camera_matrix dist_coeffs = video.dist_coeffs @@ -167,8 +178,8 @@ def get_tag_half_size(tid): results = [] # Process each unique frame - for frame in all_detections['frame_idx'].unique(): - frame_detections = all_detections.loc[all_detections['frame_idx'] == frame] + for frame in all_detections["frame_idx"].unique(): + frame_detections = all_detections.loc[all_detections["frame_idx"] == frame] if frame_detections.empty: # No tags detected in this frame, skip @@ -179,8 +190,8 @@ def get_tag_half_size(tid): # Collect all object-image correspondences for this frame for _, row in frame_detections.iterrows(): - tag_id = row['tag_id'] - corners = row['corners'] # shape (4,2) + tag_id = row["tag_id"] + corners = row["corners"] # shape (4,2) if tag_id not in tag_locations: # If no known location for this tag is provided, skip it @@ -191,12 +202,31 @@ def get_tag_half_size(tid): # Compute the 3D corners of the tag from its center # The tag plane orientation is assumed. Adjust as needed. - tag_3d_corners = np.array([ - [tag_center_3d[0], tag_center_3d[1] - half_size, tag_center_3d[2] + half_size], - [tag_center_3d[0], tag_center_3d[1] + half_size, tag_center_3d[2] + half_size], - [tag_center_3d[0], tag_center_3d[1] + half_size, tag_center_3d[2] - half_size], - [tag_center_3d[0], tag_center_3d[1] - half_size, tag_center_3d[2] - half_size] - ], dtype=np.float32) + tag_3d_corners = np.array( + [ + [ + tag_center_3d[0], + tag_center_3d[1] - half_size, + tag_center_3d[2] + half_size, + ], + [ + tag_center_3d[0], + tag_center_3d[1] + half_size, + tag_center_3d[2] + half_size, + ], + [ + tag_center_3d[0], + tag_center_3d[1] + half_size, + tag_center_3d[2] - half_size, + ], + [ + tag_center_3d[0], + tag_center_3d[1] - half_size, + tag_center_3d[2] - half_size, + ], + ], + dtype=np.float32, + ) object_points.extend(tag_3d_corners) image_points.extend(corners) @@ -210,10 +240,7 @@ def get_tag_half_size(tid): # Solve the PnP problem to find rotation and translation vectors success, rotation_vector, translation_vector = cv2.solvePnP( - object_points, - image_points, - camera_matrix, - dist_coeffs + object_points, image_points, camera_matrix, dist_coeffs ) if not success: @@ -222,18 +249,23 @@ def get_tag_half_size(tid): # Convert rotation vector to rotation matrix R, _ = cv2.Rodrigues(rotation_vector) - + # Compute camera position in world coordinates # World to camera: Pc = R * Pw + t # Pw = R^T * (Pc - t), with Pc=0 (camera center) # camera_pos = -R^T * t camera_pos = -R.T @ translation_vector - results.append({ - "frame_idx": frame, - "translation_vector": translation_vector.reshape(-1), - "rotation_vector": rotation_vector.reshape(-1), - "camera_pos": camera_pos.reshape(-1) - }) + results.append( + { + "frame_idx": frame, + "translation_vector": translation_vector.reshape(-1), + "rotation_vector": rotation_vector.reshape(-1), + "camera_pos": camera_pos.reshape(-1), + } + ) - return pd.DataFrame(results, columns=["frame_idx", "translation_vector", "rotation_vector", "camera_pos"]) + return pd.DataFrame( + results, + columns=["frame_idx", "translation_vector", "rotation_vector", "camera_pos"], + ) diff --git a/pyneon/video/video.py b/pyneon/video/video.py index b93b445..0931aff 100644 --- a/pyneon/video/video.py +++ b/pyneon/video/video.py @@ -8,6 +8,7 @@ from ..vis import plot_frame + class NeonVideo(cv2.VideoCapture): """ Loaded video file with timestamps. diff --git a/source/tutorials/apriltag_detection.ipynb b/source/tutorials/apriltag_detection.ipynb index b3d8ce3..7dd7b87 100644 --- a/source/tutorials/apriltag_detection.ipynb +++ b/source/tutorials/apriltag_detection.ipynb @@ -57,17 +57,17 @@ ], "source": [ "tag_locations = {\n", - " 0: [0.0, -0.25, 0.2],\n", - " 1: [0.0, 0.25, 0.2],\n", - " 2: [0.0, -0.25, -0.2],\n", - " 3: [0.0, 0.25, -0.2]\n", - " }\n", + " 0: [0.0, -0.25, 0.2],\n", + " 1: [0.0, 0.25, 0.2],\n", + " 2: [0.0, -0.25, -0.2],\n", + " 3: [0.0, 0.25, -0.2],\n", + "}\n", "\n", "# Size of each tag (e.g., 0.2 meters, meaning 20 cm each side)\n", "tag_size = 0.075\n", "\n", "camera_position = recording.compute_camera_positions(tag_locations, tag_size)\n", - "print(camera_position.columns)\n" + "print(camera_position.columns)" ] }, { @@ -87,21 +87,21 @@ } ], "source": [ - "#plot the trajectory in xy\n", + "# plot the trajectory in xy\n", "\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "\n", - "x = (camera_position['camera_pos'].apply(lambda x: x[0]).values) # Extract x values\n", - "y = camera_position['camera_pos'].apply(lambda x: x[1]).values # Extract y values\n", + "x = camera_position[\"camera_pos\"].apply(lambda x: x[0]).values # Extract x values\n", + "y = camera_position[\"camera_pos\"].apply(lambda x: x[1]).values # Extract y values\n", "colors = np.arange(len(x)) # Create a color array based on the index\n", "\n", "fig = plt.figure()\n", "ax = fig.add_subplot(111)\n", - "scatter = ax.scatter(x, y, c=colors, cmap='viridis')\n", - "plt.colorbar(scatter, label='Index')\n", - "ax.set_aspect('equal', 'box')\n", - "plt.show()\n" + "scatter = ax.scatter(x, y, c=colors, cmap=\"viridis\")\n", + "plt.colorbar(scatter, label=\"Index\")\n", + "ax.set_aspect(\"equal\", \"box\")\n", + "plt.show()" ] }, { @@ -137,21 +137,25 @@ } ], "source": [ - "#plot the trajectory in xy\n", + "# plot the trajectory in xy\n", "\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "\n", - "x = (camera_position['smoothed_camera_pos'].apply(lambda x: x[0]).values) # Extract x values\n", - "y = camera_position['smoothed_camera_pos'].apply(lambda x: x[1]).values # Extract y values\n", + "x = (\n", + " camera_position[\"smoothed_camera_pos\"].apply(lambda x: x[0]).values\n", + ") # Extract x values\n", + "y = (\n", + " camera_position[\"smoothed_camera_pos\"].apply(lambda x: x[1]).values\n", + ") # Extract y values\n", "colors = np.arange(len(x)) # Create a color array based on the index\n", "\n", "fig = plt.figure()\n", "ax = fig.add_subplot(111)\n", - "scatter = ax.scatter(x, y, c=colors, cmap='viridis')\n", - "plt.colorbar(scatter, label='Index')\n", - "ax.set_aspect('equal', 'box')\n", - "plt.show()\n" + "scatter = ax.scatter(x, y, c=colors, cmap=\"viridis\")\n", + "plt.colorbar(scatter, label=\"Index\")\n", + "ax.set_aspect(\"equal\", \"box\")\n", + "plt.show()" ] } ], diff --git a/source/tutorials/sensor_fusion.ipynb b/source/tutorials/sensor_fusion.ipynb index fbc8223..b5f23bf 100644 --- a/source/tutorials/sensor_fusion.ipynb +++ b/source/tutorials/sensor_fusion.ipynb @@ -25,13 +25,13 @@ } ], "source": [ - "#import all relevant libraries\n", + "# import all relevant libraries\n", "import pandas as pd\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "\n", - "#load from csv\n", - "final_df = pd.read_csv('output/final_states.csv')\n", + "# load from csv\n", + "final_df = pd.read_csv(\"output/final_states.csv\")\n", "\n", "# Convert smoothed_positions to a NumPy array\n", "smoothed_positions_array = np.array(final_df)\n", @@ -43,12 +43,12 @@ "# Plotting\n", "fig = plt.figure()\n", "ax = fig.add_subplot(111)\n", - "scatter = ax.scatter(x_values, y_values, cmap='viridis', s=5)\n", + "scatter = ax.scatter(x_values, y_values, cmap=\"viridis\", s=5)\n", "plt.colorbar(scatter, label=\"Time (ns)\")\n", "ax.set_title(\"Smoothed Positions Over Time\")\n", "ax.set_xlabel(\"X Position\")\n", "ax.set_ylabel(\"Y Position\")\n", - "plt.show()\n" + "plt.show()" ] }, { @@ -87,14 +87,16 @@ "# results: columns ['frame_idx', 'camera_pos'] where camera_pos is [x, y, z]\n", "\n", "# Create quick lookups\n", - "results_dict = {row['frame_idx']: row['smoothed_camera_pos'] for _, row in final_results.iterrows()}\n", + "results_dict = {\n", + " row[\"frame_idx\"]: row[\"smoothed_camera_pos\"] for _, row in final_results.iterrows()\n", + "}\n", "\n", "detections_by_frame = {}\n", "for _, row in all_detections.iterrows():\n", - " fidx = row['frame_idx']\n", + " fidx = row[\"frame_idx\"]\n", " if fidx not in detections_by_frame:\n", " detections_by_frame[fidx] = []\n", - " detections_by_frame[fidx].append((row['tag_id'], row['corners']))\n", + " detections_by_frame[fidx].append((row[\"tag_id\"], row[\"corners\"]))\n", "\n", "cap = recording.video\n", "\n", @@ -110,9 +112,9 @@ "min_y, max_y = np.inf, -np.inf\n", "\n", "if len(results) > 0:\n", - " all_positions = np.vstack(results['camera_pos'])\n", - " min_x, max_x = np.min(all_positions[:,0]), np.max(all_positions[:,0])\n", - " min_y, max_y = np.min(all_positions[:,1]), np.max(all_positions[:,1])\n", + " all_positions = np.vstack(results[\"camera_pos\"])\n", + " min_x, max_x = np.min(all_positions[:, 0]), np.max(all_positions[:, 0])\n", + " min_y, max_y = np.min(all_positions[:, 1]), np.max(all_positions[:, 1])\n", "\n", "# Handle degenerate ranges\n", "if min_x == max_x:\n", @@ -127,15 +129,24 @@ "# Store all visited positions to show them dimly\n", "visited_positions = []\n", "\n", + "\n", "def draw_detections(frame, detections, color, thickness=2):\n", - " for (tag_id, corners) in detections:\n", + " for tag_id, corners in detections:\n", " corners_int = corners.astype(int)\n", " cv2.polylines(frame, [corners_int], True, color, thickness)\n", " for c in corners_int:\n", " cv2.circle(frame, tuple(c), 4, color, -1)\n", - " corner_text_pos = (corners_int[0,0], corners_int[0,1] - 10)\n", - " cv2.putText(frame, f\"ID: {tag_id}\", corner_text_pos, cv2.FONT_HERSHEY_SIMPLEX, \n", - " 0.6, color, 2)\n", + " corner_text_pos = (corners_int[0, 0], corners_int[0, 1] - 10)\n", + " cv2.putText(\n", + " frame,\n", + " f\"ID: {tag_id}\",\n", + " corner_text_pos,\n", + " cv2.FONT_HERSHEY_SIMPLEX,\n", + " 0.6,\n", + " color,\n", + " 2,\n", + " )\n", + "\n", "\n", "def position_to_graph_coords(position, x0, y0, w, h, min_x, max_x, min_y, max_y):\n", " x, y, z = position\n", @@ -145,44 +156,65 @@ " pt_y = int(y0 + (1 - y_norm) * h)\n", " return (pt_x, pt_y)\n", "\n", + "\n", "def draw_coordinate_cross(frame, x0, y0, w, h, min_x, max_x, min_y, max_y):\n", " # Draw a black background\n", - " cv2.rectangle(frame, (x0, y0), (x0+w, y0+h), (0, 0, 0), -1)\n", + " cv2.rectangle(frame, (x0, y0), (x0 + w, y0 + h), (0, 0, 0), -1)\n", "\n", " # Only draw axes if 0,0 is within the range\n", " if min_x < 0 < max_x and min_y < 0 < max_y:\n", " # Find the coordinates of the origin in the graph\n", - " origin = position_to_graph_coords((0,0,0), x0, y0, w, h, min_x, max_x, min_y, max_y)\n", + " origin = position_to_graph_coords(\n", + " (0, 0, 0), x0, y0, w, h, min_x, max_x, min_y, max_y\n", + " )\n", "\n", " line_color = (200, 200, 200)\n", " thickness = 1\n", - " \n", + "\n", " # Draw x-axis line (horizontal) across entire width\n", - " cv2.line(frame, (x0, origin[1]), (x0+w, origin[1]), line_color, thickness)\n", + " cv2.line(frame, (x0, origin[1]), (x0 + w, origin[1]), line_color, thickness)\n", "\n", " # Draw y-axis line (vertical) across entire height\n", - " cv2.line(frame, (origin[0], y0), (origin[0], y0+h), line_color, thickness)\n", + " cv2.line(frame, (origin[0], y0), (origin[0], y0 + h), line_color, thickness)\n", + "\n", "\n", - "def draw_mini_graph(frame, current_position, detected, visited_positions, min_x, max_x, min_y, max_y):\n", + "def draw_mini_graph(\n", + " frame, current_position, detected, visited_positions, min_x, max_x, min_y, max_y\n", + "):\n", " h, w = frame.shape[:2]\n", " x0 = w - graph_width - 10\n", " y0 = h - graph_height - 10\n", - " \n", + "\n", " # Draw coordinate cross background and axes\n", - " draw_coordinate_cross(frame, x0, y0, graph_width, graph_height, min_x, max_x, min_y, max_y)\n", + " draw_coordinate_cross(\n", + " frame, x0, y0, graph_width, graph_height, min_x, max_x, min_y, max_y\n", + " )\n", "\n", " # Draw all previously visited positions in a dim color\n", " dim_color = (100, 100, 100)\n", " for pos in visited_positions:\n", - " pt = position_to_graph_coords(pos, x0, y0, graph_width, graph_height, min_x, max_x, min_y, max_y)\n", + " pt = position_to_graph_coords(\n", + " pos, x0, y0, graph_width, graph_height, min_x, max_x, min_y, max_y\n", + " )\n", " cv2.circle(frame, pt, 3, dim_color, -1)\n", "\n", " # Draw the current position if available\n", " if current_position is not None:\n", - " pt = position_to_graph_coords(current_position, x0, y0, graph_width, graph_height, min_x, max_x, min_y, max_y)\n", + " pt = position_to_graph_coords(\n", + " current_position,\n", + " x0,\n", + " y0,\n", + " graph_width,\n", + " graph_height,\n", + " min_x,\n", + " max_x,\n", + " min_y,\n", + " max_y,\n", + " )\n", " color = (0, 255, 0) if detected else (0, 0, 255)\n", " cv2.circle(frame, pt, 5, color, -1)\n", "\n", + "\n", "# Try reading a test frame to get video size\n", "ret, test_frame = cap.read()\n", "if not ret:\n", @@ -195,11 +227,11 @@ "cap.set(cv2.CAP_PROP_POS_FRAMES, 0)\n", "\n", "# Initialize VideoWriter to save output video\n", - "fourcc = cv2.VideoWriter_fourcc(*'mp4v')\n", + "fourcc = cv2.VideoWriter_fourcc(*\"mp4v\")\n", "fps = cap.get(cv2.CAP_PROP_FPS)\n", "if fps <= 0:\n", " fps = 30 # fallback if FPS is not available\n", - "out = cv2.VideoWriter('output_with_overlays.mp4', fourcc, fps, (width, height))\n", + "out = cv2.VideoWriter(\"output_with_overlays.mp4\", fourcc, fps, (width, height))\n", "\n", "while True:\n", " ret, frame = cap.read()\n", @@ -213,36 +245,64 @@ " if current_position is not None:\n", " # Store current position for future reference\n", " visited_positions.append(current_position)\n", - " \n", + "\n", " if current_detections is not None:\n", " # We have current detections\n", " ever_detected = True\n", " last_detections = current_detections\n", - " last_position = current_position if current_position is not None else last_position\n", + " last_position = (\n", + " current_position if current_position is not None else last_position\n", + " )\n", "\n", " # Draw detections in green\n", " draw_detections(frame, current_detections, (0, 255, 0))\n", - " \n", + "\n", " # Mini graph with current position in green\n", - " draw_mini_graph(frame, current_position, True, visited_positions, min_x, max_x, min_y, max_y)\n", + " draw_mini_graph(\n", + " frame, current_position, True, visited_positions, min_x, max_x, min_y, max_y\n", + " )\n", " else:\n", " # No current detections\n", " if ever_detected and last_detections is not None:\n", " # Draw last known detections in red\n", " draw_detections(frame, last_detections, (0, 0, 255))\n", - " \n", + "\n", " # Mini graph with last known position in red\n", - " draw_mini_graph(frame, last_position, False, visited_positions, min_x, max_x, min_y, max_y)\n", + " draw_mini_graph(\n", + " frame,\n", + " last_position,\n", + " False,\n", + " visited_positions,\n", + " min_x,\n", + " max_x,\n", + " min_y,\n", + " max_y,\n", + " )\n", " else:\n", " # Never had any detections: draw a green overlay\n", " overlay = frame.copy()\n", - " cv2.rectangle(overlay, (0,0), (frame.shape[1], frame.shape[0]), (0,255,0), thickness=20)\n", + " cv2.rectangle(\n", + " overlay,\n", + " (0, 0),\n", + " (frame.shape[1], frame.shape[0]),\n", + " (0, 255, 0),\n", + " thickness=20,\n", + " )\n", " alpha = 0.2\n", " frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)\n", " # If we do have a last known position, still show it in green\n", - " draw_mini_graph(frame, last_position, True, visited_positions, min_x, max_x, min_y, max_y)\n", - "\n", - " cv2.imshow('Video with Overlays', frame)\n", + " draw_mini_graph(\n", + " frame,\n", + " last_position,\n", + " True,\n", + " visited_positions,\n", + " min_x,\n", + " max_x,\n", + " min_y,\n", + " max_y,\n", + " )\n", + "\n", + " cv2.imshow(\"Video with Overlays\", frame)\n", " out.write(frame)\n", "\n", " key = cv2.waitKey(1) & 0xFF\n", @@ -253,7 +313,7 @@ "\n", "cap.release()\n", "out.release()\n", - "cv2.destroyAllWindows()\n" + "cv2.destroyAllWindows()" ] } ],