diff --git a/zengine/management_commands.py b/zengine/management_commands.py index ee017898..1af0d76a 100644 --- a/zengine/management_commands.py +++ b/zengine/management_commands.py @@ -183,3 +183,20 @@ def run(self): else: worker = Worker() worker.run() + + +class PrepareMQ(Command): + """ + Creates necessary exchanges, queues and bindings + """ + CMD_NAME = 'preparemq' + HELP = 'Creates necessary exchanges, queues and bindings' + + def run(self): + from zengine.wf_daemon import run_workers, Worker + worker_count = int(self.manager.args.workers or 1) + if worker_count > 1: + run_workers(worker_count) + else: + worker = Worker() + worker.run() diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py new file mode 100644 index 00000000..a4de5db1 --- /dev/null +++ b/zengine/messaging/lib.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. +import pika + + +class BaseUser(object): + connection = None + channel = None + + + def _connect_mq(self): + if not self.connection is None or self.connection.is_closed: + self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) + self.channel = selfconnection.channel() + return self.channel + + + def send_message(self, title, message, sender=None, url=None, typ=1): + channel = self._connect_mq() + mq_msg = json.dumps(dict(sender=sender, body=message, msg_title=title, url=url, typ=typ)) + channel.basic_publish(exchange=self.key, body=mq_msg) diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index e3182c34..b3360528 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -18,27 +18,34 @@ UserModel = get_object_from_path(settings.USER_MODEL) + def get_mq_connection(): connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) channel = connection.channel() return connection, channel -CHANNEL_TYPES = ( - # (1, "Notification"), - (10, "System Broadcast"), - (15, "User Broadcast"), - (20, "Chat"), - (25, "Direct"), -) +# CHANNEL_TYPES = ( +# (1, "Notification"), + # (10, "System Broadcast"), + # (20, "Chat"), + # (25, "Direct"), +# ) class Channel(Model): + channel = None + connection = None + name = field.String("Name") code_name = field.String("Internal name") description = field.String("Description") owner = UserModel(reverse_name='created_channels') - typ = field.Integer("Type", choices=CHANNEL_TYPES) + # is this users private exchange + is_private = field.Boolean() + # is this a One-To-One channel + is_direct = field.Boolean() + # typ = field.Integer("Type", choices=CHANNEL_TYPES) class Managers(ListNode): user = UserModel(reverse_name='managed_channels') @@ -50,7 +57,8 @@ def add_message(self, body, title, sender=None, url=None, typ=2): Message(sender=sender, body=body, msg_title=title, url=url, typ=typ, channel=self).save() def _connect_mq(self): - self.connection, self.channel = get_mq_connection() + if not self.connection is None or self.connection.is_closed: + self.connection, self.channel = get_mq_connection() return self.channel def create_exchange(self): @@ -113,7 +121,7 @@ def __unicode__(self): (11, "Error"), (111, "Success"), (2, "Direct Message"), - (3, "Broadcast Message") + (3, "Broadcast Message"), (4, "Channel Message") ) MESSAGE_STATUS = ( @@ -130,7 +138,6 @@ class Message(Model): """ Permission model """ - typ = field.Integer("Type", choices=MSG_TYPES) status = field.Integer("Status", choices=MESSAGE_STATUS) msg_title = field.String("Title") @@ -172,7 +179,7 @@ def __unicode__(self): class Favorite(Model): """ - A model to store users favorited messages + A model to store users bookmarked messages """ channel = Channel() user = UserModel() diff --git a/zengine/tornado_server/queue_manager.py b/zengine/tornado_server/queue_manager.py index 982e3fce..a35427d3 100644 --- a/zengine/tornado_server/queue_manager.py +++ b/zengine/tornado_server/queue_manager.py @@ -66,7 +66,7 @@ def create_channel(self): def _send_message(self, sess_id, input_data): log.info("sending data for %s" % sess_id) - self.input_channel.basic_publish(exchange='tornado_input', + self.input_channel.basic_publish(exchange='input_exc', routing_key=sess_id, body=json_encode(input_data)) @@ -160,7 +160,7 @@ def on_conn_open(self, channel): Args: channel: input channel """ - self.in_channel.exchange_declare(exchange='tornado_input', type='topic') + self.in_channel.exchange_declare(exchange='input_exc', type='topic', durable=True) channel.queue_declare(callback=self.on_input_queue_declare, queue=self.INPUT_QUEUE_NAME) def on_input_queue_declare(self, queue): @@ -172,7 +172,7 @@ def on_input_queue_declare(self, queue): queue: input queue """ self.in_channel.queue_bind(callback=None, - exchange='tornado_input', + exchange='input_exc', queue=self.INPUT_QUEUE_NAME, routing_key="#") @@ -212,10 +212,12 @@ def _on_output_queue_decleration(queue): self.connection.channel(_on_output_channel_creation) def redirect_incoming_message(self, sess_id, message, request): - message = message[:-1] + ',"_zops_remote_ip":"%s"}' % request.remote_ip - self.in_channel.basic_publish(exchange='tornado_input', + message = json_decode(message) + message['_zops_sess_id'] = sess_id + message['_zops_remote_ip'] = request.remote_ip + self.in_channel.basic_publish(exchange='input_exc', routing_key=sess_id, - body=message) + body=json_encode(message)) def on_message(self, channel, method, header, body): sess_id = method.routing_key diff --git a/zengine/wf_daemon.py b/zengine/wf_daemon.py index 7f320eab..4002bda3 100755 --- a/zengine/wf_daemon.py +++ b/zengine/wf_daemon.py @@ -36,7 +36,7 @@ class Worker(object): Workflow runner worker object """ INPUT_QUEUE_NAME = 'in_queue' - INPUT_EXCHANGE = 'tornado_input' + INPUT_EXCHANGE = 'input_exc' def __init__(self): self.connect() @@ -62,7 +62,7 @@ def connect(self): self.client_queue = ClientQueue() self.input_channel = self.connection.channel() - self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE, type='topic') + self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE, type='topic', durable=True) self.input_channel.queue_declare(queue=self.INPUT_QUEUE_NAME) self.input_channel.queue_bind(exchange=self.INPUT_EXCHANGE, queue=self.INPUT_QUEUE_NAME) log.info("Bind to queue named '%s' queue with exchange '%s'" % (self.INPUT_QUEUE_NAME, self.INPUT_EXCHANGE))