From 7301774eeb036e898f3893d6aab1111827d00b1e Mon Sep 17 00:00:00 2001 From: En-Jan Chou Date: Wed, 4 Jul 2018 10:08:24 +0000 Subject: [PATCH] operator-client bi-directional event system in WebSocket When sending transactions, the receiver client needs readltime notifications because it needs to confirm that the history has been received. And due to the clients may not always have public ip accessibility, it's easier to set a persistent connection between the server and a client, and let the server push notifications to clients. Now, the child_chain_client connects to the server in its constructor, and provides emit, on event system for easier communications. Also, the client can register its identity, like address, after connecting to the server, and the server can identify each client to send or relay messages. For example: Alice: ``` c = container.get_child_chain_client() c.emit('join', 'alice') c.on('relay', print) ``` Bob: ``` c = container.get_child_chain_client() c.emit('join', 'bob') c.emit('relay', {'dest': 'alice', 'message': 'hello'}) ``` And due to a WebSocket framework was introduced on server side, the server startup command was changed to ``` python -m plasma_cash.child_chain ``` the orignal `flask run` command will failed to start the WebSocket service. --- README.md | 2 +- plasma_cash/child_chain/__init__.py | 3 +++ plasma_cash/child_chain/__main__.py | 22 +++++++++++++++++++++ plasma_cash/child_chain/server.py | 25 ++++++++++++++++++++++++ plasma_cash/client/child_chain_client.py | 21 +++++++++++++++++++- plasma_cash/dependency_config.py | 5 ++++- requirements.txt | 2 ++ 7 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 plasma_cash/child_chain/__main__.py diff --git a/README.md b/README.md index 9472fa6..65992db 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ python deployment.py Run child chain: ``` -FLASK_APP=plasma_cash/child_chain flask run --port=8546 +python -m plasma_cash.child_chain ``` Client: diff --git a/plasma_cash/child_chain/__init__.py b/plasma_cash/child_chain/__init__.py index e05ade5..dd1c07b 100644 --- a/plasma_cash/child_chain/__init__.py +++ b/plasma_cash/child_chain/__init__.py @@ -1,8 +1,10 @@ from flask import Flask +from flask_sockets import Sockets def create_app(is_unit_test=False): app = Flask(__name__) + sockets = Sockets(app) if not is_unit_test: from plasma_cash.dependency_config import container @@ -11,5 +13,6 @@ def create_app(is_unit_test=False): from . import server app.register_blueprint(server.bp) + sockets.register_blueprint(server.ws) return app diff --git a/plasma_cash/child_chain/__main__.py b/plasma_cash/child_chain/__main__.py new file mode 100644 index 0000000..ffcb305 --- /dev/null +++ b/plasma_cash/child_chain/__main__.py @@ -0,0 +1,22 @@ +import argparse + +from gevent import pywsgi +from geventwebsocket.handler import WebSocketHandler + +from plasma_cash.child_chain import create_app + +if __name__ == '__main__': + app = create_app() + + parser = argparse.ArgumentParser(prog=__package__) + parser.add_argument('--host', default='127.0.0.1', help='Hostname to listen on.') + parser.add_argument('--port', default='8546', help='Port number to listen on.') + args = parser.parse_args() + + print('Listening on ' + args.host + ':' + args.port) + server = pywsgi.WSGIServer((args.host, int(args.port)), app, handler_class=WebSocketHandler) + + try: + server.serve_forever() + except KeyboardInterrupt: + pass diff --git a/plasma_cash/child_chain/server.py b/plasma_cash/child_chain/server.py index a57a9d9..d7b23e0 100644 --- a/plasma_cash/child_chain/server.py +++ b/plasma_cash/child_chain/server.py @@ -1,8 +1,14 @@ +import json + from flask import Blueprint, request +from geventwebsocket.exceptions import WebSocketError from plasma_cash.dependency_config import container bp = Blueprint('api', __name__) +ws = Blueprint('ws', __name__) + +clients = {} @bp.route('/block', methods=['GET']) @@ -32,3 +38,22 @@ def submit_block(): def send_tx(): tx = request.form['tx'] return container.get_child_chain().apply_transaction(tx) + + +@ws.route('/') +def websocket(socket): + global clients + try: + while True: + data = json.loads(socket.receive()) + + if data['event'] == 'join': + clients[data['arg']] = socket + elif data['event'] == 'left': + del clients[data['arg']] + elif data['event'] == 'relay': + dest = clients[data['arg']['dest']] + msg = data['arg']['message'] + dest.send(json.dumps({'event': 'relay', 'arg': msg})) + except (WebSocketError, TypeError) as e: + clients = {addr: sock for addr, sock in clients.items() if sock is not socket} diff --git a/plasma_cash/client/child_chain_client.py b/plasma_cash/client/child_chain_client.py index fb98c2d..2d99eff 100644 --- a/plasma_cash/client/child_chain_client.py +++ b/plasma_cash/client/child_chain_client.py @@ -1,15 +1,34 @@ +import json +import threading + import requests +import websocket from .exceptions import RequestFailedException class ChildChainClient(object): - def __init__(self, base_url, verify=False, timeout=5): + def __init__(self, base_url, ws_url, verify=False, timeout=5): self.base_url = base_url self.verify = verify self.timeout = timeout + self.ws = websocket.WebSocketApp(ws_url, on_message=self.ws_on_message) + self.ws_callback = {} + threading.Thread(target=self.ws.run_forever).start() + + def ws_on_message(self, ws, message): + data = json.loads(message) + if callable(self.ws_callback[data['event']]): + self.ws_callback[data['event']](data['arg']) + + def emit(self, event, arg): + self.ws.send(json.dumps({'event': event, 'arg': arg})) + + def on(self, event, callback): + self.ws_callback[event] = callback + def request(self, end_point, method, params=None, data=None, headers=None): url = self.base_url + end_point diff --git a/plasma_cash/dependency_config.py b/plasma_cash/dependency_config.py index 097b8e1..4f5de0e 100644 --- a/plasma_cash/dependency_config.py +++ b/plasma_cash/dependency_config.py @@ -48,7 +48,10 @@ def get_child_chain(self): def get_child_chain_client(self): if self._child_chain_client is None: - self._child_chain_client = ChildChainClient('http://localhost:8546') + self._child_chain_client = ChildChainClient( + 'http://localhost:8546', + 'ws://localhost:8546' + ) return self._child_chain_client def get_client(self): diff --git a/requirements.txt b/requirements.txt index ecdc8a8..8571ee7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ py-solc==2.1.0 web3==4.2.0 Flask==1.0.2 +Flask-Sockets==0.2.1 +websocket-client==0.48.0 requests==2.18.4 rlp==0.6.0 ethereum==2.3.1