diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 0a710e6..6e13033 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -5,7 +5,7 @@ updates: - package-ecosystem: pip directory: "/" schedule: - interval: weekly + interval: daily day: monday timezone: Europe/London allow: diff --git a/CHANGELOG.md b/CHANGELOG.md index e980f6a..8ad29c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Change Log +## [2.0.1](https://github.com/ome9ax/target-s3-jsonl/tree/2.0.1) (2022-10-09) + +### What's Changed +* ThreadPoolExecutor concurrent & parallel s3 files upload by @ome9ax in #75 + +**Full Changelog**: https://github.com/ome9ax/target-s3-jsonl/compare/2.0.0...2.0.1 + ## [2.0.0](https://github.com/ome9ax/target-s3-jsonl/tree/2.0.0) (2022-09-29) ### What's Changed diff --git a/src/target_s3_json/__init__.py b/src/target_s3_json/__init__.py index 724324a..0e43e52 100644 --- a/src/target_s3_json/__init__.py +++ b/src/target_s3_json/__init__.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -__version__ = '2.0.0' +__version__ = '2.0.1' # from pathlib import Path diff --git a/src/target_s3_json/s3.py b/src/target_s3_json/s3.py index 7b7a46e..b5c1f10 100644 --- a/src/target_s3_json/s3.py +++ b/src/target_s3_json/s3.py @@ -6,10 +6,11 @@ import json import gzip import lzma -import backoff from typing import Callable, Dict, Any, List, TextIO from asyncio import to_thread +from concurrent.futures import ThreadPoolExecutor, Future +import backoff from boto3.session import Session from botocore.exceptions import ClientError from botocore.client import BaseClient @@ -103,8 +104,8 @@ def create_session(config: Dict) -> Session: def get_encryption_args(config: Dict[str, Any]) -> tuple: if config.get('encryption_type', 'none').lower() == "none": # NOTE: No encryption config (defaults to settings on the bucket): - encryption_desc = '' - encryption_args = {} + encryption_desc: str = '' + encryption_args: dict = {} elif config.get('encryption_type', 'none').lower() == 'kms': if config.get('encryption_key'): encryption_desc = " using KMS encryption key ID '{}'".format(config.get('encryption_key')) @@ -121,42 +122,52 @@ def get_encryption_args(config: Dict[str, Any]) -> tuple: @_retry_pattern() -async def put_object(config: Dict[str, Any], file_metadata: Dict, stream_data: List, client: BaseClient) -> None: +def put_object(config: Dict[str, Any], file_metadata: Dict, stream_data: List) -> None: encryption_desc, encryption_args = get_encryption_args(config) - LOGGER.info("Uploading %s to bucket %s at %s%s", - str(file_metadata['absolute_path']), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc) + config['client'].put_object( + Body=config['open_func']( # NOTE: stream compression with gzip.compress, lzma.compress + b''.join(json.dumps(record, ensure_ascii=False).encode('utf-8') + b'\n' for record in stream_data)), + Bucket=config.get('s3_bucket'), + Key=file_metadata['relative_path'], + **encryption_args.get('ExtraArgs', {})) - await to_thread(client.put_object, - Body=config['open_func']( # NOTE: stream compression with gzip.compress, lzma.compress - b''.join(json.dumps(record, ensure_ascii=False).encode('utf-8') + b'\n' for record in stream_data)), - Bucket=config.get('s3_bucket'), - Key=file_metadata['relative_path'], - **encryption_args.get('ExtraArgs', {})) + LOGGER.info("%s uploaded to bucket %s at %s%s", + file_metadata['absolute_path'].as_posix(), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc) @_retry_pattern() -async def upload_file(config: Dict[str, Any], file_metadata: Dict) -> None: +def upload_file(config: Dict[str, Any], file_metadata: Dict) -> None: if not config.get('local', False) and (file_metadata['absolute_path'].stat().st_size if file_metadata['absolute_path'].exists() else 0) > 0: encryption_desc, encryption_args = get_encryption_args(config) - await to_thread(config['client'].upload_file, - str(file_metadata['absolute_path']), - config.get('s3_bucket'), - file_metadata['relative_path'], - **encryption_args) + config['client'].upload_file( + file_metadata['absolute_path'].as_posix(), + config.get('s3_bucket'), + file_metadata['relative_path'], + **encryption_args) LOGGER.info('%s uploaded to bucket %s at %s%s', - str(file_metadata['absolute_path']), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc) + file_metadata['absolute_path'].as_posix(), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc) if config.get('remove_file', True): # NOTE: Remove the local file(s) file_metadata['absolute_path'].unlink() # missing_ok=False +async def upload_thread(config: Dict[str, Any], file_metadata: Dict) -> Future: + + return await to_thread( + config['executor'].submit, + upload_file, + config, + file_metadata) + + def config_s3(config_default: Dict[str, Any], datetime_format: Dict[str, str] = { 'date_time_format': ':%Y%m%dT%H%M%S', 'date_format': ':%Y%m%d'}) -> Dict[str, Any]: + # NOTE: to_snake = lambda s: '_'.join(findall(r'[A-Z]?[a-z]+|\d+|[A-Z]{1,}(?=[A-Z][a-z]|\W|\d|$)', line)).lower() if 'temp_dir' in config_default: LOGGER.warning('`temp_dir` configuration option is deprecated and support will be removed in the future, use `work_dir` instead.') @@ -185,11 +196,12 @@ def main(lines: TextIO = sys.stdin) -> None: parser.add_argument('-c', '--config', help='Config file', required=True) args = parser.parse_args() config = file.config_compression(config_file(config_s3(json.loads(Path(args.config).read_text(encoding='utf-8'))))) - save_s3: Callable = partial(save_json, post_processing=upload_file) + save_s3: Callable = partial(save_json, post_processing=upload_thread) client: BaseClient = create_session(config).client('s3', **({'endpoint_url': config.get('aws_endpoint_url')} if config.get('aws_endpoint_url') else {})) - Loader(config | {'client': client}, writeline=save_s3).run(lines) + with ThreadPoolExecutor() as executor: + Loader(config | {'client': client, 'executor': executor}, writeline=save_s3).run(lines) # from io import BytesIO diff --git a/tests/test_s3.py b/tests/test_s3.py index 5754dbf..a6e164f 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -2,7 +2,6 @@ # Standard library imports import sys from os import environ -from asyncio import run from copy import deepcopy from re import match import lzma @@ -318,11 +317,10 @@ def test_put_object(config): {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_time": "07:15:00"}, {"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_time": "23:00:03"}] - run(put_object( - config, + put_object( + config | {'client': client}, file_metadata, - stream_data, - client)) + stream_data) head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) assert head['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -333,11 +331,10 @@ def test_put_object(config): file_metadata = { 'absolute_path': Path('tests', 'resources', 'messages.json.gz'), 'relative_path': 'dummy/messages_kms.json.gz'} - run(put_object( - config | {'encryption_type': 'kms', 'encryption_key': None}, + put_object( + config | {'client': client, 'encryption_type': 'kms', 'encryption_key': None}, file_metadata, - stream_data, - client)) + stream_data) head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) assert head['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -348,11 +345,10 @@ def test_put_object(config): file_metadata = { 'absolute_path': Path('tests', 'resources', 'messages.json.gz'), 'relative_path': 'dummy/messages_kms.json.gz'} - run(put_object( - config | {'encryption_type': 'kms', 'encryption_key': 'xXx'}, + put_object( + config | {'client': client, 'encryption_type': 'kms', 'encryption_key': 'xXx'}, file_metadata, - stream_data, - client)) + stream_data) head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) assert head['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -361,11 +357,10 @@ def test_put_object(config): # NOTE: 'dummy' encryption_type with raises(Exception): - run(put_object( - config | {'encryption_type': 'dummy'}, + put_object( + config | {'client': client, 'encryption_type': 'dummy'}, file_metadata | {'relative_path': 'dummy/messages_dummy.json.gz'}, - stream_data, - client)) + stream_data) @mock_s3 @@ -384,16 +379,16 @@ def test_upload_file(config, temp_path): 'absolute_path': temp_file, 'relative_path': 'dummy/messages.json'} - run(upload_file( + upload_file( config | {'local': True, 'client': client, 'remove_file': False}, - file_metadata)) + file_metadata) assert 'Contents' not in client.list_objects_v2(Bucket=config['s3_bucket'], Prefix=file_metadata['relative_path'], MaxKeys=1) assert file_metadata['absolute_path'].exists() - run(upload_file( + upload_file( config | {'client': client, 'remove_file': False}, - file_metadata)) + file_metadata) head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) assert head['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -404,9 +399,9 @@ def test_upload_file(config, temp_path): file_metadata = { 'absolute_path': temp_file, 'relative_path': 'dummy/messages_kms.json'} - run(upload_file( + upload_file( config | {'client': client, 'remove_file': False, 'encryption_type': 'kms', 'encryption_key': None}, - file_metadata)) + file_metadata) head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) assert head['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -417,9 +412,9 @@ def test_upload_file(config, temp_path): file_metadata = { 'absolute_path': temp_file, 'relative_path': 'dummy/messages_kms.json'} - run(upload_file( + upload_file( config | {'client': client, 'encryption_type': 'kms', 'encryption_key': 'xXx'}, - file_metadata)) + file_metadata) head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) assert head['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -430,9 +425,9 @@ def test_upload_file(config, temp_path): # NOTE: 'dummy' encryption_type # with raises(Exception): - # run(upload_file( + # upload_file( # config | {'client': client, 'encryption_type': 'dummy'}, - # file_metadata | {'relative_path': 'dummy/messages_dummy.json'})) + # file_metadata | {'relative_path': 'dummy/messages_dummy.json'}) def test_config_s3(config_raw):