Skip to content

Commit

Permalink
Merge branch 'master' into feature-scalyr-redis-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
john-g-g authored Sep 8, 2017
2 parents ca5b08d + 57cf7e0 commit fd458d0
Show file tree
Hide file tree
Showing 13 changed files with 402 additions and 63 deletions.
3 changes: 2 additions & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ indent_style = space
indent_size = 4
# isort config
force_single_line=True
known_third_party=aiocache,aiohttp,funcy,pygtrie,ujson,sanic,statsd,websockets,pytest,requests,jsonschema

known_third_party=aiocache,aiohttp,funcy,pygtrie,ujson,sanic,statsd,websockets,statsd,janus,aiojobs


# Tab indentation (no size specified)
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,4 @@ tests/failed_blocks/
/deploy/
/tests/async_deco.py
/tests/perf/*.json
service/*/supervise/
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ clean:
-rm -rf .mypy_cache
-rm -rf *.egg-info
-rm -rf *.log
-rm -rf service/*/supervise

build: clean clean-perf
docker build -t $(PROJECT_DOCKER_TAG) .
Expand All @@ -39,7 +40,7 @@ build-then-run: build
docker run $(PROJECT_DOCKER_RUN_ARGS) $(PROJECT_DOCKER_TAG)

run-local:
env LOG_LEVEL=DEBUG pipenv run python3 -m jussi.serve --server_workers=1
pipenv run python3 -m jussi.serve --server_workers=1

test:
pipenv run pytest
Expand Down
5 changes: 4 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ awscli = "*"
pytest-mock = "*"
pytest-asyncio = "*"
asynctest = "*"
yapf = "*"

[packages]
ujson = "*"
Expand All @@ -54,7 +55,9 @@ aiojobs = "*"
jsonrpcserver = "*"
cytoolz = "*"
janus = "*"
yapf = "*"
python-json-logger = "*"
progress = "*"


[requires]
python_version = "3.6"
Expand Down
38 changes: 17 additions & 21 deletions jussi/listeners.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# -*- coding: utf-8 -*-
import asyncio
import logging
import os

import aiocache
import aiohttp
import aiojobs
import janus
import statsd
import ujson
from websockets import connect as websockets_connect

import aiojobs
import janus
import jussi.cache
import jussi.jobs
import jussi.jsonrpc_method_cache_settings
Expand All @@ -24,22 +22,9 @@
def setup_listeners(app: WebApp) -> WebApp:

# pylint: disable=unused-argument, unused-variable

@app.listener('before_server_start')
def setup_logging(app: WebApp, loop) -> WebApp:
# init logging
#root_logger = logging.getLogger()
#root_logger.handlers = []
LOG_LEVEL = getattr(logging, os.environ.get('LOG_LEVEL', 'INFO'))
jussi.logging_config.LOGGING['loggers']['sanic']['level'] = LOG_LEVEL
jussi.logging_config.LOGGING['loggers']['network']['level'] = LOG_LEVEL
app.config.logger = logging.getLogger('sanic')
return app

logger = logging.getLogger('sanic')

@app.listener('before_server_start')
def setup_cache(app: WebApp, loop) -> None:
logger = app.config.logger
logger.info('before_server_start -> setup_cache')

caches_config = jussi.cache.setup_caches(app, loop)
Expand All @@ -58,12 +43,14 @@ def setup_cache(app: WebApp, loop) -> None:

@app.listener('before_server_start')
def setup_jsonrpc_method_cache_settings(app: WebApp, loop) -> None:
logger = app.config.logger
logger.info(
'before_server_start -> setup_jsonrpc_method_cache_settings')
app.config.method_ttls = jussi.jsonrpc_method_cache_settings.TTLS

@app.listener('before_server_start')
def setup_jsonrpc_method_url_settings(app: WebApp, loop) -> None:
logger = app.config.logger
logger.info('before_server_start -> setup_jsonrpc_method_url_settings')
args = app.config.args
mapping = {}
Expand All @@ -77,6 +64,7 @@ def setup_jsonrpc_method_url_settings(app: WebApp, loop) -> None:
def setup_aiohttp_session(app: WebApp, loop) -> None:
"""use one session for http connection pooling
"""
logger = app.config.logger
logger.info('before_server_start -> setup_aiohttp_session')
aio = dict(session=aiohttp.ClientSession(
skip_auto_headers=['User-Agent'],
Expand All @@ -89,6 +77,7 @@ def setup_aiohttp_session(app: WebApp, loop) -> None:
async def setup_websocket_connection(app: WebApp, loop) -> None:
"""use one ws connection (per worker) to avoid reconnection
"""
logger = app.config.logger
logger.info('before_server_start -> setup_ws_client')
args = app.config.args
app.config.websocket_kwargs = dict(uri=args.steemd_websocket_url,
Expand All @@ -101,6 +90,7 @@ async def setup_websocket_connection(app: WebApp, loop) -> None:
@app.listener('before_server_start')
async def setup_statsd(app: WebApp, loop) -> None:
"""setup statsd client and queue"""
logger = app.config.logger
logger.info('before_server_start -> setup_statsd')
app.config.statsd_client = statsd.StatsClient()
stats_queue = janus.Queue(loop=loop)
Expand All @@ -111,7 +101,9 @@ async def setup_statsd(app: WebApp, loop) -> None:
# after server start
@app.listener('after_server_start')
async def setup_job_scheduler(app: WebApp, loop) -> None:
logger = app.config.logger
logger.info('after_server_start -> setup_job_scheduler')

app.config.last_irreversible_block_num = 0
app.config.scheduler = await aiojobs.create_scheduler()
await app.config.scheduler.spawn(
Expand All @@ -125,22 +117,24 @@ async def setup_job_scheduler(app: WebApp, loop) -> None:
)

# after server stop



@app.listener('after_server_stop')
async def stop_job_scheduler(app: WebApp, loop) -> None:
logger.info('after_server_stop -> stop_job_scheduler')
await asyncio.shield(app.config.scheduler.close())


@app.listener('after_server_stop')
async def close_websocket_connection(app: WebApp, loop) -> None:
logger = app.config.logger
logger.info('after_server_stop -> close_websocket_connection')
if not app.config.scheduler.closed:
await asyncio.shield(app.config.scheduler.close())
client = app.config.websocket_client
await asyncio.shield(client.close())

@app.listener('after_server_stop')
async def close_aiohttp_session(app: WebApp, loop) -> None:
logger = app.config.logger
logger.info('after_server_stop -> close_aiohttp_session')
if not app.config.scheduler.closed:
await asyncio.shield(app.config.scheduler.close())
Expand All @@ -149,6 +143,7 @@ async def close_aiohttp_session(app: WebApp, loop) -> None:

@app.listener('after_server_stop')
async def close_stats_queue(app: WebApp, loop) -> None:
logger = app.config.logger
logger.info('after_server_stop -> close_stats_queue')
if not app.config.scheduler.closed:
await asyncio.shield(app.config.scheduler.close())
Expand All @@ -158,6 +153,7 @@ async def close_stats_queue(app: WebApp, loop) -> None:

@app.listener('after_server_stop')
async def close_cache_connections(app: WebApp, loop) -> None:
logger = app.config.logger
logger.info('after_server_stop -> close_cache_connections')
for cache in app.config.caches:
await cache.close()
Expand Down
69 changes: 57 additions & 12 deletions jussi/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,43 @@

from jussi.typedefs import WebApp

LOG_DATETIME_FORMAT = r'%Y-%m-%dT%H:%M:%S.%s%Z'
SUPPORTED_LOG_MESSAGE_KEYS = (
'levelname',
'asctime',
#'created',
'filename',
# 'levelno',
'module',
'funcName',
'lineno',
'msecs',
'message',
'name',
'pathname',
'process',
'processName',
# 'relativeCreated',
#'thread',
'threadName')

JSON_LOG_FORMAT = ' '.join(
['%({0:s})'.format(i) for i in SUPPORTED_LOG_MESSAGE_KEYS])

#JSON_FORMATTER.converter = time.gmtime


def setup_logging(app: WebApp) -> WebApp:
# init logging
root_logger = logging.getLogger()
root_logger.handlers = []
#root_logger = logging.getLogger()
#root_logger.handlers = []
LOG_LEVEL = getattr(logging, os.environ.get('LOG_LEVEL', 'INFO'))
LOGGING['loggers']['sanic']['level'] = LOG_LEVEL
LOGGING['loggers']['network']['level'] = LOG_LEVEL
app.config.logger = logging.getLogger('jussi')
LOGGING['loggers']['jussi']['level'] = LOG_LEVEL
logger = logging.getLogger('jussi')
logger.info('configuring jussi logger')
app.config.logger = logger
return app


Expand All @@ -33,35 +61,48 @@ def setup_logging(app: WebApp) -> WebApp:
},
'formatters': {
'simple': {
'format': '%(asctime)s - (%(name)s)[%(levelname)s]: %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
'()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
'format': '%(asctime)s %(name) %(levelname) %(message)',
'datefmt': LOG_DATETIME_FORMAT
},
'access': {
'json_access': {
'()':
'pythonjsonlogger.jsonlogger.JsonFormatter',
'format':
'%(asctime)s - (%(name)s)[%(levelname)s][%(host)s]: ' +
'%(request)s %(message)s %(status)d %(byte)d',
'%(asctime) %(name) %(levelname) %(host) ' +
'%(request) %(message) %(status) %(byte)',
'datefmt':
'%Y-%m-%d %H:%M:%S'
LOG_DATETIME_FORMAT
},
'json': {
'()': 'pythonjsonlogger.jsonlogger.JsonFormatter',
'format': JSON_LOG_FORMAT,
'datefmt': LOG_DATETIME_FORMAT
}
},
'handlers': {
'internal': {
'class': 'logging.StreamHandler',
'filters': ['accessFilter'],
'formatter': 'simple',
'formatter': 'json',
'stream': sys.stderr
},
'accessStream': {
'class': 'logging.StreamHandler',
'filters': ['accessFilter'],
'formatter': 'access',
'formatter': 'json',
'stream': sys.stderr
},
'errorStream': {
'class': 'logging.StreamHandler',
'filters': ['errorFilter'],
'formatter': 'simple',
'formatter': 'json',
'stream': sys.stderr
},
'jussi_hdlr': {
'class': 'logging.StreamHandler',
'stream': sys.stderr,
'formatter': 'json'
}
},
'loggers': {
Expand All @@ -72,6 +113,10 @@ def setup_logging(app: WebApp) -> WebApp:
'network': {
'level': logging.DEBUG,
'handlers': ['accessStream']
},
'jussi': {
'level': logging.DEBUG,
'handlers': ['jussi_hdlr']
}
}
}
35 changes: 35 additions & 0 deletions jussi/middlewares.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
import gzip
import logging
import zlib
from io import BytesIO
from typing import Optional

from sanic import response
Expand All @@ -25,16 +28,48 @@
logger = logging.getLogger('sanic')


def decode_gzip(data):
gzipper = gzip.GzipFile(fileobj=BytesIO(data))
return gzipper.read()


def decode_deflate(data):
try:
return zlib.decompress(data)
except zlib.error:
return zlib.decompress(data, -zlib.MAX_WBITS)


CONTENT_DECODERS = {
'gzip': decode_gzip,
'deflate': decode_deflate,
}


def setup_middlewares(app):
logger = app.config.logger
logger.info('before_server_start -> setup_middlewares')
app.request_middleware.append(handle_gzip)
app.request_middleware.append(validate_jsonrpc_request)
app.request_middleware.append(request_stats)
app.request_middleware.append(caching_middleware)
app.response_middleware.append(finalize_request_stats)
return app


@handle_middleware_exceptions
async def handle_gzip(request: HTTPRequest) -> Optional[HTTPResponse]:
content_encoding = request.headers.get('content-encoding')
decoder = CONTENT_DECODERS.get(content_encoding)
try:
if decoder:
request.body = decoder(request.body)
except (IOError, zlib.error) as e:
logger.error(e)
return response.json(
ParseError(sanic_request=request, exception=e).to_dict())


@handle_middleware_exceptions
@async_exclude_methods(exclude_http_methods=('GET', ))
async def validate_jsonrpc_request(
Expand Down
10 changes: 1 addition & 9 deletions jussi/serializers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
import logging
import zlib
from typing import AnyStr
from typing import Optional
Expand All @@ -8,8 +7,6 @@
import ujson
from aiocache.serializers import StringSerializer

logger = logging.getLogger('sanic')


class CompressionSerializer(StringSerializer):

Expand All @@ -21,13 +18,8 @@ class CompressionSerializer(StringSerializer):
def dumps(self, value: Union[AnyStr, dict]) -> bytes:
# FIXME handle structs with bytes vals, eg, [1, '2', b'3']
# currently self.loads(self.dumps([1, '2', b'3'])) == [1, '2', '3']
logger.debug(f'dumps dumping {type(value)}')
return zlib.compress(ujson.dumps(value).encode())

def loads(self, value) -> Optional[dict]:
if value:
logger.debug(f'loads loading {type(value)}')
value = ujson.loads(zlib.decompress(value).decode())
logger.debug(f'loads loaded {type(value)}')
return value
return None
return ujson.loads(zlib.decompress(value))
1 change: 0 additions & 1 deletion jussi/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def main():
app.run(
host=app.config.args.server_host,
port=app.config.args.server_port,
log_config=jussi.logging_config.LOGGING,
workers=app.config.args.server_workers)


Expand Down
Loading

0 comments on commit fd458d0

Please sign in to comment.