-
Notifications
You must be signed in to change notification settings - Fork 290
/
dual_camera.py
executable file
·196 lines (167 loc) · 5.93 KB
/
dual_camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# MIT License
# Copyright (c) 2019-2022 JetsonHacks
# A simple code snippet
# Using two CSI cameras (such as the Raspberry Pi Version 2) connected to a
# NVIDIA Jetson Nano Developer Kit with two CSI ports (Jetson Nano, Jetson Xavier NX) via OpenCV
# Drivers for the camera and OpenCV are included in the base image in JetPack 4.3+
# This script will open a window and place the camera stream from each camera in a window
# arranged horizontally.
# The camera streams are each read in their own thread, as when done sequentially there
# is a noticeable lag
import cv2
import threading
import numpy as np
class CSI_Camera:
def __init__(self):
# Initialize instance variables
# OpenCV video capture element
self.video_capture = None
# The last captured image from the camera
self.frame = None
self.grabbed = False
# The thread where the video capture runs
self.read_thread = None
self.read_lock = threading.Lock()
self.running = False
def open(self, gstreamer_pipeline_string):
try:
self.video_capture = cv2.VideoCapture(
gstreamer_pipeline_string, cv2.CAP_GSTREAMER
)
# Grab the first frame to start the video capturing
self.grabbed, self.frame = self.video_capture.read()
except RuntimeError:
self.video_capture = None
print("Unable to open camera")
print("Pipeline: " + gstreamer_pipeline_string)
def start(self):
if self.running:
print('Video capturing is already running')
return None
# create a thread to read the camera image
if self.video_capture != None:
self.running = True
self.read_thread = threading.Thread(target=self.updateCamera)
self.read_thread.start()
return self
def stop(self):
self.running = False
# Kill the thread
self.read_thread.join()
self.read_thread = None
def updateCamera(self):
# This is the thread to read images from the camera
while self.running:
try:
grabbed, frame = self.video_capture.read()
with self.read_lock:
self.grabbed = grabbed
self.frame = frame
except RuntimeError:
print("Could not read image from camera")
# FIX ME - stop and cleanup thread
# Something bad happened
def read(self):
with self.read_lock:
frame = self.frame.copy()
grabbed = self.grabbed
return grabbed, frame
def release(self):
if self.video_capture != None:
self.video_capture.release()
self.video_capture = None
# Now kill the thread
if self.read_thread != None:
self.read_thread.join()
"""
gstreamer_pipeline returns a GStreamer pipeline for capturing from the CSI camera
Flip the image by setting the flip_method (most common values: 0 and 2)
display_width and display_height determine the size of each camera pane in the window on the screen
Default 1920x1080
"""
def gstreamer_pipeline(
sensor_id=0,
capture_width=1920,
capture_height=1080,
display_width=1920,
display_height=1080,
framerate=30,
flip_method=0,
):
return (
"nvarguscamerasrc sensor-id=%d ! "
"video/x-raw(memory:NVMM), width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! "
"nvvidconv flip-method=%d ! "
"video/x-raw, width=(int)%d, height=(int)%d, format=(string)BGRx ! "
"videoconvert ! "
"video/x-raw, format=(string)BGR ! appsink"
% (
sensor_id,
capture_width,
capture_height,
framerate,
flip_method,
display_width,
display_height,
)
)
def run_cameras():
window_title = "Dual CSI Cameras"
left_camera = CSI_Camera()
left_camera.open(
gstreamer_pipeline(
sensor_id=0,
capture_width=1920,
capture_height=1080,
flip_method=0,
display_width=960,
display_height=540,
)
)
left_camera.start()
right_camera = CSI_Camera()
right_camera.open(
gstreamer_pipeline(
sensor_id=1,
capture_width=1920,
capture_height=1080,
flip_method=0,
display_width=960,
display_height=540,
)
)
right_camera.start()
if left_camera.video_capture.isOpened() and right_camera.video_capture.isOpened():
cv2.namedWindow(window_title, cv2.WINDOW_AUTOSIZE)
try:
while True:
_, left_image = left_camera.read()
_, right_image = right_camera.read()
# Use numpy to place images next to each other
camera_images = np.hstack((left_image, right_image))
# Check to see if the user closed the window
# Under GTK+ (Jetson Default), WND_PROP_VISIBLE does not work correctly. Under Qt it does
# GTK - Substitute WND_PROP_AUTOSIZE to detect if window has been closed by user
if cv2.getWindowProperty(window_title, cv2.WND_PROP_AUTOSIZE) >= 0:
cv2.imshow(window_title, camera_images)
else:
break
# This also acts as
keyCode = cv2.waitKey(30) & 0xFF
# Stop the program on the ESC key
if keyCode == 27:
break
finally:
left_camera.stop()
left_camera.release()
right_camera.stop()
right_camera.release()
cv2.destroyAllWindows()
else:
print("Error: Unable to open both cameras")
left_camera.stop()
left_camera.release()
right_camera.stop()
right_camera.release()
if __name__ == "__main__":
run_cameras()