Skip to content

Commit

Permalink
[coverage] bump version 0.0.6 changelog update: Much more specs and t…
Browse files Browse the repository at this point in the history
…ests, coverage increased to `96.91%` (#15)

* [coverage] bump version 0.0.6 changelog update: Much more specs and tests, coverage increased to `96.91%`

* [coverage] bump version 0.0.6 changelog update

* [coverage] Draft4Validator FormatChecker comeback

* [coverage] test_init s3 import farewell

* [coverage] let anything but InvalidOperation raised Exception slip by Draft4Validator FormatChecker

* [coverage] lower coverage threashold to 90%

* [coverage] s3 tests & spec, `96.91%` coverage back for good
  • Loading branch information
ome9ax authored Aug 17, 2021
1 parent 09e8d3b commit 3339bfa
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 69 deletions.
25 changes: 17 additions & 8 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
# Change Log

## [v0.0.5.2](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5.2) (2021-08-13)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5.1...v0.0.5.2)
## [0.0.6](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.6) (2021-08-17)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.5.2...0.0.6)

### Closed issues:
- Much more specs and tests, coverage increased to `96.91%`

### Merged pull requests:
- [[coverage] bump version 0.0.6 changelog update: Much more specs and tests, coverage increased to `96.91%`](https://github.com/ome9ax/target-s3-jsonl/pull/15)

## [0.0.5.2](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.5.2) (2021-08-13)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.5.1...0.0.5.2)

### New features:
- replace `io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')` with `sys.stdin` as it's already natively defined as `<_io.TextIOWrapper name='<stdin>' mode='r' encoding='utf-8'>`

### Merged pull requests:
- [[readlines] replace `io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')` with `sys.stdin`](https://github.com/ome9ax/target-s3-jsonl/pull/13)

## [v0.0.5.1](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5.1) (2021-08-12)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5...v0.0.5.1)
## [0.0.5.1](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.5.1) (2021-08-12)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.5...0.0.5.1)

### Fixed bugs:
- Issue to decompress archived files
Expand All @@ -21,8 +30,8 @@
### Merged pull requests:
- [[compression] fix compression management](https://github.com/ome9ax/target-s3-jsonl/pull/12)

## [v0.0.5](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.5) (2021-08-12)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.4...v0.0.5)
## [0.0.5](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.5) (2021-08-12)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.4...0.0.5)

### New features:
- I now store the rows in an Array on memory, and unload the Array into the file by batches. By default the batch size is 64Mb configurable with the `memory_buffer` config option.
Expand All @@ -36,8 +45,8 @@
- [[File load buffer] unload the data from a 64Mb memory buffer](https://github.com/ome9ax/target-s3-jsonl/pull/8)
- [[Metadata] manage tap Metadata _sdc columns according to the stitch documentation](https://github.com/ome9ax/target-s3-jsonl/pull/9)

## [v0.0.4](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.4) (2021-08-09)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/v0.0.0...v0.0.4)
## [0.0.4](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.4) (2021-08-09)
[Full Changelog](https://github.com/ome9ax/target-s3-jsonl/tree/0.0.0...0.0.4)

### New features:
- Initial release
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ Full list of options in `config.json`:
| s3_key_prefix | String | | (Default: None) A static prefix before the generated S3 key names. Using prefixes you can
| encryption_type | String | | (Default: 'none') The type of encryption to use. Current supported options are: 'none' and 'KMS'. |
| encryption_key | String | | A reference to the encryption key to use for data encryption. For KMS encryption, this should be the name of the KMS encryption key ID (e.g. '1234abcd-1234-1234-1234-1234abcd1234'). This field is ignored if 'encryption_type' is none or blank. |
| compression | String | | The type of compression to apply before uploading. Supported options are `none` (default), `gzip`, and `lzma`. For gzipped files, the file extension will automatically be changed to `.jsonl.gz` for all files. For `lzma` compression, the file extension will automatically be changed to `.jsonl.xz` for all files. |
| naming_convention | String | | (Default: None) Custom naming convention of the s3 key. Replaces tokens `date`, `stream`, and `timestamp` with the appropriate values. <br><br>Supports "folders" in s3 keys e.g. `folder/folder2/{stream}/export_date={date}/{timestamp}.jsonl`. <br><br>Honors the `s3_key_prefix`, if set, by prepending the "filename". E.g. naming_convention = `folder1/my_file.jsonl` and s3_key_prefix = `prefix_` results in `folder1/prefix_my_file.jsonl` |
| compression | String | | The type of compression to apply before uploading. Supported options are `none` (default), `gzip`, and `lzma`. For gzipped files, the file extension will automatically be changed to `.json.gz` for all files. For `lzma` compression, the file extension will automatically be changed to `.json.xz` for all files. |
| naming_convention | String | | (Default: None) Custom naming convention of the s3 key. Replaces tokens `date`, `stream`, and `timestamp` with the appropriate values. <br><br>Supports "folders" in s3 keys e.g. `folder/folder2/{stream}/export_date={date}/{timestamp}.json`. <br><br>Honors the `s3_key_prefix`, if set, by prepending the "filename". E.g. naming_convention = `folder1/my_file.json` and s3_key_prefix = `prefix_` results in `folder1/prefix_my_file.json` |
| timezone_offset | Integer | | Use offset `0` hours is you want the `naming_convention` to use `utc` time zone. The `null` values is used by default. |
| temp_dir | String | | (Default: platform-dependent) Directory of temporary JSONL files with RECORD messages. |
| local | Boolean | | Keep the file in the `temp_dir` directory without uploading the files on `s3`. |
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ console_scripts =
target-s3-jsonl = target_s3_jsonl:main

[tool:pytest]
addopts = -v --cov=target_s3_jsonl --cov-fail-under 60 --cov-report annotate --cov-report xml --cov-report term --cov-report html:htmlcov --doctest-modules
addopts = -v --cov=target_s3_jsonl --cov-fail-under 95 --cov-report annotate --cov-report xml --cov-report term --cov-report html:htmlcov --doctest-modules
testpaths = tests

[coverage:run]
Expand Down
42 changes: 21 additions & 21 deletions target_s3_jsonl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

__version__ = '0.0.5.2'
__version__ = '0.0.6'

import argparse
import gzip
Expand Down Expand Up @@ -181,26 +181,27 @@ def persist_lines(messages, config):
raise
message_type = o['type']
if message_type == 'RECORD':
if 'stream' not in o: # pragma: no cover
if 'stream' not in o:
raise Exception("Line is missing required key 'stream': {}".format(message))
stream = o['stream']
if stream not in schemas: # pragma: no cover
if stream not in schemas:
raise Exception('A record for stream {} was encountered before a corresponding schema'.format(stream))

# NOTE: Validate record
record_to_load = o['record']
try:
validators[stream].validate(float_to_decimal(record_to_load))
except Exception as ex:
if type(ex).__name__ == "InvalidOperation": # pragma: no cover
# NOTE: let anything but 'InvalidOperation' raised Exception slip by
if type(ex).__name__ == "InvalidOperation": # TODO pragma: no cover
LOGGER.error(
"Data validation failed and cannot load to destination. RECORD: {}\n"
"'multipleOf' validations that allows long precisions are not supported"
" (i.e. with 15 digits or more). Try removing 'multipleOf' methods from JSON schema."
.format(record_to_load))
raise ex

if config.get('add_metadata_columns'): # pragma: no cover
if config.get('add_metadata_columns'):
record_to_load = add_metadata_values_to_record(o, {}, now.timestamp())
else:
record_to_load = remove_metadata_values_from_record(o)
Expand All @@ -209,46 +210,45 @@ def persist_lines(messages, config):

# NOTE: write temporary file
# Use 64Mb default memory buffer
if sys.getsizeof(file_data[stream]['file_data']) > config.get('memory_buffer', 64e6): # pragma: no cover
if sys.getsizeof(file_data[stream]['file_data']) > config.get('memory_buffer', 64e6):
save_file(file_data[stream], open_func)

state = None
elif message_type == 'STATE':
LOGGER.debug('Setting state to {}'.format(o['value']))
state = o['value']
elif message_type == 'SCHEMA':
if 'stream' not in o: # pragma: no cover
if 'stream' not in o:
raise Exception("Line is missing required key 'stream': {}".format(message))
stream = o['stream']

if config.get('add_metadata_columns'): # pragma: no cover
if config.get('add_metadata_columns'):
schemas[stream] = add_metadata_columns_to_schema(o)
else:
schemas[stream] = float_to_decimal(o['schema'])

validators[stream] = Draft4Validator(schemas[stream], format_checker=FormatChecker())

if 'key_properties' not in o: # pragma: no cover
if 'key_properties' not in o:
raise Exception('key_properties field is required')
key_properties[stream] = o['key_properties']
LOGGER.debug('Setting schema for {}'.format(stream))

# NOTE: get the s3 file key
if stream not in file_data: # pragma: no cover
file_data[stream] = {
'target_key': get_target_key(
o,
naming_convention=naming_convention,
timestamp=now_formatted,
prefix=config.get('s3_key_prefix', ''),
timezone=timezone),
'file_name': temp_dir / naming_convention_default.format(stream=stream, timestamp=now_formatted),
'file_data': []}
file_data[stream] = {
'target_key': get_target_key(
o,
naming_convention=naming_convention,
timestamp=now_formatted,
prefix=config.get('s3_key_prefix', ''),
timezone=timezone),
'file_name': temp_dir / naming_convention_default.format(stream=stream, timestamp=now_formatted),
'file_data': []}

elif message_type == 'ACTIVATE_VERSION':
LOGGER.debug('ACTIVATE_VERSION {}'.format(message))
else: # pragma: no cover
LOGGER.warning('Unknown message type {} in message {}'.format(o['type'], o))
else:
LOGGER.warning('Unknown message type "{}" in message "{}"'.format(o['type'], o))

for _, file_info in file_data.items():
save_file(file_info, open_func)
Expand Down
8 changes: 4 additions & 4 deletions target_s3_jsonl/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def create_client(config):
)
# AWS Profile based authentication
else:
aws_session = boto3.session.Session(profile_name=aws_profile) # pragma: no cover
aws_session = boto3.session.Session(profile_name=aws_profile) # TODO pragma: no cover
if aws_endpoint_url:
s3 = aws_session.client('s3', endpoint_url=aws_endpoint_url) # pragma: no cover
s3 = aws_session.client('s3', endpoint_url=aws_endpoint_url) # TODO pragma: no cover
else:
s3 = aws_session.client('s3')
return s3
Expand All @@ -59,7 +59,7 @@ def upload_file(filename, s3_client, bucket, s3_key,
encryption_desc = ""
encryption_args = None
else:
if encryption_type.lower() == "kms": # pragma: no cover
if encryption_type.lower() == "kms": # TODO pragma: no cover
encryption_args = {"ServerSideEncryption": "aws:kms"}
if encryption_key:
encryption_desc = (
Expand All @@ -70,7 +70,7 @@ def upload_file(filename, s3_client, bucket, s3_key,
else:
encryption_desc = " using default KMS encryption"
else:
raise NotImplementedError( # pragma: no cover
raise NotImplementedError(
"Encryption type '{}' is not supported. "
"Expected: 'none' or 'KMS'"
.format(encryption_type)
Expand Down
1 change: 0 additions & 1 deletion tests/resources/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{
"local": true,
"add_metadata_columns": false,
"aws_access_key_id": "ACCESS-KEY",
"aws_secret_access_key": "SECRET",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"local": false,
"local": true,
"add_metadata_columns": false,
"aws_access_key_id": "ACCESS-KEY",
"aws_secret_access_key": "SECRET",
Expand Down
6 changes: 6 additions & 0 deletions tests/resources/messages.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "users", "record": {"id": 1, "name": "Eddy"}}
{"type": "RECORD", "stream": "users", "record": {"id": 2, "name": "Sabrina"}}
{"type": "SCHEMA", "stream": "locations", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "locations", "record": {"id": 1, "name": "Everywhere"}}
{"type": "STATE", "value": {"users": 2, "locations": 1}}
Loading

0 comments on commit 3339bfa

Please sign in to comment.