Skip to content

Commit

Permalink
[compression] fix compression management (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
ome9ax authored Aug 12, 2021
1 parent 443fb05 commit 3e76843
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 20 deletions.
14 changes: 13 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
# Change Log

## [v0.0.5.1](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5.1) (2021-08-09)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5...v0.0.5.1)

**Fixed bugs:**
- Issue to decompress archived files

**Closed issues:**
- See PR

**Merged pull requests:**
- [[compression] fix compression management](https://github.com/ome9ax/target-s3-jsonl/pull/12)

## [v0.0.5](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5) (2021-08-12)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.4...v0.0.5)

**New features:**
- I now store the rows in an Array on memory, and unload the Array into the file by batches. By default the batch size is 64Mb configurable with the `memory_buffer` config option.
Expand All @@ -15,7 +28,6 @@
- [[Metadata] manage tap Metadata _sdc columns according to the stitch documentation](https://github.com/ome9ax/target-s3-jsonl/pull/9)

## [v0.0.4](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.4) (2021-08-09)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.0...v0.0.0)

**New features:**
- Initial release
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"
build-backend = "setuptools.build_meta"
21 changes: 9 additions & 12 deletions target_s3_jsonl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

__version__ = '0.0.5'
__version__ = '0.0.5.1'

import argparse
import gzip
Expand Down Expand Up @@ -113,17 +113,14 @@ def get_target_key(message, naming_convention=None, timestamp=None, prefix=None,

def save_file(file_data, compression):
if any(file_data['file_data']):
if compression == 'gzip':
with open(file_data['file_name'], 'ab') as output_file:
with gzip.open(output_file, 'wt', encoding='utf-8') as output_data:
output_data.writelines(file_data['file_data'])
if compression == 'lzma':
with open(file_data['file_name'], 'ab') as output_file:
with lzma.open(output_file, 'wt', encoding='utf-8') as output_data:
output_data.writelines(file_data['file_data'])
if compression is not None:
with compression.open(file_data['file_name'], 'at', encoding='utf-8') as output_file:
output_file.writelines(file_data['file_data'])

else:
with open(file_data['file_name'], 'a', encoding='utf-8') as output_file:
with open(file_data['file_name'], 'at', encoding='utf-8') as output_file:
output_file.writelines(file_data['file_data'])

del file_data['file_data'][:]
LOGGER.debug("'{}' file saved using compression '{}'".format(file_data['file_name'], compression or 'none'))

Expand Down Expand Up @@ -157,12 +154,12 @@ def persist_lines(messages, config):
timezone = datetime.timezone(datetime.timedelta(hours=config.get('timezone_offset'))) if config.get('timezone_offset') is not None else None

if f"{config.get('compression')}".lower() == 'gzip':
compression = 'gzip'
compression = gzip
naming_convention_default = f"{naming_convention_default}.gz"
naming_convention = f"{naming_convention}.gz"

elif f"{config.get('compression')}".lower() == 'lzma':
compression = 'lzma'
compression = lzma
naming_convention_default = f"{naming_convention_default}.xz"
naming_convention = f"{naming_convention}.xz"

Expand Down
46 changes: 40 additions & 6 deletions tests/test_init.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'''Tests for the target_s3_jsonl.main module'''
# Standard library imports
from copy import deepcopy
from datetime import datetime as dt, timezone

# Third party imports
Expand All @@ -10,6 +11,8 @@
Decimal,
datetime,
Path,
gzip,
lzma,
json,
s3,
emit_state,
Expand Down Expand Up @@ -231,12 +234,43 @@ def test_get_target_key():
def test_save_file(config, file_metadata):
'''TEST : simple save_file call'''
Path(config['temp_dir']).mkdir(parents=True, exist_ok=True)
for _, file_info in file_metadata.items():
save_file(file_info, 'none')

assert not file_metadata['tap_dummy_test-test_table_one']['file_name'].exists()
assert not file_metadata['tap_dummy_test-test_table_two']['file_name'].exists()
assert file_metadata['tap_dummy_test-test_table_three']['file_name'].exists()
file_metadata_no_compression = deepcopy(file_metadata)
for _, file_info in file_metadata_no_compression.items():
save_file(file_info, None)

assert not file_metadata_no_compression['tap_dummy_test-test_table_one']['file_name'].exists()
assert not file_metadata_no_compression['tap_dummy_test-test_table_two']['file_name'].exists()
assert file_metadata_no_compression['tap_dummy_test-test_table_three']['file_name'].exists()

with open(file_metadata_no_compression['tap_dummy_test-test_table_three']['file_name'], 'rt', encoding='utf-8') as input_file:
assert [item for item in input_file] == file_metadata['tap_dummy_test-test_table_three']['file_data']

# NOTE: test gzip compression saved file
file_metadata_gzip = deepcopy(file_metadata)
for _, file_info in file_metadata_gzip.items():
file_info['file_name'] = file_info['file_name'].parent / f"{file_info['file_name'].name}.gz"
save_file(file_info, gzip)

assert not file_metadata_gzip['tap_dummy_test-test_table_one']['file_name'].exists()
assert not file_metadata_gzip['tap_dummy_test-test_table_two']['file_name'].exists()
assert file_metadata_gzip['tap_dummy_test-test_table_three']['file_name'].exists()

with gzip.open(file_metadata_gzip['tap_dummy_test-test_table_three']['file_name'], 'rt', encoding='utf-8') as input_file:
assert [item for item in input_file] == file_metadata['tap_dummy_test-test_table_three']['file_data']

# NOTE: test gzip compression saved file
file_metadata_lzma = deepcopy(file_metadata)
for _, file_info in file_metadata_lzma.items():
file_info['file_name'] = file_info['file_name'].parent / f"{file_info['file_name'].name}.xz"
save_file(file_info, lzma)

assert not file_metadata_lzma['tap_dummy_test-test_table_one']['file_name'].exists()
assert not file_metadata_lzma['tap_dummy_test-test_table_two']['file_name'].exists()
assert file_metadata_lzma['tap_dummy_test-test_table_three']['file_name'].exists()

with lzma.open(file_metadata_lzma['tap_dummy_test-test_table_three']['file_name'], 'rt', encoding='utf-8') as input_file:
assert [item for item in input_file] == file_metadata['tap_dummy_test-test_table_three']['file_data']

clear_dir(Path(config['temp_dir']))

Expand All @@ -252,7 +286,7 @@ def test_upload_files(monkeypatch, config, file_metadata):

Path(config['temp_dir']).mkdir(parents=True, exist_ok=True)
for _, file_info in file_metadata.items():
save_file(file_info, 'none')
save_file(file_info, None)

upload_files(file_metadata, config)

Expand Down

0 comments on commit 3e76843

Please sign in to comment.