Skip to content

Commit

Permalink
ADD: MQTT source
Browse files Browse the repository at this point in the history
  • Loading branch information
marionleborgne committed Aug 27, 2018
1 parent c304b16 commit 2fe848e
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 39 deletions.
2 changes: 1 addition & 1 deletion examples/sink.mqtt.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"modules": [
{
"name": "MQTTSink",
"name": "MQTTConverterSink",
"package": "cloudbrain.modules.sinks.mqtt",
"options": {
"mqtt_routing_key": "YOUR_MQTT_ROUTING_KEY"
Expand Down
32 changes: 32 additions & 0 deletions examples/source.mqtt.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"modules": [
{
"name": "MQTTConverterSource",
"package": "cloudbrain.modules.sources.mqtt",
"options": {
"mqtt_routing_key": "YOUR_ROUTING_KEY",
"username": "YOUR_EMAIL",
"password": "YOUR_PASSWORD"
},
"publishers": [
{
"name": "PikaPublisher",
"package": "cloudbrain.publishers.rabbitmq",
"options": {
"rabbitmq_user": "YOUR_EMAIL",
"rabbitmq_pwd": "YOUR_PASSWORD"
},
"base_routing_key": "some_unique_key",
"metrics": [
{
"metric_name": "eeg",
"num_channels": 8,
"buffer_size": 10
}
]
}
],
"subscribers": []
}
]
}
14 changes: 8 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
# Get __version__ and set other constants.
# Source: https://stackoverflow.com/a/16084844
with open(os.path.join('src', 'cloudbrain', 'version.py'), 'r') as f:
exec(f.read())
exec (f.read())
URL = 'https://github.com/cloudbrain/cloudbrain'
DOWNLOAD_URL='%s/archive/%s.tar.gz' % (URL, __version__)
DOWNLOAD_URL = '%s/archive/%s.tar.gz' % (URL, __version__)
DESCRIPTION = open('README.rst').read()


# Helper function for requirements parsing by requirement type
def parse_reqs(req_type):
reqs_file = os.path.join('requirements', '%s.txt' % req_type)
install_reqs = parse_requirements(reqs_file, session=PipSession())
reqs = [str(ir.req) for ir in install_reqs]
return reqs


# Get requirements for all types
REQUIREMENT_TYPES = ['core', 'analytics', 'muse']
reqs = {req_type: parse_reqs(req_type) for req_type in REQUIREMENT_TYPES}
Expand All @@ -44,8 +46,8 @@ def parse_reqs(req_type):
'muse:python_version>="3"': reqs['muse'],
'analytics': reqs['analytics']
},
entry_points = {
'console_scripts':
['cloudbrain=cloudbrain.run:main']
}
entry_points={
'console_scripts':
['cloudbrain=cloudbrain.run:main']
}
)
52 changes: 27 additions & 25 deletions src/cloudbrain/modules/sinks/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,29 @@
_LOGGER = logging.getLogger(__name__)


