diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 00000000..d6b86778 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,263 @@ +@ECHO OFF + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set BUILDDIR=_build +set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% . +set I18NSPHINXOPTS=%SPHINXOPTS% . +if NOT "%PAPER%" == "" ( + set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS% + set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS% +) + +if "%1" == "" goto help + +if "%1" == "help" ( + :help + echo.Please use `make ^` where ^ is one of + echo. html to make standalone HTML files + echo. dirhtml to make HTML files named index.html in directories + echo. singlehtml to make a single large HTML file + echo. pickle to make pickle files + echo. json to make JSON files + echo. htmlhelp to make HTML files and a HTML help project + echo. qthelp to make HTML files and a qthelp project + echo. devhelp to make HTML files and a Devhelp project + echo. epub to make an epub + echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter + echo. text to make text files + echo. man to make manual pages + echo. texinfo to make Texinfo files + echo. gettext to make PO message catalogs + echo. changes to make an overview over all changed/added/deprecated items + echo. xml to make Docutils-native XML files + echo. pseudoxml to make pseudoxml-XML files for display purposes + echo. linkcheck to check all external links for integrity + echo. doctest to run all doctests embedded in the documentation if enabled + echo. coverage to run coverage check of the documentation if enabled + goto end +) + +if "%1" == "clean" ( + for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i + del /q /s %BUILDDIR%\* + goto end +) + + +REM Check if sphinx-build is available and fallback to Python version if any +%SPHINXBUILD% 1>NUL 2>NUL +if errorlevel 9009 goto sphinx_python +goto sphinx_ok + +:sphinx_python + +set SPHINXBUILD=python -m sphinx.__init__ +%SPHINXBUILD% 2> nul +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.http://sphinx-doc.org/ + exit /b 1 +) + +:sphinx_ok + + +if "%1" == "html" ( + %SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The HTML pages are in %BUILDDIR%/html. + goto end +) + +if "%1" == "dirhtml" ( + %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml. + goto end +) + +if "%1" == "singlehtml" ( + %SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml. + goto end +) + +if "%1" == "pickle" ( + %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can process the pickle files. + goto end +) + +if "%1" == "json" ( + %SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can process the JSON files. + goto end +) + +if "%1" == "htmlhelp" ( + %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can run HTML Help Workshop with the ^ +.hhp project file in %BUILDDIR%/htmlhelp. + goto end +) + +if "%1" == "qthelp" ( + %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can run "qcollectiongenerator" with the ^ +.qhcp project file in %BUILDDIR%/qthelp, like this: + echo.^> qcollectiongenerator %BUILDDIR%\qthelp\zengine.qhcp + echo.To view the help file: + echo.^> assistant -collectionFile %BUILDDIR%\qthelp\zengine.ghc + goto end +) + +if "%1" == "devhelp" ( + %SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. + goto end +) + +if "%1" == "epub" ( + %SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The epub file is in %BUILDDIR%/epub. + goto end +) + +if "%1" == "latex" ( + %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; the LaTeX files are in %BUILDDIR%/latex. + goto end +) + +if "%1" == "latexpdf" ( + %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex + cd %BUILDDIR%/latex + make all-pdf + cd %~dp0 + echo. + echo.Build finished; the PDF files are in %BUILDDIR%/latex. + goto end +) + +if "%1" == "latexpdfja" ( + %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex + cd %BUILDDIR%/latex + make all-pdf-ja + cd %~dp0 + echo. + echo.Build finished; the PDF files are in %BUILDDIR%/latex. + goto end +) + +if "%1" == "text" ( + %SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The text files are in %BUILDDIR%/text. + goto end +) + +if "%1" == "man" ( + %SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The manual pages are in %BUILDDIR%/man. + goto end +) + +if "%1" == "texinfo" ( + %SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo. + goto end +) + +if "%1" == "gettext" ( + %SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The message catalogs are in %BUILDDIR%/locale. + goto end +) + +if "%1" == "changes" ( + %SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes + if errorlevel 1 exit /b 1 + echo. + echo.The overview file is in %BUILDDIR%/changes. + goto end +) + +if "%1" == "linkcheck" ( + %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck + if errorlevel 1 exit /b 1 + echo. + echo.Link check complete; look for any errors in the above output ^ +or in %BUILDDIR%/linkcheck/output.txt. + goto end +) + +if "%1" == "doctest" ( + %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest + if errorlevel 1 exit /b 1 + echo. + echo.Testing of doctests in the sources finished, look at the ^ +results in %BUILDDIR%/doctest/output.txt. + goto end +) + +if "%1" == "coverage" ( + %SPHINXBUILD% -b coverage %ALLSPHINXOPTS% %BUILDDIR%/coverage + if errorlevel 1 exit /b 1 + echo. + echo.Testing of coverage in the sources finished, look at the ^ +results in %BUILDDIR%/coverage/python.txt. + goto end +) + +if "%1" == "xml" ( + %SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The XML files are in %BUILDDIR%/xml. + goto end +) + +if "%1" == "pseudoxml" ( + %SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml. + goto end +) + +:end diff --git a/docs/zengine.messaging.rst b/docs/zengine.messaging.rst new file mode 100644 index 00000000..8889eed0 --- /dev/null +++ b/docs/zengine.messaging.rst @@ -0,0 +1,36 @@ +zengine.messaging package +========================= + + +zengine.messaging.lib module +---------------------------- + +.. automodule:: zengine.messaging.lib + :members: + :undoc-members: + :show-inheritance: + +zengine.messaging.model module +------------------------------ + +.. automodule:: zengine.messaging.model + :members: + :undoc-members: + :show-inheritance: + +zengine.messaging.views module +------------------------------ + +.. automodule:: zengine.messaging.views + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: zengine.messaging + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/zengine.models.rst b/docs/zengine.models.rst new file mode 100644 index 00000000..16553fe6 --- /dev/null +++ b/docs/zengine.models.rst @@ -0,0 +1,22 @@ +zengine.models package +====================== + +Submodules +---------- + +zengine.models.auth module +-------------------------- + +.. automodule:: zengine.models.auth + :members: + :undoc-members: + :show-inheritance: + + +Module contents +--------------- + +.. automodule:: zengine.models + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/zengine.rst b/docs/zengine.rst index 92a03a82..ef5616c2 100644 --- a/docs/zengine.rst +++ b/docs/zengine.rst @@ -14,6 +14,8 @@ zengine package zengine.dispatch zengine.forms zengine.lib + zengine.messaging + zengine.models zengine.tornado_server zengine.views @@ -37,6 +39,14 @@ zengine.engine module :undoc-members: :show-inheritance: +zengine.messaging module +--------------------- + +.. automodule:: zengine.messaging + :members: + :undoc-members: + :show-inheritance: + zengine.log module ------------------ diff --git a/requirements/local_dev.txt b/requirements/local_dev.txt index e8fa70a4..ea222090 100644 --- a/requirements/local_dev.txt +++ b/requirements/local_dev.txt @@ -10,6 +10,5 @@ lazy_object_proxy enum34 werkzeug pytest -celery -werkzeug -pytest +pika +tornado diff --git a/tests/activities/async_amqp/messaging_tests.py b/tests/activities/async_amqp/messaging_tests.py new file mode 100644 index 00000000..467f66e4 --- /dev/null +++ b/tests/activities/async_amqp/messaging_tests.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. +from zengine.lib.concurrent_amqp_test_client import ConcurrentTestCase + + +class TestCase(ConcurrentTestCase): + def test_channel_list(self): + self.ws1.client_to_backend({"view": "_zops_list_channels"}, + self.success_test_callback) + + def test_search_user(self): + self.ws1.client_to_backend({"view": "_zops_search_user", + "query":"x"}, + self.success_test_callback) + + + + + + + +if __name__ == '__main__': + TestCase() diff --git a/tests/async_amqp/messaging_tests.py b/tests/async_amqp/messaging_tests.py new file mode 100644 index 00000000..ba21c4d5 --- /dev/null +++ b/tests/async_amqp/messaging_tests.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. +from pprint import pprint + +from zengine.lib.concurrent_amqp_test_client import ConcurrentTestCase, TestQueueManager + + +class TestCase(ConcurrentTestCase): + def __init__(self, *args, **kwargs): + super(TestCase, self).__init__(*args, **kwargs) + + def test_channel_list(self): + self.post('ulakbus', dict(view="_zops_list_channels"), self.show_channel) + + def test_search_user(self): + self.post('ulakbus', + dict(view="_zops_search_user", + query="u")) + + def show_channel(self, res, req): + ch_key = res['channels'][0]['key'] + self.post('ulakbus', + dict(view="_zops_show_channel", + key=ch_key), + callback=self.create_message) + + def create_message(self, res, req=None): + self.post('ulakbus', + {"view": "_zops_create_message", + "message": dict( + body='test_body', title='testtitle', + channel=res['key'], + receiver='', + type=2 + )}) + + def cmd_user_status(self, res, req=None): + print("CMD: user_status:") + pprint(res) + + def cmd_message(self, res, req=None): + print("MESSAGE RECEIVED") + pprint(res) + + +def main(): + from tornado import ioloop + # initiate amqp manager + ioloop = ioloop.IOLoop.instance() + qm = TestQueueManager(io_loop=ioloop) + + # initiate test case + qm.set_test_class(TestCase) + + qm.connect() + ioloop.start() + + +if __name__ == '__main__': + main() diff --git a/tests/test_callactivity.py b/tests/test_callactivity.py index 7d64983b..40f9f66b 100644 --- a/tests/test_callactivity.py +++ b/tests/test_callactivity.py @@ -12,7 +12,7 @@ from zengine.lib.exceptions import HTTPError from zengine.lib.test_utils import BaseTestCase from zengine.models import User -from zengine.notifications.model import NotificationMessage +from zengine.messaging.model import Message from zengine.signals import lane_user_change diff --git a/tests/test_jump_to_wf.py b/tests/test_jump_to_wf.py index 897c46e5..7e4a434b 100644 --- a/tests/test_jump_to_wf.py +++ b/tests/test_jump_to_wf.py @@ -12,7 +12,7 @@ from zengine.lib.exceptions import HTTPError from zengine.lib.test_utils import BaseTestCase from zengine.models import User -from zengine.notifications.model import NotificationMessage +from zengine.messaging.model import Message from zengine.signals import lane_user_change diff --git a/tests/test_multi_user.py b/tests/test_multi_user.py index 821fc820..f100b32e 100644 --- a/tests/test_multi_user.py +++ b/tests/test_multi_user.py @@ -12,7 +12,7 @@ from zengine.lib.exceptions import HTTPError from zengine.lib.test_utils import BaseTestCase from zengine.models import User -from zengine.notifications.model import NotificationMessage +from zengine.messaging.model import Message from zengine.signals import lane_user_change @@ -20,7 +20,7 @@ class TestCase(BaseTestCase): def test_multi_user_mono(self): test_user = User.objects.get(username='test_user') self.prepare_client('/multi_user2/', user=test_user) - with BlockSave(NotificationMessage): + with BlockSave(Message): resp = self.client.post() assert resp.json['msgbox']['title'] == settings.MESSAGES['lane_change_message_title'] token, user = self.get_user_token('test_user2') @@ -37,12 +37,12 @@ def mock(sender, *args, **kwargs): self.old_lane = kwargs['old_lane'] self.owner = list(kwargs['possible_owners'])[0] - NotificationMessage.objects.delete() + Message.objects.delete() lane_user_change.connect(mock) wf_name = '/multi_user/' self.prepare_client(wf_name, username='test_user') - with BlockSave(NotificationMessage): + with BlockSave(Message): self.client.post() token, user = self.get_user_token('test_user') assert self.owner.username == 'test_user' diff --git a/zengine/client_queue.py b/zengine/client_queue.py index 6f0f0ea6..9c9dceb9 100644 --- a/zengine/client_queue.py +++ b/zengine/client_queue.py @@ -13,7 +13,7 @@ import time from pika.exceptions import ConnectionClosed, ChannelClosed -from zengine.lib.cache import UserSessionID + BLOCKING_MQ_PARAMS = pika.ConnectionParameters( host=settings.MQ_HOST, @@ -22,6 +22,14 @@ heartbeat_interval=0, credentials=pika.PlainCredentials(settings.MQ_USER, settings.MQ_PASS) ) +from zengine.log import log + +def get_mq_connection(): + connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) + channel = connection.channel() + if not channel.is_open: + channel.open() + return connection, channel class ClientQueue(object): @@ -30,10 +38,10 @@ class ClientQueue(object): """ def __init__(self, user_id=None, sess_id=None): - self.user_id = user_id + # self.user_id = user_id self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) self.channel = self.connection.channel() - self.sess_id = sess_id + # self.sess_id = sess_id def close(self): self.channel.close() @@ -48,37 +56,33 @@ def get_channel(self): self.channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS) return self.channel - def get_sess_id(self): - if not self.sess_id: - self.sess_id = UserSessionID(self.user_id).get() - return self.sess_id + def send_to_default_exchange(self, sess_id, message=None): + """ + Send messages through RabbitMQ's default exchange, + which will be delivered through routing_key (sess_id). - def send_to_queue(self, message=None, json_message=None): - self.get_channel().basic_publish(exchange='', - routing_key=self.get_sess_id(), - body=json_message or json.dumps(message)) + This method only used for un-authenticated users, i.e. login process. - def old_to_new_queue(self, old_sess_id): + Args: + sess_id string: Session id + message dict: Message object. """ - Somehow if users old (obsolete) queue has - undelivered messages, we should redirect them to - current queue. + msg = json.dumps(message) + log.debug("Sending following message to %s queue through default exchange:\n%s" % ( + sess_id, msg)) + self.get_channel().publish(exchange='', routing_key=sess_id, body=msg) + + def send_to_prv_exchange(self, user_id, message=None): """ - old_input_channel = self.connection.channel() - while True: - try: - method_frame, header_frame, body = old_input_channel.basic_get(old_sess_id) - if method_frame: - self.send_to_queue(json_message=body) - old_input_channel.basic_ack(method_frame.delivery_tag) - else: - old_input_channel.queue_delete(old_sess_id) - old_input_channel.close() - break - except ChannelClosed as e: - if e[0] == 404: - break - # e => (404, "NOT_FOUND - no queue 'sess_id' in vhost '/'") - else: - raise - # old_input_channel = self.connection.channel() + Send messages through logged in users private exchange. + + Args: + user_id string: User key + message dict: Message object + + """ + exchange = 'prv_%s' % user_id.lower() + msg = json.dumps(message) + log.debug("Sending following users \"%s\" exchange:\n%s " % (exchange, msg)) + self.get_channel().publish(exchange=exchange, routing_key='', body=msg) + diff --git a/zengine/current.py b/zengine/current.py index 227cc592..efddca7c 100644 --- a/zengine/current.py +++ b/zengine/current.py @@ -24,7 +24,7 @@ from zengine.lib.cache import WFCache from zengine.lib.camunda_parser import CamundaBMPNParser from zengine.log import log -from zengine.notifications import Notify +from zengine.messaging import Notify DEFAULT_LANE_CHANGE_MSG = { 'title': settings.MESSAGES['lane_change_message_title'], diff --git a/zengine/lib/concurrent_amqp_test_client.py b/zengine/lib/concurrent_amqp_test_client.py new file mode 100644 index 00000000..234c5f1c --- /dev/null +++ b/zengine/lib/concurrent_amqp_test_client.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +""" +When a user is not online, AMQP messages that sent to this +user are discarded at users private exchange. +When user come back, offline sent messages will be loaded +from DB and send to users private exchange. + +Because of this, we need to fully simulate 2 online users to test real time chat behaviour. + +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. +from __future__ import print_function + +import inspect +import uuid +from pprint import pprint, pformat + +import pika +from tornado.escape import json_encode, json_decode + +from pyoko.conf import settings +from pyoko.lib.utils import get_object_from_path +from zengine.current import Current +from zengine.lib.cache import Session +from zengine.log import log +from zengine.tornado_server.ws_to_queue import QueueManager, NON_BLOCKING_MQ_PARAMS +import sys + +from zengine.views.auth import Login + +sys.sessid_to_userid = {} +UserModel = get_object_from_path(settings.USER_MODEL) + + +class TestQueueManager(QueueManager): + def on_input_queue_declare(self, queue): + """ + AMQP connection callback. + Creates input channel. + + Args: + connection: AMQP connection + """ + log.info("input queue declared") + super(TestQueueManager, self).on_input_queue_declare(queue) + self.run_after_connection() + + def __init__(self, *args, **kwargs): + super(TestQueueManager, self).__init__(*args, **kwargs) + log.info("queue manager init") + self.test_class = lambda qm: 1 + + def conn_err(self, *args, **kwargs): + log("conne err: %s %s" % (args, kwargs)) + + def run_after_connection(self): + log.info("run after connect") + self.test_class(self) + + def set_test_class(self, kls): + log.info("test class setted %s" % kls) + self.test_class = kls + + +class TestWSClient(object): + def __init__(self, queue_manager, username, sess_id=None): + self.message_callbacks = {} + self.message_stack = {} + self.user = UserModel.objects.get(username=username) + + self.request = type('MockWSRequestObject', (object,), {'remote_ip': '127.0.0.1'}) + self.queue_manager = queue_manager + self.sess_id = sess_id or uuid.uuid4().hex + sys.sessid_to_userid[self.sess_id] = self.user.key.lower() + self.queue_manager.register_websocket(self.sess_id, self) + self.login_user() + # mimic tornado ws object + # zengine.tornado_server.ws_to_queue.QueueManager will call write_message() method + self.write_message = self.backend_to_client + + def login_user(self): + session = Session(self.sess_id) + current = Current(session=session, input={}) + current.auth.set_user(self.user) + Login(current)._do_upgrade() + + def backend_to_client(self, body): + """ + from backend to client + """ + try: + body = json_decode(body) + if 'callbackID' in body: + self.message_stack[body['callbackID']] = body + self.message_callbacks[body['callbackID']](body) + elif 'cmd' in body: + self.message_callbacks[body['cmd']](body) + except: + import traceback + print("\nException BODY: %s \n" % pformat(body)) + traceback.print_exc() + + log.info("WRITE MESSAGE TO CLIENT:\n%s" % (pformat(body),)) + + def client_to_backend(self, message, callback, caller_fn_name): + """ + from client to backend + """ + cbid = uuid.uuid4().hex + message = json_encode({"callbackID": cbid, "data": message}) + def cb(res): + print("API Request: %s :: " % caller_fn_name, end='') + result = callback(res, message) + if ConcurrentTestCase.stc == callback and not result: + FAIL = 'FAIL' + else: + FAIL = '--> %s' % callback.__name__ + print('PASS' if result else FAIL) + # self.message_callbacks[cbid] = lambda res: callable(res, message) + self.message_callbacks[cbid] = cb + log.info("GOT MESSAGE FOR BACKEND %s: %s" % (self.sess_id, message)) + self.queue_manager.redirect_incoming_message(self.sess_id, message, self.request) + + +class ConcurrentTestCase(object): + """ + Extend this class, define your test methods with "test_" prefix. + + + + """ + + def __init__(self, queue_manager): + log.info("ConcurrentTestCase class init with %s" % queue_manager) + self.cmds = {} + self.register_cmds() + self.queue_manager = queue_manager + self.clients = {} + self.make_client('ulakbus') + self.run_tests() + + def make_client(self, username): + """ + Args: + username: username for this client instance + + Returns: + Logged in TestWSClient instance for given username + """ + self.clients[username] = TestWSClient(self.queue_manager, username) + + + + def post(self, username, data, callback=None): + if username not in self.clients: + self.make_client(username) + self.clients[username].message_callbacks.update(self.cmds) + callback = callback or self.stc + view_name = data['view'] if 'view' in data else sys._getframe(1).f_code.co_name + self.clients[username].client_to_backend(data, callback, view_name) + + def register_cmds(self): + for name in sorted(self.__class__.__dict__): + if name.startswith("cmd_"): + self.cmds[name[4:]] = getattr(self, name) + + def run_tests(self): + for name in sorted(self.__class__.__dict__): + if name.startswith("test_"): + try: + getattr(self, name)() + except: + import traceback + traceback.print_exc() + + def process_error_reponse(self, resp): + if 'error' in resp: + print(resp['error'].replace('\\n','\n').replace('u\\', '')) + return True + + def stc(self, response, request=None): + """ + STC means Success Test Callback. Looks for 200 or 201 codes in response code. + + Args: + response: + request: + """ + try: + if not response['code'] in (200, 201): + print("FAILED: Response not successful: \n") + if not self.process_error_reponse(response): + print("\nRESP:\n%s") + print("\nREQ:\n %s" % (response, request)) + else: + return True + except Exception as e: + log.exception("\n===========>\nFAILED API REQUEST\n<===========\n%s\n" % e) + log.info("Response: \n%s\n\n" % response) + + def pstc(self, response, request=None): + """ + Same as self.stc() (success request callback) but printing response/request + for debugging purposes + + Args: + response: + request: + + """ + self.stc(response, request) + print("\n\n=================\n\nRESPONSE: %s \n\nREQUEST: %s\n" % (response, request)) + + + +def main(): + from tornado import ioloop + # initiate amqp manager + ioloop = ioloop.IOLoop.instance() + qm = TestQueueManager(io_loop=ioloop) + + # initiate test case + qm.set_test_class(ConcurrentTestCase) + + qm.connect() + ioloop.start() + + +if __name__ == '__main__': + main() diff --git a/zengine/lib/test_utils.py b/zengine/lib/test_utils.py index 703731f3..f0e0704e 100644 --- a/zengine/lib/test_utils.py +++ b/zengine/lib/test_utils.py @@ -12,10 +12,11 @@ from zengine.lib.cache import ClearCache from zengine.lib.exceptions import HTTPError from zengine.log import log +from zengine.tornado_server.ws_to_queue import BlockingConnectionForHTTP from zengine.wf_daemon import Worker from zengine.models import User -from zengine.notifications.model import NotificationMessage +from zengine.messaging.model import Message class ResponseWrapper(object): @@ -60,7 +61,7 @@ def raw(self): pprint(self.content) -class TestClient(Worker): +class BaseTestClient(Worker): """ TestClient to simplify writing API tests for Zengine based apps. """ @@ -71,7 +72,7 @@ def __init__(self, path, *args, **kwargs): :param str path: Request uri """ - super(TestClient, self).__init__(*args, **kwargs) + super(BaseTestClient, self).__init__(*args, **kwargs) self.test_client_sessid = None self.response_wrapper = None self.set_path(path, None) @@ -93,7 +94,7 @@ def set_path(self, path, token=''): self.path = path self.token = token - def post(self, **data): + def _prepare_post(self, data): """ by default data dict encoded as json and content type set as application/json @@ -119,14 +120,33 @@ def post(self, **data): post_data = {'data': data, '_zops_remote_ip': '127.0.0.1'} log.info("PostData : %s" % post_data) print("PostData : %s" % post_data) - post_data = json.dumps(post_data) + return post_data + + def post(self, **data): + post_data = json.dumps(self._prepare_post(data)) fake_method = type('FakeMethod', (object,), {'routing_key': self.sess_id}) self.handle_message(None, fake_method, None, post_data) # update client token from response self.token = self.response_wrapper.token return self.response_wrapper - def send_output(self, output, sessid): + +class AMQPTestClient(BaseTestClient): + def apost(self, **data): + """ + AMQP based post method + Args: + **data: Post data + + Returns: + + """ + post_data = self._prepare_post(data) + BlockingConnectionForHTTP().send_message(self.sess_id, post_data) + + +class TestClient(BaseTestClient): + def send_output(self, output): self.response_wrapper = ResponseWrapper(output) @@ -168,7 +188,7 @@ def setup_method(self, method): sleep(2) else: print( - "\nREPORT:: Test case does not have a fixture file like %s" % fixture_guess) + "\nREPORT:: Test case does not have a fixture file like %s" % fixture_guess) else: print("\nREPORT:: Fixture loading disabled by user. (by --ignore=fixture)") @@ -219,11 +239,12 @@ def _do_login(self): assert all([(field in req_fields) for field in ('username', 'password')]) resp = self.client.post(username=self.client.username or self.client.user.username, password="123", cmd="do") + log.debug("login result :\n%s" % resp.json) assert resp.json['cmd'] == 'upgrade' @staticmethod def get_user_token(username): user = User.objects.get(username=username) - msg = NotificationMessage.objects.filter(receiver=user)[0] + msg = Message.objects.filter(receiver=user)[0] token = msg.url.split('/')[-1] return token, user diff --git a/zengine/lib/utils.py b/zengine/lib/utils.py index fd971131..7acd46bb 100644 --- a/zengine/lib/utils.py +++ b/zengine/lib/utils.py @@ -1,4 +1,4 @@ - +# -*- coding: utf-8 -*- class DotDict(dict): """ @@ -6,6 +6,7 @@ class DotDict(dict): Slower that pure dict. """ + def __getattr__(self, attr): return self.get(attr, None) @@ -17,10 +18,24 @@ def date_to_solr(d): """ converts DD-MM-YYYY to YYYY-MM-DDT00:00:00Z""" return "{y}-{m}-{day}T00:00:00Z".format(day=d[:2], m=d[3:5], y=d[6:]) if d else d + def solr_to_date(d): """ converts YYYY-MM-DDT00:00:00Z to DD-MM-YYYY """ return "{day}:{m}:{y}".format(y=d[:4], m=d[5:7], day=d[8:10]) if d else d + def solr_to_year(d): """ converts YYYY-MM-DDT00:00:00Z to DD-MM-YYYY """ return d[:4] + +import re +def to_safe_str(s): + """ + converts some (tr) non-ascii chars to ascii counterparts, + then return the result as lowercase + """ + # TODO: This is insufficient as it doesn't do anything for other non-ascii chars + return re.sub(r'[^0-9a-zA-Z]+', '_', s.strip().replace(u'ğ', 'g').replace(u'ö', 'o').replace( + u'ç', 'c').replace(u'Ç','c').replace(u'Ö', u'O').replace(u'Ş', 's').replace( + u'Ü', 'u').replace(u'ı', 'i').replace(u'İ','i').replace(u'Ğ', 'g').replace( + u'ö', 'o').replace(u'ş', 's').replace(u'ü', 'u').lower(), re.UNICODE) diff --git a/zengine/management_commands.py b/zengine/management_commands.py index ee017898..fca3a793 100644 --- a/zengine/management_commands.py +++ b/zengine/management_commands.py @@ -8,7 +8,9 @@ # (GPLv3). See LICENSE.txt for details. import six +from pyoko.db.adapter.db_riak import BlockSave from pyoko.exceptions import ObjectDoesNotExist +from pyoko.lib.utils import get_object_from_path from pyoko.manage import * from zengine.views.crud import SelectBoxCache @@ -183,3 +185,42 @@ def run(self): else: worker = Worker() worker.run() + + +class PrepareMQ(Command): + """ + Creates necessary exchanges, queues and bindings + """ + CMD_NAME = 'preparemq' + HELP = 'Creates necessary exchanges, queues and bindings for messaging subsystem' + + def run(self): + self.create_user_channels() + self.create_channel_exchanges() + + def create_user_channels(self): + from zengine.messaging.model import Channel, Subscriber + user_model = get_object_from_path(settings.USER_MODEL) + with BlockSave(Channel): + for usr in user_model.objects.filter(): + # create private exchange of user + ch, new = Channel.objects.get_or_create(owner=usr, typ=5) + print("%s exchange: %s" % ('created' if new else 'existing', ch.code_name)) + # create notification subscription to private exchange + sb, new = Subscriber.objects.get_or_create(channel=ch, + user=usr, + read_only=True, + name='Notifications', + can_manage=True, + can_leave=False + ) + print("%s notify sub: %s" % ('created' if new else 'existing', ch.code_name)) + + + + def create_channel_exchanges(self): + from zengine.messaging.model import Channel + for ch in Channel.objects.filter(): + print("(re)creation exchange: %s" % ch.code_name) + ch.create_exchange() + diff --git a/zengine/notifications/__init__.py b/zengine/messaging/__init__.py similarity index 85% rename from zengine/notifications/__init__.py rename to zengine/messaging/__init__.py index 0fca11cf..588e4ac4 100644 --- a/zengine/notifications/__init__.py +++ b/zengine/messaging/__init__.py @@ -16,7 +16,7 @@ import time import six from zengine.lib.cache import Cache, KeepAlive -from .model import NotificationMessage + class Notify(Cache, ClientQueue): """ @@ -54,12 +54,13 @@ def _delayed_send(self, offline_messages): self.remove_item(n) def set_message(self, title, msg, typ, url=None, sender=None): + from .model import Message message = {'title': title, 'body': msg, 'type': typ, 'url': url, 'id': uuid4().hex} if sender and isinstance(sender, six.string_types): - sender = NotificationMessage.sender.objects.get(sender) - receiver = NotificationMessage.receiver.objects.get(self.user_id) - NotificationMessage(typ=typ, msg_title=title, body=msg, url=url, - sender=sender, receiver=receiver).save() + sender = Message.sender.objects.get(sender) + receiver = Message.receiver.objects.get(self.user_id) + Message(typ=typ, msg_title=title, body=msg, url=url, + sender=sender, receiver=receiver).save() if KeepAlive(user_id=self.user_id).is_alive(): client_message = {'cmd': 'notification', 'notifications': [message, ]} self.send_to_queue(client_message) diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py new file mode 100644 index 00000000..7214ae9a --- /dev/null +++ b/zengine/messaging/lib.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. +import json + +import pika +from passlib.handlers.pbkdf2 import pbkdf2_sha512 + +from pyoko.conf import settings +from zengine.client_queue import BLOCKING_MQ_PARAMS, get_mq_connection +from zengine.lib.cache import Cache +from zengine.log import log + + +class ConnectionStatus(Cache): + """ + Cache object for workflow instances. + + Args: + wf_token: Token of the workflow instance. + """ + PREFIX = 'ONOFF' + + def __init__(self, user_id): + super(ConnectionStatus, self).__init__(user_id) + + +class BaseUser(object): + mq_connection = None + mq_channel = None + + @classmethod + def _connect_mq(cls): + if cls.mq_connection is None or cls.mq_connection.is_closed: + cls.mq_connection, cls.mq_channel = get_mq_connection() + return cls.mq_channel + + def get_avatar_url(self): + """ + Bu metot kullanıcıya ait avatar url'ini üretir. + + Returns: + str: kullanıcı avatar url + """ + return "%s%s" % (settings.S3_PUBLIC_URL, self.avatar) + + def __unicode__(self): + return "User %s" % self.username + + def set_password(self, raw_password): + """ + Kullanıcı şifresini encrypt ederek set eder. + + Args: + raw_password (str) + """ + self.password = pbkdf2_sha512.encrypt(raw_password, rounds=10000, + salt_size=10) + + def is_online(self, status=None): + if status is None: + return ConnectionStatus(self.key).get() or False + else: + mq_channel = self._connect_mq() + for sbs in self.subscriptions.objects.filter(): + if sbs.channel.typ == 10: + mq_channel.basic_publish(exchange=sbs.channel.code_name, + routing_key='', + body=json.dumps({ + 'cmd': 'user_status', + 'channel_key': sbs.channel.key, + 'channel_name': sbs.name, + 'avatar_url': self.get_avatar_url(), + 'is_online': status, + })) + ConnectionStatus(self.key).set(status) + + def encrypt_password(self): + """ encrypt password if not already encrypted """ + if self.password and not self.password.startswith('$pbkdf2'): + self.set_password(self.password) + + def prepare_channels(self): + from zengine.messaging.model import Channel, Subscriber + # create private channel of user + ch, new = Channel.objects.get_or_create(owner=self, typ=5) + # create subscription entry for notification messages + sb, new = Subscriber.objects.get_or_create(channel=ch, user=self, is_visible=True, + can_leave=False, inform_me=False) + + def check_password(self, raw_password): + """ + Verilen encrypt edilmemiş şifreyle kullanıcıya ait encrypt + edilmiş şifreyi karşılaştırır. + + Args: + raw_password (str) + + Returns: + bool: Değerler aynı olması halinde True, değilse False + döner. + """ + return pbkdf2_sha512.verify(raw_password, self.password) + + def get_role(self, role_id): + """ + Retrieves user's roles. + + Args: + role_id (int) + + Returns: + dict: Role nesnesi + + """ + return self.role_set.node_dict[role_id] + + @property + def full_name(self): + return self.username + + @property + def prv_exchange(self): + return 'prv_%s' % str(self.key).lower() + + def bind_private_channel(self, sess_id): + mq_channel = self._connect_mq() + mq_channel.queue_declare(queue=sess_id, arguments={'x-expires': 40000}) + log.debug("Binding private exchange to client queue: Q:%s --> E:%s" % (sess_id, + self.prv_exchange)) + mq_channel.queue_bind(exchange=self.prv_exchange, queue=sess_id) + + def send_notification(self, title, message, typ=1, url=None): + """ + sends message to users private mq exchange + Args: + title: + message: + sender: + url: + typ: + + + """ + self.created_channels.channel.add_message( + channel_key=self.prv_exchange, + body=message, + title=title, + typ=typ, + url=url + ) diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py new file mode 100644 index 00000000..500421c0 --- /dev/null +++ b/zengine/messaging/model.py @@ -0,0 +1,418 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. +import json +from uuid import uuid4 + +import pika + +from pyoko import Model, field, ListNode +from pyoko.conf import settings +from pyoko.db.adapter.db_riak import BlockSave +from pyoko.exceptions import IntegrityError +from pyoko.fields import DATE_TIME_FORMAT +from pyoko.lib.utils import get_object_from_path +from zengine.client_queue import BLOCKING_MQ_PARAMS, get_mq_connection +from zengine.lib.utils import to_safe_str + +UserModel = get_object_from_path(settings.USER_MODEL) + + + + +CHANNEL_TYPES = ( + # users private message hub + (5, "Private"), + # a One-To-One communication between 2 user + (10, "Direct"), + # public chat rooms + (15, "Public"), +) + + +class Channel(Model): + """ + Represents MQ exchanges. + + is_private: Represents users exchange hub + Each user have a durable private exchange, + which their code_name composed from user key prefixed with "prv_" + + is_direct: Represents a user-to-user direct message exchange + """ + mq_channel = None + mq_connection = None + + typ = field.Integer("Type", choices=CHANNEL_TYPES) + name = field.String("Name") + code_name = field.String("Internal name") + description = field.String("Description") + owner = UserModel(reverse_name='created_channels', null=True) + + def __unicode__(self): + return "%s (%s's %s channel)" % (self.name or '', + self.owner.full_name, + self.get_typ_display()) + + # + # class Managers(ListNode): + # user = UserModel() + + def is_private(self): + return self.typ == 5 + + @classmethod + def get_or_create_direct_channel(cls, initiator_key, receiver_key): + """ + Creates a direct messaging channel between two user + + Args: + initiator: User, who want's to make first contact + receiver: User, other party + + Returns: + (Channel, receiver_name) + """ + existing = cls.objects.OR().filter( + code_name='%s_%s' % (initiator_key, receiver_key)).filter( + code_name='%s_%s' % (receiver_key, initiator_key)) + receiver_name = UserModel.objects.get(receiver_key).full_name + if existing: + channel = existing[0] + else: + channel_name = '%s_%s' % (initiator_key, receiver_key) + channel = cls(is_direct=True, code_name=channel_name, typ=10).blocking_save() + with BlockSave(Subscriber): + Subscriber.objects.get_or_create(channel=channel, + user_id=initiator_key, + name=receiver_name) + Subscriber.objects.get_or_create(channel=channel, + user_id=receiver_key, + name=UserModel.objects.get(initiator_key).full_name) + return channel, receiver_name + + def get_avatar(self, user): + if self.typ == 10: + return self.subscriber_set.objects.exclude(user=user)[0].user.get_avatar_url() + else: + return None + + @classmethod + def add_message(cls, channel_key, body, title=None, sender=None, url=None, typ=2, + receiver=None): + mq_channel = cls._connect_mq() + msg_object = Message(sender=sender, body=body, msg_title=title, url=url, + typ=typ, channel_id=channel_key, receiver=receiver, key=uuid4().hex) + msg_object.setattr('unsaved', True) + mq_channel.basic_publish(exchange=channel_key, + routing_key='', + body=json.dumps(msg_object.serialize())) + return msg_object.save() + + def get_subscription_for_user(self, user_id): + return self.subscriber_set.objects.get(user_id=user_id) + + def get_last_messages(self): + # TODO: Try to refactor this with https://github.com/rabbitmq/rabbitmq-recent-history-exchange + return self.message_set.objects.filter().set_params(sort="updated_at desc")[:20] + + @classmethod + def _connect_mq(cls): + if cls.mq_connection is None or cls.mq_connection.is_closed: + cls.mq_connection, cls.mq_channel = get_mq_connection() + return cls.mq_channel + + def create_exchange(self): + """ + Creates MQ exchange for this channel + Needs to be defined only once. + """ + mq_channel = self._connect_mq() + mq_channel.exchange_declare(exchange=self.code_name, + exchange_type='fanout', + durable=True) + + + def pre_creation(self): + if not self.code_name: + if self.name: + self.code_name = to_safe_str(self.name) + self.key = self.code_name + return + if self.owner and self.is_private: + self.code_name = self.owner.prv_exchange + self.key = self.code_name + return + raise IntegrityError('Non-private and non-direct channels should have a "name".') + else: + self.key = self.code_name + + def post_save(self): + self.create_exchange() + # self.subscribe_owner() + + +class Subscriber(Model): + """ + Permission model + """ + + mq_channel = None + mq_connection = None + + channel = Channel() + name = field.String("Subscription Name") + user = UserModel(reverse_name='subscriptions') + is_muted = field.Boolean("Mute the channel", default=False) + pinned = field.Boolean("Pin channel to top", default=False) + inform_me = field.Boolean("Inform when I'm mentioned", default=True) + read_only = field.Boolean("This is a read-only subscription (to a broadcast channel)", + default=False) + is_visible = field.Boolean("Show under user's channel list", default=True) + can_manage = field.Boolean("Can manage this channel", default=False) + can_leave = field.Boolean("Membership is not obligatory", default=True) + last_seen_msg_time = field.TimeStamp("Last seen message's time") + + # status = field.Integer("Status", choices=SUBSCRIPTION_STATUS) + + def __unicode__(self): + return "%s subscription of %s" % (self.name, self.user) + + + @classmethod + def _connect_mq(cls): + if cls.mq_connection is None or cls.mq_connection.is_closed: + cls.mq_connection, cls.mq_channel = get_mq_connection() + return cls.mq_channel + + def get_actions(self): + actions = [ + ('Pin', '_zops_pin_channel'), + # ('Mute', '_zops_mute_channel'), + ] + if self.channel.owner == self.user or self.can_manage: + actions.extend([ + ('Delete', '_zops_delete_channel'), + ('Edit', '_zops_edit_channel'), + ('Add Users', '_zops_add_members'), + ('Add Unit', '_zops_add_unit_to_channel') + ]) + return actions + + def is_online(self): + # TODO: Cache this method + if self.channel.typ == 10: + return self.channel.subscriber_set.objects.exclude( + user=self.user).get().user.is_online() + + def unread_count(self): + # FIXME: track and return actual unread message count + if self.last_seen_msg_time: + return self.channel.message_set.objects.filter( + timestamp__gt=self.last_seen_msg_time).count() + else: + return self.channel.message_set.objects.filter().count() + + def create_exchange(self): + """ + Creates user's private exchange + + Actually user's private channel needed to be defined only once, + and this should be happened when user first created. + But since this has a little performance cost, + to be safe we always call it before binding to the channel we currently subscribe + """ + channel = self._connect_mq() + channel.exchange_declare(exchange=self.user.prv_exchange, + exchange_type='fanout', + durable=True) + + @classmethod + def mark_seen(cls, key, datetime_str): + cls.objects.filter(key=key).update(last_seen=datetime_str) + + def bind_to_channel(self): + """ + Binds (subscribes) users private exchange to channel exchange + Automatically called at creation of subscription record. + """ + if self.channel.code_name != self.user.prv_exchange: + channel = self._connect_mq() + channel.exchange_bind(source=self.channel.code_name, destination=self.user.prv_exchange) + + def post_creation(self): + self.create_exchange() + self.bind_to_channel() + + def pre_creation(self): + if not self.name: + self.name = self.channel.name + + +MSG_TYPES = ( + (1, "Info Notification"), + (11, "Error Notification"), + (111, "Success Notification"), + (2, "Direct Message"), + (3, "Broadcast Message"), + (4, "Channel Message") +) +MESSAGE_STATUS = ( + (1, "Created"), + (11, "Transmitted"), + (22, "Seen"), + (33, "Read"), + (44, "Archived"), + +) + + +class Message(Model): + """ + Message model + + Notes: + Never use directly for creating new messages! Use these methods: + - Channel objects's **add_message()** method. + - User object's **set_message()** method. (which also uses channel.add_message) + """ + channel = Channel() + sender = UserModel(reverse_name='sent_messages') + receiver = UserModel(reverse_name='received_messages') + typ = field.Integer("Type", choices=MSG_TYPES, default=1) + status = field.Integer("Status", choices=MESSAGE_STATUS, default=1) + msg_title = field.String("Title") + body = field.String("Body") + url = field.String("URL") + + def get_actions_for(self, user): + actions = [] + if Favorite.objects.filter(user=user, + channel=self.channel, + message=self).count(): + actions.append(('Remove from favorites', '_zops_remove_from_favorites')) + else: + actions.append(('Add to favorites', '_zops_favorite_message')) + + + if user: + if FlaggedMessage.objects.filter(user=user, message=self).count(): + actions.append(('Remove Flag', '_zops_unflag_message')) + else: + actions.append(('Flag Message', '_zops_flag_message')) + if self.sender == user: + actions.extend([ + ('Delete', '_zops_delete_message'), + ('Edit', '_zops_edit_message') + ]) + return actions + + def serialize(self, user=None): + """ + Serializes message for given user. + + Note: + Should be called before first save(). Otherwise "is_update" will get wrong value. + + Args: + user: User object + + Returns: + Dict. JSON serialization ready dictionary object + """ + return { + 'content': self.body, + 'type': self.typ, + 'updated_at': self.updated_at, + 'timestamp': self.updated_at, + 'is_update': hasattr(self, 'unsaved'), + 'attachments': [attachment.serialize() for attachment in self.attachment_set], + 'title': self.msg_title, + 'sender_name': self.sender.full_name, + 'sender_key': self.sender.key, + 'channel_key': self.channel.key, + 'cmd': 'message', + 'avatar_url': self.sender.avatar, + 'key': self.key, + } + + def __unicode__(self): + content = self.msg_title or self.body + return "%s%s" % (content[:30], '...' if len(content) > 30 else '') + + def _republish(self): + """ + Re-publishes updated message + """ + mq_channel = self.channel._connect_mq() + mq_channel.basic_publish(exchange=self.channel.key, routing_key='', + body=json.dumps(self.serialize())) + + def pre_save(self): + if not hasattr(self, 'unsaved'): + self._republish() + + +ATTACHMENT_TYPES = ( + (1, "Document"), + (11, "Spreadsheet"), + (22, "Image"), + (33, "PDF"), + +) + + +class Attachment(Model): + """ + A model to store message attachments + """ + file = field.File("File", random_name=True, required=False) + typ = field.Integer("Type", choices=ATTACHMENT_TYPES) + name = field.String("File Name") + description = field.String("Description") + channel = Channel() + message = Message() + + def serialize(self): + return { + 'description': self.description, + 'file_name': self.name, + 'url': "%s%s" % (settings.S3_PUBLIC_URL, self.file) + } + + def __unicode__(self): + return self.name + + +class Favorite(Model): + """ + A model to store users bookmarked messages + """ + channel = Channel() + user = UserModel() + message = Message() + summary = field.String("Message Summary") + channel_name = field.String("Channel Name") + + def pre_creation(self): + if not self.channel: + self.channel = self.message.channel + self.summary = self.message.body[:60] + self.channel_name = self.channel.name + + +class FlaggedMessage(Model): + """ + A model to store users bookmarked messages + """ + channel = Channel() + user = UserModel() + message = Message() + + def pre_creation(self): + self.channel = self.message.channel diff --git a/zengine/messaging/permissions.py b/zengine/messaging/permissions.py new file mode 100644 index 00000000..0e422d7e --- /dev/null +++ b/zengine/messaging/permissions.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. +from zengine.auth.permissions import CustomPermission + +CustomPermission.add_multi( + # ('code_name', 'human_readable_name', 'description'), + [ + ('messaging.can_invite_user_by_unit', 'Can invite all users of a unit', ''), + ('messaging.can_invite_user_by_searching', 'Can invite any user by searching on name', ''), + ]) diff --git a/zengine/messaging/views.py b/zengine/messaging/views.py new file mode 100644 index 00000000..aeeffbc5 --- /dev/null +++ b/zengine/messaging/views.py @@ -0,0 +1,925 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. +from pyoko.conf import settings +from pyoko.db.adapter.db_riak import BlockSave +from pyoko.exceptions import ObjectDoesNotExist +from pyoko.lib.utils import get_object_from_path +from zengine.log import log +from zengine.lib.exceptions import HTTPError +from zengine.messaging.model import Channel, Attachment, Subscriber, Message, Favorite, \ + FlaggedMessage + +UserModel = get_object_from_path(settings.USER_MODEL) +UnitModel = get_object_from_path(settings.UNIT_MODEL) + +""" + +.. code-block:: python + + MSG_DICT = { + 'content': string, + 'title': string, + 'timestamp': datetime, + 'updated_at': datetime, + 'is_update': boolean, # false for new messages + # true if this is an updated message + 'channel_key': key, + 'sender_name': string, + 'sender_key': key, + 'type': int, + 'avatar_url': string, + 'key': key, + 'cmd': 'message', + 'attachments': [{ + 'description': string, + 'file_name': string, + 'url': string, + },] +} + + + USER_STATUS_UPDATE = { + 'cmd': 'user_status', + 'channel_key': key, + 'channel_name': string, + 'avatar_url': string, + 'is_online': boolean, + } +""" + + +def _dedect_file_type(name, content): + # FIXME: Implement attachment type detection + return 1 # Return as Document for now + + +def _paginate(self, current_page, query_set, per_page=10): + """ + Handles pagination of object listings. + + Args: + current_page int: + Current page number + query_set (:class:`QuerySet`): + Object listing queryset. + per_page int: + Objects per page. + + Returns: + QuerySet object, pagination data dict as a tuple + """ + total_objects = query_set.count() + total_pages = int(total_objects / per_page or 1) + # add orphans to last page + current_per_page = per_page + ( + total_objects % per_page if current_page == total_pages else 0) + pagination_data = dict(page=current_page, + total_pages=total_pages, + total_objects=total_objects, + per_page=current_per_page) + query_set = query_set.set_params(rows=current_per_page, start=(current_page - 1) * per_page) + return query_set, pagination_data + + +def create_message(current): + """ + Creates a message for the given channel. + + .. code-block:: python + + # request: + { + 'view':'_zops_create_message', + 'message': { + 'channel': key, # of channel + 'body': string, # message text., + 'type': int, # zengine.messaging.model.MSG_TYPES, + 'attachments': [{ + 'description': string, # can be blank, + 'name': string, # file name with extension, + 'content': string, # base64 encoded file content + }]} + # response: + { + 'status': 'Created', + 'code': 201, + 'msg_key': key, # key of the message object, + } + + """ + msg = current.input['message'] + msg_obj = Channel.add_message(msg['channel'], body=msg['body'], typ=msg['type'], + sender=current.user, + title=msg['title'], receiver=msg['receiver'] or None) + current.output = { + 'msg_key': msg_obj.key, + 'status': 'Created', + 'code': 201 + } + if 'attachment' in msg: + for atch in msg['attachments']: + typ = current._dedect_file_type(atch['name'], atch['content']) + Attachment(channel_id=msg['channel'], msg=msg_obj, name=atch['name'], + file=atch['content'], description=atch['description'], typ=typ).save() + + +def show_channel(current, waited=False): + """ + Initial display of channel content. + Returns channel description, members, no of members, last 20 messages etc. + + + .. code-block:: python + + # request: + { + 'view':'_zops_show_channel', + 'key': key, + } + + # response: + { + 'channel_key': key, + 'description': string, + 'no_of_members': int, + 'member_list': [ + {'name': string, + 'is_online': bool, + 'avatar_url': string, + }], + 'name': string, + 'last_messages': [MSG_DICT] + 'status': 'OK', + 'code': 200 + } + """ + ch = Channel(current).objects.get(current.input['key']) + sbs = ch.get_subscription_for_user(current.user_id) + current.output = {'key': current.input['key'], + 'description': ch.description, + 'name': sbs.name, + 'actions': sbs.get_actions(), + 'avatar_url': ch.get_avatar(current.user), + 'no_of_members': len(ch.subscriber_set), + 'member_list': [{'name': sb.user.full_name, + 'is_online': sb.user.is_online(), + 'avatar_url': sb.user.get_avatar_url() + } for sb in ch.subscriber_set.objects.filter()], + 'last_messages': [], + 'status': 'OK', + 'code': 200 + } + for msg in ch.get_last_messages(): + current.output['last_messages'].insert(0, msg.serialize(current.user)) + + +def channel_history(current): + """ + Get old messages for a channel. 20 messages per request + + .. code-block:: python + + # request: + { + 'view':'_zops_channel_history, + 'channel_key': key, + 'timestamp': datetime, # timestamp data of oldest shown message + } + + # response: + { + 'messages': [MSG_DICT, ], + 'status': 'OK', + 'code': 200 + } + """ + current.output = { + 'status': 'OK', + 'code': 201, + 'messages': [] + } + + for msg in Message.objects.filter(channel_id=current.input['channel_key'], + updated_at__lt=current.input['timestamp'])[:20]: + current.output['messages'].insert(0, msg.serialize(current.user)) + + +def report_last_seen_message(current): + """ + Push timestamp of latest message of an ACTIVE channel. + + This view should be called with timestamp of latest message; + - When user opens (clicks on) a channel. + - Periodically (eg: setInterval for 15secs) while user staying in a channel. + + + .. code-block:: python + + # request: + { + 'view':'_zops_last_seen_msg', + 'channel_key': key, + 'key': key, + 'timestamp': datetime, + } + + # response: + { + 'status': 'OK', + 'code': 200, + } + """ + sbs = Subscriber(current).objects.filter(channel_id=current.input['channel_key'], + user_id=current.user_id)[0] + sbs.last_seen_msg_time=current.input['timestamp'] + sbs.save() + current.output = { + 'status': 'OK', + 'code': 200} + +def list_channels(current): + """ + List channel memberships of current user + + + .. code-block:: python + + # request: + { + 'view':'_zops_list_channels', + } + + # response: + { + 'channels': [ + {'name': string, # name of channel + 'key': key, # key of channel + 'unread': int, # unread message count + 'type': int, # channel type, + # 15: public channels (chat room/broadcast channel distinction + comes from "read_only" flag) + # 10: direct channels + # 5: one and only private channel which is "Notifications" + 'read_only': boolean, + # true if this is a read-only subscription to a broadcast channel + # false if it's a public chat room + + 'actions':[('action name', 'view name'),] + },] + } + """ + current.output = { + 'status': 'OK', + 'code': 200, + 'channels': []} + for sbs in current.user.subscriptions.objects.filter(is_visible=True): + try: + current.output['channels'].append({'name': sbs.name, + 'key': sbs.channel.key, + 'type': sbs.channel.typ, + 'read_only': sbs.read_only, + 'is_online': sbs.is_online(), + 'actions': sbs.get_actions(), + 'unread': sbs.unread_count()}) + except ObjectDoesNotExist: + sbs.delete() + +def unread_count(current): + """ + Number of unread messages for current user + + + .. code-block:: python + + # request: + { + 'view':'_zops_unread_count', + } + + # response: + { + 'status': 'OK', + 'code': 200, + 'notifications': int, + 'messages': int, + } + """ + unread_ntf = 0 + unread_msg = 0 + for sbs in current.user.subscriptions.objects.filter(is_visible=True): + try: + if sbs.channel.key == current.user.prv_exchange: + unread_ntf += sbs.unread_count() + else: + unread_msg += sbs.unread_count() + except ObjectDoesNotExist: + sbs.delete() + current.output = { + 'status': 'OK', + 'code': 200, + 'notifications': unread_ntf, + 'messages': unread_msg + } + + +def create_channel(current): + """ + Create a public channel. Can be a broadcast channel or normal chat room. + + Chat room and broadcast distinction will be made at user subscription phase. + + .. code-block:: python + + # request: + { + 'view':'_zops_create_channel', + 'name': string, + 'description': string, + } + + # response: + { + 'description': string, + 'name': string, + 'no_of_members': int, + 'member_list': [ + {'name': string, + 'is_online': bool, + 'avatar_url': string, + }], + 'last_messages': [MSG_DICT] + 'status': 'Created', + 'code': 201, + 'key': key, # of just created channel + } + """ + channel = Channel(name=current.input['name'], + description=current.input['description'], + owner=current.user, + typ=15).save() + with BlockSave(Subscriber): + Subscriber.objects.get_or_create(user=channel.owner, + channel=channel, + can_manage=True, + can_leave=False) + current.input['key'] = channel.key + show_channel(current) + current.output.update({ + 'status': 'Created', + 'code': 201 + }) + + +def add_members(current): + """ + Subscribe member(s) to a channel + + .. code-block:: python + + # request: + { + 'view':'_zops_add_members', + 'channel_key': key, + 'read_only': boolean, # true if this is a Broadcast channel, + # false if it's a normal chat room + 'members': [key, key], + } + + # response: + { + 'existing': [key,], # existing members + 'newly_added': [key,], # newly added members + 'status': 'Created', + 'code': 201 + } + """ + newly_added, existing = [], [] + read_only = current.input['read_only'] + for member_key in current.input['members']: + sb, new = Subscriber(current).objects.get_or_create(user_id=member_key, + read_only=read_only, + channel_id=current.input['channel_key']) + if new: + newly_added.append(member_key) + else: + existing.append(member_key) + + current.output = { + 'existing': existing, + 'newly_added': newly_added, + 'status': 'OK', + 'code': 201 + } + + +def add_unit_to_channel(current): + """ + Subscribe users of a given unit to given channel + + JSON API: + .. code-block:: python + + # request: + { + 'view':'_zops_add_unit_to_channel', + 'unit_key': key, + 'channel_key': key, + 'read_only': boolean, # true if this is a Broadcast channel, + # false if it's a normal chat room + + } + + # response: + { + 'existing': [key,], # existing members + 'newly_added': [key,], # newly added members + 'status': 'Created', + 'code': 201 + } + """ + read_only = current.input['read_only'] + newly_added, existing = [], [] + for member_key in UnitModel.get_user_keys(current, current.input['unit_key']): + sb, new = Subscriber(current).objects.get_or_create(user_id=member_key, + read_only=read_only, + channel_id=current.input['channel_key']) + if new: + newly_added.append(member_key) + else: + existing.append(member_key) + + current.output = { + 'existing': existing, + 'newly_added': newly_added, + 'status': 'OK', + 'code': 201 + } + + +def search_user(current): + """ + Search users for adding to a public room + or creating one to one direct messaging + + .. code-block:: python + + # request: + { + 'view':'_zops_search_user', + 'query': string, + } + + # response: + { + 'results': [('full_name', 'key', 'avatar_url'), ], + 'status': 'OK', + 'code': 200 + } + """ + current.output = { + 'results': [], + 'status': 'OK', + 'code': 201 + } + qs = UserModel(current).objects.exclude(key=current.user_id).search_on( + *settings.MESSAGING_USER_SEARCH_FIELDS, + contains=current.input['query']) + # FIXME: somehow exclude(key=current.user_id) not working with search_on() + + for user in qs: + if user.key != current.user_id: + current.output['results'].append((user.full_name, user.key, user.get_avatar_url())) + + +def search_unit(current): + """ + Search on units for subscribing it's users to a channel + + .. code-block:: python + + # request: + { + 'view':'_zops_search_unit', + 'query': string, + } + + # response: + { + 'results': [('name', 'key'), ], + 'status': 'OK', + 'code': 200 + } + """ + current.output = { + 'results': [], + 'status': 'OK', + 'code': 201 + } + for user in UnitModel(current).objects.search_on(*settings.MESSAGING_UNIT_SEARCH_FIELDS, + contains=current.input['query']): + current.output['results'].append((user.name, user.key)) + + +def create_direct_channel(current): + """ + Create a One-To-One channel between current and selected user. + + + .. code-block:: python + + # request: + { + 'view':'_zops_create_direct_channel', + 'user_key': key, + } + + # response: + { + 'description': string, + 'no_of_members': int, + 'member_list': [ + {'name': string, + 'is_online': bool, + 'avatar_url': string, + }], + 'last_messages': [MSG_DICT] + 'status': 'Created', + 'code': 201, + 'channel_key': key, # of just created channel + 'name': string, # name of subscribed channel + } + """ + channel, sub_name = Channel.get_or_create_direct_channel(current.user_id, + current.input['user_key']) + current.input['key'] = channel.key + show_channel(current) + current.output.update({ + 'status': 'Created', + 'code': 201 + }) + + +def find_message(current): + """ + Search in messages. If "channel_key" given, search will be limited to that channel, + otherwise search will be performed on all of user's subscribed channels. + + .. code-block:: python + + # request: + { + 'view':'_zops_search_unit, + 'channel_key': key, + 'query': string, + 'page': int, + } + + # response: + { + 'results': [MSG_DICT, ], + 'pagination': { + 'page': int, # current page + 'total_pages': int, + 'total_objects': int, + 'per_page': int, # object per page + }, + 'status': 'OK', + 'code': 200 + } + """ + current.output = { + 'results': [], + 'status': 'OK', + 'code': 201 + } + query_set = Message(current).objects.search_on(['msg_title', 'body', 'url'], + contains=current.input['query']) + if current.input['channel_key']: + query_set = query_set.filter(channel_id=current.input['channel_key']) + else: + subscribed_channels = Subscriber.objects.filter(user_id=current.user_id).values_list( + "channel_id", flatten=True) + query_set = query_set.filter(channel_id__in=subscribed_channels) + + query_set, pagination_data = _paginate(current_page=current.input['page'], query_set=query_set) + current.output['pagination'] = pagination_data + for msg in query_set: + current.output['results'].append(msg.serialize(current.user)) + + +def delete_channel(current): + """ + Delete a channel + + .. code-block:: python + + # request: + { + 'view':'_zops_delete_channel, + 'channel_key': key, + } + + # response: + { + 'status': 'OK', + 'code': 200 + } + """ + ch = Channel(current).objects.get(owner_id=current.user_id, + key=current.input['channel_key']) + for sbs in ch.subscriber_set.objects.filter(): + sbs.delete() + for msg in ch.message_set.objects.filter(): + msg.delete() + try: + ch.delete() + except: + log.exception("fix this!!!!!") + current.output = {'status': 'Deleted', 'code': 200} + + +def edit_channel(current): + """ + Update channel name or description + + .. code-block:: python + + # request: + { + 'view':'_zops_edit_channel, + 'channel_key': key, + 'name': string, + 'description': string, + } + + # response: + { + 'status': 'OK', + 'code': 200 + } + """ + ch = Channel(current).objects.get(owner_id=current.user_id, + key=current.input['channel_key']) + ch.name = current.input['name'] + ch.description = current.input['description'] + ch.save() + for sbs in ch.subscriber_set.objects.filter(): + sbs.name = ch.name + sbs.save() + current.output = {'status': 'OK', 'code': 200} + + +def pin_channel(current): + """ + Pin a channel to top of channel list + + .. code-block:: python + + # request: + { + 'view':'_zops_pin_channel, + 'channel_key': key, + } + + # response: + { + 'status': 'OK', + 'code': 200 + } + """ + try: + Subscriber(current).objects.filter(user_id=current.user_id, + channel_id=current.input['channel_key']).update( + pinned=True) + current.output = {'status': 'OK', 'code': 200} + except ObjectDoesNotExist: + raise HTTPError(404, "") + + +def delete_message(current): + """ + Delete a message + + .. code-block:: python + + # request: + { + 'view':'_zops_delete_message, + 'message_key': key, + } + + # response: + { + 'key': key, + 'status': 'OK', + 'code': 200 + } + """ + try: + Message(current).objects.get(sender_id=current.user_id, + key=current.input['key']).delete() + current.output = {'status': 'Deleted', 'code': 200, 'key': current.input['key']} + except ObjectDoesNotExist: + raise HTTPError(404, "") + + +def edit_message(current): + """ + Edit a message a user own. + + .. code-block:: python + + # request: + { + 'view':'_zops_edit_message', + 'message': { + 'body': string, # message text + 'key': key + } + } + # response: + { + 'status': string, # 'OK' for success + 'code': int, # 200 for success + } + + """ + current.output = {'status': 'OK', 'code': 200} + msg = current.input['message'] + try: + msg = Message(current).objects.get(sender_id=current.user_id, key=msg['key']) + msg.body = msg['body'] + msg.save() + except ObjectDoesNotExist: + raise HTTPError(404, "") + + +def flag_message(current): + """ + Flag inappropriate messages + + .. code-block:: python + + # request: + { + 'view':'_zops_flag_message', + 'message_key': key, + } + # response: + { + ' + 'status': 'Created', + 'code': 201, + } + + """ + current.output = {'status': 'Created', 'code': 201} + FlaggedMessage.objects.get_or_create(user_id=current.user_id, + message_id=current.input['key']) + + +def unflag_message(current): + """ + remove flag of a message + + .. code-block:: python + + # request: + { + 'view':'_zops_flag_message', + 'key': key, + } + # response: + { + ' + 'status': 'OK', + 'code': 200, + } + + """ + current.output = {'status': 'OK', 'code': 200} + + FlaggedMessage(current).objects.filter(user_id=current.user_id, + message_id=current.input['key']).delete() + + +def get_message_actions(current): + """ + Returns applicable actions for current user for given message key + + .. code-block:: python + + # request: + { + 'view':'_zops_get_message_actions', + 'key': key, + } + # response: + { + 'actions':[('name_string', 'cmd_string'),] + 'status': string, # 'OK' for success + 'code': int, # 200 for success + } + + """ + current.output = {'status': 'OK', + 'code': 200, + 'actions': Message.objects.get( + current.input['key']).get_actions_for(current.user)} + + +def add_to_favorites(current): + """ + Favorite a message + + .. code-block:: python + + # request: + { + 'view':'_zops_add_to_favorites, + 'key': key, + } + + # response: + { + 'status': 'Created', + 'code': 201 + 'favorite_key': key + } + + """ + msg = Message.objects.get(current.input['message_key']) + current.output = {'status': 'Created', 'code': 201} + fav, new = Favorite.objects.get_or_create(user_id=current.user_id, message=msg) + current.output['favorite_key'] = fav.key + + +def remove_from_favorites(current): + """ + Remove a message from favorites + + .. code-block:: python + + # request: + { + 'view':'_zops_remove_from_favorites, + 'key': key, + } + + # response: + { + 'status': 'OK', + 'code': 200 + } + + """ + try: + current.output = {'status': 'OK', 'code': 200} + Favorite(current).objects.get(user_id=current.user_id, + key=current.input['key']).delete() + except ObjectDoesNotExist: + raise HTTPError(404, "") + + +def list_favorites(current): + """ + List user's favorites. If "channel_key" given, will return favorites belong to that channel. + + .. code-block:: python + + # request: + { + 'view':'_zops_list_favorites, + 'channel_key': key, + } + + # response: + { + 'status': 'OK', + 'code': 200 + 'favorites':[{'key': key, + 'channel_key': key, + 'message_key': key, + 'message_summary': string, # max 60 char + 'channel_name': string, + },] + } + + """ + current.output = {'status': 'OK', 'code': 200, 'favorites': []} + query_set = Favorite(current).objects.filter(user_id=current.user_id) + if current.input['channel_key']: + query_set = query_set.filter(channel_id=current.input['channel_key']) + current.output['favorites'] = [{ + 'key': fav.key, + 'channel_key': fav.channel.key, + 'message_key': fav.message.key, + 'message_summary': fav.summary, + 'channel_name': fav.channel_name + } for fav in query_set] diff --git a/zengine/models/__init__.py b/zengine/models/__init__.py index 9fd91777..6ac86ac8 100644 --- a/zengine/models/__init__.py +++ b/zengine/models/__init__.py @@ -7,4 +7,4 @@ # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. from .auth import * -from ..notifications.model import * +from ..messaging.model import * diff --git a/zengine/models/auth.py b/zengine/models/auth.py index 6c7d489b..2df93ed7 100644 --- a/zengine/models/auth.py +++ b/zengine/models/auth.py @@ -7,8 +7,37 @@ # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. -from pyoko import Model, field, ListNode +from pyoko import Model, field, ListNode, LinkProxy from passlib.hash import pbkdf2_sha512 +from zengine.messaging.lib import BaseUser + + +class Unit(Model): + """Unit model + + Can be used to group users according to their physical or organizational position + + """ + name = field.String("İsim", index=True) + parent = LinkProxy('Unit', verbose_name='Parent Unit', reverse_name='sub_units') + + class Meta: + verbose_name = "Unit" + verbose_name_plural = "Units" + search_fields = ['name'] + list_fields = ['name',] + + def __unicode__(self): + return '%s' % self.name + + + @classmethod + def get_user_keys(cls, unit_key): + stack = User.objects.filter(unit_id=unit_key).values_list('key', flatten=True) + for unit_key in cls.objects.filter(parent_id=unit_key).values_list('key', flatten=True): + stack.extend(cls.get_user_keys(unit_key)) + return stack + class Permission(Model): @@ -41,48 +70,26 @@ def get_permitted_roles(self): return [rset.role for rset in self.role_set] -class User(Model): +class User(Model, BaseUser): """ Basic User model """ username = field.String("Username", index=True) password = field.String("Password") superuser = field.Boolean("Super user", default=False) + avatar = field.File("Avatar", random_name=True, required=False) + unit = Unit() class Meta: """ meta class """ list_fields = ['username', 'superuser'] - def __unicode__(self): - return "User %s" % self.username + def pre_save(self): + self.encrypt_password() - def __repr__(self): - return "User_%s" % self.key - - def set_password(self, raw_password): - """ - Encrypts user password. - - Args: - raw_password: Clean password string. - - """ - self.password = pbkdf2_sha512.encrypt(raw_password, - rounds=10000, - salt_size=10) - - def check_password(self, raw_password): - """ - Checks given clean password against stored encrtyped password. - - Args: - raw_password: Clean password. - - Returns: - Boolean. True if given password match. - """ - return pbkdf2_sha512.verify(raw_password, self.password) + def post_creation(self): + self.prepare_channels() def get_permissions(self): """ @@ -94,23 +101,6 @@ def get_permissions(self): users_primary_role = self.role_set[0].role return users_primary_role.get_permissions() - def get_role(self, role_id): - """ - Gets the first role of the user with given key. - - Args: - role_id: Key of the Role object. - - Returns: - :class:`Role` object - """ - return self.role_set.node_dict[role_id] - - def send_message(self, title, message, sender=None): - from zengine.notifications import Notify - Notify(self.key).set_message(title, message, typ=Notify.Message, sender=sender) - - class Role(Model): """ This model binds group of Permissions with a certain User. diff --git a/zengine/notifications/model.py b/zengine/notifications/model.py deleted file mode 100644 index 83bb3337..00000000 --- a/zengine/notifications/model.py +++ /dev/null @@ -1,49 +0,0 @@ -# -*- coding: utf-8 -*- -""" -""" - -# Copyright (C) 2015 ZetaOps Inc. -# -# This file is licensed under the GNU General Public License v3 -# (GPLv3). See LICENSE.txt for details. -from pyoko import Model, field, ListNode -from pyoko.conf import settings -from pyoko.lib.utils import get_object_from_path - -UserModel = get_object_from_path(settings.USER_MODEL) - -NOTIFY_MSG_TYPES = ( - (1, "Info"), - (11, "Error"), - (111, "Success"), - (2, "User Message"), - (3, "Broadcast Message") -) - - -NOTIFICATION_STATUS = ( - (1, "Created"), - (11, "Transmitted"), - (22, "Seen"), - (33, "Read"), - (44, "Archived"), - -) - -class NotificationMessage(Model): - """ - Permission model - """ - - typ = field.Integer("Message Type", choices=NOTIFY_MSG_TYPES) - status = field.Integer("Status", choices=NOTIFICATION_STATUS) - msg_title = field.String("Title") - body = field.String("Body") - url = field.String("URL") - sender = UserModel(reverse_name='sent_messages') - receiver = UserModel(reverse_name='received_messages') - - def __unicode__(self): - content = self.msg_title or self.body - return "%s%s" % (content[:30], '...' if len(content) > 30 else '') - diff --git a/zengine/receivers.py b/zengine/receivers.py index 965b2fdf..db3262bb 100644 --- a/zengine/receivers.py +++ b/zengine/receivers.py @@ -11,14 +11,10 @@ 'set_password', ] - - - from pyoko.conf import settings from zengine.dispatch.dispatcher import receiver from zengine.signals import lane_user_change, crud_post_save - DEFAULT_LANE_CHANGE_INVITE_MSG = { 'title': settings.MESSAGES['lane_change_invite_title'], 'body': settings.MESSAGES['lane_change_invite_body'], @@ -37,7 +33,7 @@ def send_message_for_lane_change(sender, *args, **kwargs): from zengine.lib.catalog_data import gettxt as _ from pyoko.lib.utils import get_object_from_path UserModel = get_object_from_path(settings.USER_MODEL) - from zengine.notifications import Notify + from zengine.messaging import Notify current = kwargs['current'] old_lane = kwargs['old_lane'] owners = kwargs['possible_owners'] @@ -48,11 +44,11 @@ def send_message_for_lane_change(sender, *args, **kwargs): for recipient in owners: if not isinstance(recipient, UserModel): recipient = recipient.get_user() - Notify(recipient.key).set_message(title=_(msg_context['title']), - msg=_(msg_context['body']), - typ=Notify.TaskInfo, - url=current.get_wf_link() - ) + recipient.send_notification(title=_(msg_context['title']), + message=_(msg_context['body']), + typ=Notify.TaskInfo, + url=current.get_wf_link() + ) # encrypting password on save diff --git a/zengine/settings.py b/zengine/settings.py index 774d6158..5796f08e 100644 --- a/zengine/settings.py +++ b/zengine/settings.py @@ -36,6 +36,9 @@ #: Role model ROLE_MODEL = 'zengine.models.Role' +#: Unit model +UNIT_MODEL = 'zengine.models.Unit' + MQ_HOST = os.getenv('MQ_HOST', 'localhost') MQ_PORT = int(os.getenv('MQ_PORT', '5672')) MQ_USER = os.getenv('MQ_USER', 'guest') @@ -111,10 +114,37 @@ #: View URL list for non-workflow views. #: -#: ('falcon URI template', 'python path to view method/class'), +#: ('URI template', 'python path to view method/class'), VIEW_URLS = { 'dashboard': 'zengine.views.menu.Menu', + 'sessid_to_userid': 'zengine.views.system.sessid_to_userid', + 'mark_offline_user': 'zengine.views.system.mark_offline_user', 'ping': 'zengine.views.dev_utils.Ping', + '_zops_create_message': 'zengine.messaging.views.create_message', + '_zops_show_channel': 'zengine.messaging.views.show_channel', + '_zops_list_channels': 'zengine.messaging.views.list_channels', + '_zops_channel_history': 'zengine.messaging.views.channel_history', + '_zops_report_last_seen_message': 'zengine.messaging.views.report_last_seen_message', + '_zops_create_channel': 'zengine.messaging.views.create_channel', + '_zops_add_members': 'zengine.messaging.views.add_members', + '_zops_add_unit_to_channel': 'zengine.messaging.views.add_unit_to_channel', + '_zops_search_user': 'zengine.messaging.views.search_user', + '_zops_search_unit': 'zengine.messaging.views.search_unit', + '_zops_create_direct_channel': 'zengine.messaging.views.create_direct_channel', + '_zops_find_message': 'zengine.messaging.views.find_message', + '_zops_delete_message': 'zengine.messaging.views.delete_message', + '_zops_edit_message': 'zengine.messaging.views.edit_message', + '_zops_get_message_actions': 'zengine.messaging.views.get_message_actions', + '_zops_add_to_favorites': 'zengine.messaging.views.add_to_favorites', + '_zops_remove_from_favorites': 'zengine.messaging.views.remove_from_favorites', + '_zops_list_favorites': 'zengine.messaging.views.list_favorites', + '_zops_edit_channel': 'zengine.messaging.views.edit_channel', + '_zops_delete_channel': 'zengine.messaging.views.delete_channel', + '_zops_pin_channel': 'zengine.messaging.views.pin_channel', + '_zops_flag_message': 'zengine.messaging.views.flag_message', + '_zops_unflag_message': 'zengine.messaging.views.unflag_message', + '_zops_unread_count': 'zengine.messaging.views.unread_count', + # '_zops_': 'zengine.messaging.views.', } if DEBUG: @@ -188,3 +218,11 @@ #: These models will not flushed when running tests TEST_FLUSHING_EXCLUDES = 'Permission,User,Role' + + +#: User search method of messaging subsystem will work on these fields +MESSAGING_USER_SEARCH_FIELDS = ['username', 'name', 'surname'] + +#: Unit search method of messaging subsystem will work on these fields +MESSAGING_UNIT_SEARCH_FIELDS = ['name',] + diff --git a/zengine/tornado_server/get_logger.py b/zengine/tornado_server/get_logger.py index 8fa1aa61..a0a84036 100644 --- a/zengine/tornado_server/get_logger.py +++ b/zengine/tornado_server/get_logger.py @@ -23,8 +23,13 @@ def get_logger(settings): # ch.setLevel(logging.DEBUG) # create formatter - formatter = logging.Formatter( - '%(asctime)s - %(process)d - %(pathname)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s') + if settings.DEBUG: + # make log messages more readable at development + format_str = '%(asctime)s - %(filename)s:%(lineno)d %(module)s.%(funcName)s \n> %(message)s\n\n' + else: + format_str = '%(asctime)s - %(process)d - %(pathname)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s' + + formatter = logging.Formatter(format_str) # add formatter to ch ch.setFormatter(formatter) diff --git a/zengine/tornado_server/server.py b/zengine/tornado_server/server.py index 671d535f..9753f57b 100644 --- a/zengine/tornado_server/server.py +++ b/zengine/tornado_server/server.py @@ -15,7 +15,7 @@ from tornado.httpclient import HTTPError sys.path.insert(0, os.path.realpath(os.path.dirname(__file__))) -from queue_manager import QueueManager, BlockingConnectionForHTTP, log +from ws_to_queue import QueueManager, BlockingConnectionForHTTP, log COOKIE_NAME = 'zopsess' DEBUG = os.getenv("DEBUG", False) @@ -137,7 +137,7 @@ def post(self, view_name): (r'/(\w+)', HttpHandler), ] -app = web.Application(URL_CONFS, debug=DEBUG) +app = web.Application(URL_CONFS, debug=DEBUG, autoreload=False) def runserver(host=None, port=None): diff --git a/zengine/tornado_server/queue_manager.py b/zengine/tornado_server/ws_to_queue.py similarity index 63% rename from zengine/tornado_server/queue_manager.py rename to zengine/tornado_server/ws_to_queue.py index 982e3fce..ec7d9aa3 100644 --- a/zengine/tornado_server/queue_manager.py +++ b/zengine/tornado_server/ws_to_queue.py @@ -9,8 +9,8 @@ import json from uuid import uuid4 -import os - +import os, sys +sys.sessid_to_userid = {} import pika import time from pika.adapters import TornadoConnection, BaseConnection @@ -30,6 +30,7 @@ 'MQ_PORT': int(os.environ.get('MQ_PORT', '5672')), 'MQ_USER': os.environ.get('MQ_USER', 'guest'), 'MQ_PASS': os.environ.get('MQ_PASS', 'guest'), + 'DEBUG': os.environ.get('DEBUG', False), 'MQ_VHOST': os.environ.get('MQ_VHOST', '/'), }) log = get_logger(settings) @@ -51,7 +52,7 @@ class BlockingConnectionForHTTP(object): - REPLY_TIMEOUT = 10 # sec + REPLY_TIMEOUT = 5 # sec def __init__(self): self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) @@ -66,22 +67,32 @@ def create_channel(self): def _send_message(self, sess_id, input_data): log.info("sending data for %s" % sess_id) - self.input_channel.basic_publish(exchange='tornado_input', + self.input_channel.basic_publish(exchange='input_exc', routing_key=sess_id, body=json_encode(input_data)) + def _store_user_id(self, sess_id, body): + log.debug("SET SESSUSERS: %s" % sys.sessid_to_userid) + sys.sessid_to_userid[sess_id[5:]] = json_decode(body)['user_id'].lower() + def _wait_for_reply(self, sess_id, input_data): channel = self.create_channel() - channel.queue_declare(queue=sess_id, auto_delete=True) + channel.queue_declare(queue=sess_id, + arguments={'x-expires': 4000} + # auto_delete=True + ) timeout_start = time.time() while 1: method_frame, header_frame, body = channel.basic_get(sess_id) + log.debug("\n%s\n%s\n%s\n%s" % (sess_id, method_frame, header_frame, body)) if method_frame: reply = json_decode(body) if 'callbackID' in reply and reply['callbackID'] == input_data['callbackID']: channel.basic_ack(method_frame.delivery_tag) channel.close() log.info('Returned view message for %s: %s' % (sess_id, body)) + if 'upgrade' in body: + self._store_user_id(sess_id, body) return body else: if time.time() - json_decode(body)['reply_timestamp'] > self.REPLY_TIMEOUT: @@ -90,7 +101,7 @@ def _wait_for_reply(self, sess_id, input_data): if time.time() - timeout_start > self.REPLY_TIMEOUT: break else: - time.sleep(0.4) + time.sleep(1) log.info('No message returned for %s' % sess_id) channel.close() @@ -113,7 +124,7 @@ class QueueManager(object): """ INPUT_QUEUE_NAME = 'in_queue' - def __init__(self, io_loop): + def __init__(self, io_loop=None): log.info('PikaClient: __init__') self.io_loop = io_loop self.connected = False @@ -123,7 +134,7 @@ def __init__(self, io_loop): self.out_channels = {} self.out_channel = None self.websockets = {} - self.connect() + # self.connect() def connect(self): """ @@ -137,6 +148,8 @@ def connect(self): self.connecting = True self.connection = TornadoConnection(NON_BLOCKING_MQ_PARAMS, + stop_ioloop_on_close=False, + custom_ioloop=self.io_loop, on_open_callback=self.on_connected) def on_connected(self, connection): @@ -149,10 +162,9 @@ def on_connected(self, connection): """ log.info('PikaClient: connected to RabbitMQ') self.connected = True - self.connection = connection - self.in_channel = self.connection.channel(self.on_conn_open) + self.in_channel = self.connection.channel(self.on_channel_open) - def on_conn_open(self, channel): + def on_channel_open(self, channel): """ Input channel creation callback Queue declaration done here @@ -160,7 +172,7 @@ def on_conn_open(self, channel): Args: channel: input channel """ - self.in_channel.exchange_declare(exchange='tornado_input', type='topic') + self.in_channel.exchange_declare(exchange='input_exc', type='topic', durable=True) channel.queue_declare(callback=self.on_input_queue_declare, queue=self.INPUT_QUEUE_NAME) def on_input_queue_declare(self, queue): @@ -172,9 +184,15 @@ def on_input_queue_declare(self, queue): queue: input queue """ self.in_channel.queue_bind(callback=None, - exchange='tornado_input', + exchange='input_exc', queue=self.INPUT_QUEUE_NAME, routing_key="#") + def ask_for_user_id(self, sess_id): + log.debug(sess_id) + # TODO: add remote ip + self.publish_incoming_message(dict(_zops_remote_ip='', + data={'view': 'sessid_to_userid'}), sess_id) + def register_websocket(self, sess_id, ws): """ @@ -183,45 +201,80 @@ def register_websocket(self, sess_id, ws): sess_id: ws: """ - self.websockets[sess_id] = ws - channel = self.create_out_channel(sess_id) + log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid) + try: + user_id = sys.sessid_to_userid[sess_id] + self.websockets[user_id] = ws + except KeyError: + self.ask_for_user_id(sess_id) + self.websockets[sess_id] = ws + user_id = sess_id + self.create_out_channel(sess_id, user_id) + + def inform_disconnection(self, sess_id): + self.in_channel.basic_publish(exchange='input_exc', + routing_key=sess_id, + body=json_encode(dict(data={ + 'view': 'mark_offline_user', + 'sess_id': sess_id,}, + _zops_remote_ip=''))) def unregister_websocket(self, sess_id): + user_id = sys.sessid_to_userid.get(sess_id, None) try: - del self.websockets[sess_id] + self.inform_disconnection(sess_id) + del self.websockets[user_id] except KeyError: - log.exception("Non-existent websocket") + log.exception("Non-existent websocket for %s" % user_id) if sess_id in self.out_channels: try: self.out_channels[sess_id].close() except ChannelClosed: log.exception("Pika client (out) channel already closed") - def create_out_channel(self, sess_id): + def create_out_channel(self, sess_id, user_id): def _on_output_channel_creation(channel): def _on_output_queue_decleration(queue): - channel.basic_consume(self.on_message, queue=sess_id) - + channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=user_id) + log.debug("BIND QUEUE TO WS Q.%s on Ch.%s WS.%s" % (sess_id, + channel.consumer_tags[0], + user_id)) self.out_channels[sess_id] = channel + channel.queue_declare(callback=_on_output_queue_decleration, queue=sess_id, + arguments={'x-expires': 40000}, # auto_delete=True, # exclusive=True ) self.connection.channel(_on_output_channel_creation) + def redirect_incoming_message(self, sess_id, message, request): - message = message[:-1] + ',"_zops_remote_ip":"%s"}' % request.remote_ip - self.in_channel.basic_publish(exchange='tornado_input', + message = json_decode(message) + message['_zops_sess_id'] = sess_id + message['_zops_remote_ip'] = request.remote_ip + self.publish_incoming_message(message, sess_id) + + def publish_incoming_message(self, message, sess_id): + self.in_channel.basic_publish(exchange='input_exc', routing_key=sess_id, - body=message) + body=json_encode(message)) def on_message(self, channel, method, header, body): - sess_id = method.routing_key - if sess_id in self.websockets: - self.websockets[sess_id].write_message(body) - log.debug("WS RPLY for %s: %s" % (sess_id, body)) - channel.basic_ack(delivery_tag=method.delivery_tag) + user_id = method.consumer_tag + log.debug("WS RPLY for %s: %s" % (user_id, body)) + if user_id in self.websockets: + log.info("write msg to client") + self.websockets[user_id].write_message(body) + # channel.basic_ack(delivery_tag=method.delivery_tag) + elif 'sessid_to_userid' in body: + reply = json_decode(body) + sys.sessid_to_userid[reply['sess_id']] = reply['user_id'] + self.websockets[reply['user_id']] = self.websockets[reply['sess_id']] + del self.websockets[reply['sess_id']] + channel.basic_ack(delivery_tag=method.delivery_tag) + # else: # channel.basic_reject(delivery_tag=method.delivery_tag) diff --git a/zengine/views/auth.py b/zengine/views/auth.py index 1c8f0b2c..8f2bb95e 100644 --- a/zengine/views/auth.py +++ b/zengine/views/auth.py @@ -10,7 +10,7 @@ from pyoko import fields from zengine.forms.json_form import JsonForm from zengine.lib.cache import UserSessionID, KeepAlive -from zengine.notifications import Notify +from zengine.messaging import Notify from zengine.views.base import SimpleView @@ -30,9 +30,7 @@ def logout(current): Args: current: :attr:`~zengine.engine.WFCurrent` object. """ - user_id = current.session.get('user_id') - if user_id: - KeepAlive(user_id).delete() + current.user.is_online(False) current.session.delete() @@ -53,13 +51,27 @@ class Login(SimpleView): does the authentication at ``do`` stage. """ + def _user_is_online(self): + self.current.user.is_online(True) + + def _do_upgrade(self): + """ open websocket connection """ + self.current.output['cmd'] = 'upgrade' + self.current.output['user_id'] = self.current.user_id + self.current.user.is_online(True) + self.current.user.bind_private_channel(self.current.session.sess_id) + user_sess = UserSessionID(self.current.user_id) + user_sess.set(self.current.session.sess_id) + def do_view(self): """ Authenticate user with given credentials. + Connects user's queue and exchange """ + self.current.output['login_process'] = True self.current.task_data['login_successful'] = False if self.current.is_auth: - self.current.output['cmd'] = 'upgrade' + self._do_upgrade() else: try: auth_result = self.current.auth.authenticate( @@ -67,14 +79,13 @@ def do_view(self): self.current.input['password']) self.current.task_data['login_successful'] = auth_result if auth_result: - user_sess = UserSessionID(self.current.user_id) - old_sess_id = user_sess.get() - user_sess.set(self.current.session.sess_id) - notify = Notify(self.current.user_id) - notify.cache_to_queue() - if old_sess_id: - notify.old_to_new_queue(old_sess_id) - self.current.output['cmd'] = 'upgrade' + self._do_upgrade() + + # old_sess_id = user_sess.get() + # notify = Notify(self.current.user_id) + # notify.cache_to_queue() + # if old_sess_id: + # notify.old_to_new_queue(old_sess_id) except: raise self.current.log.exception("Wrong username or another error occurred") @@ -87,7 +98,8 @@ def show_view(self): """ Show :attr:`LoginForm` form. """ + self.current.output['login_process'] = True if self.current.is_auth: - self.current.output['cmd'] = 'upgrade' + self._do_upgrade() else: self.current.output['forms'] = LoginForm(current=self.current).serialize() diff --git a/zengine/views/catalog_datas.py b/zengine/views/catalog_datas.py index e5f9a45f..8d786c64 100644 --- a/zengine/views/catalog_datas.py +++ b/zengine/views/catalog_datas.py @@ -13,13 +13,14 @@ from zengine.views.crud import CrudView from zengine import forms from zengine.forms import fields -from falcon import HTTPBadRequest +from zengine.lib.exceptions import HTTPError class CatalogSelectForm(forms.JsonForm): """ Generates Form object for catalog select view. """ + class Meta: title = 'Choose Catalog Data' help_text = "Type and choose existing catalog data to edit. Or if you want to add one type the name of the catalog data you want to add." @@ -121,6 +122,6 @@ def save_catalog(self): self.output["notify"] = "catalog: %s successfully updated." % self.input[ "object_key"] except: - raise HTTPBadRequest("Form object could not be saved") + raise HTTPError(500, "Form object could not be saved") if self.input["cmd"] == 'cancel': self.output["notify"] = "catalog: %s canceled." % self.input["object_key"] diff --git a/zengine/views/system.py b/zengine/views/system.py new file mode 100644 index 00000000..a592faf2 --- /dev/null +++ b/zengine/views/system.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. + + +def sessid_to_userid(current): + current.output['user_id'] = current.user_id.lower() + current.output['sess_id'] = current.session.sess_id + current.user.bind_private_channel(current.session.sess_id) + current.output['sessid_to_userid'] = True + +def mark_offline_user(current): + current.user.is_online(False) diff --git a/zengine/wf_daemon.py b/zengine/wf_daemon.py index 7f320eab..5ad8efd7 100755 --- a/zengine/wf_daemon.py +++ b/zengine/wf_daemon.py @@ -36,7 +36,7 @@ class Worker(object): Workflow runner worker object """ INPUT_QUEUE_NAME = 'in_queue' - INPUT_EXCHANGE = 'tornado_input' + INPUT_EXCHANGE = 'input_exc' def __init__(self): self.connect() @@ -62,10 +62,13 @@ def connect(self): self.client_queue = ClientQueue() self.input_channel = self.connection.channel() - self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE, type='topic') + self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE, + type='topic', + durable=True) self.input_channel.queue_declare(queue=self.INPUT_QUEUE_NAME) self.input_channel.queue_bind(exchange=self.INPUT_EXCHANGE, queue=self.INPUT_QUEUE_NAME) - log.info("Bind to queue named '%s' queue with exchange '%s'" % (self.INPUT_QUEUE_NAME, self.INPUT_EXCHANGE)) + log.info("Bind to queue named '%s' queue with exchange '%s'" % (self.INPUT_QUEUE_NAME, + self.INPUT_EXCHANGE)) def run(self): """ @@ -119,7 +122,7 @@ def _handle_workflow(self, session, data, headers): def handle_message(self, ch, method, properties, body): """ this is a pika.basic_consumer callback - handles client inputs, runs appropriate workflows + handles client inputs, runs appropriate workflows and views Args: ch: amqp channel @@ -129,7 +132,7 @@ def handle_message(self, ch, method, properties, body): """ input = {} try: - sessid = method.routing_key + self.sessid = method.routing_key input = json_decode(body) data = input['data'] @@ -141,9 +144,9 @@ def handle_message(self, ch, method, properties, body): data['view'] = data['path'] else: data['wf'] = data['path'] - session = Session(sessid[5:]) # clip "HTTP_" prefix from sessid + session = Session(self.sessid[5:]) # clip "HTTP_" prefix from sessid else: - session = Session(sessid) + session = Session(self.sessid) headers = {'remote_ip': input['_zops_remote_ip']} @@ -163,20 +166,22 @@ def handle_message(self, ch, method, properties, body): raise err = traceback.format_exc() output = {'error': self._prepare_error_msg(err), "code": 500} - log.exception("Worker error occurred") + log.exception("Worker error occurred with messsage body:\n%s" % body) if 'callbackID' in input: output['callbackID'] = input['callbackID'] - log.info("OUTPUT for %s: %s" % (sessid, output)) + log.info("OUTPUT for %s: %s" % (self.sessid, output)) output['reply_timestamp'] = time() - self.send_output(output, sessid) - - def send_output(self, output, sessid): - self.client_queue.sess_id = sessid - self.client_queue.send_to_queue(output) - # self.output_channel.basic_publish(exchange='', - # routing_key=sessid, - # body=json.dumps(output)) - # except ConnectionClosed: + self.send_output(output) + + def send_output(self, output): + # TODO: This is ugly, we should separate login process + # log.debug("SEND_OUTPUT: %s" % output) + if self.current.user_id is None or 'login_process' in output: + self.client_queue.send_to_default_exchange(self.sessid, output) + else: + self.client_queue.send_to_prv_exchange(self.current.user_id, output) + + def run_workers(no_subprocess):