Skip to content

Commit

Permalink
ThreadPoolExecutor concurrent & parallel s3 files upload (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
ome9ax authored Oct 9, 2022
1 parent d7e27f7 commit 85f4746
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ updates:
- package-ecosystem: pip
directory: "/"
schedule:
interval: weekly
interval: daily
day: monday
timezone: Europe/London
allow:
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change Log

## [2.0.1](https://github.com/ome9ax/target-s3-jsonl/tree/2.0.1) (2022-10-09)

### What's Changed
* ThreadPoolExecutor concurrent & parallel s3 files upload by @ome9ax in #75

**Full Changelog**: https://github.com/ome9ax/target-s3-jsonl/compare/2.0.0...2.0.1

## [2.0.0](https://github.com/ome9ax/target-s3-jsonl/tree/2.0.0) (2022-09-29)

### What's Changed
Expand Down
2 changes: 1 addition & 1 deletion src/target_s3_json/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

__version__ = '2.0.0'
__version__ = '2.0.1'

# from pathlib import Path

Expand Down
54 changes: 33 additions & 21 deletions src/target_s3_json/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import json
import gzip
import lzma
import backoff
from typing import Callable, Dict, Any, List, TextIO
from asyncio import to_thread
from concurrent.futures import ThreadPoolExecutor, Future

import backoff
from boto3.session import Session
from botocore.exceptions import ClientError
from botocore.client import BaseClient
Expand Down Expand Up @@ -103,8 +104,8 @@ def create_session(config: Dict) -> Session:
def get_encryption_args(config: Dict[str, Any]) -> tuple:
if config.get('encryption_type', 'none').lower() == "none":
# NOTE: No encryption config (defaults to settings on the bucket):
encryption_desc = ''
encryption_args = {}
encryption_desc: str = ''
encryption_args: dict = {}
elif config.get('encryption_type', 'none').lower() == 'kms':
if config.get('encryption_key'):
encryption_desc = " using KMS encryption key ID '{}'".format(config.get('encryption_key'))
Expand All @@ -121,42 +122,52 @@ def get_encryption_args(config: Dict[str, Any]) -> tuple:


@_retry_pattern()
async def put_object(config: Dict[str, Any], file_metadata: Dict, stream_data: List, client: BaseClient) -> None:
def put_object(config: Dict[str, Any], file_metadata: Dict, stream_data: List) -> None:
encryption_desc, encryption_args = get_encryption_args(config)

LOGGER.info("Uploading %s to bucket %s at %s%s",
str(file_metadata['absolute_path']), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc)
config['client'].put_object(
Body=config['open_func']( # NOTE: stream compression with gzip.compress, lzma.compress
b''.join(json.dumps(record, ensure_ascii=False).encode('utf-8') + b'\n' for record in stream_data)),
Bucket=config.get('s3_bucket'),
Key=file_metadata['relative_path'],
**encryption_args.get('ExtraArgs', {}))

await to_thread(client.put_object,
Body=config['open_func']( # NOTE: stream compression with gzip.compress, lzma.compress
b''.join(json.dumps(record, ensure_ascii=False).encode('utf-8') + b'\n' for record in stream_data)),
Bucket=config.get('s3_bucket'),
Key=file_metadata['relative_path'],
**encryption_args.get('ExtraArgs', {}))
LOGGER.info("%s uploaded to bucket %s at %s%s",
file_metadata['absolute_path'].as_posix(), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc)


@_retry_pattern()
async def upload_file(config: Dict[str, Any], file_metadata: Dict) -> None:
def upload_file(config: Dict[str, Any], file_metadata: Dict) -> None:
if not config.get('local', False) and (file_metadata['absolute_path'].stat().st_size if file_metadata['absolute_path'].exists() else 0) > 0:
encryption_desc, encryption_args = get_encryption_args(config)

await to_thread(config['client'].upload_file,
str(file_metadata['absolute_path']),
config.get('s3_bucket'),
file_metadata['relative_path'],
**encryption_args)
config['client'].upload_file(
file_metadata['absolute_path'].as_posix(),
config.get('s3_bucket'),
file_metadata['relative_path'],
**encryption_args)

LOGGER.info('%s uploaded to bucket %s at %s%s',
str(file_metadata['absolute_path']), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc)
file_metadata['absolute_path'].as_posix(), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc)

if config.get('remove_file', True):
# NOTE: Remove the local file(s)
file_metadata['absolute_path'].unlink() # missing_ok=False


async def upload_thread(config: Dict[str, Any], file_metadata: Dict) -> Future:

return await to_thread(
config['executor'].submit,
upload_file,
config,
file_metadata)


def config_s3(config_default: Dict[str, Any], datetime_format: Dict[str, str] = {
'date_time_format': ':%Y%m%dT%H%M%S',
'date_format': ':%Y%m%d'}) -> Dict[str, Any]:
# NOTE: to_snake = lambda s: '_'.join(findall(r'[A-Z]?[a-z]+|\d+|[A-Z]{1,}(?=[A-Z][a-z]|\W|\d|$)', line)).lower()

