-
Notifications
You must be signed in to change notification settings - Fork 1
/
streaming_http.py
70 lines (57 loc) · 1.75 KB
/
streaming_http.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import cv2
import uvicorn
import numpy as np
from fastapi import FastAPI, Response, Request
from fastapi.responses import StreamingResponse
import sys
app = FastAPI()
camera = cv2.VideoCapture(0)
streaming = False
def generate_frames():
while streaming:
success, frame = camera.read()
global img
img = frame.copy()
if not success:
break
else:
ret, buffer = cv2.imencode('.jpg', frame)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
# yield img
@app.get('/')
async def index():
return {"message": "Welcome to the webcam streaming application!"}
@app.get('/video_feed')
async def video_feed():
global streaming
streaming = True
return StreamingResponse(generate_frames(), media_type='multipart/x-mixed-replace; boundary=frame')
@app.get('/stop_stream')
async def stop_stream():
global streaming, camera
streaming = False
if camera:
camera.release()
camera = None
return {"message": "Streaming stopped successfully."}
@app.get('/restart_stream')
async def restart_stream():
global streaming
streaming = True
return {"message": "Streaming restarted successfully."}
@app.get('/save_frame')
async def save_frame(filename:str):
cv2.imwrite(filename, img)
####demo using @app.post to receive request from client(web browser)
@app.post('/demo')
async def demo_getvalue(value:str):
global demo_value
demo_value = value
return demo_value
@app.get('/rundemo')
async def rundemo():
return {"message": "the value was received as value = {}".format(demo_value)}
if __name__ == '__main__':
uvicorn.run(app, host='localhost', port=8000)