diff --git a/requirements/local_dev.txt b/requirements/local_dev.txt index e8fa70a4..ea222090 100644 --- a/requirements/local_dev.txt +++ b/requirements/local_dev.txt @@ -10,6 +10,5 @@ lazy_object_proxy enum34 werkzeug pytest -celery -werkzeug -pytest +pika +tornado diff --git a/zengine/client_queue.py b/zengine/client_queue.py index bf0d8536..fbbd4d8a 100644 --- a/zengine/client_queue.py +++ b/zengine/client_queue.py @@ -30,10 +30,10 @@ class ClientQueue(object): """ def __init__(self, user_id=None, sess_id=None): - self.user_id = user_id + # self.user_id = user_id self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) self.channel = self.connection.channel() - self.sess_id = sess_id + # self.sess_id = sess_id def close(self): self.channel.close() @@ -47,42 +47,45 @@ def get_channel(self): self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) self.channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS) return self.channel + # + # def get_sess_id(self): + # if not self.sess_id: + # self.sess_id = UserSessionID(self.user_id).get() + # return self.sess_id - def get_sess_id(self): - if not self.sess_id: - self.sess_id = UserSessionID(self.user_id).get() - return self.sess_id + def send_to_default_exchange(self, sess_id, message=None): + msg = json.dumps(message) + log.debug("Sending following message to %s queue through default exchange:\n%s" % ( + sess_id, msg)) + self.get_channel().publish(exchange='', routing_key=sess_id, body=msg) - def send_to_queue(self, message=None, json_message=None): - exchange = self.user_id or '' - log.debug("Sending following message to %s queue, \"%s\" exchange:\n%s " % ( - self.sess_id, exchange, json_message or json.dumps(message))) + def send_to_prv_exchange(self, user_id, message=None): + exchange = 'prv_%s' % user_id.lower() + msg = json.dumps(message) + log.debug("Sending following users \"%s\" exchange:\n%s " % (exchange, msg)) + self.get_channel().publish(exchange=exchange, routing_key='', body=msg) - self.get_channel().publish(exchange=exchange, - routing_key=self.sess_id, - body=json_message or json.dumps(message)) - - def old_to_new_queue(self, old_sess_id): - """ - Somehow if users old (obsolete) queue has - undelivered messages, we should redirect them to - current queue. - """ - old_input_channel = self.connection.channel() - while True: - try: - method_frame, header_frame, body = old_input_channel.basic_get(old_sess_id) - if method_frame: - self.send_to_queue(json_message=body) - old_input_channel.basic_ack(method_frame.delivery_tag) - else: - old_input_channel.queue_delete(old_sess_id) - old_input_channel.close() - break - except ChannelClosed as e: - if e[0] == 404: - break - # e => (404, "NOT_FOUND - no queue 'sess_id' in vhost '/'") - else: - raise - # old_input_channel = self.connection.channel() + # def old_to_new_queue(self, old_sess_id): + # """ + # Somehow if users old (obsolete) queue has + # undelivered messages, we should redirect them to + # current queue. + # """ + # old_input_channel = self.connection.channel() + # while True: + # try: + # method_frame, header_frame, body = old_input_channel.basic_get(old_sess_id) + # if method_frame: + # self.send_to_queue(json_message=body) + # old_input_channel.basic_ack(method_frame.delivery_tag) + # else: + # old_input_channel.queue_delete(old_sess_id) + # old_input_channel.close() + # break + # except ChannelClosed as e: + # if e[0] == 404: + # break + # # e => (404, "NOT_FOUND - no queue 'sess_id' in vhost '/'") + # else: + # raise + # # old_input_channel = self.connection.channel() diff --git a/zengine/lib/test_utils.py b/zengine/lib/test_utils.py index bcb24288..8fa2299f 100644 --- a/zengine/lib/test_utils.py +++ b/zengine/lib/test_utils.py @@ -126,7 +126,7 @@ def post(self, **data): self.token = self.response_wrapper.token return self.response_wrapper - def send_output(self, output, sessid): + def send_output(self, output): self.response_wrapper = ResponseWrapper(output) @@ -219,6 +219,7 @@ def _do_login(self): assert all([(field in req_fields) for field in ('username', 'password')]) resp = self.client.post(username=self.client.username or self.client.user.username, password="123", cmd="do") + log.debug("login result :\n%s" % resp.json) assert resp.json['cmd'] == 'upgrade' @staticmethod diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py index 8e3a97b6..3f397b60 100644 --- a/zengine/messaging/lib.py +++ b/zengine/messaging/lib.py @@ -14,7 +14,7 @@ from pyoko.conf import settings from zengine.client_queue import BLOCKING_MQ_PARAMS from zengine.lib.cache import Cache - +from zengine.log import log class ConnectionStatus(Cache): """ @@ -106,10 +106,16 @@ def get_role(self, role_id): def full_name(self): return self.username - @classmethod - def bind_private_channel(cls, sess_id): - mq_channel = self._connect_mq() - mq_channel.queue_bind(exchange='prv_%s' % self.key, queue=sess_id) + @property + def prv_exchange(self): + return 'prv_%s' % self.key.lower() + + def bind_private_channel(self, sess_id): + mq_channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS).channel() + mq_channel.queue_declare(queue=sess_id, arguments={'x-expires': 40000}) + log.debug("Binding private exchange to client queue: Q:%s --> E:%s" % (sess_id, + self.prv_exchange)) + mq_channel.queue_bind(exchange=self.prv_exchange, queue=sess_id) def send_notification(self, title, message, typ=1, url=None): """ @@ -124,7 +130,7 @@ def send_notification(self, title, message, typ=1, url=None): """ self.channel_set.channel.__class__.add_message( - channel_key='prv_%s' % self.key, + channel_key=self.prv_exchange, body=message, title=title, typ=typ, diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index f2e734af..b69c4cd9 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -113,7 +113,9 @@ def create_exchange(self): Needs to be defined only once. """ mq_channel = self._connect_mq() - mq_channel.exchange_declare(exchange=self.code_name, exchange_type='fanout', durable=True) + mq_channel.exchange_declare(exchange=self.code_name, + exchange_type='fanout', + durable=True) def pre_creation(self): if not self.code_name: @@ -122,7 +124,7 @@ def pre_creation(self): self.key = self.code_name return if self.owner and self.is_private: - self.code_name = "prv_%s" % to_safe_str(self.owner.key) + self.code_name = self.owner.prv_exchange self.key = self.code_name return raise IntegrityError('Non-private and non-direct channels should have a "name".') @@ -161,12 +163,16 @@ def unread_count(self): def create_exchange(self): """ Creates user's private exchange - Actually needed to be defined only once. - but since we don't know if it's exists or not - we always call it before binding it to related channel + + Actually user's private channel needed to be defined only once, + and this should be happened when user first created. + But since this has a little performance cost, + to be safe we always call it before binding to the channel we currently subscribe """ channel = self._connect_mq() - channel.exchange_declare(exchange=self.user.key, exchange_type='fanout', durable=True) + channel.exchange_declare(exchange='prv_%s' % self.user.key.lower(), + exchange_type='fanout', + durable=True) @classmethod def mark_seen(cls, key, datetime_str): diff --git a/zengine/tornado_server/get_logger.py b/zengine/tornado_server/get_logger.py index 8fa1aa61..06e8cca8 100644 --- a/zengine/tornado_server/get_logger.py +++ b/zengine/tornado_server/get_logger.py @@ -23,8 +23,13 @@ def get_logger(settings): # ch.setLevel(logging.DEBUG) # create formatter - formatter = logging.Formatter( - '%(asctime)s - %(process)d - %(pathname)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s') + if settings.DEBUG: + # make log messages concise and readble for developemnt + format_str = '%(created)d - %(filename)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s' + else: + format_str = '%(asctime)s - %(process)d - %(pathname)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s' + + formatter = logging.Formatter(format_str) # add formatter to ch ch.setFormatter(formatter) diff --git a/zengine/tornado_server/queue_manager.py b/zengine/tornado_server/queue_manager.py index 841a5736..39b4bd05 100644 --- a/zengine/tornado_server/queue_manager.py +++ b/zengine/tornado_server/queue_manager.py @@ -9,8 +9,8 @@ import json from uuid import uuid4 -import os - +import os, sys +sys.sessid_to_userid = {} import pika import time from pika.adapters import TornadoConnection, BaseConnection @@ -30,6 +30,7 @@ 'MQ_PORT': int(os.environ.get('MQ_PORT', '5672')), 'MQ_USER': os.environ.get('MQ_USER', 'guest'), 'MQ_PASS': os.environ.get('MQ_PASS', 'guest'), + 'DEBUG': os.environ.get('DEBUG', False), 'MQ_VHOST': os.environ.get('MQ_VHOST', '/'), }) log = get_logger(settings) @@ -70,9 +71,15 @@ def _send_message(self, sess_id, input_data): routing_key=sess_id, body=json_encode(input_data)) + def _store_user_id(self, sess_id, body): + sys.sessid_to_userid[sess_id[5:]] = json_decode(body)['user_id'] + def _wait_for_reply(self, sess_id, input_data): channel = self.create_channel() - channel.queue_declare(queue=sess_id, auto_delete=True) + channel.queue_declare(queue=sess_id, + arguments={'x-expires': 4000} + # auto_delete=True + ) timeout_start = time.time() while 1: method_frame, header_frame, body = channel.basic_get(sess_id) @@ -83,6 +90,8 @@ def _wait_for_reply(self, sess_id, input_data): channel.basic_ack(method_frame.delivery_tag) channel.close() log.info('Returned view message for %s: %s' % (sess_id, body)) + if 'upgrade' in body: + self._store_user_id(sess_id, body) return body else: if time.time() - json_decode(body)['reply_timestamp'] > self.REPLY_TIMEOUT: @@ -184,6 +193,7 @@ def register_websocket(self, sess_id, ws): sess_id: ws: """ + user_id = sys.sessid_to_userid[sess_id] self.websockets[sess_id] = ws channel = self.create_out_channel(sess_id) @@ -192,8 +202,8 @@ def inform_disconnection(self, sess_id): routing_key=sess_id, body=json_encode(dict(data={ 'view': 'mark_offline_user', - 'sess_id': sess_id - }))) + 'sess_id': sess_id,}, + _zops_remote_ip=''))) def unregister_websocket(self, sess_id): try: @@ -211,16 +221,19 @@ def create_out_channel(self, sess_id): def _on_output_channel_creation(channel): def _on_output_queue_decleration(queue): channel.basic_consume(self.on_message, queue=sess_id) - + log.debug("BIND QUEUE TO WS Q.%s on Ch.%s" % (sess_id, channel.consumer_tags[0])) self.out_channels[sess_id] = channel + channel.queue_declare(callback=_on_output_queue_decleration, queue=sess_id, + arguments={'x-expires': 40000}, # auto_delete=True, # exclusive=True ) self.connection.channel(_on_output_channel_creation) + def redirect_incoming_message(self, sess_id, message, request): message = json_decode(message) message['_zops_sess_id'] = sess_id @@ -231,9 +244,10 @@ def redirect_incoming_message(self, sess_id, message, request): def on_message(self, channel, method, header, body): sess_id = method.routing_key + log.debug("WS RPLY for %s: %s" % (sess_id, body)) if sess_id in self.websockets: self.websockets[sess_id].write_message(body) - log.debug("WS RPLY for %s: %s" % (sess_id, body)) + channel.basic_ack(delivery_tag=method.delivery_tag) # else: # channel.basic_reject(delivery_tag=method.delivery_tag) diff --git a/zengine/views/auth.py b/zengine/views/auth.py index f84c9f8d..d8571fdf 100644 --- a/zengine/views/auth.py +++ b/zengine/views/auth.py @@ -56,6 +56,15 @@ class Login(SimpleView): def _user_is_online(self): self.current.user.is_online(True) + def _do_upgrade(self): + """ open websocket connection """ + self.current.output['cmd'] = 'upgrade' + self.current.output['user_id'] = self.current.user_id + self.current.user.is_online(True) + self.current.user.bind_private_channel(self.current.session.sess_id) + user_sess = UserSessionID(self.current.user_id) + user_sess.set(self.current.session.sess_id) + def do_view(self): """ Authenticate user with given credentials. @@ -64,7 +73,7 @@ def do_view(self): self.current.output['login_process'] = True self.current.task_data['login_successful'] = False if self.current.is_auth: - self.current.output['cmd'] = 'upgrade' + self._do_upgrade() else: try: auth_result = self.current.auth.authenticate( @@ -72,16 +81,13 @@ def do_view(self): self.current.input['password']) self.current.task_data['login_successful'] = auth_result if auth_result: - self.current.user.is_online(True) - self.current.user.bind_private_channel(self.current.session.sess_id) - user_sess = UserSessionID(self.current.user_id) - old_sess_id = user_sess.get() - user_sess.set(self.current.session.sess_id) - notify = Notify(self.current.user_id) - notify.cache_to_queue() - if old_sess_id: - notify.old_to_new_queue(old_sess_id) - self.current.output['cmd'] = 'upgrade' + self._do_upgrade() + + # old_sess_id = user_sess.get() + # notify = Notify(self.current.user_id) + # notify.cache_to_queue() + # if old_sess_id: + # notify.old_to_new_queue(old_sess_id) except: raise self.current.log.exception("Wrong username or another error occurred") @@ -96,7 +102,6 @@ def show_view(self): """ self.current.output['login_process'] = True if self.current.is_auth: - self.current.output['cmd'] = 'upgrade' + self._do_upgrade() else: - self.current.output['forms'] = LoginForm(current=self.current).serialize() diff --git a/zengine/wf_daemon.py b/zengine/wf_daemon.py index 4ddd5694..215b1271 100755 --- a/zengine/wf_daemon.py +++ b/zengine/wf_daemon.py @@ -132,7 +132,7 @@ def handle_message(self, ch, method, properties, body): """ input = {} try: - sessid = method.routing_key + self.sessid = method.routing_key input = json_decode(body) data = input['data'] @@ -144,9 +144,9 @@ def handle_message(self, ch, method, properties, body): data['view'] = data['path'] else: data['wf'] = data['path'] - session = Session(sessid[5:]) # clip "HTTP_" prefix from sessid + session = Session(self.sessid[5:]) # clip "HTTP_" prefix from sessid else: - session = Session(sessid) + session = Session(self.sessid) headers = {'remote_ip': input['_zops_remote_ip']} @@ -169,16 +169,19 @@ def handle_message(self, ch, method, properties, body): log.exception("Worker error occurred with messsage body:\n%s" % body) if 'callbackID' in input: output['callbackID'] = input['callbackID'] - log.info("OUTPUT for %s: %s" % (sessid, output)) + log.info("OUTPUT for %s: %s" % (self.sessid, output)) output['reply_timestamp'] = time() - self.send_output(output, sessid) - - def send_output(self, output, sessid): - self.client_queue.sess_id = sessid - # TODO: This is ugly - if 'login_process' not in output: - self.client_queue.user_id = self.current.user_id - self.client_queue.send_to_queue(output) + self.send_output(output) + + def send_output(self, output): + # TODO: This is ugly, we should separate login process + log.debug("SEND_OUTPUT: %s" % output) + if self.current.user_id is None or 'login_process' in output: + self.client_queue.send_to_default_exchange(self.sessid, output) + else: + self.client_queue.send_to_prv_exchange(self.current.user_id, output) + + def run_workers(no_subprocess):