From bd2e33b3ecaf4cdd194996a645c9ed76b0c8c0e0 Mon Sep 17 00:00:00 2001 From: John Gerlock Date: Mon, 28 Aug 2017 14:46:01 -0400 Subject: [PATCH 1/3] Add nginx monitor to scalyr, add nginx access log with json parser to scalyr, remove scalyr-agent process metrics --- service/nginx/nginx.conf | 7 ++++++- service/scalyr-agent-2/agent.json | 16 +++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/service/nginx/nginx.conf b/service/nginx/nginx.conf index 798f165..a6fec4a 100644 --- a/service/nginx/nginx.conf +++ b/service/nginx/nginx.conf @@ -38,7 +38,7 @@ http { '"upstream_response_length": $upstream_response_length' '}'; - access_log /dev/stdout json_combined buffer=32k flush=5s; + access_log /var/log/nginx/access.log json_combined buffer=32k flush=5s; keepalive_timeout 65; keepalive_requests 10000; @@ -94,5 +94,10 @@ http { return 403; } + location /nginx_status { + stub_status on; + allow 127.0.0.1; + deny all; + } } } diff --git a/service/scalyr-agent-2/agent.json b/service/scalyr-agent-2/agent.json index ad44240..ad2b26d 100644 --- a/service/scalyr-agent-2/agent.json +++ b/service/scalyr-agent-2/agent.json @@ -1,8 +1,16 @@ { "import_vars": [ "SCALYR_API_KEY","ENVIRONMENT" ], "api_key": "$SCALYR_API_KEY", - "server_attributes":{"tier": "$ENVIRONMENT"}, + "server_attributes":{ + "tier": "$ENVIRONMENT", + "serverHost": "$scalyr_hostname" + }, + "implicit_agent_process_metrics_monitor": false, "monitors": [ + { + "module": "scalyr_agent.builtin_monitors.nginx_monitor", + "status_url": "http://localhost:80/nginx_status" + }, { "module": "scalyr_agent.builtin_monitors.graphite_monitor" }, @@ -11,5 +19,11 @@ "id": "jussi-process", "commandline": ".*jussi.*" } + ], + "logs": [ + { + "path": "/var/log/nginx/access.log", + "parser": "json" + } ] } From a1956b8c548dc40febfa2f5d8b922b05bc3c7e3d Mon Sep 17 00:00:00 2001 From: Johan Nordberg Date: Tue, 5 Sep 2017 23:30:32 +0200 Subject: [PATCH 2/3] Add CORS headers --- service/nginx/nginx.conf | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/service/nginx/nginx.conf b/service/nginx/nginx.conf index 798f165..4f7a4e8 100644 --- a/service/nginx/nginx.conf +++ b/service/nginx/nginx.conf @@ -65,7 +65,9 @@ http { listen 8080; server_name jussi; - add_header 'Access-Control-Allow-Origin' '*'; + add_header Access-Control-Allow-Origin "*"; + add_header Access-Control-Allow-Methods "GET, POST, OPTIONS"; + add_header Access-Control-Allow-Headers "DNT,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Content-Range,Range"; add_header Strict-Transport-Security "max-age=31557600; includeSubDomains; preload" always; add_header 'Content-Security-Policy' 'upgrade-insecure-requests'; From e617e8bf6c12a1240c0da2dc05860bb6adc32657 Mon Sep 17 00:00:00 2001 From: John Gerlock Date: Thu, 7 Sep 2017 01:21:18 -0400 Subject: [PATCH 3/3] format all logs as json, add gzip/deflate handling, draft async client --- .editorconfig | 2 +- .gitignore | 1 + .pre-commit-config.yaml | 6 - Makefile | 3 +- Pipfile | 5 + jussi/listeners.py | 67 ++++---- jussi/logging_config.py | 69 ++++++-- jussi/middlewares.py | 35 ++++ jussi/serializers.py | 1 - jussi/serve.py | 1 - service/nginx/nginx.conf | 3 +- service/scalyr-agent-2/agent.json | 10 -- tests/async_http_client.py | 274 ++++++++++++++++++++++++++++++ tests/docker-compose.yml | 2 +- tests/http_client.py | 10 -- 15 files changed, 412 insertions(+), 77 deletions(-) create mode 100644 tests/async_http_client.py diff --git a/.editorconfig b/.editorconfig index 05b08f0..b4cff70 100644 --- a/.editorconfig +++ b/.editorconfig @@ -14,7 +14,7 @@ indent_style = space indent_size = 4 # isort config force_single_line=True -known_third_party=aiocache,aiohttp,funcy,pygtrie,ujson,sanic,statsd,websockets +known_third_party=aiocache,aiohttp,funcy,pygtrie,ujson,sanic,statsd,websockets,statsd,janus,aiojobs # Tab indentation (no size specified) diff --git a/.gitignore b/.gitignore index 669de1b..7a26c04 100644 --- a/.gitignore +++ b/.gitignore @@ -116,3 +116,4 @@ tests/failed_blocks/ /deploy/ /tests/async_deco.py /tests/perf/*.json +service/*/supervise/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 64bd6b5..49939a4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,12 +21,6 @@ - id: python-import-sorter args: - --silent-overwrite -- repo: git://github.com/pre-commit/mirrors-yapf - sha: v0.16.3 - hooks: - - id: yapf - args: - - -p - repo: git://github.com/Lucas-C/pre-commit-hooks sha: v1.0.1 hooks: diff --git a/Makefile b/Makefile index bb91443..8f834ab 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,7 @@ clean: -rm -rf .mypy_cache -rm -rf *.egg-info -rm -rf *.log + -rm -rf service/*/supervise build: clean clean-perf docker build -t $(PROJECT_DOCKER_TAG) . @@ -39,7 +40,7 @@ build-then-run: build docker run $(PROJECT_DOCKER_RUN_ARGS) $(PROJECT_DOCKER_TAG) run-local: - env LOG_LEVEL=DEBUG pipenv run python3 -m jussi.serve --server_workers=1 + pipenv run python3 -m jussi.serve --server_workers=1 test: pipenv run pytest diff --git a/Pipfile b/Pipfile index b2e3a7c..2ad43e5 100644 --- a/Pipfile +++ b/Pipfile @@ -52,6 +52,11 @@ aiojobs = "*" jsonrpcserver = "*" cytoolz = "*" janus = "*" +yapf = "*" +python-json-logger = "*" +progress = "*" +pre_commit = "*" + [requires] python_version = "3.6" diff --git a/jussi/listeners.py b/jussi/listeners.py index f0293d0..ff0f6d8 100644 --- a/jussi/listeners.py +++ b/jussi/listeners.py @@ -1,14 +1,13 @@ # -*- coding: utf-8 -*- -import logging -import os +import asyncio import aiohttp +import aiojobs +import janus import statsd import ujson import websockets -import aiojobs -import janus import jussi.cache import jussi.jobs import jussi.jsonrpc_method_cache_settings @@ -21,22 +20,9 @@ def setup_listeners(app: WebApp) -> WebApp: # pylint: disable=unused-argument, unused-variable - - @app.listener('before_server_start') - def setup_logging(app: WebApp, loop) -> WebApp: - # init logging - root_logger = logging.getLogger() - root_logger.handlers = [] - LOG_LEVEL = getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')) - jussi.logging_config.LOGGING['loggers']['sanic']['level'] = LOG_LEVEL - jussi.logging_config.LOGGING['loggers']['network']['level'] = LOG_LEVEL - app.config.logger = logging.getLogger('jussi') - return app - - logger = app.config.logger - @app.listener('before_server_start') def setup_cache(app: WebApp, loop) -> None: + logger = app.config.logger logger.info('before_server_start -> setup_cache') caches = jussi.cache.setup_caches(app, loop) @@ -53,12 +39,14 @@ def setup_cache(app: WebApp, loop) -> None: @app.listener('before_server_start') def setup_jsonrpc_method_cache_settings(app: WebApp, loop) -> None: + logger = app.config.logger logger.info( 'before_server_start -> setup_jsonrpc_method_cache_settings') app.config.method_ttls = jussi.jsonrpc_method_cache_settings.TTLS @app.listener('before_server_start') def setup_jsonrpc_method_url_settings(app: WebApp, loop) -> None: + logger = app.config.logger logger.info('before_server_start -> setup_jsonrpc_method_url_settings') args = app.config.args mapping = {} @@ -72,6 +60,7 @@ def setup_jsonrpc_method_url_settings(app: WebApp, loop) -> None: def setup_aiohttp_session(app: WebApp, loop) -> None: """use one session for http connection pooling """ + logger = app.config.logger logger.info('before_server_start -> setup_aiohttp_session') aio = dict(session=aiohttp.ClientSession( skip_auto_headers=['User-Agent'], @@ -84,6 +73,7 @@ def setup_aiohttp_session(app: WebApp, loop) -> None: async def setup_websocket_connection(app: WebApp, loop) -> None: """use one ws connection (per worker) to avoid reconnection """ + logger = app.config.logger logger.info('before_server_start -> setup_ws_client') args = app.config.args app.config.websocket_kwargs = dict( @@ -97,6 +87,7 @@ async def setup_websocket_connection(app: WebApp, loop) -> None: @app.listener('before_server_start') async def setup_statsd(app: WebApp, loop) -> None: """setup statsd client and queue""" + logger = app.config.logger logger.info('before_server_start -> setup_statsd') app.config.statsd_client = statsd.StatsClient() stats_queue = janus.Queue(loop=loop) @@ -107,7 +98,9 @@ async def setup_statsd(app: WebApp, loop) -> None: # after server start @app.listener('after_server_start') async def setup_job_scheduler(app: WebApp, loop) -> None: - logger.info('before_server_start -> setup_job_scheduler') + logger = app.config.logger + logger.info('after_server_start -> setup_job_scheduler') + app.config.last_irreversible_block_num = 0 app.config.scheduler = await aiojobs.create_scheduler() await app.config.scheduler.spawn( @@ -120,31 +113,39 @@ async def setup_job_scheduler(app: WebApp, loop) -> None: 'before_server_start -> setup_job_scheduler scheduled jussi.jobs.flush_stats' ) - # before server stop - @app.listener('before_server_stop') - async def stop_job_scheduler(app: WebApp, loop) -> None: - logger.info('before_server_stop -> stop_job_scheduler') - await app.config.scheduler.close() - - @app.listener('before_server_stop') - def close_websocket_connection(app: WebApp, loop) -> None: - logger.info('before_server_stop -> close_websocket_connection') + @app.listener('after_server_stop') + async def close_websocket_connection(app: WebApp, loop) -> None: + logger = app.config.logger + logger.info('after_server_stop -> close_websocket_connection') + if not app.config.scheduler.closed: + await asyncio.shield(app.config.scheduler.close()) client = app.config.websocket_client client.close() - @app.listener('before_server_stop') - def close_aiohttp_session(app: WebApp, loop) -> None: - logger.info('before_server_stop -> close_aiohttp_session') + @app.listener('after_server_stop') + async def close_aiohttp_session(app: WebApp, loop) -> None: + logger = app.config.logger + logger.info('after_server_stop -> close_aiohttp_session') + if not app.config.scheduler.closed: + await asyncio.shield(app.config.scheduler.close()) session = app.config.aiohttp['session'] session.close() @app.listener('before_server_stop') async def close_stats_queue(app: WebApp, loop) -> None: - logger.info('before_server_stop -> close_stats_queue') + logger = app.config.logger + logger.info('after_server_stop -> close_stats_queue') if not app.config.scheduler.closed: await app.config.scheduler.close() stats = app.config.stats statsd_client = app.config.statsd_client - await stats.final_flush(statsd_client) + await asyncio.shield(stats.final_flush(statsd_client)) + + @app.listener('after_server_stop') + async def close_cache_connections(app: WebApp, loop) -> None: + logger = app.config.logger + logger.info('after_server_stop -> close_cache_connections') + for cache in app.config.caches: + await cache.close() return app diff --git a/jussi/logging_config.py b/jussi/logging_config.py index 21d6d9c..fb4a00a 100644 --- a/jussi/logging_config.py +++ b/jussi/logging_config.py @@ -7,15 +7,43 @@ from jussi.typedefs import WebApp +LOG_DATETIME_FORMAT = r'%Y-%m-%dT%H:%M:%S.%s%Z' +SUPPORTED_LOG_MESSAGE_KEYS = ( + 'levelname', + 'asctime', + #'created', + 'filename', + # 'levelno', + 'module', + 'funcName', + 'lineno', + 'msecs', + 'message', + 'name', + 'pathname', + 'process', + 'processName', + # 'relativeCreated', + #'thread', + 'threadName') + +JSON_LOG_FORMAT = ' '.join( + ['%({0:s})'.format(i) for i in SUPPORTED_LOG_MESSAGE_KEYS]) + +#JSON_FORMATTER.converter = time.gmtime + def setup_logging(app: WebApp) -> WebApp: # init logging - root_logger = logging.getLogger() - root_logger.handlers = [] + #root_logger = logging.getLogger() + #root_logger.handlers = [] LOG_LEVEL = getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')) LOGGING['loggers']['sanic']['level'] = LOG_LEVEL LOGGING['loggers']['network']['level'] = LOG_LEVEL - app.config.logger = logging.getLogger('jussi') + LOGGING['loggers']['jussi']['level'] = LOG_LEVEL + logger = logging.getLogger('jussi') + logger.info('configuring jussi logger') + app.config.logger = logger return app @@ -33,35 +61,48 @@ def setup_logging(app: WebApp) -> WebApp: }, 'formatters': { 'simple': { - 'format': '%(asctime)s - (%(name)s)[%(levelname)s]: %(message)s', - 'datefmt': '%Y-%m-%d %H:%M:%S' + '()': 'pythonjsonlogger.jsonlogger.JsonFormatter', + 'format': '%(asctime)s %(name) %(levelname) %(message)', + 'datefmt': LOG_DATETIME_FORMAT }, - 'access': { + 'json_access': { + '()': + 'pythonjsonlogger.jsonlogger.JsonFormatter', 'format': - '%(asctime)s - (%(name)s)[%(levelname)s][%(host)s]: ' + - '%(request)s %(message)s %(status)d %(byte)d', + '%(asctime) %(name) %(levelname) %(host) ' + + '%(request) %(message) %(status) %(byte)', 'datefmt': - '%Y-%m-%d %H:%M:%S' + LOG_DATETIME_FORMAT + }, + 'json': { + '()': 'pythonjsonlogger.jsonlogger.JsonFormatter', + 'format': JSON_LOG_FORMAT, + 'datefmt': LOG_DATETIME_FORMAT } }, 'handlers': { 'internal': { 'class': 'logging.StreamHandler', 'filters': ['accessFilter'], - 'formatter': 'simple', + 'formatter': 'json', 'stream': sys.stderr }, 'accessStream': { 'class': 'logging.StreamHandler', 'filters': ['accessFilter'], - 'formatter': 'access', + 'formatter': 'json', 'stream': sys.stderr }, 'errorStream': { 'class': 'logging.StreamHandler', 'filters': ['errorFilter'], - 'formatter': 'simple', + 'formatter': 'json', 'stream': sys.stderr + }, + 'jussi_hdlr': { + 'class': 'logging.StreamHandler', + 'stream': sys.stderr, + 'formatter': 'json' } }, 'loggers': { @@ -72,6 +113,10 @@ def setup_logging(app: WebApp) -> WebApp: 'network': { 'level': logging.DEBUG, 'handlers': ['accessStream'] + }, + 'jussi': { + 'level': logging.DEBUG, + 'handlers': ['jussi_hdlr'] } } } diff --git a/jussi/middlewares.py b/jussi/middlewares.py index 8321f11..0508758 100644 --- a/jussi/middlewares.py +++ b/jussi/middlewares.py @@ -1,5 +1,8 @@ # -*- coding: utf-8 -*- +import gzip import logging +import zlib +from io import BytesIO from typing import Optional from sanic import response @@ -24,9 +27,28 @@ logger = logging.getLogger('sanic') +def decode_gzip(data): + gzipper = gzip.GzipFile(fileobj=BytesIO(data)) + return gzipper.read() + + +def decode_deflate(data): + try: + return zlib.decompress(data) + except zlib.error: + return zlib.decompress(data, -zlib.MAX_WBITS) + + +CONTENT_DECODERS = { + 'gzip': decode_gzip, + 'deflate': decode_deflate, +} + + def setup_middlewares(app): logger = app.config.logger logger.info('before_server_start -> setup_middlewares') + app.request_middleware.append(handle_gzip) app.request_middleware.append(validate_jsonrpc_request) app.request_middleware.append(request_stats) app.request_middleware.append(caching_middleware) @@ -34,6 +56,19 @@ def setup_middlewares(app): return app +@handle_middleware_exceptions +async def handle_gzip(request: HTTPRequest) -> Optional[HTTPResponse]: + content_encoding = request.headers.get('content-encoding') + decoder = CONTENT_DECODERS.get(content_encoding) + try: + if decoder: + request.body = decoder(request.body) + except (IOError, zlib.error) as e: + logger.error(e) + return response.json( + ParseError(sanic_request=request, exception=e).to_dict()) + + @handle_middleware_exceptions @async_exclude_methods(exclude_http_methods=('GET', )) async def validate_jsonrpc_request( diff --git a/jussi/serializers.py b/jussi/serializers.py index 040b99d..df7baab 100644 --- a/jussi/serializers.py +++ b/jussi/serializers.py @@ -23,4 +23,3 @@ def dumps(self, value: Union[AnyStr, dict]) -> bytes: def loads(self, value: Optional[bytes]) -> Optional[dict]: if value: return ujson.loads(zlib.decompress(value)) - return None diff --git a/jussi/serve.py b/jussi/serve.py index 072cd45..2041f1b 100644 --- a/jussi/serve.py +++ b/jussi/serve.py @@ -58,7 +58,6 @@ def main(): app.run( host=app.config.args.server_host, port=app.config.args.server_port, - log_config=jussi.logging_config.LOGGING, workers=app.config.args.server_workers) diff --git a/service/nginx/nginx.conf b/service/nginx/nginx.conf index ae6e35c..fec9cbc 100644 --- a/service/nginx/nginx.conf +++ b/service/nginx/nginx.conf @@ -38,7 +38,8 @@ http { '"upstream_response_length": $upstream_response_length' '}'; - access_log /var/log/nginx/access.log json_combined buffer=32k flush=5s; + + access_log /dev/stdout json_combined buffer=32k flush=5s; keepalive_timeout 65; keepalive_requests 10000; diff --git a/service/scalyr-agent-2/agent.json b/service/scalyr-agent-2/agent.json index 7b52a50..e5803ba 100644 --- a/service/scalyr-agent-2/agent.json +++ b/service/scalyr-agent-2/agent.json @@ -7,10 +7,6 @@ }, "implicit_agent_process_metrics_monitor": false, "monitors": [ - { - "module": "scalyr_agent.builtin_monitors.nginx_monitor", - "status_url": "http://localhost:80/nginx_status" - }, { "module": "scalyr_agent.builtin_monitors.graphite_monitor" }, @@ -19,11 +15,5 @@ "id": "jussi-process", "commandline": ".*jussi.*" } - ], - "logs": [ - { - "path": "/var/log/nginx/access.log", - "parser": "json" - } ] } diff --git a/tests/async_http_client.py b/tests/async_http_client.py new file mode 100644 index 0000000..be9fe8d --- /dev/null +++ b/tests/async_http_client.py @@ -0,0 +1,274 @@ +# -*- coding: utf-8 -*- +# pylint: skip-file +import asyncio +import logging +import os +import time +import zlib +from collections import deque +from itertools import islice + +import aiohttp +import ujson +from funcy.colls import get_in + +import uvloop +from progress.bar import Bar + +asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +CORRECT_BATCH_TEST_RESPONSE=''' +[{"id":1,"result":{"previous":"000000b0c668dad57f55172da54899754aeba74b","timestamp":"2016-03-24T16:14:21","witness":"initminer","transaction_merkle_root":"0000000000000000000000000000000000000000","extensions":[],"witness_signature":"2036fd4ff7838ba32d6d27637576e1b1e82fd2858ac97e6e65b7451275218cbd2b64411b0a5d74edbde790c17ef704b8ce5d9de268cb43783b499284c77f7d9f5e","transactions":[],"block_id":"000000b13707dfaad7c2452294d4cfa7c2098db4","signing_key":"STM8GC13uCZbP44HzMLV6zPZGwVQ8Nt4Kji8PapsPiNq1BK153XTX","transaction_ids":[]}},{"id":2,"result":{"previous":"000000b0c668dad57f55172da54899754aeba74b","timestamp":"2016-03-24T16:14:21","witness":"initminer","transaction_merkle_root":"0000000000000000000000000000000000000000","extensions":[],"witness_signature":"2036fd4ff7838ba32d6d27637576e1b1e82fd2858ac97e6e65b7451275218cbd2b64411b0a5d74edbde790c17ef704b8ce5d9de268cb43783b499284c77f7d9f5e","transactions":[],"block_id":"000000b13707dfaad7c2452294d4cfa7c2098db4","signing_key":"STM8GC13uCZbP44HzMLV6zPZGwVQ8Nt4Kji8PapsPiNq1BK153XTX","transaction_ids":[]}}] +''' +NO_BATCH_SUPPORT_RESPONSE = '7 bad_cast_exception: Bad Cast' + +class RateBar(Bar): + suffix = '%(index)d (%(rate)d/sec) time remaining: %(eta_td)s' + + @property + def rate(self): + elapsed = self.elapsed or 0.000000001 + return self.index/elapsed + +def chunkify(iterable, chunksize=3000): + i = 0 + chunk = [] + for item in iterable: + chunk.append(item) + i += 1 + if i == chunksize: + yield chunk + i = 0 + chunk = [] + if chunk: + yield chunk + + +class SimpleSteemAPIClient(object): + def __init__(self, *, url=None, **kwargs): + self.url = url or os.environ.get('STEEMD_HTTP_URL', 'https://steemd.steemitdev.com') + self.kwargs = kwargs + self.session = kwargs.get('session', None) + self.connector = get_in(kwargs, ['session','connector']) + + if not self.connector: + self.connector = self._new_connector() + if not self.session: + self.session = self._new_session() + + self._batch_request_size = self.kwargs.get('batch_request_size', 150) + self._concurrent_tasks_limit = self.kwargs.get('concurrent_tasks_limit', 10) + + self._perf_history = deque(maxlen=2000) + self._batch_request_count = 0 + self._request_count = 0 + + + def _new_connector(self, connector_kwargs=None): + connector_kwargs = connector_kwargs or self._connector_kwargs + return aiohttp.TCPConnector(**connector_kwargs) + + def _new_session(self, session_kwargs=None): + session_kwargs = session_kwargs or self._session_kwargs + return aiohttp.ClientSession(**session_kwargs) + + async def fetch(self,request_data): + if isinstance(request_data, list): + self._batch_request_count += 1 + self._request_count += len(request_data) + async with self.session.post(self.url, json=request_data, compress='gzip') as response: + try: + response_data = await response.json() + except Exception as e: + logger.error(e) + logger.error(await response.text()) + return + return response_data + + async def get_blocks(self, block_nums): + requests = ({'jsonrpc':'2.0','id':block_num,'method':'get_block','params':[block_num]} for _id,block_num in enumerate(block_nums)) + batched_requests = chunkify(requests, self.batch_request_size) + coros = (self.fetch(batch) for batch in batched_requests) + first_coros = islice(coros,0,self.concurrent_tasks_limit) + futures = [asyncio.ensure_future(c) for c in first_coros] + + logger.debug(f'inital futures:{len(futures)}') + start = time.perf_counter() + + while futures: + await asyncio.sleep(0) + for f in futures: + try: + if f.done(): + self._perf_history.append(time.perf_counter() - start) + result = f.result() + futures.remove(f) + logger.debug(f'futures:{len(futures)}') + try: + futures.append(asyncio.ensure_future(next(coros))) + except StopIteration as e: + logger.debug('StopIteration') + start = time.perf_counter() + yield result + except Exception as e: + logger.error(e) + + + async def test_batch_support(self, url): + batch_request = [{"id":1,"jsonrpc":"2.0","method":"get_block","params":[1]},{"id":2,"jsonrpc":"2.0","method":"get_block","params":[1]}] + try: + async with self.session.post(self.url, json=batch_request) as response: + response_data = await response.text() + if response_data.startswith(NO_BATCH_SUPPORT_RESPONSE): + return False + return ujson.loads(response_data) == CORRECT_BATCH_TEST_RESPONSE + except Exception as e: + logger.error(e) + return False + + @property + def _session_kwargs(self): + session_kwargs = self.kwargs.get('session_kwargs', {}) + session_kwargs['skip_auto_headers'] = session_kwargs.get('skip_auto_headers', ['User-Agent']) + session_kwargs['json_serialize'] = session_kwargs.get('json_serialize', ujson.dumps) + session_kwargs['headers'] = session_kwargs.get('headers', {'Content-Type': 'application/json'}) + session_kwargs['connector'] = session_kwargs.get('connector', None) + return session_kwargs + + @property + def _connector_kwargs(self): + connector_kwargs = self.kwargs.get('connector_kwargs', {}) + connector_kwargs['keepalive_timeout'] = connector_kwargs.get('keepalive_timeout', 60) + connector_kwargs['limit'] = connector_kwargs.get('limit', 100) + return connector_kwargs + + @property + def concurrent_connections(self): + """number of tcp connections to steemd""" + return self.connector.limit + + @property + def batch_request_size(self): + """number of individual jsonrpc requests to combine into a jsonrpc batch request""" + return self._batch_request_size + + @property + def concurrent_tasks_limit(self): + """number of jsonrpc batch requests tasks to submit to event loop at any one time""" + return self._concurrent_tasks_limit + + + def close(self): + self.session.close() + for task in asyncio.Task.all_tasks(): + task.cancel() + +if __name__ == '__main__': + import argparse + logging.basicConfig(level=logging.DEBUG) + logger = logging.getLogger('async_http_client_main') + parser = argparse.ArgumentParser('jussi perf test script') + parser.add_argument('--blocks', type=int, default=20000) + parser.add_argument('--offset', type=int, default=0) + parser.add_argument('--url', type=str, default='https://api.steemitdev.com') + parser.add_argument('--batch_request_size', type=int, default=1000) + parser.add_argument('--concurrent_tasks_limit', type=int, default=5) + parser.add_argument('--concurrent_connections', type=int, default=0) + parser.add_argument('--print', type=bool, default=False) + args = parser.parse_args() + block_nums = list(range(args.offset, args.blocks)) + loop = asyncio.get_event_loop() + loop.set_debug(True) + + + + async def run(block_nums, url=None, batch_request_size=None, concurrent_tasks_limit=None, concurrent_connections=None): + client = SimpleSteemAPIClient(url=url, + batch_request_size=batch_request_size, + concurrent_tasks_limit=concurrent_tasks_limit, + connector_kwargs={'limit':concurrent_connections}) + logger.debug(f'blocks:{len(block_nums)} {client.batch_request_size} concurrent_tasks_limit:{client.concurrent_tasks_limit} concurrent_connections:{client.concurrent_connections}') + responses = [] + start = time.perf_counter() + + bar = RateBar('Fetching blocks', max=len(block_nums)) + try: + start = time.perf_counter() + async for result in client.get_blocks(block_nums): + if result: + bar.next(n=len(result)) + #responses.extend(result) + + + except Exception as e: + logger.error(e) + finally: + bar.finish() + elapsed = time.perf_counter() - start + client.close() + + return client, responses, elapsed + + try: + client, responses, elapsed = loop.run_until_complete(run(block_nums, + url=args.url, + batch_request_size=args.batch_request_size, + concurrent_tasks_limit=args.concurrent_tasks_limit, + concurrent_connections=args.concurrent_connections)) + finally: + pass + #loop.run_until_complete(loop.shutdown_asyncgens()) + #loop.close() + + history = client._perf_history + request_count = client._request_count + total_time = elapsed + total_time_sequential = sum(client._perf_history) + total_get_block_requests = client._request_count + total_batch_requests = client._batch_request_count + + get_block_time = total_time/total_get_block_requests + batch_request_time = total_time/total_batch_requests + + hours_to_sync = (14000000 * get_block_time)/360 + hours_to_sync2 = ((14000000/args.batch_request_size) * batch_request_time)/360 + + concurrency = 1/(total_time / total_time_sequential) + + def verify_response(response): + try: + assert 'result' in response + assert 'error' not in response + return True + except Exception as e: + print(f'error:{e} response:{response}') + return False + + print() + #print(responses[0]) + #_ids = set([r['id'] for r in responses]) + #block_nums = set(block_nums) + #print(block_nums - _ids) + + + print() + print(f'batch request_size:\t\t{client.batch_request_size}') + print(f'concurrent_tasks_limit:\t\t{client.concurrent_tasks_limit}') + print(f'concurrent_connections:\t\t{client.concurrent_connections}') + print() + print(f'total time:\t\t\t{total_time}') + print(f'total time sequential:\t\t{total_time_sequential}') + print(f'concurrency:\t\t\t{concurrency}') + print(f'total get_block requests:\t{total_get_block_requests}') + print(f'total get_block responses:\t{len(responses)}') + print(f'total batch_requests:\t\t{total_batch_requests}') + print(f'get_block requests/s:\t\t{total_get_block_requests/total_time}') + print(f'batch requests/s:\t\t{total_batch_requests/total_time}') + print(f'avg get_block request time:\t{total_time/total_get_block_requests}') + print(f'avg batch request time:\t\t{total_time/total_batch_requests}') + print(f'est. hours to sync: \t\t{hours_to_sync}') + print(f'est. hours to sync2: \t\t{hours_to_sync2}') diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index f4cf383..857eb6e 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -6,7 +6,7 @@ services: ports: - "8080:8080" environment: - LOG_LEVEL: INFO + LOG_LEVEL: WARNING JUSSI_REDIS_HOST: redis env_file: - ../.env diff --git a/tests/http_client.py b/tests/http_client.py index 5411e1f..1240638 100644 --- a/tests/http_client.py +++ b/tests/http_client.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -import concurrent.futures import json import logging import os @@ -205,15 +204,6 @@ def exec_batch(self, name, params): for resp in batch_response: yield json.dumps(resp) - def exec_batch_with_futures(self, name, params, max_workers=None): - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers) as executor: - futures = [] - for chunk in chunkify(params): - futures.append(executor.submit(self.exec_batch, name, chunk)) - for future in concurrent.futures.as_completed(futures): - for item in future.result(): - yield item get_block = partialmethod(exec, 'get_block')