diff --git a/.editorconfig b/.editorconfig index 61cf7fa..c5b983b 100644 --- a/.editorconfig +++ b/.editorconfig @@ -14,7 +14,8 @@ indent_style = space indent_size = 4 # isort config force_single_line=True -known_third_party=aiocache,aiohttp,funcy,pygtrie,ujson,sanic,statsd,websockets,pytest,requests,jsonschema + +known_third_party=aiocache,aiohttp,funcy,pygtrie,ujson,sanic,statsd,websockets,statsd,janus,aiojobs # Tab indentation (no size specified) diff --git a/.gitignore b/.gitignore index 669de1b..7a26c04 100644 --- a/.gitignore +++ b/.gitignore @@ -116,3 +116,4 @@ tests/failed_blocks/ /deploy/ /tests/async_deco.py /tests/perf/*.json +service/*/supervise/ diff --git a/Makefile b/Makefile index 5d2ea12..63f00a0 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,7 @@ clean: -rm -rf .mypy_cache -rm -rf *.egg-info -rm -rf *.log + -rm -rf service/*/supervise build: clean clean-perf docker build -t $(PROJECT_DOCKER_TAG) . @@ -39,7 +40,7 @@ build-then-run: build docker run $(PROJECT_DOCKER_RUN_ARGS) $(PROJECT_DOCKER_TAG) run-local: - env LOG_LEVEL=DEBUG pipenv run python3 -m jussi.serve --server_workers=1 + pipenv run python3 -m jussi.serve --server_workers=1 test: pipenv run pytest diff --git a/Pipfile b/Pipfile index 36b7a5e..3760b38 100644 --- a/Pipfile +++ b/Pipfile @@ -34,6 +34,7 @@ awscli = "*" pytest-mock = "*" pytest-asyncio = "*" asynctest = "*" +yapf = "*" [packages] ujson = "*" @@ -54,7 +55,9 @@ aiojobs = "*" jsonrpcserver = "*" cytoolz = "*" janus = "*" -yapf = "*" +python-json-logger = "*" +progress = "*" + [requires] python_version = "3.6" diff --git a/jussi/listeners.py b/jussi/listeners.py index 87671af..ff2b33d 100644 --- a/jussi/listeners.py +++ b/jussi/listeners.py @@ -1,16 +1,14 @@ # -*- coding: utf-8 -*- import asyncio -import logging -import os import aiocache import aiohttp +import aiojobs +import janus import statsd import ujson from websockets import connect as websockets_connect -import aiojobs -import janus import jussi.cache import jussi.jobs import jussi.jsonrpc_method_cache_settings @@ -24,22 +22,9 @@ def setup_listeners(app: WebApp) -> WebApp: # pylint: disable=unused-argument, unused-variable - - @app.listener('before_server_start') - def setup_logging(app: WebApp, loop) -> WebApp: - # init logging - #root_logger = logging.getLogger() - #root_logger.handlers = [] - LOG_LEVEL = getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')) - jussi.logging_config.LOGGING['loggers']['sanic']['level'] = LOG_LEVEL - jussi.logging_config.LOGGING['loggers']['network']['level'] = LOG_LEVEL - app.config.logger = logging.getLogger('sanic') - return app - - logger = logging.getLogger('sanic') - @app.listener('before_server_start') def setup_cache(app: WebApp, loop) -> None: + logger = app.config.logger logger.info('before_server_start -> setup_cache') caches_config = jussi.cache.setup_caches(app, loop) @@ -58,12 +43,14 @@ def setup_cache(app: WebApp, loop) -> None: @app.listener('before_server_start') def setup_jsonrpc_method_cache_settings(app: WebApp, loop) -> None: + logger = app.config.logger logger.info( 'before_server_start -> setup_jsonrpc_method_cache_settings') app.config.method_ttls = jussi.jsonrpc_method_cache_settings.TTLS @app.listener('before_server_start') def setup_jsonrpc_method_url_settings(app: WebApp, loop) -> None: + logger = app.config.logger logger.info('before_server_start -> setup_jsonrpc_method_url_settings') args = app.config.args mapping = {} @@ -77,6 +64,7 @@ def setup_jsonrpc_method_url_settings(app: WebApp, loop) -> None: def setup_aiohttp_session(app: WebApp, loop) -> None: """use one session for http connection pooling """ + logger = app.config.logger logger.info('before_server_start -> setup_aiohttp_session') aio = dict(session=aiohttp.ClientSession( skip_auto_headers=['User-Agent'], @@ -89,6 +77,7 @@ def setup_aiohttp_session(app: WebApp, loop) -> None: async def setup_websocket_connection(app: WebApp, loop) -> None: """use one ws connection (per worker) to avoid reconnection """ + logger = app.config.logger logger.info('before_server_start -> setup_ws_client') args = app.config.args app.config.websocket_kwargs = dict(uri=args.steemd_websocket_url, @@ -101,6 +90,7 @@ async def setup_websocket_connection(app: WebApp, loop) -> None: @app.listener('before_server_start') async def setup_statsd(app: WebApp, loop) -> None: """setup statsd client and queue""" + logger = app.config.logger logger.info('before_server_start -> setup_statsd') app.config.statsd_client = statsd.StatsClient() stats_queue = janus.Queue(loop=loop) @@ -111,7 +101,9 @@ async def setup_statsd(app: WebApp, loop) -> None: # after server start @app.listener('after_server_start') async def setup_job_scheduler(app: WebApp, loop) -> None: + logger = app.config.logger logger.info('after_server_start -> setup_job_scheduler') + app.config.last_irreversible_block_num = 0 app.config.scheduler = await aiojobs.create_scheduler() await app.config.scheduler.spawn( @@ -125,22 +117,24 @@ async def setup_job_scheduler(app: WebApp, loop) -> None: ) # after server stop - - - @app.listener('after_server_stop') async def stop_job_scheduler(app: WebApp, loop) -> None: logger.info('after_server_stop -> stop_job_scheduler') await asyncio.shield(app.config.scheduler.close()) + @app.listener('after_server_stop') async def close_websocket_connection(app: WebApp, loop) -> None: + logger = app.config.logger logger.info('after_server_stop -> close_websocket_connection') + if not app.config.scheduler.closed: + await asyncio.shield(app.config.scheduler.close()) client = app.config.websocket_client await asyncio.shield(client.close()) @app.listener('after_server_stop') async def close_aiohttp_session(app: WebApp, loop) -> None: + logger = app.config.logger logger.info('after_server_stop -> close_aiohttp_session') if not app.config.scheduler.closed: await asyncio.shield(app.config.scheduler.close()) @@ -149,6 +143,7 @@ async def close_aiohttp_session(app: WebApp, loop) -> None: @app.listener('after_server_stop') async def close_stats_queue(app: WebApp, loop) -> None: + logger = app.config.logger logger.info('after_server_stop -> close_stats_queue') if not app.config.scheduler.closed: await asyncio.shield(app.config.scheduler.close()) @@ -158,6 +153,7 @@ async def close_stats_queue(app: WebApp, loop) -> None: @app.listener('after_server_stop') async def close_cache_connections(app: WebApp, loop) -> None: + logger = app.config.logger logger.info('after_server_stop -> close_cache_connections') for cache in app.config.caches: await cache.close() diff --git a/jussi/logging_config.py b/jussi/logging_config.py index 21d6d9c..fb4a00a 100644 --- a/jussi/logging_config.py +++ b/jussi/logging_config.py @@ -7,15 +7,43 @@ from jussi.typedefs import WebApp +LOG_DATETIME_FORMAT = r'%Y-%m-%dT%H:%M:%S.%s%Z' +SUPPORTED_LOG_MESSAGE_KEYS = ( + 'levelname', + 'asctime', + #'created', + 'filename', + # 'levelno', + 'module', + 'funcName', + 'lineno', + 'msecs', + 'message', + 'name', + 'pathname', + 'process', + 'processName', + # 'relativeCreated', + #'thread', + 'threadName') + +JSON_LOG_FORMAT = ' '.join( + ['%({0:s})'.format(i) for i in SUPPORTED_LOG_MESSAGE_KEYS]) + +#JSON_FORMATTER.converter = time.gmtime + def setup_logging(app: WebApp) -> WebApp: # init logging - root_logger = logging.getLogger() - root_logger.handlers = [] + #root_logger = logging.getLogger() + #root_logger.handlers = [] LOG_LEVEL = getattr(logging, os.environ.get('LOG_LEVEL', 'INFO')) LOGGING['loggers']['sanic']['level'] = LOG_LEVEL LOGGING['loggers']['network']['level'] = LOG_LEVEL - app.config.logger = logging.getLogger('jussi') + LOGGING['loggers']['jussi']['level'] = LOG_LEVEL + logger = logging.getLogger('jussi') + logger.info('configuring jussi logger') + app.config.logger = logger return app @@ -33,35 +61,48 @@ def setup_logging(app: WebApp) -> WebApp: }, 'formatters': { 'simple': { - 'format': '%(asctime)s - (%(name)s)[%(levelname)s]: %(message)s', - 'datefmt': '%Y-%m-%d %H:%M:%S' + '()': 'pythonjsonlogger.jsonlogger.JsonFormatter', + 'format': '%(asctime)s %(name) %(levelname) %(message)', + 'datefmt': LOG_DATETIME_FORMAT }, - 'access': { + 'json_access': { + '()': + 'pythonjsonlogger.jsonlogger.JsonFormatter', 'format': - '%(asctime)s - (%(name)s)[%(levelname)s][%(host)s]: ' + - '%(request)s %(message)s %(status)d %(byte)d', + '%(asctime) %(name) %(levelname) %(host) ' + + '%(request) %(message) %(status) %(byte)', 'datefmt': - '%Y-%m-%d %H:%M:%S' + LOG_DATETIME_FORMAT + }, + 'json': { + '()': 'pythonjsonlogger.jsonlogger.JsonFormatter', + 'format': JSON_LOG_FORMAT, + 'datefmt': LOG_DATETIME_FORMAT } }, 'handlers': { 'internal': { 'class': 'logging.StreamHandler', 'filters': ['accessFilter'], - 'formatter': 'simple', + 'formatter': 'json', 'stream': sys.stderr }, 'accessStream': { 'class': 'logging.StreamHandler', 'filters': ['accessFilter'], - 'formatter': 'access', + 'formatter': 'json', 'stream': sys.stderr }, 'errorStream': { 'class': 'logging.StreamHandler', 'filters': ['errorFilter'], - 'formatter': 'simple', + 'formatter': 'json', 'stream': sys.stderr + }, + 'jussi_hdlr': { + 'class': 'logging.StreamHandler', + 'stream': sys.stderr, + 'formatter': 'json' } }, 'loggers': { @@ -72,6 +113,10 @@ def setup_logging(app: WebApp) -> WebApp: 'network': { 'level': logging.DEBUG, 'handlers': ['accessStream'] + }, + 'jussi': { + 'level': logging.DEBUG, + 'handlers': ['jussi_hdlr'] } } } diff --git a/jussi/middlewares.py b/jussi/middlewares.py index 9acc14c..93b29d0 100644 --- a/jussi/middlewares.py +++ b/jussi/middlewares.py @@ -1,5 +1,8 @@ # -*- coding: utf-8 -*- +import gzip import logging +import zlib +from io import BytesIO from typing import Optional from sanic import response @@ -25,9 +28,28 @@ logger = logging.getLogger('sanic') +def decode_gzip(data): + gzipper = gzip.GzipFile(fileobj=BytesIO(data)) + return gzipper.read() + + +def decode_deflate(data): + try: + return zlib.decompress(data) + except zlib.error: + return zlib.decompress(data, -zlib.MAX_WBITS) + + +CONTENT_DECODERS = { + 'gzip': decode_gzip, + 'deflate': decode_deflate, +} + + def setup_middlewares(app): logger = app.config.logger logger.info('before_server_start -> setup_middlewares') + app.request_middleware.append(handle_gzip) app.request_middleware.append(validate_jsonrpc_request) app.request_middleware.append(request_stats) app.request_middleware.append(caching_middleware) @@ -35,6 +57,19 @@ def setup_middlewares(app): return app +@handle_middleware_exceptions +async def handle_gzip(request: HTTPRequest) -> Optional[HTTPResponse]: + content_encoding = request.headers.get('content-encoding') + decoder = CONTENT_DECODERS.get(content_encoding) + try: + if decoder: + request.body = decoder(request.body) + except (IOError, zlib.error) as e: + logger.error(e) + return response.json( + ParseError(sanic_request=request, exception=e).to_dict()) + + @handle_middleware_exceptions @async_exclude_methods(exclude_http_methods=('GET', )) async def validate_jsonrpc_request( diff --git a/jussi/serializers.py b/jussi/serializers.py index 2495da3..88bdc52 100644 --- a/jussi/serializers.py +++ b/jussi/serializers.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -import logging import zlib from typing import AnyStr from typing import Optional @@ -8,8 +7,6 @@ import ujson from aiocache.serializers import StringSerializer -logger = logging.getLogger('sanic') - class CompressionSerializer(StringSerializer): @@ -21,13 +18,8 @@ class CompressionSerializer(StringSerializer): def dumps(self, value: Union[AnyStr, dict]) -> bytes: # FIXME handle structs with bytes vals, eg, [1, '2', b'3'] # currently self.loads(self.dumps([1, '2', b'3'])) == [1, '2', '3'] - logger.debug(f'dumps dumping {type(value)}') return zlib.compress(ujson.dumps(value).encode()) def loads(self, value) -> Optional[dict]: if value: - logger.debug(f'loads loading {type(value)}') - value = ujson.loads(zlib.decompress(value).decode()) - logger.debug(f'loads loaded {type(value)}') - return value - return None + return ujson.loads(zlib.decompress(value)) diff --git a/jussi/serve.py b/jussi/serve.py index 072cd45..2041f1b 100644 --- a/jussi/serve.py +++ b/jussi/serve.py @@ -58,7 +58,6 @@ def main(): app.run( host=app.config.args.server_host, port=app.config.args.server_port, - log_config=jussi.logging_config.LOGGING, workers=app.config.args.server_workers) diff --git a/service/nginx/nginx.conf b/service/nginx/nginx.conf index 33c8cc8..6ed55df 100644 --- a/service/nginx/nginx.conf +++ b/service/nginx/nginx.conf @@ -38,8 +38,8 @@ http { '"upstream_response_length": $upstream_response_length' '}'; - access_log /var/log/nginx/access.json json_combined buffer=32k flush=5s; - access_log /var/log/nginx/access.log; + access_log /dev/stdout json_combined buffer=32k flush=5s; + keepalive_timeout 65; keepalive_requests 10000; @@ -66,7 +66,9 @@ http { listen 8080; server_name jussi; - add_header 'Access-Control-Allow-Origin' '*'; + add_header Access-Control-Allow-Origin "*"; + add_header Access-Control-Allow-Methods "GET, POST, OPTIONS"; + add_header Access-Control-Allow-Headers "DNT,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Content-Range,Range"; add_header Strict-Transport-Security "max-age=31557600; includeSubDomains; preload" always; add_header 'Content-Security-Policy' 'upgrade-insecure-requests'; @@ -96,9 +98,9 @@ http { } location /nginx_status { - stub_status on; - allow 127.0.0.1; - deny all; + stub_status on; + allow 127.0.0.1; + deny all; } } } diff --git a/tests/async_http_client.py b/tests/async_http_client.py new file mode 100644 index 0000000..be9fe8d --- /dev/null +++ b/tests/async_http_client.py @@ -0,0 +1,274 @@ +# -*- coding: utf-8 -*- +# pylint: skip-file +import asyncio +import logging +import os +import time +import zlib +from collections import deque +from itertools import islice + +import aiohttp +import ujson +from funcy.colls import get_in + +import uvloop +from progress.bar import Bar + +asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +CORRECT_BATCH_TEST_RESPONSE=''' +[{"id":1,"result":{"previous":"000000b0c668dad57f55172da54899754aeba74b","timestamp":"2016-03-24T16:14:21","witness":"initminer","transaction_merkle_root":"0000000000000000000000000000000000000000","extensions":[],"witness_signature":"2036fd4ff7838ba32d6d27637576e1b1e82fd2858ac97e6e65b7451275218cbd2b64411b0a5d74edbde790c17ef704b8ce5d9de268cb43783b499284c77f7d9f5e","transactions":[],"block_id":"000000b13707dfaad7c2452294d4cfa7c2098db4","signing_key":"STM8GC13uCZbP44HzMLV6zPZGwVQ8Nt4Kji8PapsPiNq1BK153XTX","transaction_ids":[]}},{"id":2,"result":{"previous":"000000b0c668dad57f55172da54899754aeba74b","timestamp":"2016-03-24T16:14:21","witness":"initminer","transaction_merkle_root":"0000000000000000000000000000000000000000","extensions":[],"witness_signature":"2036fd4ff7838ba32d6d27637576e1b1e82fd2858ac97e6e65b7451275218cbd2b64411b0a5d74edbde790c17ef704b8ce5d9de268cb43783b499284c77f7d9f5e","transactions":[],"block_id":"000000b13707dfaad7c2452294d4cfa7c2098db4","signing_key":"STM8GC13uCZbP44HzMLV6zPZGwVQ8Nt4Kji8PapsPiNq1BK153XTX","transaction_ids":[]}}] +''' +NO_BATCH_SUPPORT_RESPONSE = '7 bad_cast_exception: Bad Cast' + +class RateBar(Bar): + suffix = '%(index)d (%(rate)d/sec) time remaining: %(eta_td)s' + + @property + def rate(self): + elapsed = self.elapsed or 0.000000001 + return self.index/elapsed + +def chunkify(iterable, chunksize=3000): + i = 0 + chunk = [] + for item in iterable: + chunk.append(item) + i += 1 + if i == chunksize: + yield chunk + i = 0 + chunk = [] + if chunk: + yield chunk + + +class SimpleSteemAPIClient(object): + def __init__(self, *, url=None, **kwargs): + self.url = url or os.environ.get('STEEMD_HTTP_URL', 'https://steemd.steemitdev.com') + self.kwargs = kwargs + self.session = kwargs.get('session', None) + self.connector = get_in(kwargs, ['session','connector']) + + if not self.connector: + self.connector = self._new_connector() + if not self.session: + self.session = self._new_session() + + self._batch_request_size = self.kwargs.get('batch_request_size', 150) + self._concurrent_tasks_limit = self.kwargs.get('concurrent_tasks_limit', 10) + + self._perf_history = deque(maxlen=2000) + self._batch_request_count = 0 + self._request_count = 0 + + + def _new_connector(self, connector_kwargs=None): + connector_kwargs = connector_kwargs or self._connector_kwargs + return aiohttp.TCPConnector(**connector_kwargs) + + def _new_session(self, session_kwargs=None): + session_kwargs = session_kwargs or self._session_kwargs + return aiohttp.ClientSession(**session_kwargs) + + async def fetch(self,request_data): + if isinstance(request_data, list): + self._batch_request_count += 1 + self._request_count += len(request_data) + async with self.session.post(self.url, json=request_data, compress='gzip') as response: + try: + response_data = await response.json() + except Exception as e: + logger.error(e) + logger.error(await response.text()) + return + return response_data + + async def get_blocks(self, block_nums): + requests = ({'jsonrpc':'2.0','id':block_num,'method':'get_block','params':[block_num]} for _id,block_num in enumerate(block_nums)) + batched_requests = chunkify(requests, self.batch_request_size) + coros = (self.fetch(batch) for batch in batched_requests) + first_coros = islice(coros,0,self.concurrent_tasks_limit) + futures = [asyncio.ensure_future(c) for c in first_coros] + + logger.debug(f'inital futures:{len(futures)}') + start = time.perf_counter() + + while futures: + await asyncio.sleep(0) + for f in futures: + try: + if f.done(): + self._perf_history.append(time.perf_counter() - start) + result = f.result() + futures.remove(f) + logger.debug(f'futures:{len(futures)}') + try: + futures.append(asyncio.ensure_future(next(coros))) + except StopIteration as e: + logger.debug('StopIteration') + start = time.perf_counter() + yield result + except Exception as e: + logger.error(e) + + + async def test_batch_support(self, url): + batch_request = [{"id":1,"jsonrpc":"2.0","method":"get_block","params":[1]},{"id":2,"jsonrpc":"2.0","method":"get_block","params":[1]}] + try: + async with self.session.post(self.url, json=batch_request) as response: + response_data = await response.text() + if response_data.startswith(NO_BATCH_SUPPORT_RESPONSE): + return False + return ujson.loads(response_data) == CORRECT_BATCH_TEST_RESPONSE + except Exception as e: + logger.error(e) + return False + + @property + def _session_kwargs(self): + session_kwargs = self.kwargs.get('session_kwargs', {}) + session_kwargs['skip_auto_headers'] = session_kwargs.get('skip_auto_headers', ['User-Agent']) + session_kwargs['json_serialize'] = session_kwargs.get('json_serialize', ujson.dumps) + session_kwargs['headers'] = session_kwargs.get('headers', {'Content-Type': 'application/json'}) + session_kwargs['connector'] = session_kwargs.get('connector', None) + return session_kwargs + + @property + def _connector_kwargs(self): + connector_kwargs = self.kwargs.get('connector_kwargs', {}) + connector_kwargs['keepalive_timeout'] = connector_kwargs.get('keepalive_timeout', 60) + connector_kwargs['limit'] = connector_kwargs.get('limit', 100) + return connector_kwargs + + @property + def concurrent_connections(self): + """number of tcp connections to steemd""" + return self.connector.limit + + @property + def batch_request_size(self): + """number of individual jsonrpc requests to combine into a jsonrpc batch request""" + return self._batch_request_size + + @property + def concurrent_tasks_limit(self): + """number of jsonrpc batch requests tasks to submit to event loop at any one time""" + return self._concurrent_tasks_limit + + + def close(self): + self.session.close() + for task in asyncio.Task.all_tasks(): + task.cancel() + +if __name__ == '__main__': + import argparse + logging.basicConfig(level=logging.DEBUG) + logger = logging.getLogger('async_http_client_main') + parser = argparse.ArgumentParser('jussi perf test script') + parser.add_argument('--blocks', type=int, default=20000) + parser.add_argument('--offset', type=int, default=0) + parser.add_argument('--url', type=str, default='https://api.steemitdev.com') + parser.add_argument('--batch_request_size', type=int, default=1000) + parser.add_argument('--concurrent_tasks_limit', type=int, default=5) + parser.add_argument('--concurrent_connections', type=int, default=0) + parser.add_argument('--print', type=bool, default=False) + args = parser.parse_args() + block_nums = list(range(args.offset, args.blocks)) + loop = asyncio.get_event_loop() + loop.set_debug(True) + + + + async def run(block_nums, url=None, batch_request_size=None, concurrent_tasks_limit=None, concurrent_connections=None): + client = SimpleSteemAPIClient(url=url, + batch_request_size=batch_request_size, + concurrent_tasks_limit=concurrent_tasks_limit, + connector_kwargs={'limit':concurrent_connections}) + logger.debug(f'blocks:{len(block_nums)} {client.batch_request_size} concurrent_tasks_limit:{client.concurrent_tasks_limit} concurrent_connections:{client.concurrent_connections}') + responses = [] + start = time.perf_counter() + + bar = RateBar('Fetching blocks', max=len(block_nums)) + try: + start = time.perf_counter() + async for result in client.get_blocks(block_nums): + if result: + bar.next(n=len(result)) + #responses.extend(result) + + + except Exception as e: + logger.error(e) + finally: + bar.finish() + elapsed = time.perf_counter() - start + client.close() + + return client, responses, elapsed + + try: + client, responses, elapsed = loop.run_until_complete(run(block_nums, + url=args.url, + batch_request_size=args.batch_request_size, + concurrent_tasks_limit=args.concurrent_tasks_limit, + concurrent_connections=args.concurrent_connections)) + finally: + pass + #loop.run_until_complete(loop.shutdown_asyncgens()) + #loop.close() + + history = client._perf_history + request_count = client._request_count + total_time = elapsed + total_time_sequential = sum(client._perf_history) + total_get_block_requests = client._request_count + total_batch_requests = client._batch_request_count + + get_block_time = total_time/total_get_block_requests + batch_request_time = total_time/total_batch_requests + + hours_to_sync = (14000000 * get_block_time)/360 + hours_to_sync2 = ((14000000/args.batch_request_size) * batch_request_time)/360 + + concurrency = 1/(total_time / total_time_sequential) + + def verify_response(response): + try: + assert 'result' in response + assert 'error' not in response + return True + except Exception as e: + print(f'error:{e} response:{response}') + return False + + print() + #print(responses[0]) + #_ids = set([r['id'] for r in responses]) + #block_nums = set(block_nums) + #print(block_nums - _ids) + + + print() + print(f'batch request_size:\t\t{client.batch_request_size}') + print(f'concurrent_tasks_limit:\t\t{client.concurrent_tasks_limit}') + print(f'concurrent_connections:\t\t{client.concurrent_connections}') + print() + print(f'total time:\t\t\t{total_time}') + print(f'total time sequential:\t\t{total_time_sequential}') + print(f'concurrency:\t\t\t{concurrency}') + print(f'total get_block requests:\t{total_get_block_requests}') + print(f'total get_block responses:\t{len(responses)}') + print(f'total batch_requests:\t\t{total_batch_requests}') + print(f'get_block requests/s:\t\t{total_get_block_requests/total_time}') + print(f'batch requests/s:\t\t{total_batch_requests/total_time}') + print(f'avg get_block request time:\t{total_time/total_get_block_requests}') + print(f'avg batch request time:\t\t{total_time/total_batch_requests}') + print(f'est. hours to sync: \t\t{hours_to_sync}') + print(f'est. hours to sync2: \t\t{hours_to_sync2}') diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 2e403b1..37b0292 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -6,7 +6,7 @@ services: ports: - "8080:8080" environment: - LOG_LEVEL: INFO + LOG_LEVEL: WARNING JUSSI_REDIS_HOST: redis env_file: - ../.env diff --git a/tests/http_client.py b/tests/http_client.py index 666bd6f..db8a777 100644 --- a/tests/http_client.py +++ b/tests/http_client.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -import concurrent.futures import json import logging import os @@ -209,16 +208,7 @@ def exec_batch(self, name, params): for resp in batch_response: yield json.dumps(resp) - def exec_batch_with_futures(self, name, params, max_workers=None): - with concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers) as executor: - futures = [] - for chunk in chunkify(params): - futures.append(executor.submit(self.exec_batch, name, chunk)) - for future in concurrent.futures.as_completed(futures): - for item in future.result(): - yield item def test_batch_support(self, url): batch_request = '[{"id":1,"jsonrpc":"2.0","method":"get_block","params":[1]},{"id":2,"jsonrpc":"2.0","method":"get_block","params":[1]}]'