From 2fe848ec4acd554585329a8e692ea46f5315bbb7 Mon Sep 17 00:00:00 2001 From: Marion Le Borgne Date: Sun, 8 Oct 2017 04:37:55 -0700 Subject: [PATCH] ADD: MQTT source --- examples/sink.mqtt.json | 2 +- examples/source.mqtt.json | 32 ++++++++ setup.py | 14 ++-- src/cloudbrain/modules/sinks/mqtt.py | 52 ++++++------ src/cloudbrain/modules/sources/mqtt.py | 107 +++++++++++++++++++++++++ src/cloudbrain/version.py | 8 +- 6 files changed, 176 insertions(+), 39 deletions(-) create mode 100644 examples/source.mqtt.json create mode 100644 src/cloudbrain/modules/sources/mqtt.py diff --git a/examples/sink.mqtt.json b/examples/sink.mqtt.json index a36c70d..02d3a96 100644 --- a/examples/sink.mqtt.json +++ b/examples/sink.mqtt.json @@ -1,7 +1,7 @@ { "modules": [ { - "name": "MQTTSink", + "name": "MQTTConverterSink", "package": "cloudbrain.modules.sinks.mqtt", "options": { "mqtt_routing_key": "YOUR_MQTT_ROUTING_KEY" diff --git a/examples/source.mqtt.json b/examples/source.mqtt.json new file mode 100644 index 0000000..28e807a --- /dev/null +++ b/examples/source.mqtt.json @@ -0,0 +1,32 @@ +{ + "modules": [ + { + "name": "MQTTConverterSource", + "package": "cloudbrain.modules.sources.mqtt", + "options": { + "mqtt_routing_key": "YOUR_ROUTING_KEY", + "username": "YOUR_EMAIL", + "password": "YOUR_PASSWORD" + }, + "publishers": [ + { + "name": "PikaPublisher", + "package": "cloudbrain.publishers.rabbitmq", + "options": { + "rabbitmq_user": "YOUR_EMAIL", + "rabbitmq_pwd": "YOUR_PASSWORD" + }, + "base_routing_key": "some_unique_key", + "metrics": [ + { + "metric_name": "eeg", + "num_channels": 8, + "buffer_size": 10 + } + ] + } + ], + "subscribers": [] + } + ] +} diff --git a/setup.py b/setup.py index db1fffb..c4204d5 100644 --- a/setup.py +++ b/setup.py @@ -6,11 +6,12 @@ # Get __version__ and set other constants. # Source: https://stackoverflow.com/a/16084844 with open(os.path.join('src', 'cloudbrain', 'version.py'), 'r') as f: - exec(f.read()) + exec (f.read()) URL = 'https://github.com/cloudbrain/cloudbrain' -DOWNLOAD_URL='%s/archive/%s.tar.gz' % (URL, __version__) +DOWNLOAD_URL = '%s/archive/%s.tar.gz' % (URL, __version__) DESCRIPTION = open('README.rst').read() + # Helper function for requirements parsing by requirement type def parse_reqs(req_type): reqs_file = os.path.join('requirements', '%s.txt' % req_type) @@ -18,6 +19,7 @@ def parse_reqs(req_type): reqs = [str(ir.req) for ir in install_reqs] return reqs + # Get requirements for all types REQUIREMENT_TYPES = ['core', 'analytics', 'muse'] reqs = {req_type: parse_reqs(req_type) for req_type in REQUIREMENT_TYPES} @@ -44,8 +46,8 @@ def parse_reqs(req_type): 'muse:python_version>="3"': reqs['muse'], 'analytics': reqs['analytics'] }, - entry_points = { - 'console_scripts': - ['cloudbrain=cloudbrain.run:main'] - } + entry_points={ + 'console_scripts': + ['cloudbrain=cloudbrain.run:main'] + } ) diff --git a/src/cloudbrain/modules/sinks/mqtt.py b/src/cloudbrain/modules/sinks/mqtt.py index 0ee962e..5b72a99 100644 --- a/src/cloudbrain/modules/sinks/mqtt.py +++ b/src/cloudbrain/modules/sinks/mqtt.py @@ -10,29 +10,29 @@ _LOGGER = logging.getLogger(__name__) -def _convert_old_chunk(data_to_send, num_channels): +def _convert_old_chunk_to_new_chunk(old_chunk, num_channels): """ - Convert old data chunk to chunk with new data model. + Convert old data chunk to new chunk (i.e. with new data model). More on the new data model: https://github.com/NeuroJS/ eeg-stream-data-model/issues/1#issuecomment-309515243 - :param data_to_send: (list of dict) + :param old_chunk: (list of dict) Chunk of data with the old data model :param num_channels: (int) Number of channels of the input stream. - :return chunk: (dict) + :return new_chunk: (dict) Chunk of data with the new data model. """ - chunk = [] - for data in data_to_send: - num_channels = len(data) - 1 # don't count the 'timestamp' key - sample = { - 'timestamp': data['timestamp'], - 'data': [data['channel_%s' % i] for i in range(num_channels)] + new_chunk = [] + for old_sample in old_chunk: + num_channels = len(old_sample) - 1 # don't count the 'timestamp' key + new_sample = { + 'timestamp': old_sample['timestamp'], + 'data': [old_sample['channel_%s' % i] for i in range(num_channels)] } - chunk.append(sample) + new_chunk.append(new_sample) - return {'chunk': chunk} + return {'chunk': new_chunk} def _get_vhost(username, password): @@ -53,18 +53,20 @@ def _setup_mqtt_channel(username, password, host, vhost, exchange): return channel -MQTTPublisher = namedtuple('MQTTPublisher', - ['host', 'username', 'password', 'vhost', 'channel']) +MQTTConnection = namedtuple('MQTTConnection', + ['host', 'username', 'password', 'vhost', + 'channel']) -class MQTTSink(ModuleInterface): +class MQTTConverterSink(ModuleInterface): """ This module subscribes to an AMQP stream, converts it into the new data model and publishes it to a MQTT topic with a chosen routing key. """ + def __init__(self, subscribers, publishers, mqtt_routing_key): - super(MQTTSink, self).__init__(subscribers, publishers) + super(MQTTConverterSink, self).__init__(subscribers, publishers) _LOGGER.debug("Subscribers: %s" % self.subscribers) _LOGGER.debug("Publishers: %s" % self.publishers) @@ -84,18 +86,18 @@ def start(self): vhost = _get_vhost(username, password) channel = _setup_mqtt_channel(username, password, host, vhost, self.exchange) - mqtt_publisher = MQTTPublisher(host, username, - password, vhost, channel) - callback = self.callback_factory(num_channels, mqtt_publisher) + mqtt_connection = MQTTConnection(host, username, + password, vhost, channel) + callback = self.callback_factory(num_channels, mqtt_connection) subscriber.subscribe(metric_name, callback) - def callback_factory(self, num_channels, mqtt_publisher): + def callback_factory(self, num_channels, mqtt_connection): def callback(unused_ch, unused_method, unused_properties, body): - data = json.loads(body) - chunk = _convert_old_chunk(data, num_channels) - mqtt_publisher.channel.basic_publish(exchange=self.exchange, - routing_key=self.routing_key, - body=json.dumps(chunk)) + old_chunk = json.loads(body) + new_chunk = _convert_old_chunk_to_new_chunk(old_chunk, num_channels) + mqtt_connection.channel.basic_publish(exchange=self.exchange, + routing_key=self.routing_key, + body=json.dumps(new_chunk)) return callback diff --git a/src/cloudbrain/modules/sources/mqtt.py b/src/cloudbrain/modules/sources/mqtt.py new file mode 100644 index 0000000..9117831 --- /dev/null +++ b/src/cloudbrain/modules/sources/mqtt.py @@ -0,0 +1,107 @@ +import json +import logging +import pika +from uuid import uuid4 + +from cloudbrain.core.config import get_config +from cloudbrain.core.auth import CloudbrainAuth +from cloudbrain.modules.interface import ModuleInterface + +_LOGGER = logging.getLogger(__name__) + + +def _convert_new_chunk_to_old_chunk(new_chunk): + """ + Convert new chunk (i.e. with new data model) to old chunk. + More on the new data model: https://github.com/NeuroJS/ + eeg-stream-data-model/issues/1#issuecomment-309515243 + + :param new_chunk: (dict) + Chunk of data with the new data model. + :return old_chunk: (list of dicts) + Chunk of data with the old data model + """ + old_chunk = [] + for sample in new_chunk['chunk']: + old_sample = {'timestamp': sample['timestamp']} + for i in range(len(sample['data'])): + old_sample['channel_%s' % i] = sample['data'][i] + old_chunk.append(old_sample) + return old_chunk + + +def _get_vhost(username, password): + config = get_config() + auth = CloudbrainAuth(config['authUrl']) + return auth.get_vhost(username, password) + + +def _setup_mqtt_channel(username, password, host, vhost, exchange, routing_key, + queue_name): + credentials = pika.PlainCredentials(username, password) + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=host, + credentials=credentials, + virtual_host=vhost)) + channel = connection.channel() + + channel.exchange_declare(exchange=exchange, exchange_type='topic', + durable=True) + + result = channel.queue_declare(queue_name, exclusive=True) + queue_name = result.method.queue + + channel.queue_bind(exchange=exchange, + queue=queue_name, + routing_key=routing_key) + return channel + + +class MQTTConverterSource(ModuleInterface): + """ + Subscribe to MQTT topic with given routing key, convert data chunks + from old data model to new, and publish on AMQP exchange. + """ + + def __init__(self, + subscribers, + publishers, + mqtt_routing_key, + username, + password): + + super(MQTTConverterSource, self).__init__(subscribers, publishers) + _LOGGER.debug("Subscribers: %s" % self.subscribers) + _LOGGER.debug("Publishers: %s" % self.publishers) + + self.threads = [] + self.mqtt_routing_key = mqtt_routing_key + self.username = username + self.password = password + self.exchange = 'amq.topic' + vhost = _get_vhost(username, password) + host = get_config()['rabbitHost'] + self.queue_name = 'MQTTConverterSource-' + str(uuid4()) + self.mqtt_channel = _setup_mqtt_channel(username, password, host, vhost, + self.exchange, mqtt_routing_key, + self.queue_name) + + def start(self): + + self.mqtt_channel.basic_consume(self.callback, + queue=self.queue_name, + exclusive=True, + no_ack=True) + + self.mqtt_channel.start_consuming() + + def callback(self, ch, method, properties, body): + new_chunk = json.loads(body) + old_chunk = _convert_new_chunk_to_old_chunk(new_chunk) + + for publisher in self.publishers: + for metric_buffer in publisher.metric_buffers.values(): + metric_name = metric_buffer.name + routing_key = "%s:%s" % ( + publisher.base_routing_key, metric_name) + publisher._rabbitmq_publish(routing_key, old_chunk) diff --git a/src/cloudbrain/version.py b/src/cloudbrain/version.py index 72d0bbe..3106bbe 100644 --- a/src/cloudbrain/version.py +++ b/src/cloudbrain/version.py @@ -1,7 +1 @@ -# Store the version here so: -# 1) we don't load dependencies by storing it in __init__.py -# 2) we can import it in setup.py for the same reason -# 3) we can import it into your module module -# -# Source: https://stackoverflow.com/a/16084844 -__version__ = '0.0.18' +__version__ = '0.0.20'