From 3e76843b9b681d98f020d7aa017ca8dfec7d8618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eddy=20=E2=88=86?= Date: Thu, 12 Aug 2021 16:41:25 +0100 Subject: [PATCH] [compression] fix compression management (#12) --- CHANGELOG.md | 14 ++++++++++- pyproject.toml | 2 +- target_s3_jsonl/__init__.py | 21 ++++++++--------- tests/test_init.py | 46 ++++++++++++++++++++++++++++++++----- 4 files changed, 63 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4568e89..b26182a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,19 @@ # Change Log +## [v0.0.5.1](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5.1) (2021-08-09) +[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5...v0.0.5.1) + +**Fixed bugs:** +- Issue to decompress archived files + +**Closed issues:** +- See PR + +**Merged pull requests:** +- [[compression] fix compression management](https://github.com/ome9ax/target-s3-jsonl/pull/12) + ## [v0.0.5](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5) (2021-08-12) +[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.4...v0.0.5) **New features:** - I now store the rows in an Array on memory, and unload the Array into the file by batches. By default the batch size is 64Mb configurable with the `memory_buffer` config option. @@ -15,7 +28,6 @@ - [[Metadata] manage tap Metadata _sdc columns according to the stitch documentation](https://github.com/ome9ax/target-s3-jsonl/pull/9) ## [v0.0.4](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.4) (2021-08-09) -[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.0...v0.0.0) **New features:** - Initial release diff --git a/pyproject.toml b/pyproject.toml index b5a3c46..374b58c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,4 +3,4 @@ requires = [ "setuptools>=42", "wheel" ] -build-backend = "setuptools.build_meta" \ No newline at end of file +build-backend = "setuptools.build_meta" diff --git a/target_s3_jsonl/__init__.py b/target_s3_jsonl/__init__.py index 7f94c39..9c4c5cb 100644 --- a/target_s3_jsonl/__init__.py +++ b/target_s3_jsonl/__init__.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -__version__ = '0.0.5' +__version__ = '0.0.5.1' import argparse import gzip @@ -113,17 +113,14 @@ def get_target_key(message, naming_convention=None, timestamp=None, prefix=None, def save_file(file_data, compression): if any(file_data['file_data']): - if compression == 'gzip': - with open(file_data['file_name'], 'ab') as output_file: - with gzip.open(output_file, 'wt', encoding='utf-8') as output_data: - output_data.writelines(file_data['file_data']) - if compression == 'lzma': - with open(file_data['file_name'], 'ab') as output_file: - with lzma.open(output_file, 'wt', encoding='utf-8') as output_data: - output_data.writelines(file_data['file_data']) + if compression is not None: + with compression.open(file_data['file_name'], 'at', encoding='utf-8') as output_file: + output_file.writelines(file_data['file_data']) + else: - with open(file_data['file_name'], 'a', encoding='utf-8') as output_file: + with open(file_data['file_name'], 'at', encoding='utf-8') as output_file: output_file.writelines(file_data['file_data']) + del file_data['file_data'][:] LOGGER.debug("'{}' file saved using compression '{}'".format(file_data['file_name'], compression or 'none')) @@ -157,12 +154,12 @@ def persist_lines(messages, config): timezone = datetime.timezone(datetime.timedelta(hours=config.get('timezone_offset'))) if config.get('timezone_offset') is not None else None if f"{config.get('compression')}".lower() == 'gzip': - compression = 'gzip' + compression = gzip naming_convention_default = f"{naming_convention_default}.gz" naming_convention = f"{naming_convention}.gz" elif f"{config.get('compression')}".lower() == 'lzma': - compression = 'lzma' + compression = lzma naming_convention_default = f"{naming_convention_default}.xz" naming_convention = f"{naming_convention}.xz" diff --git a/tests/test_init.py b/tests/test_init.py index 913b940..8ff8d9a 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -1,5 +1,6 @@ '''Tests for the target_s3_jsonl.main module''' # Standard library imports +from copy import deepcopy from datetime import datetime as dt, timezone # Third party imports @@ -10,6 +11,8 @@ Decimal, datetime, Path, + gzip, + lzma, json, s3, emit_state, @@ -231,12 +234,43 @@ def test_get_target_key(): def test_save_file(config, file_metadata): '''TEST : simple save_file call''' Path(config['temp_dir']).mkdir(parents=True, exist_ok=True) - for _, file_info in file_metadata.items(): - save_file(file_info, 'none') - assert not file_metadata['tap_dummy_test-test_table_one']['file_name'].exists() - assert not file_metadata['tap_dummy_test-test_table_two']['file_name'].exists() - assert file_metadata['tap_dummy_test-test_table_three']['file_name'].exists() + file_metadata_no_compression = deepcopy(file_metadata) + for _, file_info in file_metadata_no_compression.items(): + save_file(file_info, None) + + assert not file_metadata_no_compression['tap_dummy_test-test_table_one']['file_name'].exists() + assert not file_metadata_no_compression['tap_dummy_test-test_table_two']['file_name'].exists() + assert file_metadata_no_compression['tap_dummy_test-test_table_three']['file_name'].exists() + + with open(file_metadata_no_compression['tap_dummy_test-test_table_three']['file_name'], 'rt', encoding='utf-8') as input_file: + assert [item for item in input_file] == file_metadata['tap_dummy_test-test_table_three']['file_data'] + + # NOTE: test gzip compression saved file + file_metadata_gzip = deepcopy(file_metadata) + for _, file_info in file_metadata_gzip.items(): + file_info['file_name'] = file_info['file_name'].parent / f"{file_info['file_name'].name}.gz" + save_file(file_info, gzip) + + assert not file_metadata_gzip['tap_dummy_test-test_table_one']['file_name'].exists() + assert not file_metadata_gzip['tap_dummy_test-test_table_two']['file_name'].exists() + assert file_metadata_gzip['tap_dummy_test-test_table_three']['file_name'].exists() + + with gzip.open(file_metadata_gzip['tap_dummy_test-test_table_three']['file_name'], 'rt', encoding='utf-8') as input_file: + assert [item for item in input_file] == file_metadata['tap_dummy_test-test_table_three']['file_data'] + + # NOTE: test gzip compression saved file + file_metadata_lzma = deepcopy(file_metadata) + for _, file_info in file_metadata_lzma.items(): + file_info['file_name'] = file_info['file_name'].parent / f"{file_info['file_name'].name}.xz" + save_file(file_info, lzma) + + assert not file_metadata_lzma['tap_dummy_test-test_table_one']['file_name'].exists() + assert not file_metadata_lzma['tap_dummy_test-test_table_two']['file_name'].exists() + assert file_metadata_lzma['tap_dummy_test-test_table_three']['file_name'].exists() + + with lzma.open(file_metadata_lzma['tap_dummy_test-test_table_three']['file_name'], 'rt', encoding='utf-8') as input_file: + assert [item for item in input_file] == file_metadata['tap_dummy_test-test_table_three']['file_data'] clear_dir(Path(config['temp_dir'])) @@ -252,7 +286,7 @@ def test_upload_files(monkeypatch, config, file_metadata): Path(config['temp_dir']).mkdir(parents=True, exist_ok=True) for _, file_info in file_metadata.items(): - save_file(file_info, 'none') + save_file(file_info, None) upload_files(file_metadata, config)