Skip to content

Commit

Permalink
Merge pull request #2 from OpenVisualCloud/master
Browse files Browse the repository at this point in the history
Merge from OpenVisualCloud
  • Loading branch information
dahanhan authored May 1, 2020
2 parents 8f4f977 + 6859dd3 commit 5778286
Show file tree
Hide file tree
Showing 33 changed files with 278 additions and 339 deletions.
99 changes: 31 additions & 68 deletions analytics/common/runva.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,76 +31,39 @@ def stop(self):
GLib.timeout_add(10,self._noop)
self._stop=True

def loop(self, sensor, location, uri, topic, algorithm, algorithmName, resolution={}, zonemap=[]):
if algorithmName=="crowd-counting":
pid,msg=PipelineManager.create_instance(self._pipeline,self._version,{
"source": {
"uri": uri,
"type":"uri"
},
"destination": {
"type": "mqtt",
"host": mqtthost,
"clientid": algorithm,
"topic": topic
},
"tags": {
"sensor": sensor,
"location": location,
"algorithm": algorithm,
"office": {
"lat": office[0],
"lon": office[1],
},
},
"destination": {
"type": "mqtt",
"host": mqtthost,
"clientid": algorithm,
"topic": topic,
},
"parameters": {
"crowd_count": {
"width": resolution["width"],
"height": resolution["height"],
"zonemap": zonemap
},
"every-nth-frame": every_nth_frame,
"recording_prefix": "/tmp/" + sensor,
}
})
else:
pid,msg=PipelineManager.create_instance(self._pipeline,self._version,{
"source": {
"uri": uri,
"type":"uri"
},
"destination": {
"type": "mqtt",
"host": mqtthost,
"clientid": algorithm,
"topic": topic
},
"tags": {
"sensor": sensor,
"location": location,
"algorithm": algorithm,
"office": {
"lat": office[0],
"lon": office[1],
},
},
"destination": {
"type": "mqtt",
"host": mqtthost,
"clientid": algorithm,
"topic": topic,
def loop(self, sensor, location, uri, algorithm, algorithmName, resolution={"width":0,"height":0}, zonemap=[], topic="analytics"):

pid,msg=PipelineManager.create_instance(self._pipeline,self._version,{
"source": {
"uri": uri,
"type":"uri"
},
"destination": {
"type": "mqtt",
"host": mqtthost,
"clientid": algorithm,
"topic": topic,
},
"tags": {
"sensor": sensor,
"location": location,
"algorithm": algorithm,
"office": {
"lat": office[0],
"lon": office[1],
},
"parameters": {
"every-nth-frame": every_nth_frame,
"recording_prefix": "/tmp/" + sensor,
},
"parameters": {
"crowd_count": { # crowd-counting only
"width": resolution["width"],
"height": resolution["height"],
"zonemap": zonemap
},
})
"every-nth-frame": every_nth_frame,
"recording_prefix": "/tmp/" + sensor,
},
})

if pid is None:
print("Exception: "+str(msg), flush=True)
return
Expand Down
10 changes: 10 additions & 0 deletions analytics/crowd/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@

DIR=$(dirname $(readlink -f "$0"))
SCENARIO="${2:-stadium}"
FRAMEWORK="${6:-gst}"

case "$FRAMEWORK" in
gst)
;;
*)
echo "Not Implemented"
exit -1
;;
esac

case "$SCENARIO" in
*stadium*)
. "$DIR/../../script/build.sh"
Expand Down
10 changes: 2 additions & 8 deletions analytics/crowd/count-crowd.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from db_query import DBQuery
from signal import signal, SIGTERM
from concurrent.futures import ThreadPoolExecutor
from mqtt2db import MQTT2DB
from rec2db import Rec2DB
from runva import RunVA
import os
Expand All @@ -19,30 +18,25 @@
dbhost = os.environ["DBHOST"]
every_nth_frame = int(os.environ["EVERY_NTH_FRAME"])

mqtt2db=None
rec2db=None
runva=None
stop=False

def connect(sensor, location, uri, algorithm, algorithmName, resolution, zonemap):
global mqtt2db, rec2db, runva
global rec2db, runva

try:
mqtt2db=MQTT2DB(algorithm) # this waits for mqtt
rec2db=Rec2DB(sensor)
runva=RunVA("crowd_counting")

topic=str(uuid.uuid4()) # topic must be different as camera may reconnect
with ThreadPoolExecutor(2) as e:
e.submit(mqtt2db.loop, topic)
e.submit(rec2db.loop)

# any VA exit indicates a camera disconnect
with ThreadPoolExecutor(1) as e1:
e1.submit(runva.loop, sensor, location, uri, topic, algorithm, algorithmName, resolution, zonemap)
e1.submit(runva.loop, sensor, location, uri, algorithm, algorithmName, resolution, zonemap)

if not stop:
mqtt2db.stop()
rec2db.stop()
raise Exception("VA exited. This should not happen.")