def _convert_old_chunk(data_to_send, num_channels):
def _convert_old_chunk_to_new_chunk(old_chunk, num_channels):
"""
Convert old data chunk to chunk with new data model.
Convert old data chunk to new chunk (i.e. with new data model).
More on the new data model: https://github.com/NeuroJS/
eeg-stream-data-model/issues/1#issuecomment-309515243
:param data_to_send: (list of dict)
:param old_chunk: (list of dict)
Chunk of data with the old data model
:param num_channels: (int)
Number of channels of the input stream.
:return chunk: (dict)
:return new_chunk: (dict)
Chunk of data with the new data model.
"""
chunk = []
for data in data_to_send:
num_channels = len(data) - 1 # don't count the 'timestamp' key
sample = {
'timestamp': data['timestamp'],
'data': [data['channel_%s' % i] for i in range(num_channels)]
new_chunk = []
for old_sample in old_chunk:
num_channels = len(old_sample) - 1 # don't count the 'timestamp' key
new_sample = {
'timestamp': old_sample['timestamp'],
'data': [old_sample['channel_%s' % i] for i in range(num_channels)]
}
chunk.append(sample)
new_chunk.append(new_sample)

return {'chunk': chunk}
return {'chunk': new_chunk}


def _get_vhost(username, password):
Expand All @@ -53,18 +53,20 @@ def _setup_mqtt_channel(username, password, host, vhost, exchange):
return channel


MQTTPublisher = namedtuple('MQTTPublisher',
['host', 'username', 'password', 'vhost', 'channel'])
MQTTConnection = namedtuple('MQTTConnection',
['host', 'username', 'password', 'vhost',
'channel'])


class MQTTSink(ModuleInterface):
class MQTTConverterSink(ModuleInterface):
"""
This module subscribes to an AMQP stream, converts it into the
new data model and publishes it to a MQTT topic with a chosen routing key.
"""

def __init__(self, subscribers, publishers, mqtt_routing_key):

super(MQTTSink, self).__init__(subscribers, publishers)
super(MQTTConverterSink, self).__init__(subscribers, publishers)
_LOGGER.debug("Subscribers: %s" % self.subscribers)
_LOGGER.debug("Publishers: %s" % self.publishers)

Expand All @@ -84,18 +86,18 @@ def start(self):
vhost = _get_vhost(username, password)
channel = _setup_mqtt_channel(username, password,
host, vhost, self.exchange)
mqtt_publisher = MQTTPublisher(host, username,
password, vhost, channel)
callback = self.callback_factory(num_channels, mqtt_publisher)
mqtt_connection = MQTTConnection(host, username,
password, vhost, channel)
callback = self.callback_factory(num_channels, mqtt_connection)
subscriber.subscribe(metric_name, callback)

def callback_factory(self, num_channels, mqtt_publisher):
def callback_factory(self, num_channels, mqtt_connection):

def callback(unused_ch, unused_method, unused_properties, body):
data = json.loads(body)
chunk = _convert_old_chunk(data, num_channels)
mqtt_publisher.channel.basic_publish(exchange=self.exchange,
routing_key=self.routing_key,
body=json.dumps(chunk))
old_chunk = json.loads(body)
new_chunk = _convert_old_chunk_to_new_chunk(old_chunk, num_channels)
mqtt_connection.channel.basic_publish(exchange=self.exchange,
routing_key=self.routing_key,
body=json.dumps(new_chunk))

return callback
107 changes: 107 additions & 0 deletions src/cloudbrain/modules/sources/mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import json
import logging
import pika
from uuid import uuid4

from cloudbrain.core.config import get_config
from cloudbrain.core.auth import CloudbrainAuth
from cloudbrain.modules.interface import ModuleInterface

_LOGGER = logging.getLogger(__name__)


def _convert_new_chunk_to_old_chunk(new_chunk):
"""
Convert new chunk (i.e. with new data model) to old chunk.
More on the new data model: https://github.com/NeuroJS/
eeg-stream-data-model/issues/1#issuecomment-309515243
:param new_chunk: (dict)
Chunk of data with the new data model.
:return old_chunk: (list of dicts)
Chunk of data with the old data model
"""
old_chunk = []
for sample in new_chunk['chunk']:
old_sample = {'timestamp': sample['timestamp']}
for i in range(len(sample['data'])):
old_sample['channel_%s' % i] = sample['data'][i]
old_chunk.append(old_sample)
return old_chunk


def _get_vhost(username, password):
config = get_config()
auth = CloudbrainAuth(config['authUrl'])
return auth.get_vhost(username, password)


def _setup_mqtt_channel(username, password, host, vhost, exchange, routing_key,
queue_name):
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host,
credentials=credentials,
virtual_host=vhost))
channel = connection.channel()

channel.exchange_declare(exchange=exchange, exchange_type='topic',
durable=True)

result = channel.queue_declare(queue_name, exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=exchange,
queue=queue_name,
routing_key=routing_key)
return channel


class MQTTConverterSource(ModuleInterface):
"""
Subscribe to MQTT topic with given routing key, convert data chunks
from old data model to new, and publish on AMQP exchange.
"""

def __init__(self,
subscribers,
publishers,
mqtt_routing_key,
username,
password):

super(MQTTConverterSource, self).__init__(subscribers, publishers)
_LOGGER.debug("Subscribers: %s" % self.subscribers)
_LOGGER.debug("Publishers: %s" % self.publishers)

self.threads = []
self.mqtt_routing_key = mqtt_routing_key
self.username = username
self.password = password
self.exchange = 'amq.topic'
vhost = _get_vhost(username, password)
host = get_config()['rabbitHost']
self.queue_name = 'MQTTConverterSource-' + str(uuid4())
self.mqtt_channel = _setup_mqtt_channel(username, password, host, vhost,
self.exchange, mqtt_routing_key,
self.queue_name)

def start(self):

self.mqtt_channel.basic_consume(self.callback,
queue=self.queue_name,
exclusive=True,
no_ack=True)

self.mqtt_channel.start_consuming()

def callback(self, ch, method, properties, body):
new_chunk = json.loads(body)
old_chunk = _convert_new_chunk_to_old_chunk(new_chunk)

for publisher in self.publishers:
for metric_buffer in publisher.metric_buffers.values():
metric_name = metric_buffer.name
routing_key = "%s:%s" % (
publisher.base_routing_key, metric_name)
publisher._rabbitmq_publish(routing_key, old_chunk)
8 changes: 1 addition & 7 deletions src/cloudbrain/version.py
Original file line number Diff line number Diff line change
@@ -1,7 +1 @@
# Store the version here so:
# 1) we don't load dependencies by storing it in __init__.py
# 2) we can import it in setup.py for the same reason
# 3) we can import it into your module module
#
# Source: https://stackoverflow.com/a/16084844
__version__ = '0.0.18'
__version__ = '0.0.20'

0 comments on commit 2fe848e

Please sign in to comment.