Skip to content

Commit

Permalink
Add handling of multiple device ids and device names in addition to m…
Browse files Browse the repository at this point in the history
…etrics
  • Loading branch information
William Wnekowicz authored and marionleborgne committed Aug 28, 2018
1 parent d829328 commit bd64a05
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions cloudbrain/rt_server/rt_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import logging
import argparse

from collections import defaultdict
from sockjs.tornado.conn import SockJSConnection
from sockjs.tornado import SockJSRouter
from tornado.ioloop import PeriodicCallback, IOLoop
from tornado.web import RequestHandler, Application

SERVER_PORT = 31415
logging.getLogger().setLevel(logging.INFO)
recursivedict = lambda: defaultdict(recursivedict)


def get_args_parser():
Expand Down Expand Up @@ -41,17 +43,19 @@ class RtStreamConnection(SockJSConnection):

def __init__(self, session):
super(self.__class__, self).__init__(session)
self.subscribers = {}
self.subscribers = recursivedict()


def send_probe_factory(self, metric_name):
def send_probe_factory(self, device_id, device_name, metric):

def send_probe(body):
logging.debug("GOT: " + body)
buffer_content = json.loads(body)

for record in buffer_content:
record["metric"] = metric_name
record["device_id"] = device_id
record["device_name"] = device_name
record["metric"] = metric
self.send(json.dumps(record))

return send_probe
Expand Down Expand Up @@ -86,29 +90,36 @@ def handle_channel_subscription(self, stream_configuration):
device_id = stream_configuration['deviceId']
metric = stream_configuration['metric']

if metric not in self.subscribers:
self.subscribers[metric] = TornadoSubscriber(
callback=self.send_probe_factory(metric),
if not self.metric_exists(device_id, device_name, metric):
self.subscribers[device_id][device_name][metric] = TornadoSubscriber(
callback=self.send_probe_factory(device_id, device_name, metric),
device_name=device_name,
device_id=device_id,
rabbitmq_address=RABBITMQ_ADDRESS,
metric_name=metric)

self.subscribers[metric].connect()
self.subscribers[device_id][device_name][metric].connect()

def handle_channel_unsubscription(self, unsubscription_msg):
logging.info("Unsubscription received for %s"
% unsubscription_msg['metric'])
if unsubscription_msg['metric'] in self.subscribers:
self.subscribers[unsubscription_msg['metric']].disconnect()
device_name = unsubscription_msg['deviceName']
device_id = unsubscription_msg['deviceId']
metric = unsubscription_msg['metric']

logging.info("Unsubscription received for device_id: %s, device_name: %s, metric: %s"
% (device_id, device_name, metric))
if self.metric_exists(device_id, device_name, metric):
self.subscribers[device_id][device_name][metric].disconnect()

def on_close(self):
logging.info("Disconnecting client...")
for metric in self.subscribers.keys():
subscriber = self.subscribers[metric]
if subscriber is not None:
logging.info("Disconnecting subscriber for metric: %s" % metric)
subscriber.disconnect()
for device_id in self.subscribers:
for device_name in self.subscribers[device_id]:
for metric in self.subscribers[device_id][device_name]:
subscriber = self.subscribers[device_id][device_name][metric]
if subscriber is not None:
logging.info("Disconnecting subscriber for device_id: %s, device_name: %s, metric: %s"
% (device_id, device_name, metric))
subscriber.disconnect()

self.subscribers = {}
#self.timeout.stop()
Expand All @@ -118,6 +129,10 @@ def on_close(self):
def send_heartbeat(self):
self.broadcast(self.clients, 'message')

def metric_exists(self, device_id, device_name, metric):
return (self.subscribers.has_key(device_id)
and self.subscribers[device_id].has_key(device_name)
and self.subscribers[device_id][device_name].has_key(metric))

class MockHandler(RequestHandler):
"""Just a mock page to test it out..."""
Expand Down

0 comments on commit bd64a05

Please sign in to comment.