Expand Down
78 changes: 0 additions & 78 deletions analytics/crowd/mqtt2db.py

This file was deleted.

4 changes: 2 additions & 2 deletions analytics/entrance/VCAC-A/gst/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ RUN apt-get update -qq && apt-get install -qq python3-paho-mqtt python3-ply pyt
COPY --from=smtc_common /home/*.py /home/
COPY *.py /home/
COPY models /home/models
COPY VCAC-A/gst/pipeline /home/pipelines/people_counting
COPY VCAC-A/gst/pipeline /home/pipelines/entrance_counting
COPY custom_transforms /home/custom_transforms
CMD ["/home/count-people.py"]
CMD ["/home/count-entrance.py"]
ENV PATH=${PATH}:/home/custom_transforms

####
Expand Down
4 changes: 2 additions & 2 deletions analytics/entrance/Xeon/gst/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ RUN apt-get update -qq && apt-get install -qq python3-paho-mqtt python3-ply pyt
COPY --from=smtc_common /home/*.py /home/
COPY *.py /home/
COPY models /home/models
COPY Xeon/gst/pipeline /home/pipelines/people_counting
COPY Xeon/gst/pipeline /home/pipelines/entrance_counting
COPY custom_transforms /home/custom_transforms
CMD ["/home/count-people.py"]
CMD ["/home/count-entrance.py"]
ENV PATH=${PATH}:/home/custom_transforms

####
Expand Down
10 changes: 10 additions & 0 deletions analytics/entrance/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

DIR=$(dirname $(readlink -f "$0"))
SCENARIO="${2:-stadium}"
FRAMEWORK="${6:-gst}"

case "$FRAMEWORK" in
gst)
;;
*)
echo "Not Implemented"
exit -1
;;
esac

case "$SCENARIO" in
*stadium*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from db_query import DBQuery
from signal import signal, SIGTERM
from concurrent.futures import ThreadPoolExecutor
from mqtt2db import MQTT2DB
from rec2db import Rec2DB
from runva import RunVA
import os
Expand All @@ -15,30 +14,25 @@
dbhost = os.environ["DBHOST"]
every_nth_frame = int(os.environ["EVERY_NTH_FRAME"])

mqtt2db=None
rec2db=None
runva=None
stop=False

def connect(sensor, location, uri, algorithm, algorithmName):
global mqtt2db, rec2db, runva
global rec2db, runva

try:
mqtt2db=MQTT2DB(algorithm) # this waits for mqtt
rec2db=Rec2DB(sensor)
runva=RunVA("people_counting")
runva=RunVA("entrance_counting")

topic=str(uuid.uuid4()) # topic must be different as camera may reconnect
with ThreadPoolExecutor(2) as e:
e.submit(mqtt2db.loop, topic)
e.submit(rec2db.loop)

# any VA exit indicates a camera disconnect
with ThreadPoolExecutor(1) as e1:
e1.submit(runva.loop, sensor, location, uri, topic, algorithm, algorithmName)
e1.submit(runva.loop, sensor, location, uri, algorithm, algorithmName)

if not stop:
mqtt2db.stop()
rec2db.stop()
raise Exception("VA exited. This should not happen.")

Expand All @@ -58,7 +52,7 @@ def quit_service(signum, sigframe):
while not stop:
try:
algorithm=dba.ingest({
"name": "people-counting",
"name": "entrance-counting",
"office": {
"lat": office[0],
"lon": office[1],
Expand All @@ -68,14 +62,14 @@ def quit_service(signum, sigframe):
})["_id"]
break
except Exception as e:
print("Exception in count-people register algorithm: "+str(e), flush=True)
print("Exception in count-entrance register algorithm: "+str(e), flush=True)
time.sleep(10)

# compete for a sensor connection
while not stop:
try:
print("Searching...", flush=True)
for sensor in dbs.search("sensor:'camera' and status:'idle' and algorithm='people-counting' and office:["+str(office[0])+","+str(office[1])+"]"):
for sensor in dbs.search("sensor:'camera' and status:'idle' and algorithm='entrance-counting' and office:["+str(office[0])+","+str(office[1])+"]"):
try:
# compete (with other va instances) for a sensor
r=dbs.update(sensor["_id"],{"status":"streaming"},seq_no=sensor["_seq_no"],primary_term=sensor["_primary_term"])
Expand All @@ -89,10 +83,10 @@ def quit_service(signum, sigframe):
if stop: break

except Exception as e:
print("Exception in count-people search sensor: "+str(e), flush=True)
print("Exception in count-entrance search sensor: "+str(e), flush=True)

except Exception as e:
print("Exception in count-people sensor connection: "+str(e), flush=True)
print("Exception in count-entrance sensor connection: "+str(e), flush=True)

time.sleep(10)

Expand Down
Loading

0 comments on commit 5778286

Please sign in to comment.