forked from nikmart/pi-object-detection
-
Notifications
You must be signed in to change notification settings - Fork 0
/
people_counter.py
executable file
·153 lines (130 loc) · 4.82 KB
/
people_counter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# USAGE
# python pi_object_detection.py --prototxt MobileNetSSD_deploy.prototxt.txt --model MobileNetSSD_deploy.caffemodel
# import the necessary packages
from imutils.video import VideoStream
from imutils.video import FPS
from multiprocessing import Process
from multiprocessing import Queue
import numpy as np
import argparse
import imutils
import time
import cv2
def classify_frame(net, inputQueue, outputQueue):
# keep looping
while True:
# check to see if there is a frame in our input queue
if not inputQueue.empty():
# grab the frame from the input queue, resize it, and
# construct a blob from it
frame = inputQueue.get()
frame = cv2.resize(frame, (300, 300))
blob = cv2.dnn.blobFromImage(frame, 0.007843,
(300, 300), 127.5)
# set the blob as input to our deep learning object
# detector and obtain the detections
net.setInput(blob)
detections = net.forward()
# write the detections to the output queue
outputQueue.put(detections)
# construct the argument parse and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
help="path to Caffe pre-trained model")
ap.add_argument("-c", "--confidence", type=float, default=0.2,
help="minimum probability to filter weak detections")
args = vars(ap.parse_args())
# initialize the list of class labels MobileNet SSD was trained to
# detect, then generate a set of bounding box colors for each class
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
"sofa", "train", "tvmonitor"]
COLORS = np.random.uniform(0, 255, size=(len(CLASSES), 3))
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])
# initialize the input queue (frames), output queue (detections),
# and the list of actual detections returned by the child process
inputQueue = Queue(maxsize=1)
outputQueue = Queue(maxsize=1)
detections = None
# construct a child process *indepedent* from our main process of
# execution
print("[INFO] starting process...")
p = Process(target=classify_frame, args=(net, inputQueue,
outputQueue,))
p.daemon = True
p.start()
# initialize the video stream, allow the cammera sensor to warmup,
# and initialize the FPS counter
print("[INFO] starting video stream...")
vs = VideoStream(src=0).start()
# vs = VideoStream(usePiCamera=True).start()
time.sleep(2.0)
fps = FPS().start()
# loop over the frames from the video stream
while True:
# grab the frame from the threaded video stream, resize it, and
# grab its imensions
frame = vs.read()
frame = imutils.resize(frame, width=400)
(fH, fW) = frame.shape[:2]
# if the input queue *is* empty, give the current frame to
# classify
if inputQueue.empty():
inputQueue.put(frame)
# if the output queue *is not* empty, grab the detections
if not outputQueue.empty():
detections = outputQueue.get()
# initialize the people counter for each frame
people_count = 0
# check to see if our detectios are not None (and if so, we'll
# draw the detections on the frame)
if detections is not None:
# loop over the detections
for i in np.arange(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated
# with the prediction
confidence = detections[0, 0, i, 2]
# filter out weak detections by ensuring the `confidence`
# is greater than the minimum confidence
if confidence < args["confidence"]:
continue
# otherwise, extract the index of the class label from
# the `detections`, then compute the (x, y)-coordinates
# of the bounding box for the object
idx = int(detections[0, 0, i, 1])
dims = np.array([fW, fH, fW, fH])
box = detections[0, 0, i, 3:7] * dims
(startX, startY, endX, endY) = box.astype("int")
# draw the prediction on the frame
label = "{}: {:.2f}%".format(CLASSES[idx],
confidence * 100)
cv2.rectangle(frame, (startX, startY), (endX, endY),
COLORS[idx], 2)
y = startY - 15 if startY - 15 > 15 else startY + 15
cv2.putText(frame, label, (startX, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLORS[idx], 2)
# count the people
if CLASSES[idx] == "person":
people_count += 1
cv2.putText(frame, str(people_count), (250, 250),
cv2.FONT_HERSHEY_SIMPLEX, 10, (144,0,255), 10)
# show the output frame
cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
# if the `q` key was pressed, break from the loop
if key == ord("q"):
break
# update the FPS counter
fps.update()
# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# do a bit of cleanup
cv2.destroyAllWindows()
vs.stop()