Skip to content

Commit

Permalink
operator-client bi-directional event system in WebSocket
Browse files Browse the repository at this point in the history
When sending transactions, the receiver client needs readltime
notifications because it needs to confirm that the history has been
received. And due to the clients may not always have public ip accessibility,
it's easier to set a persistent connection between the server and a client,
and let the server push notifications to clients.

Now, the child_chain_client connects to the server in its constructor,
and provides emit, on event system for easier communications.
Also, the client can register its identity, like address, after connecting
to the server, and the server can identify each client to send or relay
messages. For example:

Alice:
```
c = container.get_child_chain_client()
c.emit('join', 'alice')
c.on('relay', print)
```

Bob:
```
c = container.get_child_chain_client()
c.emit('join', 'bob')
c.emit('relay', {'dest': 'alice', 'message': 'hello'})
```

And due to a WebSocket framework was introduced on server side,
the server startup command was changed to
```
python -m plasma_cash.child_chain
```
the orignal `flask run` command will failed to start the WebSocket
service.
  • Loading branch information
zhouer authored and boolafish committed Jul 14, 2018
1 parent 2652408 commit 7301774
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ python deployment.py

Run child chain:
```
FLASK_APP=plasma_cash/child_chain flask run --port=8546
python -m plasma_cash.child_chain
```

Client:
Expand Down
3 changes: 3 additions & 0 deletions plasma_cash/child_chain/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from flask import Flask
from flask_sockets import Sockets


def create_app(is_unit_test=False):
app = Flask(__name__)
sockets = Sockets(app)

if not is_unit_test:
from plasma_cash.dependency_config import container
Expand All @@ -11,5 +13,6 @@ def create_app(is_unit_test=False):

from . import server
app.register_blueprint(server.bp)
sockets.register_blueprint(server.ws)

return app
22 changes: 22 additions & 0 deletions plasma_cash/child_chain/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import argparse

from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler

from plasma_cash.child_chain import create_app

if __name__ == '__main__':
app = create_app()

parser = argparse.ArgumentParser(prog=__package__)
parser.add_argument('--host', default='127.0.0.1', help='Hostname to listen on.')
parser.add_argument('--port', default='8546', help='Port number to listen on.')
args = parser.parse_args()

print('Listening on ' + args.host + ':' + args.port)
server = pywsgi.WSGIServer((args.host, int(args.port)), app, handler_class=WebSocketHandler)

try:
server.serve_forever()
except KeyboardInterrupt:
pass
25 changes: 25 additions & 0 deletions plasma_cash/child_chain/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import json

from flask import Blueprint, request
from geventwebsocket.exceptions import WebSocketError

from plasma_cash.dependency_config import container

bp = Blueprint('api', __name__)
ws = Blueprint('ws', __name__)

clients = {}


@bp.route('/block', methods=['GET'])
Expand Down Expand Up @@ -32,3 +38,22 @@ def submit_block():
def send_tx():
tx = request.form['tx']
return container.get_child_chain().apply_transaction(tx)


@ws.route('/')
def websocket(socket):
global clients
try:
while True:
data = json.loads(socket.receive())

if data['event'] == 'join':
clients[data['arg']] = socket
elif data['event'] == 'left':
del clients[data['arg']]
elif data['event'] == 'relay':
dest = clients[data['arg']['dest']]
msg = data['arg']['message']
dest.send(json.dumps({'event': 'relay', 'arg': msg}))
except (WebSocketError, TypeError) as e:
clients = {addr: sock for addr, sock in clients.items() if sock is not socket}
21 changes: 20 additions & 1 deletion plasma_cash/client/child_chain_client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
import json
import threading

import requests
import websocket

from .exceptions import RequestFailedException


class ChildChainClient(object):

def __init__(self, base_url, verify=False, timeout=5):
def __init__(self, base_url, ws_url, verify=False, timeout=5):
self.base_url = base_url
self.verify = verify
self.timeout = timeout

self.ws = websocket.WebSocketApp(ws_url, on_message=self.ws_on_message)
self.ws_callback = {}
threading.Thread(target=self.ws.run_forever).start()

def ws_on_message(self, ws, message):
data = json.loads(message)
if callable(self.ws_callback[data['event']]):
self.ws_callback[data['event']](data['arg'])

def emit(self, event, arg):
self.ws.send(json.dumps({'event': event, 'arg': arg}))

def on(self, event, callback):
self.ws_callback[event] = callback

def request(self, end_point, method, params=None, data=None, headers=None):
url = self.base_url + end_point

Expand Down
5 changes: 4 additions & 1 deletion plasma_cash/dependency_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ def get_child_chain(self):

def get_child_chain_client(self):
if self._child_chain_client is None:
self._child_chain_client = ChildChainClient('http://localhost:8546')
self._child_chain_client = ChildChainClient(
'http://localhost:8546',
'ws://localhost:8546'
)
return self._child_chain_client

def get_client(self):
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
py-solc==2.1.0
web3==4.2.0
Flask==1.0.2
Flask-Sockets==0.2.1
websocket-client==0.48.0
requests==2.18.4
rlp==0.6.0
ethereum==2.3.1
Expand Down

0 comments on commit 7301774

Please sign in to comment.