-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
133 lines (123 loc) · 5.55 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import json
import cv2
import subprocess
import threading
import numpy as np
import os
base_dir = os.path.dirname(__file__)
def camera_thread():
infer_path = os.path.join(base_dir, "python", "det_keypoint_unite_infer.py")
det_model_dir = os.path.join(base_dir, "model", "picodet_v2_s_192_pedestrian")
kpt_model_dir = os.path.join(base_dir, "model", "tinypose_128x96")
shell = "python %s --det_model_dir=%s --keypoint_model_dir=%s --camera_id=0 --device=GPU --save_res=True" % \
(infer_path, det_model_dir, kpt_model_dir)
subprocess.run(shell)
def video_player(video_name):
json_path = os.path.join(base_dir, 'video', video_name[:video_name.rfind('.')],
video_name[:video_name.rfind('.')] + ".json")
video_path = os.path.join(base_dir, 'video', video_name[:video_name.rfind('.')], video_name)
print(video_path)
print(json_path)
data = json.load(open(json_path, "r"))
video = cv2.VideoCapture(video_path)
frame_id = 0
fp = open("temp.json", "w")
fp.close()
while True:
ret, frame = video.read()
if not ret:
raise "视频读取错误"
frame_id += 1
frame_res = data[frame_id]
peo_boxes = frame_res[1]
skel_list = frame_res[2][0]
peo_area = []
for i, box in enumerate(peo_boxes):
peo_area.append((box[2] - box[0]) * (box[3] - box[1]))
main_boxes = peo_boxes[peo_area.index(max(peo_area))]
cv2.rectangle(frame,
(main_boxes[0], main_boxes[1]), (main_boxes[2], main_boxes[3]),
color=[0, 0, 255], thickness=3)
for i, skel_points in enumerate(skel_list[peo_area.index(max(peo_area))]):
cv2.circle(frame,
(int(skel_points[0]), int(skel_points[1])),
radius=4, color=[255, 0, 0], thickness=2)
frame = cv2.resize(frame, (frame.shape[1] // 2, frame.shape[0] // 2))
cv2.imshow("test", frame)
k = cv2.waitKey(20)
# q键退出
if (k & 0xff == ord('q')):
break
im = None
while True:
try:
im = np.load(os.path.join(base_dir, "python", "frame.npy"))
except:
continue
camera_res = None
therhold = 0.97
video_KPT = skel_list[peo_area.index(max(peo_area))]
video_box = peo_boxes[peo_area.index(max(peo_area))]
while camera_res is None:
try:
camera_res = json.load(open("temp.json", "r"))
except:
pass
camera_KPT = camera_res[2][0][0]
camera_box = camera_res[1][0]
# camera_KPT = skel_list[peo_area.index(min(peo_area))]
video_kpt = np.array(video_KPT)[0:17, 0:2]
camera_kpt = np.array(camera_KPT)[0:17, 0:2]
cosine = []
attention_index = []
kpt_tuple = [[6, 8], [5, 7], [8, 10], [7, 9], [5, 6], [12, 14], [11, 13], [14, 16], [13, 15]]
kpt_tuple_name = ["左上臂", "右上臂", "左下臂", "右下臂", "两肩", "左大腿", "右大腿", "左小腿", "右小腿"]
kpt_tuple_name_en = ["Left upper arm", "Right upper arm", "Left lower arm", "Right lower arm",
"Two shoulders", "Left thigh", "Right thigh", "Left calf", "Right calf"]
for i in range(len(kpt_tuple)):
kpt_index = kpt_tuple[i]
video_vec = [video_kpt[kpt_index[0]][0] - video_kpt[kpt_index[1]][0],
video_kpt[kpt_index[0]][1] - video_kpt[kpt_index[1]][1]]
camera_vec = [camera_kpt[kpt_index[0]][0] - camera_kpt[kpt_index[1]][0],
camera_kpt[kpt_index[0]][1] - camera_kpt[kpt_index[1]][1]]
video_vec = np.array(video_vec)
camera_vec = np.array(camera_vec)
cos_sim = video_vec.dot(camera_vec) / (np.linalg.norm(video_vec) * np.linalg.norm(camera_vec))
cosine.append(cos_sim)
if cosine[i] >= 0.95:
cv2.line(im,
(int(camera_kpt[kpt_index[0]][0]), int(camera_kpt[kpt_index[0]][1])),
(int(camera_kpt[kpt_index[1]][0]), int(camera_kpt[kpt_index[1]][1])),
color=[0, 255, 0],
thickness=2)
else:
cv2.line(im,
(int(camera_kpt[kpt_index[0]][0]), int(camera_kpt[kpt_index[0]][1])),
(int(camera_kpt[kpt_index[1]][0]), int(camera_kpt[kpt_index[1]][1])),
color=[0, 0, 255],
thickness=2)
attention_index.append(i)
text_w = 3
text_h = 1
line_inter = im.shape[0] / 40.
text_scale = max(0.5, im.shape[0] / 3000.)
for i in attention_index:
text_h += int(line_inter)
text_loc = (text_w, text_h)
cv2.putText(
im,
kpt_tuple_name_en[i],
text_loc,
cv2.FONT_ITALIC,
text_scale, (0, 255, 255),
thickness=1)
cv2.imshow("camera rec", im)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
if sum(cosine) > (0.95*9):
break
t = threading.Thread(target=camera_thread, args=())
t.setDaemon(True)
t.start()
while True:
video_player("taiji.mp4")