From 09e8d3bfc43b294025792d9f22593da2e541bb8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eddy=20=E2=88=86?= Date: Fri, 13 Aug 2021 17:43:30 +0100 Subject: [PATCH] [specs_tests] specs & test 100% coverage (#14) * [specs_tests] specs & test 100% coverage * [specs_tests] style * [specs_tests] setup.py * [specs_tests] s3 specs & tests --- setup.cfg | 5 +- target_s3_jsonl/__init__.py | 22 ++-- target_s3_jsonl/s3.py | 10 +- tests/resources/config.json | 2 +- tests/resources/config_local_false.json | 12 ++ tests/resources/config_no_bucket.json | 11 ++ tests/test_init.py | 154 +++++++++++++++++++----- 7 files changed, 167 insertions(+), 49 deletions(-) create mode 100644 tests/resources/config_local_false.json create mode 100644 tests/resources/config_no_bucket.json diff --git a/setup.cfg b/setup.cfg index a781ee5..f78aa17 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,7 +34,9 @@ include_package_data = True target_s3_jsonl = logging.conf [options.extras_require] -test = pytest-cov +test = + pytest-cov + moto[s3] lint = flake8 dist = wheel @@ -54,7 +56,6 @@ testpaths = tests branch = True omit = venv/* - target_s3_jsonl/s3.py [coverage:report] show_missing = True diff --git a/target_s3_jsonl/__init__.py b/target_s3_jsonl/__init__.py index 1f9be54..6297718 100644 --- a/target_s3_jsonl/__init__.py +++ b/target_s3_jsonl/__init__.py @@ -181,10 +181,10 @@ def persist_lines(messages, config): raise message_type = o['type'] if message_type == 'RECORD': - if 'stream' not in o: + if 'stream' not in o: # pragma: no cover raise Exception("Line is missing required key 'stream': {}".format(message)) stream = o['stream'] - if stream not in schemas: + if stream not in schemas: # pragma: no cover raise Exception('A record for stream {} was encountered before a corresponding schema'.format(stream)) # NOTE: Validate record @@ -192,7 +192,7 @@ def persist_lines(messages, config): try: validators[stream].validate(float_to_decimal(record_to_load)) except Exception as ex: - if type(ex).__name__ == "InvalidOperation": + if type(ex).__name__ == "InvalidOperation": # pragma: no cover LOGGER.error( "Data validation failed and cannot load to destination. RECORD: {}\n" "'multipleOf' validations that allows long precisions are not supported" @@ -200,7 +200,7 @@ def persist_lines(messages, config): .format(record_to_load)) raise ex - if config.get('add_metadata_columns'): + if config.get('add_metadata_columns'): # pragma: no cover record_to_load = add_metadata_values_to_record(o, {}, now.timestamp()) else: record_to_load = remove_metadata_values_from_record(o) @@ -209,7 +209,7 @@ def persist_lines(messages, config): # NOTE: write temporary file # Use 64Mb default memory buffer - if sys.getsizeof(file_data[stream]['file_data']) > config.get('memory_buffer', 64e6): + if sys.getsizeof(file_data[stream]['file_data']) > config.get('memory_buffer', 64e6): # pragma: no cover save_file(file_data[stream], open_func) state = None @@ -217,24 +217,24 @@ def persist_lines(messages, config): LOGGER.debug('Setting state to {}'.format(o['value'])) state = o['value'] elif message_type == 'SCHEMA': - if 'stream' not in o: + if 'stream' not in o: # pragma: no cover raise Exception("Line is missing required key 'stream': {}".format(message)) stream = o['stream'] - if config.get('add_metadata_columns'): + if config.get('add_metadata_columns'): # pragma: no cover schemas[stream] = add_metadata_columns_to_schema(o) else: schemas[stream] = float_to_decimal(o['schema']) validators[stream] = Draft4Validator(schemas[stream], format_checker=FormatChecker()) - if 'key_properties' not in o: + if 'key_properties' not in o: # pragma: no cover raise Exception('key_properties field is required') key_properties[stream] = o['key_properties'] LOGGER.debug('Setting schema for {}'.format(stream)) # NOTE: get the s3 file key - if stream not in file_data: + if stream not in file_data: # pragma: no cover file_data[stream] = { 'target_key': get_target_key( o, @@ -247,7 +247,7 @@ def persist_lines(messages, config): elif message_type == 'ACTIVATE_VERSION': LOGGER.debug('ACTIVATE_VERSION {}'.format(message)) - else: + else: # pragma: no cover LOGGER.warning('Unknown message type {} in message {}'.format(o['type'], o)) for _, file_info in file_data.items(): @@ -278,5 +278,5 @@ def main(): LOGGER.debug('Exiting normally') -if __name__ == '__main__': +if __name__ == '__main__': # pragma: no cover main() diff --git a/target_s3_jsonl/s3.py b/target_s3_jsonl/s3.py index 81400eb..16ca078 100644 --- a/target_s3_jsonl/s3.py +++ b/target_s3_jsonl/s3.py @@ -18,7 +18,7 @@ def retry_pattern(): def log_backoff_attempt(details): - LOGGER.info("Error detected communicating with Amazon, triggering backoff: %d try", details.get("tries")) + LOGGER.info("Error detected communicating with Amazon, triggering backoff: %d try", details.get("tries")) # pragma: no cover @retry_pattern() @@ -41,9 +41,9 @@ def create_client(config): ) # AWS Profile based authentication else: - aws_session = boto3.session.Session(profile_name=aws_profile) + aws_session = boto3.session.Session(profile_name=aws_profile) # pragma: no cover if aws_endpoint_url: - s3 = aws_session.client('s3', endpoint_url=aws_endpoint_url) + s3 = aws_session.client('s3', endpoint_url=aws_endpoint_url) # pragma: no cover else: s3 = aws_session.client('s3') return s3 @@ -59,7 +59,7 @@ def upload_file(filename, s3_client, bucket, s3_key, encryption_desc = "" encryption_args = None else: - if encryption_type.lower() == "kms": + if encryption_type.lower() == "kms": # pragma: no cover encryption_args = {"ServerSideEncryption": "aws:kms"} if encryption_key: encryption_desc = ( @@ -70,7 +70,7 @@ def upload_file(filename, s3_client, bucket, s3_key, else: encryption_desc = " using default KMS encryption" else: - raise NotImplementedError( + raise NotImplementedError( # pragma: no cover "Encryption type '{}' is not supported. " "Expected: 'none' or 'KMS'" .format(encryption_type) diff --git a/tests/resources/config.json b/tests/resources/config.json index f2b7b94..bbae8ad 100644 --- a/tests/resources/config.json +++ b/tests/resources/config.json @@ -1,5 +1,5 @@ { - "local": false, + "local": true, "add_metadata_columns": false, "aws_access_key_id": "ACCESS-KEY", "aws_secret_access_key": "SECRET", diff --git a/tests/resources/config_local_false.json b/tests/resources/config_local_false.json new file mode 100644 index 0000000..f2b7b94 --- /dev/null +++ b/tests/resources/config_local_false.json @@ -0,0 +1,12 @@ +{ + "local": false, + "add_metadata_columns": false, + "aws_access_key_id": "ACCESS-KEY", + "aws_secret_access_key": "SECRET", + "s3_bucket": "BUCKET", + "temp_dir": "tests/output", + "memory_buffer": 2000000, + "compression": "none", + "timezone_offset": 0, + "naming_convention": "{stream}-{timestamp}.jsonl" +} diff --git a/tests/resources/config_no_bucket.json b/tests/resources/config_no_bucket.json new file mode 100644 index 0000000..deb1b39 --- /dev/null +++ b/tests/resources/config_no_bucket.json @@ -0,0 +1,11 @@ +{ + "local": true, + "add_metadata_columns": false, + "aws_access_key_id": "ACCESS-KEY", + "aws_secret_access_key": "SECRET", + "temp_dir": "tests/output", + "memory_buffer": 2000000, + "compression": "none", + "timezone_offset": 0, + "naming_convention": "{stream}-{timestamp}.jsonl" +} diff --git a/tests/test_init.py b/tests/test_init.py index e76d792..9bc0168 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -4,16 +4,20 @@ from datetime import datetime as dt, timezone # Third party imports -from pytest import fixture +from pytest import fixture, raises +import boto3 +from moto import mock_s3 # Package imports from target_s3_jsonl import ( + sys, Decimal, datetime, - Path, + argparse, gzip, lzma, json, + Path, s3, add_metadata_columns_to_schema, add_metadata_values_to_record, @@ -24,24 +28,17 @@ save_file, upload_files, persist_lines, + main, ) -with open(Path('tests', 'resources', 'messages-with-three-streams.json'), 'r', encoding='utf-8') as input_file, \ - open(Path('tests', 'resources', 'invalid-json.json'), 'r', encoding='utf-8') as invalid_row_file, \ - open(Path('tests', 'resources', 'invalid-message-order.json'), 'r', encoding='utf-8') as invalid_order_file: - INPUT_DATA = [item for item in input_file] - INVALID_ROW_DATA = [item for item in invalid_row_file] - INVALID_ORDER_DATA = [item for item in invalid_order_file] - - @fixture def patch_datetime(monkeypatch): class mydatetime: @classmethod def utcnow(cls): - return dt.fromtimestamp(1628713605.321056, tz=timezone.utc).replace(tzinfo=None) + return dt.fromtimestamp(1628663978.321056, tz=timezone.utc).replace(tzinfo=None) @classmethod def now(cls, x=timezone.utc): @@ -70,21 +67,24 @@ def config(): def input_data(): '''Use custom parameters set''' - return INPUT_DATA + with open(Path('tests', 'resources', 'messages-with-three-streams.json'), 'r', encoding='utf-8') as input_file: + return [item for item in input_file] @fixture def invalid_row_data(): '''Use custom parameters set''' - return INVALID_ROW_DATA + with open(Path('tests', 'resources', 'invalid-json.json'), 'r', encoding='utf-8') as input_file: + return [item for item in input_file] @fixture def invalid_order_data(): '''Use custom parameters set''' - return INVALID_ORDER_DATA + with open(Path('tests', 'resources', 'invalid-message-order.json'), 'r', encoding='utf-8') as input_file: + return [item for item in input_file] @fixture @@ -105,16 +105,16 @@ def file_metadata(): return { 'tap_dummy_test-test_table_one': { - 'target_key': 'tap_dummy_test-test_table_one-20210811T063938.jsonl', - 'file_name': Path('tests/output/tap_dummy_test-test_table_one-20210811T063938.jsonl'), + 'target_key': 'tap_dummy_test-test_table_one-20210811T063938.json', + 'file_name': Path('tests/output/tap_dummy_test-test_table_one-20210811T063938.json'), 'file_data': []}, 'tap_dummy_test-test_table_two': { - 'target_key': 'tap_dummy_test-test_table_two-20210811T063938.jsonl', - 'file_name': Path('tests/output/tap_dummy_test-test_table_two-20210811T063938.jsonl'), + 'target_key': 'tap_dummy_test-test_table_two-20210811T063938.json', + 'file_name': Path('tests/output/tap_dummy_test-test_table_two-20210811T063938.json'), 'file_data': []}, 'tap_dummy_test-test_table_three': { - 'target_key': 'tap_dummy_test-test_table_three-20210811T063938.jsonl', - 'file_name': Path('tests/output/tap_dummy_test-test_table_three-20210811T063938.jsonl'), + 'target_key': 'tap_dummy_test-test_table_three-20210811T063938.json', + 'file_name': Path('tests/output/tap_dummy_test-test_table_three-20210811T063938.json'), 'file_data': [ '{"c_pk": 1, "c_varchar": "1", "c_int": 1, "c_time": "04:00:00"}\n', '{"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_time": "07:15:00"}\n', @@ -134,6 +134,10 @@ def test_emit_state(capsys, state): captured = capsys.readouterr() assert captured.out == json.dumps(state) + '\n' + emit_state(None) + captured = capsys.readouterr() + assert captured.out == '' + def test_add_metadata_columns_to_schema(): '''TEST : simple add_metadata_columns_to_schema call''' @@ -228,7 +232,8 @@ def test_float_to_decimal(): def test_get_target_key(): '''TEST : simple get_target_key call''' - assert get_target_key({'stream': 'dummy_stream'}, naming_convention='{stream}-{timestamp}.jsonl', timestamp='99') == 'dummy_stream-99.jsonl' + assert get_target_key({'stream': 'dummy_stream'}, timestamp='99') == 'dummy_stream-99.json' + assert get_target_key({'stream': 'dummy_stream'}, naming_convention='xxx-{stream}-{timestamp}.jsonl', timestamp='99') == 'xxx-dummy_stream-99.jsonl' def test_save_file(config, file_metadata): @@ -254,19 +259,17 @@ def test_save_file(config, file_metadata): clear_dir(Path(config['temp_dir'])) +@mock_s3 def test_upload_files(monkeypatch, config, file_metadata): '''TEST : simple upload_files call''' - monkeypatch.setattr(s3, 'create_client', lambda config: None) - - monkeypatch.setattr( - s3, 'upload_file', lambda filename, s3_client, bucket, s3_key, - encryption_type=None, encryption_key=None: None) - Path(config['temp_dir']).mkdir(parents=True, exist_ok=True) for _, file_info in file_metadata.items(): save_file(file_info, open) + conn = boto3.resource('s3', region_name='us-east-1') + conn.create_bucket(Bucket=config['s3_bucket']) + upload_files(file_metadata, config) assert not file_metadata['tap_dummy_test-test_table_three']['file_name'].exists() @@ -274,18 +277,109 @@ def test_upload_files(monkeypatch, config, file_metadata): clear_dir(Path(config['temp_dir'])) -def test_persist_lines(config, input_data, state, file_metadata): +def test_persist_lines(config, input_data, invalid_row_data, invalid_order_data, state, file_metadata): '''TEST : simple persist_lines call''' output_state, output_file_metadata = persist_lines(input_data, config) file_paths = set(path for path in Path(config['temp_dir']).iterdir()) - assert len(file_paths) == 3 - assert output_state == state + assert len(file_paths) == 3 + assert len(set(str(values['file_name']) for _, values in output_file_metadata.items()) - set(str(path) for path in file_paths)) == 0 with open(output_file_metadata['tap_dummy_test-test_table_three']['file_name'], 'r', encoding='utf-8') as input_file: assert [item for item in input_file] == file_metadata['tap_dummy_test-test_table_three']['file_data'] + for compression, extension in {'gzip': '.gz', 'lzma': '.xz', 'none': ''}.items(): + clear_dir(Path(config['temp_dir'])) + config_copy = deepcopy(config) + config_copy['compression'] = compression + output_state, output_file_metadata = persist_lines(input_data, config_copy) + file_paths = set(path for path in Path(config['temp_dir']).iterdir()) + + assert len(file_paths) == 3 + + assert len(set(str(values['file_name']) for _, values in output_file_metadata.items()) - set(str(path) for path in file_paths)) == 0 + + clear_dir(Path(config['temp_dir'])) + + with raises(NotImplementedError): + config_copy = deepcopy(config) + config_copy['compression'] = 'dummy' + output_state, output_file_metadata = persist_lines(input_data, config_copy) + + with raises(json.decoder.JSONDecodeError): + output_state, output_file_metadata = persist_lines(invalid_row_data, config) + + with raises(Exception): + output_state, output_file_metadata = persist_lines(invalid_order_data, config) + + +@mock_s3 +def test_main(monkeypatch, capsys, patch_datetime, input_data, config, state, file_metadata): + '''TEST : simple persist_lines call''' + + class argument_parser: + + def __init__(self): + self.config = str(Path('tests', 'resources', 'config.json')) + + def add_argument(self, x, y, help='Dummy config file', required=False): + pass + + def parse_args(self): + return self + + monkeypatch.setattr(argparse, 'ArgumentParser', argument_parser) + + monkeypatch.setattr(sys, 'stdin', input_data) + + main() + captured = capsys.readouterr() + assert captured.out == json.dumps(state) + '\n' + + for _, file_info in file_metadata.items(): + assert file_info['file_name'].exists() + clear_dir(Path(config['temp_dir'])) + + with raises(Exception): + + class argument_parser: + + def __init__(self): + self.config = str(Path('tests', 'resources', 'config_no_bucket.json')) + + def add_argument(self, x, y, help='Dummy config file', required=False): + pass + + def parse_args(self): + return self + + monkeypatch.setattr(argparse, 'ArgumentParser', argument_parser) + + main() + + class argument_parser: + + def __init__(self): + self.config = str(Path('tests', 'resources', 'config_local_false.json')) + + def add_argument(self, x, y, help='Dummy config file', required=False): + pass + + def parse_args(self): + return self + + monkeypatch.setattr(argparse, 'ArgumentParser', argument_parser) + + monkeypatch.setattr(s3, 'create_client', lambda config: None) + + monkeypatch.setattr( + s3, 'upload_file', lambda filename, s3_client, bucket, s3_key, + encryption_type=None, encryption_key=None: None) + + main() + captured = capsys.readouterr() + assert captured.out == json.dumps(state) + '\n'