if 'temp_dir' in config_default:
LOGGER.warning('`temp_dir` configuration option is deprecated and support will be removed in the future, use `work_dir` instead.')
Expand Down Expand Up @@ -185,11 +196,12 @@ def main(lines: TextIO = sys.stdin) -> None:
parser.add_argument('-c', '--config', help='Config file', required=True)
args = parser.parse_args()
config = file.config_compression(config_file(config_s3(json.loads(Path(args.config).read_text(encoding='utf-8')))))
save_s3: Callable = partial(save_json, post_processing=upload_file)
save_s3: Callable = partial(save_json, post_processing=upload_thread)
client: BaseClient = create_session(config).client('s3', **({'endpoint_url': config.get('aws_endpoint_url')}
if config.get('aws_endpoint_url') else {}))

Loader(config | {'client': client}, writeline=save_s3).run(lines)
with ThreadPoolExecutor() as executor:
Loader(config | {'client': client, 'executor': executor}, writeline=save_s3).run(lines)


# from io import BytesIO
Expand Down
49 changes: 22 additions & 27 deletions tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Standard library imports
import sys
from os import environ
from asyncio import run
from copy import deepcopy
from re import match
import lzma
Expand Down Expand Up @@ -318,11 +317,10 @@ def test_put_object(config):
{"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_time": "07:15:00"},
{"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_time": "23:00:03"}]

run(put_object(
config,
put_object(
config | {'client': client},
file_metadata,
stream_data,
client))
stream_data)

head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path'])
assert head['ResponseMetadata']['HTTPStatusCode'] == 200
Expand All @@ -333,11 +331,10 @@ def test_put_object(config):
file_metadata = {
'absolute_path': Path('tests', 'resources', 'messages.json.gz'),
'relative_path': 'dummy/messages_kms.json.gz'}
run(put_object(
config | {'encryption_type': 'kms', 'encryption_key': None},
put_object(
config | {'client': client, 'encryption_type': 'kms', 'encryption_key': None},
file_metadata,
stream_data,
client))
stream_data)

head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path'])
assert head['ResponseMetadata']['HTTPStatusCode'] == 200
Expand All @@ -348,11 +345,10 @@ def test_put_object(config):
file_metadata = {
'absolute_path': Path('tests', 'resources', 'messages.json.gz'),
'relative_path': 'dummy/messages_kms.json.gz'}
run(put_object(
config | {'encryption_type': 'kms', 'encryption_key': 'xXx'},
put_object(
config | {'client': client, 'encryption_type': 'kms', 'encryption_key': 'xXx'},
file_metadata,
stream_data,
client))
stream_data)

head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path'])
assert head['ResponseMetadata']['HTTPStatusCode'] == 200
Expand All @@ -361,11 +357,10 @@ def test_put_object(config):

# NOTE: 'dummy' encryption_type
with raises(Exception):
run(put_object(
config | {'encryption_type': 'dummy'},
put_object(
config | {'client': client, 'encryption_type': 'dummy'},
file_metadata | {'relative_path': 'dummy/messages_dummy.json.gz'},
stream_data,
client))
stream_data)


@mock_s3
Expand All @@ -384,16 +379,16 @@ def test_upload_file(config, temp_path):
'absolute_path': temp_file,
'relative_path': 'dummy/messages.json'}

run(upload_file(
upload_file(
config | {'local': True, 'client': client, 'remove_file': False},
file_metadata))
file_metadata)

assert 'Contents' not in client.list_objects_v2(Bucket=config['s3_bucket'], Prefix=file_metadata['relative_path'], MaxKeys=1)
assert file_metadata['absolute_path'].exists()

run(upload_file(
upload_file(
config | {'client': client, 'remove_file': False},
file_metadata))
file_metadata)

head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path'])
assert head['ResponseMetadata']['HTTPStatusCode'] == 200
Expand All @@ -404,9 +399,9 @@ def test_upload_file(config, temp_path):
file_metadata = {
'absolute_path': temp_file,
'relative_path': 'dummy/messages_kms.json'}
run(upload_file(
upload_file(
config | {'client': client, 'remove_file': False, 'encryption_type': 'kms', 'encryption_key': None},
file_metadata))
file_metadata)

head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path'])
assert head['ResponseMetadata']['HTTPStatusCode'] == 200
Expand All @@ -417,9 +412,9 @@ def test_upload_file(config, temp_path):
file_metadata = {
'absolute_path': temp_file,
'relative_path': 'dummy/messages_kms.json'}
run(upload_file(
upload_file(
config | {'client': client, 'encryption_type': 'kms', 'encryption_key': 'xXx'},
file_metadata))
file_metadata)

head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path'])
assert head['ResponseMetadata']['HTTPStatusCode'] == 200
Expand All @@ -430,9 +425,9 @@ def test_upload_file(config, temp_path):

# NOTE: 'dummy' encryption_type
# with raises(Exception):
# run(upload_file(
# upload_file(
# config | {'client': client, 'encryption_type': 'dummy'},
# file_metadata | {'relative_path': 'dummy/messages_dummy.json'}))
# file_metadata | {'relative_path': 'dummy/messages_dummy.json'})


def test_config_s3(config_raw):
Expand Down

0 comments on commit 85f4746

Please sign in to comment.