-
Notifications
You must be signed in to change notification settings - Fork 4
/
run.py
161 lines (125 loc) · 6.31 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# Fluid sim demo prototype
import numpy as np
import cv2
from numba import jit
from util.Camera import Camera
from util.FPS_counter import FPS_counter
from pynput import keyboard
import util.Options as Options
# from configuration import Sim
import configuration
# Using a queue to decouple key press from update action
pressed_keys = []
def on_release(key):
if hasattr(key, 'char'):
pressed_keys.append(key.char)
cv2.startWindowThread()
cv2.namedWindow("window", flags=cv2.WND_PROP_FULLSCREEN)
camera = Camera(camera_index=configuration.camera_index, no_cam_allowed=True)
# initialise run-time configurable Options
# (display name, keys, range, step, initial value)
fullscreen = Options.Cycle('Fullscreen', 'f', ['Window', 'Fullscreen'], configuration.fullscreen)
mirror_screen = Options.Cycle('Mirror Screen', 'g', ['Normal', 'Horizontal', 'Verticle', 'Both'], configuration.mirror_screen)
render_mask = Options.Cycle('Render Mask', 'm', ['false', 'true'], configuration.render_mask)
bg_mode = Options.Cycle('BG', 'b', ['white', 'black', 'hue', 'bg subtract'], configuration.bg_mode)
mask_level = Options.Range('Mask Threshold', ['1','2'], [0, 1], 0.03, configuration.mask_level)
mask_width = Options.Range('Mask Width', ['3','4'], [0, 0.5], 0.01, configuration.mask_width)
optical_flow = Options.Cycle('Optical Flow', 'o', ['false', 'true'], configuration.optical_flow)
sim_res_multiplier = Options.Range('Sim Res', ['9','0'], [0.1, 2.0], 0.1, configuration.sim_res_multiplier)
flow_speed = Options.Range('Flow Speed', ['-','='], [0.02, 1], 0.02, configuration.flow_speed)
flow_direction = Options.Cycle('Flow Direction', 'p', ['right', 'down', 'left', 'up'], configuration.flow_direction)
num_smoke_streams = Options.Range('Smoke Streams', ['[',']'], [1, 50], 1, configuration.num_smoke_streams)
smoke_percentage = Options.Range('Smoke Amount', ['\'','#'], [0.1, 1], 0.1, configuration.smoke_percentage)
debugMode = Options.Cycle('Mode', 'd', ['Normal', 'Debug'], 0)
# add to a list to update and display
options = [fullscreen, mirror_screen, render_mask,
bg_mode, mask_level, mask_width, optical_flow,
sim_res_multiplier, flow_speed, flow_direction, num_smoke_streams, smoke_percentage,
debugMode]
def run_sim():
fps = FPS_counter(limit=30)
display_counter = 0 # display values for a short time if they change
key_listner = keyboard.Listener(on_press=None, on_release=on_release)
key_listner.start()
run = True
while(run):
fps.update()
if display_counter > 0:
display_counter -= fps.last_dt
# Always true on first iteration. Sub-sample the webcam image to fit the
# fluid sim resolution. Update when option changes.
if sim_res_multiplier.get_has_changed(reset_change_flag=True):
sim = configuration.Sim(camera.shape, sim_res_multiplier.current, 0, 0)
sim.mode = debugMode.current
flow = np.zeros((sim.shape[1], sim.shape[0], 2))
# Always True on first iteration. Update fullscreen if option changed
if fullscreen.get_has_changed(reset_change_flag=True):
if fullscreen.current:
cv2.setWindowProperty("window", cv2.WND_PROP_FULLSCREEN, 1)
else:
cv2.setWindowProperty("window", cv2.WND_PROP_FULLSCREEN, 0)
if debugMode.get_has_changed(reset_change_flag=True):
sim.mode = debugMode.current
# if flow direction changes, reset, else things get messy
# if flow_direction.get_has_changed(reset_change_flag=True):
# sim.reset()
# camera.reset()
# update input image
camera.update(bg_mode.current, mirror_screen.current,
mask_level.current, mask_width.current)
if optical_flow.current == 1:
flow[:] = cv2.calcOpticalFlowFarneback(cv2.resize(camera.last_grey, sim.shape),
cv2.resize(camera.current_grey, sim.shape),
float(0),
pyr_scale=0.5, levels=3, winsize=15, iterations=3,
poly_n=5, poly_sigma=1.0, flags=0)
scale = np.sqrt(sim._dx[0] * sim._dx[1]) / fps.last_dt
flow *= scale
else:
flow[:] = 0
sim.set_velocity(fps.last_dt, flow_speed.current,
flow_direction.current, num_smoke_streams.current,
smoke_percentage.current)
# copy the webcam generated mask into the boundary
boundary = np.array(cv2.resize(camera.mask, sim.shape).T, dtype=bool)
sim.set_boundary(boundary, flow.T, flow_direction.current)
# update and render the sim
sim.udpate(fps.last_dt)
output = sim.render(camera, render_mask_velocity=(optical_flow.current==1), render_mask=(render_mask.current == 1))
output_shape = np.shape(output)
# add the GUI
text_color = (0, 0, 255) if bg_mode.current == 0 else (255, 255, 0)
if debugMode.current == 0 and display_counter <= 0:
cv2.putText(output, 'd=Debug Mode', (30,output_shape[0] - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color)
else:
pos = np.array((30,30))
for option in options:
cv2.putText(output, str(option), tuple(pos), cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color)
pos = pos + [0,20]
cv2.putText(output, str(fps), (output_shape[1] - 80,output_shape[0] - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color)
cv2.putText(output, 'q=Quit, r=Reset', (30,output_shape[0] - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.4, text_color)
# render the output
cv2.imshow('window', output)
for key in pressed_keys:
# update the options (poll for input, cycle)
for option in options:
if option.update(key, fps.last_dt):
display_counter = 5
# poll for quit, reset
if key == 'q':
run = False
elif key == 'r':
sim.reset()
camera.reset()
pressed_keys[:] = []
key_listner.stop()
if(camera.active):
run_sim()
else:
print("ERROR: Couldn't capture frame. Is Webcam available/enabled?")
# close the window
cv2.destroyAllWindows()
# weirdly, this helps
cv2.waitKey(1)
cv2.waitKey(1)
cv2.waitKey(1)