diff --git a/.github/dependabot.yml b/.github/dependabot.yml index d0b69f5..7f51bfb 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -5,9 +5,12 @@ updates: - package-ecosystem: pip directory: "/" schedule: - interval: weekly + interval: monthly day: monday timezone: Europe/London + allow: + # Allow only dependencies in the "Production dependency group" + - dependency-type: production reviewers: - ome9ax open-pull-requests-limit: 9 diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 44b0375..e4666db 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -20,7 +20,7 @@ jobs: matrix: project: ['target-s3-jsonl'] os: [ubuntu-latest] #, macos-latest, windows-latest - python-version: [3.8, 3.9, '3.10'] + python-version: [3.9, '3.10'] exclude: - os: macos-latest python-version: 3.9 @@ -68,12 +68,28 @@ jobs: python -m venv venv || virtualenv venv . venv/bin/activate pip install --upgrade pip # setuptools + # pip install .[test,lint,static,dist] pip install tox - name: Get pip cache dir id: pip-cache run: | echo "::set-output name=dir::$(pip cache dir)" + + # - name: Lint with flake8 + # run: | + # . venv/bin/activate + # # stop the build if there are Python syntax errors or undefined names + # # exit-zero treats all errors as warnings. The GitHub editor is 255 chars wide + # flake8 + # - name: Static typing with mypy + # run: | + # . venv/bin/activate + # mypy + - name: Lint with flake8 & Static typing with mypy + run: | + . venv/bin/activate + TOX_PARALLEL_NO_SPINNER=1 tox --parallel -e lint,static - name: pip cache uses: actions/cache@v3 with: @@ -82,16 +98,12 @@ jobs: restore-keys: | ${{ runner.os }}-pip- - - name: Lint with flake8 - run: | - . venv/bin/activate - # stop the build if there are Python syntax errors or undefined names - # exit-zero treats all errors as warnings. The GitHub editor is 255 chars wide - TOX_PARALLEL_NO_SPINNER=1 tox -e lint - name: Test run: | . venv/bin/activate - TOX_PARALLEL_NO_SPINNER=1 tox -e py + # pytest + # tox --parallel + tox -e py - name: Upload coverage test results to Codecov uses: codecov/codecov-action@v2 if: | @@ -109,12 +121,13 @@ jobs: - name: Build distribution package run: | . venv/bin/activate + # python setup.py sdist bdist_wheel pip install build python -m build ls -l dist - name: Publish distribution package to TestPyPI if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags') - uses: pypa/gh-action-pypi-publish@master + uses: pypa/gh-action-pypi-publish@release/v1 with: verify_metadata: true skip_existing: true @@ -124,7 +137,7 @@ jobs: - name: Publish distribution package if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags') - uses: pypa/gh-action-pypi-publish@master + uses: pypa/gh-action-pypi-publish@release/v1 with: verify_metadata: true skip_existing: true diff --git a/CHANGELOG.md b/CHANGELOG.md index 93e3244..e980f6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,33 @@ # Change Log +## [2.0.0](https://github.com/ome9ax/target-s3-jsonl/tree/2.0.0) (2022-09-29) + +### What's Changed +⚠️ 🚨 **BREAKING COMPATIBILITY: `Python 3.8` SUPPORT REMOVED** 🚨 ⚠️ +* [target-core] Move the core features and functions in common shared [`target-core`](https://gitlab.com/singer-core/target-core) package by @ome9ax in #35 + +All the core stream processing functionalities are combined into [`target-core`](https://gitlab.com/singer-core/target-core). + +#### [`target-core`](https://gitlab.com/singer-core/target-core) core functionalities +- The stream processing library is using [`asyncio.to_thread`](https://docs.python.org/3/library/asyncio-task.html?highlight=to_thread#asyncio.to_thread) introduced in **`Python 3.9`**. +- Better isolation architecture comes now by design between singer stream protocol and output custom processing. This opens for more native processing modularity and flexibility (API, S3, ...). +- Uses `sys.stdin.buffer` input reader over `sys.stdin` for more efficient input stream management. + +#### `target-s3-jsonl` changes +- version `">=2.0"` developments will continue under **`Python 3.9`** and above. +- version `"~=1.0"` will keep living under the `legacy-v1` branch. +- Optimised memory and storage management: files are uploaded asynchronously and deleted on the fly, no longer all at once at the end. + +#### Config file updates +- changes (those will be automatically replaced during the deprecation period for backward compatibility): + - `path_template` replaces `naming_convention` (*deprecated*). Few changes as well in the `path_template` syntax: + - `{date_time}` replaces `{timestamp}` (*deprecated*). + - `{date_time:%Y%m%d}` replaces `{date}` (*deprecated*). + - `work_dir` replaces `temp_dir` (*deprecated*). +- New option `file_size` for file partitioning by size limit. The `path_template` must contain a part section for the part number. Example `"path_template": "{stream}_{date_time:%Y%m%d_%H%M%S}_part_{part:0>3}.json"`. + +**Full Changelog**: https://github.com/ome9ax/target-s3-jsonl/compare/1.2.2...2.0.0 + ## [1.2.2](https://github.com/ome9ax/target-s3-jsonl/tree/1.2.2) (2022-09-01) ### What's Changed @@ -17,7 +45,7 @@ ## [1.2.0](https://github.com/ome9ax/target-s3-jsonl/tree/1.2.0) (2022-04-11) ### What's Changed -* Upgrade version to 1.1.0: changelog by @ome9ax in https://github.com/ome9ax/target-s3-jsonl/pull/33 +* Upgrade version to 1.2.0: changelog by @ome9ax in https://github.com/ome9ax/target-s3-jsonl/pull/33 * [jsonschema] Remove the deprecated custom exception to Handle `multipleOf` overflow fixed in jsonschema v4.0.0 by @ome9ax in https://github.com/ome9ax/target-s3-jsonl/pull/34 * [jsonschema] remove validation exception catching by @ome9ax in https://github.com/ome9ax/target-s3-jsonl/pull/36 * [persist_lines] save_records argument by @ome9ax in https://github.com/ome9ax/target-s3-jsonl/pull/37 diff --git a/MANIFEST.in b/MANIFEST.in index d267794..cd31518 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -include requirements.txt LICENSE target_s3_jsonl/logging.conf \ No newline at end of file +include src/target_s3_json/logging.conf, LICENSE \ No newline at end of file diff --git a/README.md b/README.md index 6337d68..6cbc68b 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ following the [Singer spec](https://github.com/singer-io/getting-started/blob/ma `target-s3-jsonl` is a [Singer](https://singer.io) Target which intend to work with regular [Singer](https://singer.io) Tap. It take the output of the tap and export it as a [JSON Lines](http://jsonlines.org/) files into an AWS S3 bucket. +This package is built over the [`target-core`](https://gitlab.com/singer-core/target-core). + ## Install First, make sure Python 3 is installed on your system or follow these @@ -35,21 +37,21 @@ pip install target-s3-jsonl python -m venv venv . venv/bin/activate pip install --upgrade pip -pip install --upgrade https://github.com/ome9ax/target-s3-jsonl/archive/main.tar.gz +pip install --upgrade git+https://github.com/ome9ax/target-s3-jsonl.git@main ``` ### Isolated virtual environment ```bash python -m venv ~/.virtualenvs/target-s3-jsonl -source ~/.virtualenvs/target-s3-jsonl/bin/activate -pip install target-s3-jsonl -deactivate +~/.virtualenvs/target-s3-jsonl/bin/pip install target-s3-jsonl ``` Alternative ```bash python -m venv ~/.virtualenvs/target-s3-jsonl -~/.virtualenvs/target-s3-jsonl/bin/pip install target-s3-jsonl +source ~/.virtualenvs/target-s3-jsonl/bin/activate +pip install target-s3-jsonl +deactivate ``` ### To run @@ -83,16 +85,23 @@ For non-profile based authentication set `aws_access_key_id` , `aws_secret_acces Full list of options in `config.json`: +#### Inherited from `target-core` + | Property | Type | Mandatory? | Description | |-------------------------------------|---------|------------|---------------------------------------------------------------| -| naming_convention | String | | (Default: None) Custom naming convention of the s3 key. Replaces tokens `date`, `stream`, and `timestamp` with the appropriate values.

Supports datetime and other python advanced string formatting e.g. `{stream:_>8}_{timestamp:%Y%m%d_%H%M%S}.json` or `{stream}/{timestamp:%Y}/{timestamp:%m}/{timestamp:%d}/{timestamp:%Y%m%d_%H%M%S_%f}.json`.

Supports "folders" in s3 keys e.g. `folder/folder2/{stream}/export_date={date}/{timestamp}.json`.

Honors the `s3_key_prefix`, if set, by prepending the "filename". E.g. naming_convention = `folder1/my_file.json` and s3_key_prefix = `prefix_` results in `folder1/prefix_my_file.json` | -| timezone_offset | Integer | | Offset value in hour. Use offset `0` hours is you want the `naming_convention` to use `utc` time zone. The `null` values is used by default. | -| memory_buffer | Integer | | Memory buffer's size used before storing the data into the temporary file. 64Mb used by default if unspecified. | -| temp_dir | String | | (Default: platform-dependent) Directory of temporary JSONL files with RECORD messages. | +| path_template | String | | (Default: None) Custom naming convention of the s3 key. Replaces tokens `date`, `stream`, and `timestamp` with the appropriate values.

Supports datetime and other python advanced string formatting e.g. `{stream:_>8}_{timestamp:%Y%m%d_%H%M%S}.json` or `{stream}/{timestamp:%Y}/{timestamp:%m}/{timestamp:%d}/{timestamp:%Y%m%d_%H%M%S_%f}.json`.

Supports "folders" in s3 keys e.g. `folder/folder2/{stream}/export_date={date}/{timestamp}.json`.

Honors the `s3_key_prefix`, if set, by prepending the "filename". E.g. path_template = `folder1/my_file.json` and s3_key_prefix = `prefix_` results in `folder1/prefix_my_file.json` | +| timezone_offset | Integer | | Offset value in hour. Use offset `0` hours is you want the `path_template` to use `utc` time zone. The `null` values is used by default. | +| memory_buffer | Integer | | Memory buffer's size used for non partitioned files before storing the data into the temporary file. 64Mb used by default if unspecified. | +| file_size | Integer | | File partitinoning by `size_limit`. File parts will be created. The `path_template` must contain a part section for the part number. Example `"path_template": "{stream}_{date_time:%Y%m%d_%H%M%S}_part_{part:0>3}.json"`. | +| work_dir | String | | (Default: platform-dependent) Directory of temporary JSONL files with RECORD messages. | | compression | String | | The type of compression to apply before uploading. Supported options are `none` (default), `gzip`, and `lzma`. For gzipped files, the file extension will automatically be changed to `.json.gz` for all files. For `lzma` compression, the file extension will automatically be changed to `.json.xz` for all files. | -| local | Boolean | | Keep the file in the `temp_dir` directory without uploading the files on `s3`. | + +#### Specific For `target-s3-jsonl` + +| Property | Type | Mandatory? | Description | +|-------------------------------------|---------|------------|---------------------------------------------------------------| +| local | Boolean | | Keep the file in the `work_dir` directory without uploading the files on `s3`. | | s3_bucket | String | Yes | S3 Bucket name | -| s3_key_prefix | String | | (Default: None) A static prefix before the generated S3 key names. | | aws_profile | String | | AWS profile name for profile based authentication. If not provided, `AWS_PROFILE` environment variable will be used. | | aws_endpoint_url | String | | AWS endpoint URL. | | aws_access_key_id | String | | S3 Access Key Id. If not provided, `AWS_ACCESS_KEY_ID` environment variable will be used. | @@ -104,17 +113,20 @@ Full list of options in `config.json`: ## Test Install the tools - ```bash pip install .[test,lint] ``` Run pytest - ```bash pytest -p no:cacheprovider ``` +## Lint +```bash +flake8 --show-source --statistics --count --extend-exclude .virtualenvs +``` + ## Release 1. Update the version number at the beginning of `target-s3-jsonl/target_s3_jsonl/__init__.py` 2. Merge the changes PR into `main` diff --git a/codecov.yml b/codecov.yml index 8e4f756..35b5871 100644 --- a/codecov.yml +++ b/codecov.yml @@ -1,5 +1,5 @@ ignore: - - target-s3-jsonl/__init__.py + - target-s3-json/__init__.py - tests/.* - ./setup.py diff --git a/config.sample.json b/config.sample.json deleted file mode 100644 index 9f45183..0000000 --- a/config.sample.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "aws_access_key_id": "ACCESS-KEY", - "aws_secret_access_key": "SECRET", - "s3_bucket": "BUCKET", - "s3_key_prefix": "SOME-PREFIX/", - "compression": "gzip", - "naming_convention": "{stream}-{timestamp}.jsonl", - "role_arn": "arn:aws:iam::000000000000:role/my_custom_role" -} diff --git a/pyproject.toml b/pyproject.toml index e587f24..d035320 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ strict_equality = true [[tool.mypy.overrides]] # Overrides for currently untyped modules module = [ - "target_s3_jsonl.*" + "target_s3_json.*" ] [[tool.mypy.overrides]] # Overrides for currently untyped modules diff --git a/setup.cfg b/setup.cfg index 081d729..1b0b5c0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,34 +1,33 @@ [metadata] name = target-s3-jsonl -version = attr: target_s3_jsonl.__version__ +version = attr: target_s3_json.__version__ description = Singer.io target for writing JSON Line files and upload to S3 long_description = file: README.md long_description_content_type = text/markdown author = Eddy ∆ author_email = edrdelta@gmail.com url = https://github.com/ome9ax/target-s3-jsonl -keywords = target-s3-jsonl, target-s3-json, singer, singer.io, tap, target, etl, json, jsonl, aws, s3 +keywords = target-core, target-s3-jsonl, target-s3-json, singer, singer.io, tap, target, etl, json, jsonl, aws, s3 license = Apache License 2.0 classifiers = Development Status :: 5 - Production/Stable Operating System :: OS Independent License :: OSI Approved :: Apache Software License - Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3 Programming Language :: Python :: 3.9 Programming Language :: Python :: 3.10 project_urls = # Documentation = https://ome9ax.github.io/target-s3-jsonl Releases = https://github.com/ome9ax/target-s3-jsonl/releases - Changelog = https://github.com/ome9ax/target-s3-jsonl/blob/main/CHANGELOG.rst + Changelog = https://github.com/ome9ax/target-s3-jsonl/blob/main/CHANGELOG.md Issue Tracker = https://github.com/ome9ax/target-s3-jsonl/issues [options] -packages = find: package_dir = = src -# py_modules = target_s3_jsonl -python_requires = >=3.8 -# install_requires = file: requirements.txt +packages = find: +py_modules = target_s3_json +python_requires = >=3.9 # install_requires = # jsonschema==4.9.1 # boto3==1.24.52 @@ -36,29 +35,36 @@ python_requires = >=3.8 include_package_data = True platforms = any -[options.entry_points] -console_scripts = - target-s3-jsonl = target_s3_jsonl:main - [options.package_data] -target_s3_jsonl = logging.conf +target_s3_json = logging.conf [options.packages.find] where = src exclude = tests +[options.entry_points] +console_scripts = + target-s3-json = target_s3_json:main + target-s3-jsonl = target_s3_json:main + [options.extras_require] test = + pytest-asyncio pytest-cov - moto[s3,sts] + moto[s3] + # moto[s3,sts] lint = flake8 static = mypy -dist = build +dist = + setuptools + wheel + build deploy = twine +doc = sphinx-rtd-theme [tool:pytest] -addopts = -v --cov=target_s3_jsonl --cov-fail-under 95 --cov-report annotate --cov-report xml --cov-report term --cov-report html:htmlcov --doctest-modules +addopts = -v --cov=target_s3_json --cov-fail-under 95 --cov-report xml --cov-report term --cov-report html:htmlcov --doctest-modules testpaths = tests asyncio_mode = auto @@ -67,6 +73,7 @@ branch = True omit = ./setup.py tests/.* + docs/conf.py venv/* [coverage:report] @@ -80,7 +87,6 @@ count = True show-source = True statistics = True extend-exclude = venv - build ignore = C901 max-line-length = 160 max-complexity = 10 @@ -90,20 +96,20 @@ builder = html warning-is-error = true # keep-going = true project = 'Target S3 Jsonl' -version = attr: target_s3_jsonl.__version__ -release = attr: target_s3_jsonl.__version__ +version = attr: target_s3_json.__version__ +release = attr: target_s3_json.__version__ source-dir = 'docs' [tox:tox] passenv = TOXENV TOX_* CI_* GITLAB_* # requires = tox-pipenv -envlist = py{38,39,310} +envlist = py{39,310} # labels = # test = py{39,310,pi} # static = flake8, mypy # envlist = .virtualenvs/target-s3-jsonl isolated_build = True -# skipsdist = false +# skipsdist = True # parallel_show_output=True # requires = tox-pip-extensions @@ -113,14 +119,12 @@ isolated_build = True usedevelop = True extras = test commands = pytest {posargs} - # publish: python setup.py sdist upload --sign -r pypi - # publish: echo Publish that [testenv:lint] usedevelop = True skip_install = true -deps = flake8 -commands = flake8 {posargs} +deps=flake8 +commands=flake8 {posargs} [testenv:static] usedevelop = True @@ -128,59 +132,9 @@ skip_install = true deps = mypy commands = mypy {posargs} -[testenv:coverage] -usedevelop = True -passenv = CODECOV_TOKEN CI_* -skip_install = true -deps = codecov -# allowlist_externals = gpg -# install_command = echo Install codecov {packages} -# curl https://keybase.io/codecovsecurity/pgp_keys.asc | gpg --no-default-keyring --keyring trustedkeys.gpg --import # One-time step -# curl -Os https://uploader.codecov.io/latest/linux/codecov -# curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM -# curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM.sig -# gpgv codecov.SHA256SUM.sig codecov.SHA256SUM -# shasum -a 256 -c codecov.SHA256SUM -# chmod +x ./codecov -commands = - codecov \ - --file "{toxinidir}/coverage.xml" \ - --name "codecov-$CI_PROJECT_NAME" \ - --branch "$CI_COMMIT_BRANCH" \ - --commit "$CI_COMMIT_SHA" \ - --tag "$CI_COMMIT_TAG" \ - --flags "unittests" {posargs} || echo 'Codecov upload failed' - -[testenv:docs] -# https://packaging-guide.openastronomy.org/en/latest/docs.html -# Init -# sphinx-quickstart docs; cd docs -# edit index.rst >>> add modules -# sphinx-apidoc -o docs . -# sphinx-apidoc -o /source/_modules src -# sphinx-build docs docs/_build/html -W -j auto --color -b html -description = Invoke sphinx-build to build the HTML docs -usedevelop = True -extras = docs -# commands_pre = sphinx-build docs/source "{toxworkdir}/docs_out" -d "{toxworkdir}/docs_doctree" -b doctest {posargs:-E} -# commands = sphinx-build docs docs/_build/html -W -j auto --color -Ea -b html {posargs} -commands = sphinx-build docs/source "{toxworkdir}/docs_out" -d "{toxworkdir}/docs_doctree" -W -j auto --color -b html {posargs} -commands_post = python -c 'import pathlib; print("documentation available under file://\{0\}".format(pathlib.Path(r"{toxworkdir}") / "docs_out" / "index.html"))' - # sphinx-build docs/source "{toxworkdir}/docs_out" -d "{toxworkdir}/docs_doctree" -b linkcheck {posargs:-E} - -[testenv:dist] -deps = build -commands = python -m build - -[testenv:deploy] -usedevelop = True -skip_install = true -# depends = dist -passenv = TWINE_* -deps = - build - twine -commands_pre = - python -m build - twine check dist/* -commands = twine upload --skip-existing {posargs} dist/* +# [build-system] +# requires = [ +# "setuptools>=42", +# "wheel" +# ] +# build-backend = "setuptools.build_meta" diff --git a/setup.py b/setup.py index d7de081..f97146c 100644 --- a/setup.py +++ b/setup.py @@ -4,9 +4,7 @@ setup( install_requires=[ - 'adjust-precision-for-schema', - 'jsonschema==4.14.0', - 'boto3==1.24.62', - 'backoff==2.1.2' + 'target-core==0.0.7', + 'boto3==1.24.81' ] ) diff --git a/src/target_s3_json/__init__.py b/src/target_s3_json/__init__.py new file mode 100644 index 0000000..724324a --- /dev/null +++ b/src/target_s3_json/__init__.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 + +__version__ = '2.0.0' + +# from pathlib import Path + +# Package imports +# from target._logger import get_logger +from .s3 import main + +# LOGGER = get_logger(Path(__file__).with_name('logging.conf')) + +CONFIG_PARAMS = { + 'local', + 's3_bucket', + 's3_key_prefix', + 'aws_profile', + 'aws_endpoint_url', + 'aws_access_key_id', + 'aws_secret_access_key', + 'aws_session_token', + 'encryption_type', + 'encryption_key' +} + + +if __name__ == '__main__': + main() diff --git a/src/target_s3_jsonl/logging.conf b/src/target_s3_json/logging.conf similarity index 100% rename from src/target_s3_jsonl/logging.conf rename to src/target_s3_json/logging.conf diff --git a/src/target_s3_json/s3.py b/src/target_s3_json/s3.py new file mode 100644 index 0000000..7b7a46e --- /dev/null +++ b/src/target_s3_json/s3.py @@ -0,0 +1,233 @@ +from functools import partial +from os import environ +from pathlib import Path +import sys +import argparse +import json +import gzip +import lzma +import backoff +from typing import Callable, Dict, Any, List, TextIO +from asyncio import to_thread + +from boto3.session import Session +from botocore.exceptions import ClientError +from botocore.client import BaseClient + +from target.stream import Loader +from target import file +from target.file import config_file, save_json + +from target._logger import get_logger +LOGGER = get_logger() + + +def _log_backoff_attempt(details: Dict) -> None: + LOGGER.info("Error detected communicating with Amazon, triggering backoff: %d try", details.get("tries")) + + +def _retry_pattern() -> Callable: + return backoff.on_exception( + backoff.expo, + ClientError, + max_tries=5, + on_backoff=_log_backoff_attempt, + factor=10) + + +def config_compression(config_default: Dict) -> Dict: + config: Dict[str, Any] = { + 'compression': 'none' + } | config_default + + if f"{config.get('compression')}".lower() == 'gzip': + config['open_func'] = gzip.compress + config['path_template'] = config['path_template'] + '.gz' + + elif f"{config.get('compression')}".lower() == 'lzma': + config['open_func'] = lzma.compress + config['path_template'] = config['path_template'] + '.xz' + + elif f"{config.get('compression')}".lower() in {'', 'none'}: + config['open_func'] = open + + else: + raise NotImplementedError( + "Compression type '{}' is not supported. " + "Expected: 'none', 'gzip', or 'lzma'" + .format(f"{config.get('compression')}".lower())) + + return config + + +@_retry_pattern() +def create_session(config: Dict) -> Session: + LOGGER.debug('Attempting to create AWS session') + + # NOTE: Get the required parameters from config file and/or environment variables + aws_access_key_id = config.get('aws_access_key_id') or environ.get('AWS_ACCESS_KEY_ID') + aws_secret_access_key = config.get('aws_secret_access_key') or environ.get('AWS_SECRET_ACCESS_KEY') + aws_session_token = config.get('aws_session_token') or environ.get('AWS_SESSION_TOKEN') + aws_profile = config.get('aws_profile') or environ.get('AWS_PROFILE') + aws_endpoint_url = config.get('aws_endpoint_url') + role_arn = config.get('role_arn') + + endpoint_params = {'endpoint_url': aws_endpoint_url} if aws_endpoint_url else {} + + # NOTE: AWS credentials based authentication + if aws_access_key_id and aws_secret_access_key: + aws_session: Session = Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_session_token=aws_session_token) + # NOTE: AWS Profile based authentication + else: + aws_session = Session(profile_name=aws_profile) + + # NOTE: AWS credentials based authentication assuming specific IAM role + if role_arn: + role_name = role_arn.split('/', 1)[1] + sts: BaseClient = aws_session.client('sts', **endpoint_params) + resp = sts.assume_role(RoleArn=role_arn, RoleSessionName=f'role-name={role_name}-profile={aws_profile}') + credentials = { + 'aws_access_key_id': resp['Credentials']['AccessKeyId'], + 'aws_secret_access_key': resp['Credentials']['SecretAccessKey'], + 'aws_session_token': resp['Credentials']['SessionToken'], + } + aws_session = Session(**credentials) + LOGGER.info(f'Creating s3 session with role {role_name}') + + return aws_session + + +def get_encryption_args(config: Dict[str, Any]) -> tuple: + if config.get('encryption_type', 'none').lower() == "none": + # NOTE: No encryption config (defaults to settings on the bucket): + encryption_desc = '' + encryption_args = {} + elif config.get('encryption_type', 'none').lower() == 'kms': + if config.get('encryption_key'): + encryption_desc = " using KMS encryption key ID '{}'".format(config.get('encryption_key')) + encryption_args = {'ExtraArgs': {'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': config.get('encryption_key')}} + else: + encryption_desc = ' using default KMS encryption' + encryption_args = {'ExtraArgs': {'ServerSideEncryption': 'aws:kms'}} + else: + raise NotImplementedError( + "Encryption type '{}' is not supported. " + "Expected: 'none' or 'KMS'" + .format(config.get('encryption_type'))) + return encryption_desc, encryption_args + + +@_retry_pattern() +async def put_object(config: Dict[str, Any], file_metadata: Dict, stream_data: List, client: BaseClient) -> None: + encryption_desc, encryption_args = get_encryption_args(config) + + LOGGER.info("Uploading %s to bucket %s at %s%s", + str(file_metadata['absolute_path']), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc) + + await to_thread(client.put_object, + Body=config['open_func']( # NOTE: stream compression with gzip.compress, lzma.compress + b''.join(json.dumps(record, ensure_ascii=False).encode('utf-8') + b'\n' for record in stream_data)), + Bucket=config.get('s3_bucket'), + Key=file_metadata['relative_path'], + **encryption_args.get('ExtraArgs', {})) + + +@_retry_pattern() +async def upload_file(config: Dict[str, Any], file_metadata: Dict) -> None: + if not config.get('local', False) and (file_metadata['absolute_path'].stat().st_size if file_metadata['absolute_path'].exists() else 0) > 0: + encryption_desc, encryption_args = get_encryption_args(config) + + await to_thread(config['client'].upload_file, + str(file_metadata['absolute_path']), + config.get('s3_bucket'), + file_metadata['relative_path'], + **encryption_args) + + LOGGER.info('%s uploaded to bucket %s at %s%s', + str(file_metadata['absolute_path']), config.get('s3_bucket'), file_metadata['relative_path'], encryption_desc) + + if config.get('remove_file', True): + # NOTE: Remove the local file(s) + file_metadata['absolute_path'].unlink() # missing_ok=False + + +def config_s3(config_default: Dict[str, Any], datetime_format: Dict[str, str] = { + 'date_time_format': ':%Y%m%dT%H%M%S', + 'date_format': ':%Y%m%d'}) -> Dict[str, Any]: + + if 'temp_dir' in config_default: + LOGGER.warning('`temp_dir` configuration option is deprecated and support will be removed in the future, use `work_dir` instead.') + config_default['work_dir'] = config_default.pop('temp_dir') + + if 'naming_convention' in config_default: + LOGGER.warning( + '`naming_convention` configuration option is deprecated and support will be removed in the future, use `path_template` instead.' + ', `{timestamp}` key pattern is now replaced by `{date_time}`' + ', and `{date}` key pattern is now replaced by `{date_time:%Y%m%d}`') + config_default['path_template'] = config_default.pop('naming_convention') \ + .replace('{timestamp:', '{date_time:').replace('{date:', '{date_time:') \ + .replace('{timestamp}', '{date_time%s}' % datetime_format['date_time_format']) \ + .replace('{date}', '{date_time%s}' % datetime_format['date_format']) + + missing_params = {'s3_bucket'} - set(config_default.keys()) + if missing_params: + raise Exception(f'Config is missing required settings: {missing_params}') + + return config_default + + +def main(lines: TextIO = sys.stdin) -> None: + '''Main''' + parser = argparse.ArgumentParser() + parser.add_argument('-c', '--config', help='Config file', required=True) + args = parser.parse_args() + config = file.config_compression(config_file(config_s3(json.loads(Path(args.config).read_text(encoding='utf-8'))))) + save_s3: Callable = partial(save_json, post_processing=upload_file) + client: BaseClient = create_session(config).client('s3', **({'endpoint_url': config.get('aws_endpoint_url')} + if config.get('aws_endpoint_url') else {})) + + Loader(config | {'client': client}, writeline=save_s3).run(lines) + + +# from io import BytesIO +# from pyarrow import parquet +# from pyarrow.json import read_json +# import pandas +# def write_parquet(config: Dict[str, Any], file_meta: Dict, file_data: List) -> None: +# s3_fs: S3FileSystem = S3FileSystem(anon=False, s3_additional_kwargs={'ACL': 'bucket-owner-full-control'}, asynchronous=False) + +# # NOTE: Create parquet file using Pandas +# pandas.json_normalize(file_data).to_parquet(path = file_meta['absolute_path'], filesystem = s3_fs) +# # NOTE: Synchronous Alternative without df middle step, read_json not yet as efficient as pandas. Worth keeping on check. +# with BytesIO(b''.join(json.dumps(record, ensure_ascii=False).encode('utf-8') + b'\n' for record in file_data)) as data: +# parquet.write_table(read_json(data), file_meta['relative_path'], filesystem=s3_fs) + + +# NOTE: https://github.com/aws/aws-cli/issues/3784 +# async def search(client: BaseClient, bucket: str, prefix: str, regex_path: str) -> Generator: +# ''' +# perform a flat listing of the files within a bucket +# ''' +# regex_pattern: Pattern = compile(regex_path) + +# paginator = client.get_paginator('list_objects_v2') +# files_metadata = paginator.paginate(Bucket=bucket, Prefix=prefix) +# for file_path in map(lambda x: x.get('Key', ''), await to_thread(files_metadata.search, 'Contents')): +# if match(regex_pattern, file_path): +# yield file_path + + +# async def sync( +# start_date: datetime = eval(config.get('date_time', 'datetime.now().astimezone(timezone.utc)')) +# client: BaseClient, semaphore: Semaphore, source_bucket: str, source_key: str, target_bucket: str, target_key: str, overwrite: bool = False) -> None: +# await gather(*[shield(search(client, semaphore, source_bucket, source_key, target_bucket, target_root + source_key.removeprefix(source_root), overwrite)) +# async for source_key in search(client, source_bucket, source_root, source_regexp)]) +# async with semaphore: +# if not overwrite and 'Contents' in client.list_objects_v2(Bucket=target_bucket, Prefix=target_key, MaxKeys=1): +# LOGGER.debug(f'S3 Bucket Sync - "s3://{target_bucket}/{target_key}" already exists.') +# else: +# await to_thread(client.copy, {'Bucket': source_bucket, 'Key': source_key}, target_bucket, target_key) +# LOGGER.info(f'S3 Bucket Sync - "s3://{source_bucket}/{source_key}" to "s3://{target_bucket}/{target_key}" copy completed.') diff --git a/src/target_s3_jsonl/__init__.py b/src/target_s3_jsonl/__init__.py deleted file mode 100644 index 3e464d5..0000000 --- a/src/target_s3_jsonl/__init__.py +++ /dev/null @@ -1,307 +0,0 @@ -#!/usr/bin/env python3 - -__version__ = '1.2.2' - -import argparse -import gzip -import lzma -import json -from pathlib import Path -import sys -import tempfile -import datetime -from uuid import uuid4 - -from jsonschema import Draft4Validator, FormatChecker -from adjust_precision_for_schema import adjust_decimal_precision_for_schema - -from target_s3_jsonl import s3 -from target_s3_jsonl.logger import get_logger - -LOGGER = get_logger() - - -def add_metadata_columns_to_schema(schema_message): - '''Metadata _sdc columns according to the stitch documentation at - https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns - - Metadata columns gives information about data injections - ''' - schema_message['schema']['properties'].update( - _sdc_batched_at={'type': ['null', 'string'], 'format': 'date-time'}, - _sdc_deleted_at={'type': ['null', 'string']}, - _sdc_extracted_at={'type': ['null', 'string'], 'format': 'date-time'}, - _sdc_primary_key={'type': ['null', 'string']}, - _sdc_received_at={'type': ['null', 'string'], 'format': 'date-time'}, - _sdc_sequence={'type': ['integer']}, - _sdc_table_version={'type': ['null', 'string']}) - - return schema_message - - -def add_metadata_values_to_record(record_message, schema_message, timestamp): - '''Populate metadata _sdc columns from incoming record message - The location of the required attributes are fixed in the stream - ''' - utcnow = timestamp.astimezone(datetime.timezone.utc).replace(tzinfo=None).isoformat() - record_message['record'].update( - _sdc_batched_at=utcnow, - _sdc_deleted_at=record_message.get('record', {}).get('_sdc_deleted_at'), - _sdc_extracted_at=record_message.get('time_extracted'), - _sdc_primary_key=schema_message.get('key_properties'), - _sdc_received_at=utcnow, - _sdc_sequence=int(timestamp.timestamp() * 1e3), - _sdc_table_version=record_message.get('version')) - - return record_message['record'] - - -def remove_metadata_values_from_record(record_message): - '''Removes every metadata _sdc column from a given record message - ''' - for key in { - '_sdc_batched_at', - '_sdc_deleted_at', - '_sdc_extracted_at', - '_sdc_primary_key', - '_sdc_received_at', - '_sdc_sequence', - '_sdc_table_version' - }: - - record_message['record'].pop(key, None) - - return record_message['record'] - - -def emit_state(state): - if state is not None: - line = json.dumps(state) - LOGGER.debug('Emitting state {}'.format(line)) - sys.stdout.write('{}\n'.format(line)) - sys.stdout.flush() - - -def get_target_key(stream, config, timestamp=None, prefix=None): - '''Creates and returns an S3 key for the stream''' - - # NOTE: Replace dynamic tokens - key = config.get('naming_convention').format( - stream=stream, - timestamp=timestamp, - date=timestamp, - uuid=uuid4()) - - return str(Path(key).parent / f'{prefix}{Path(key).name}') if prefix else key - - -def save_jsonl_file(file_data, open_func): - if any(file_data['file_data']): - with open_func(file_data['file_name'], 'at', encoding='utf-8') as output_file: - output_file.writelines(file_data['file_data']) - - del file_data['file_data'][:] - LOGGER.debug("'{}' file saved using open_func '{}'".format(file_data['file_name'], open_func.__name__)) - - -def persist_lines(messages, config, save_records=save_jsonl_file): - state = None - schemas = {} - key_properties = {} - validators = {} - - timezone = datetime.timezone(datetime.timedelta(hours=config.get('timezone_offset'))) if config.get('timezone_offset') is not None else None - - # NOTE: Use the system specific temp directory if no custom temp_dir provided - temp_dir = Path(config.get('temp_dir', tempfile.gettempdir())).expanduser() - - # NOTE: Create temp_dir if not exists - temp_dir.mkdir(parents=True, exist_ok=True) - - file_data = {} - now = datetime.datetime.now(timezone) - - for message in messages: - try: - o = json.loads(message) - except json.decoder.JSONDecodeError: - LOGGER.error('Unable to parse:\n{}'.format(message)) - raise - message_type = o['type'] - if message_type == 'RECORD': - if 'stream' not in o: - raise Exception("Line is missing required key 'stream': {}".format(message)) - stream = o['stream'] - if stream not in schemas: - raise Exception('A record for stream {} was encountered before a corresponding schema'.format(stream)) - - record_to_load = o['record'] - # NOTE: Validate record - validators[stream].validate(record_to_load) - - if config.get('add_metadata_columns'): - record_to_load = add_metadata_values_to_record(o, {}, now) - else: - record_to_load = remove_metadata_values_from_record(o) - - file_data[stream]['file_data'].append(json.dumps(record_to_load) + '\n') - - # NOTE: write the lines into the temporary file when received data over 64Mb default memory buffer - if sys.getsizeof(file_data[stream]['file_data']) > config.get('memory_buffer'): - save_records(file_data[stream], config.get('open_func')) - - state = None - elif message_type == 'STATE': - LOGGER.debug('Setting state to {}'.format(o['value'])) - state = o['value'] - elif message_type == 'SCHEMA': - if 'stream' not in o: - raise Exception("Line is missing required key 'stream': {}".format(message)) - stream = o['stream'] - schemas[stream] = o['schema'] - - schemas[stream] = add_metadata_columns_to_schema(o) if config.get('add_metadata_columns') else o - - adjust_decimal_precision_for_schema(schemas[stream]) - - # NOTE: prevent exception *** jsonschema.exceptions.UnknownType: Unknown type 'SCHEMA' for validator. - # 'type' is a key word for jsonschema validator which is different from `{'type': 'SCHEMA'}` as the message type. - schemas[stream].pop('type') - validators[stream] = Draft4Validator(schemas[stream], format_checker=FormatChecker()) - - if 'key_properties' not in o: - raise Exception('key_properties field is required') - key_properties[stream] = o['key_properties'] - LOGGER.debug('Setting schema for {}'.format(stream)) - - # NOTE: get the s3 file key. Persistent array data storage. - if stream not in file_data: - file_data[stream] = { - 'target_key': get_target_key( - stream=stream, - config=config, - timestamp=now, - prefix=config.get('s3_key_prefix', '')), - 'file_name': temp_dir / config['naming_convention_default'].format(stream=stream, timestamp=now), - 'file_data': []} - - elif message_type == 'ACTIVATE_VERSION': - LOGGER.debug('ACTIVATE_VERSION {}'.format(message)) - else: - LOGGER.warning('Unknown message type "{}" in message "{}"'.format(o['type'], o)) - - for _, file_info in file_data.items(): - save_records(file_info, config.get('open_func')) - - return state, file_data - - -def config_file(config_path): - datetime_format = { - 'timestamp_format': '%Y%m%dT%H%M%S', - 'date_format': '%Y%m%d' - } - - naming_convention_default = '{stream}-{timestamp}.json' \ - .replace('{timestamp}', '{timestamp:' + datetime_format['timestamp_format'] + '}') \ - .replace('{date}', '{date:' + datetime_format['date_format'] + '}') - - config = { - 'compression': 'none', - 'naming_convention': naming_convention_default, - 'memory_buffer': 64e6 - } - - with open(config_path) as input_file: - config.update(json.load(input_file)) - - missing_params = {'s3_bucket'} - set(config.keys()) - if missing_params: - raise Exception(f'Config is missing required settings: {missing_params}') - - unknown_params = set(config.keys()) - { - 'add_metadata_columns', - 'aws_access_key_id', - 'aws_secret_access_key', - 'aws_session_token', - 'aws_endpoint_url', - 'aws_profile', - 'role_arn', - 's3_bucket', - 's3_key_prefix', - 'encryption_type', - 'encryption_key', - 'compression', - 'naming_convention', - 'timezone_offset', - 'temp_dir', - 'local', - 'memory_buffer' - } - - if unknown_params: - raise Exception(f'Config unknown settings: {unknown_params}') - - config['naming_convention_default'] = naming_convention_default - config['naming_convention'] = config['naming_convention'] \ - .replace('{timestamp}', '{timestamp:' + datetime_format['timestamp_format'] + '}') \ - .replace('{date}', '{date:' + datetime_format['date_format'] + '}') - - if f"{config.get('compression')}".lower() == 'gzip': - config['open_func'] = gzip.open - config['naming_convention_default'] = config['naming_convention_default'] + '.gz' - config['naming_convention'] = config['naming_convention'] + '.gz' - - elif f"{config.get('compression')}".lower() == 'lzma': - config['open_func'] = lzma.open - config['naming_convention_default'] = config['naming_convention_default'] + '.xz' - config['naming_convention'] = config['naming_convention'] + '.xz' - - elif f"{config.get('compression')}".lower() == 'none': - config['open_func'] = open - - else: - raise NotImplementedError( - "Compression type '{}' is not supported. " - "Expected: 'none', 'gzip', or 'lzma'" - .format(f"{config.get('compression')}".lower())) - - return config - - -def upload_files(file_data, config): - if not config.get('local', False): - s3_client = s3.create_client(config) - for stream, file_info in file_data.items(): - if file_info['file_name'].exists(): - s3.upload_file( - s3_client, - str(file_info['file_name']), - config.get('s3_bucket'), - file_info['target_key'], - encryption_type=config.get('encryption_type'), - encryption_key=config.get('encryption_key')) - LOGGER.debug("{} file {} uploaded to {}".format(stream, file_info['target_key'], config.get('s3_bucket'))) - - # NOTE: Remove the local file(s) - file_info['file_name'].unlink() - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('-c', '--config', help='Config file', required=True) - args = parser.parse_args() - - config = config_file(args.config) - - state, file_data = persist_lines(sys.stdin, config) - - # NOTE: Upload created files to S3 - upload_files(file_data, config) - - emit_state(state) - LOGGER.debug('Exiting normally') - - -if __name__ == '__main__': # pragma: no cover - main() diff --git a/src/target_s3_jsonl/logger.py b/src/target_s3_jsonl/logger.py deleted file mode 100644 index 30c250d..0000000 --- a/src/target_s3_jsonl/logger.py +++ /dev/null @@ -1,14 +0,0 @@ -from pathlib import Path -from logging import config, getLogger - - -def get_logger(): - '''Return a Logger instance appropriate for using in a Tap or a Target.''' - # See - # https://docs.python.org/3.5/library/logging.config.html#logging.config.fileConfig - # for a discussion of why or why not to set disable_existing_loggers - # to False. The long and short of it is that if you don't set it to - # False it ruins external module's abilities to use the logging - # facility. - config.fileConfig(Path(__file__).parent / 'logging.conf', disable_existing_loggers=False) - return getLogger() diff --git a/src/target_s3_jsonl/s3.py b/src/target_s3_jsonl/s3.py deleted file mode 100644 index cc92da0..0000000 --- a/src/target_s3_jsonl/s3.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python3 -import os -import backoff -import boto3 -from botocore.exceptions import ClientError - -from target_s3_jsonl.logger import get_logger - -LOGGER = get_logger() - - -def retry_pattern(): - return backoff.on_exception( - backoff.expo, - ClientError, - max_tries=5, - on_backoff=log_backoff_attempt, - factor=10) - - -def log_backoff_attempt(details): - LOGGER.info("Error detected communicating with Amazon, triggering backoff: %d try", details.get("tries")) - - -@retry_pattern() -def create_client(config): - LOGGER.info("Attempting to create AWS session") - - # Get the required parameters from config file and/or environment variables - aws_access_key_id = config.get('aws_access_key_id') or os.environ.get('AWS_ACCESS_KEY_ID') - aws_secret_access_key = config.get('aws_secret_access_key') or os.environ.get('AWS_SECRET_ACCESS_KEY') - aws_session_token = config.get('aws_session_token') or os.environ.get('AWS_SESSION_TOKEN') - aws_profile = config.get('aws_profile') or os.environ.get('AWS_PROFILE') - aws_endpoint_url = config.get('aws_endpoint_url') - role_arn = config.get('role_arn') - - endpoint_params = {'endpoint_url': aws_endpoint_url} if aws_endpoint_url else {} - - # AWS credentials based authentication - if aws_access_key_id and aws_secret_access_key: - aws_session = boto3.session.Session( - aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key, - aws_session_token=aws_session_token) - # AWS Profile based authentication - else: - aws_session = boto3.session.Session(profile_name=aws_profile) - - # config specifies a particular role to assume - # we create a session & s3-client with this role - if role_arn: - role_name = role_arn.split('/', 1)[1] - sts = aws_session.client('sts', **endpoint_params) - resp = sts.assume_role(RoleArn=role_arn, RoleSessionName=f'role-name={role_name}-profile={aws_profile}') - credentials = { - "aws_access_key_id": resp["Credentials"]["AccessKeyId"], - "aws_secret_access_key": resp["Credentials"]["SecretAccessKey"], - "aws_session_token": resp["Credentials"]["SessionToken"], - } - aws_session = boto3.Session(**credentials) - LOGGER.info(f"Creating s3 client with role {role_name}") - return aws_session.client('s3', **endpoint_params) - - -# pylint: disable=too-many-arguments -@retry_pattern() -def upload_file(s3_client, filename, bucket, s3_key, - encryption_type=None, encryption_key=None): - - if encryption_type is None or encryption_type.lower() == "none": - # No encryption config (defaults to settings on the bucket): - encryption_desc = "" - encryption_args = None - elif encryption_type.lower() == "kms": - if encryption_key: - encryption_desc = " using KMS encryption key ID '{}'".format(encryption_key) - encryption_args = {"ServerSideEncryption": "aws:kms", "SSEKMSKeyId": encryption_key} - else: - encryption_desc = " using default KMS encryption" - encryption_args = {"ServerSideEncryption": "aws:kms"} - else: - raise NotImplementedError( - "Encryption type '{}' is not supported. " - "Expected: 'none' or 'KMS'" - .format(encryption_type)) - - LOGGER.info("Uploading {} to bucket {} at {}{}".format(filename, bucket, s3_key, encryption_desc)) - s3_client.upload_file(filename, bucket, s3_key, ExtraArgs=encryption_args) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..0acf26b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,146 @@ +from io import BufferedReader, TextIOWrapper +import sys +import datetime +from datetime import datetime as dt, timezone, tzinfo +from pathlib import Path +import argparse +import json + +from pytest import fixture + + +def clear_dir(dir_path): + for path in dir_path.iterdir(): + path.unlink() + dir_path.rmdir() + + +@fixture +def patch_datetime(monkeypatch): + + class mydatetime(dt): + @classmethod + def now(cls, tz: tzinfo = None): + # NOTE: timestamp dt.fromtimestamp(1628663978.321056, tz=timezone.utc) + d: dt = dt.strptime('2022-04-29 07:39:38.321056+01:00', '%Y-%m-%d %H:%M:%S.%f%z') + return d.astimezone(tz) if tz else d + + @classmethod + def utcnow(cls): + return cls.now(timezone.utc).replace(tzinfo=None) + + monkeypatch.setattr(datetime, 'datetime', mydatetime) + + +@fixture # (scope='session') +def temp_path(tmpdir_factory): + + return tmpdir_factory.mktemp('root_dir') + + +@fixture +def config_raw(temp_path): + '''Use custom configuration set''' + + return { + 'add_metadata_columns': False, + 'work_dir': f'{temp_path}/tests/output', + 'memory_buffer': 2000000, + 'compression': 'none', + 'timezone_offset': 0, + 'path_template': '{stream}-{date_time}.json' + } + + +@fixture +def config(patch_datetime, config_raw): + '''Use custom configuration set''' + + return config_raw | { + # 'date_time': dt.strptime('2021-08-11 06:39:38.321056+00:00', '%Y-%m-%d %H:%M:%S.%f%z'), + 'date_time': datetime.datetime.now(), + 'work_path': Path(config_raw['work_dir']), + 'open_func': open + } + + +@fixture +def file_metadata(temp_path): + '''Use expected metadata''' + + return { + 'tap_dummy_test-test_table_one': { + 'part': 1, + 'path': { + 1: {'relative_path': 'tap_dummy_test-test_table_one-2022-04-29 06:39:38.321056+00:00.json', + 'absolute_path': Path(f'{temp_path}/tests/output/tap_dummy_test-test_table_one-2022-04-29 06:39:38.321056+00:00.json')}}, + 'file_data': []}, + 'tap_dummy_test-test_table_two': { + 'part': 1, + 'path': { + 1: {'relative_path': 'tap_dummy_test-test_table_two-2022-04-29 06:39:38.321056+00:00.json', + 'absolute_path': Path(f'{temp_path}/tests/output/tap_dummy_test-test_table_two-2022-04-29 06:39:38.321056+00:00.json')}}, + 'file_data': []}, + 'tap_dummy_test-test_table_three': { + 'part': 1, + 'path': { + 1: {'relative_path': 'tap_dummy_test-test_table_three-2022-04-29 06:39:38.321056+00:00.json', + 'absolute_path': Path(f'{temp_path}/tests/output/tap_dummy_test-test_table_three-2022-04-29 06:39:38.321056+00:00.json')}}, + 'file_data': [ + {"c_pk": 1, "c_varchar": "1", "c_int": 1, "c_time": "04:00:00"}, + {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_time": "07:15:00"}, + {"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_time": "23:00:03"}]}} + + +@fixture +def state(): + '''Use expected state''' + + return { + 'currently_syncing': None, + 'bookmarks': { + 'tap_dummy_test-test_table_one': {'initial_full_table_complete': True}, + 'tap_dummy_test-test_table_two': {'initial_full_table_complete': True}, + 'tap_dummy_test-test_table_three': {'initial_full_table_complete': True}}} + + +@fixture +def input_multi_stream_data(): + '''messages-with-three-streams.json''' + + return Path('tests', 'resources', 'messages-with-three-streams.json').read_text(encoding='utf-8')[:-1].split('\n') + + +@fixture +def patch_argument_parser(monkeypatch, temp_path, config_raw): + + temp_file = temp_path.join('config.json') + temp_file.write_text(json.dumps(config_raw), encoding='utf-8') + + class argument_parser: + + def __init__(self): + self.config = str(temp_file) + + def add_argument(self, x, y, help='Dummy config file', required=False): + pass + + def parse_args(self): + return self + + monkeypatch.setattr(argparse, 'ArgumentParser', argument_parser) + + +@fixture # (scope='module') +def patch_sys_stdin(monkeypatch): + + # Get a file-like object in binary mode + input_file = Path('tests', 'resources', 'messages-with-three-streams.json').open('rb') + # Wrap it in a buffered reader with a 4096 byte buffer + # This can also be used to read later from the buffer independently without consuming the IOReader + buffered = BufferedReader(input_file, buffer_size=4096) + # Could then first_bytes = buffered.peek(2048) + # Wrap the buffered reader in a text io wrapper that can decode to unicode + decoded = TextIOWrapper(buffered, encoding='utf-8') + + monkeypatch.setattr(sys, 'stdin', decoded) diff --git a/tests/resources/config.json b/tests/resources/config.json index 0b70114..0ea01ce 100644 --- a/tests/resources/config.json +++ b/tests/resources/config.json @@ -3,9 +3,9 @@ "aws_access_key_id": "ACCESS-KEY", "aws_secret_access_key": "SECRET", "s3_bucket": "BUCKET", - "temp_dir": "tests/output", + "work_dir": "tests/output", "memory_buffer": 2000000, "compression": "none", "timezone_offset": 0, - "naming_convention": "{stream}-{timestamp:%Y%m%dT%H%M%S}.jsonl" + "path_template": "{stream}-{date_time}.jsonl" } diff --git a/tests/resources/config_assume_role.json b/tests/resources/config_assume_role.json deleted file mode 100644 index c07b07a..0000000 --- a/tests/resources/config_assume_role.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "add_metadata_columns": false, - "aws_access_key_id": "ACCESS-KEY", - "aws_secret_access_key": "SECRET", - "s3_bucket": "BUCKET", - "temp_dir": "tests/output", - "memory_buffer": 2000000, - "compression": "none", - "timezone_offset": 0, - "naming_convention": "{stream}-{timestamp:%Y%m%dT%H%M%S}.jsonl", - "role_arn": "arn:aws:iam::123456789012:role/TestAssumeRole" - -} diff --git a/tests/resources/config_compression_dummy.json b/tests/resources/config_compression_dummy.json deleted file mode 100644 index 81e08ff..0000000 --- a/tests/resources/config_compression_dummy.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "s3_bucket": "BUCKET", - "compression": "dummy" -} diff --git a/tests/resources/config_compression_gzip.json b/tests/resources/config_compression_gzip.json deleted file mode 100644 index c6dcc8b..0000000 --- a/tests/resources/config_compression_gzip.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "s3_bucket": "BUCKET", - "compression": "gzip" -} diff --git a/tests/resources/config_compression_lzma.json b/tests/resources/config_compression_lzma.json deleted file mode 100644 index a30a11a..0000000 --- a/tests/resources/config_compression_lzma.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "s3_bucket": "BUCKET", - "compression": "lzma" -} diff --git a/tests/resources/config_local.json b/tests/resources/config_local.json deleted file mode 100644 index b4c1fe4..0000000 --- a/tests/resources/config_local.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "local": true, - "add_metadata_columns": false, - "aws_access_key_id": "ACCESS-KEY", - "aws_secret_access_key": "SECRET", - "s3_bucket": "BUCKET", - "temp_dir": "tests/output", - "memory_buffer": 2000000, - "compression": "none", - "timezone_offset": 0, - "naming_convention": "{stream}-{timestamp:%Y%m%dT%H%M%S}.jsonl" -} diff --git a/tests/resources/config_naked.json b/tests/resources/config_naked.json deleted file mode 100644 index 7624804..0000000 --- a/tests/resources/config_naked.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "s3_bucket": "BUCKET" -} diff --git a/tests/resources/config_no_bucket.json b/tests/resources/config_no_bucket.json deleted file mode 100644 index 6fb3214..0000000 --- a/tests/resources/config_no_bucket.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "local": true, - "add_metadata_columns": false, - "aws_access_key_id": "ACCESS-KEY", - "aws_secret_access_key": "SECRET", - "temp_dir": "tests/output", - "memory_buffer": 2000000, - "compression": "none", - "timezone_offset": 0, - "naming_convention": "{stream}-{timestamp:%Y%m%dT%H%M%S}.jsonl" -} diff --git a/tests/resources/config_unknown_param.json b/tests/resources/config_unknown_param.json deleted file mode 100644 index cda8ed3..0000000 --- a/tests/resources/config_unknown_param.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "s3_bucket": "BUCKET", - "dummy": "dummy" -} diff --git a/tests/resources/invalid-json.json b/tests/resources/invalid-json.json deleted file mode 100644 index e3a0707..0000000 --- a/tests/resources/invalid-json.json +++ /dev/null @@ -1,4 +0,0 @@ -{"type": "STATE", "value": {"currently_syncing": "tap_dummy_test-test_table_one"}} -{"type": "SCHEMA", "stream": "tap_dummy_test-test_table_one", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]} -THIS IS A TEST INPUT FROM A TAP WITH A LINE WITH INVALID JSON -{"type": "ACTIVATE_VERSION", "stream": "tap_dummy_test-test_table_one", "version": 1} diff --git a/tests/resources/invalid-message-order.json b/tests/resources/invalid-message-order.json deleted file mode 100644 index 8f67d1f..0000000 --- a/tests/resources/invalid-message-order.json +++ /dev/null @@ -1,3 +0,0 @@ -{"type": "SCHEMA", "stream": "tap_dummy_test-test_table_one", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, "type": "object"}, "key_properties": ["c_pk"]} -{"type": "RECORD", "stream": "tap_dummy_test-test_table_one", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1}, "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"} -{"type": "RECORD", "stream": "tap_dummy_test-test_table_two", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-10 02:00:00"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"} \ No newline at end of file diff --git a/tests/test_init.py b/tests/test_init.py deleted file mode 100644 index ca51e66..0000000 --- a/tests/test_init.py +++ /dev/null @@ -1,501 +0,0 @@ -'''Tests for the target_s3_jsonl.main module''' -# Standard library imports -from copy import deepcopy -from datetime import datetime as dt, timezone - -# Third party imports -from pytest import fixture, raises -import boto3 -from moto import mock_s3 - -# Package imports -from target_s3_jsonl import ( - sys, - datetime, - argparse, - gzip, - lzma, - json, - Path, - add_metadata_columns_to_schema, - add_metadata_values_to_record, - remove_metadata_values_from_record, - emit_state, - get_target_key, - save_jsonl_file, - upload_files, - persist_lines, - config_file, - main, -) - - -@fixture -def patch_datetime(monkeypatch): - - class mydatetime: - @classmethod - def utcnow(cls): - return dt.fromtimestamp(1628663978.321056, tz=timezone.utc).replace(tzinfo=None) - - @classmethod - def now(cls, x=timezone.utc, tz=None): - return cls.utcnow() - - @classmethod - def utcfromtimestamp(cls, x): - return cls.utcnow() - - @classmethod - def fromtimestamp(cls, x, format): - return cls.utcnow() - - @classmethod - def strptime(cls, x, format): - return dt.strptime(x, format) - - monkeypatch.setattr(datetime, 'datetime', mydatetime) - - -@fixture -def patch_argument_parser(monkeypatch): - - class argument_parser: - - def __init__(self): - self.config = str(Path('tests', 'resources', 'config.json')) - - def add_argument(self, x, y, help='Dummy config file', required=False): - pass - - def parse_args(self): - return self - - monkeypatch.setattr(argparse, 'ArgumentParser', argument_parser) - - -@fixture -def config(): - '''Use custom configuration set''' - - return { - 'add_metadata_columns': False, - 'aws_access_key_id': 'ACCESS-KEY', - 'aws_secret_access_key': 'SECRET', - 's3_bucket': 'BUCKET', - 'temp_dir': 'tests/output', - 'memory_buffer': 2000000, - 'compression': 'none', - 'timezone_offset': 0, - 'naming_convention': '{stream}-{timestamp:%Y%m%dT%H%M%S}.jsonl', - 'naming_convention_default': '{stream}-{timestamp:%Y%m%dT%H%M%S}.json', - 'open_func': open - } - - -@fixture -def input_data(): - '''Use custom parameters set''' - - with open(Path('tests', 'resources', 'messages.json'), 'r', encoding='utf-8') as input_file: - return [item for item in input_file] - - -@fixture -def input_multi_stream_data(): - '''Use custom parameters set''' - - with open(Path('tests', 'resources', 'messages-with-three-streams.json'), 'r', encoding='utf-8') as input_file: - return [item for item in input_file] - - -@fixture -def invalid_row_data(): - '''Use custom parameters set''' - - with open(Path('tests', 'resources', 'invalid-json.json'), 'r', encoding='utf-8') as input_file: - return [item for item in input_file] - - -@fixture -def invalid_order_data(): - '''Use custom parameters set''' - - with open(Path('tests', 'resources', 'invalid-message-order.json'), 'r', encoding='utf-8') as input_file: - return [item for item in input_file] - - -@fixture -def state(): - '''Use expected state''' - - return { - 'currently_syncing': None, - 'bookmarks': { - 'tap_dummy_test-test_table_one': {'initial_full_table_complete': True}, - 'tap_dummy_test-test_table_two': {'initial_full_table_complete': True}, - 'tap_dummy_test-test_table_three': {'initial_full_table_complete': True}}} - - -@fixture -def file_metadata(): - '''Use expected metadata''' - - return { - 'tap_dummy_test-test_table_one': { - 'target_key': 'tap_dummy_test-test_table_one-20210811T063938.json', - 'file_name': Path('tests/output/tap_dummy_test-test_table_one-20210811T063938.json'), - 'file_data': []}, - 'tap_dummy_test-test_table_two': { - 'target_key': 'tap_dummy_test-test_table_two-20210811T063938.json', - 'file_name': Path('tests/output/tap_dummy_test-test_table_two-20210811T063938.json'), - 'file_data': []}, - 'tap_dummy_test-test_table_three': { - 'target_key': 'tap_dummy_test-test_table_three-20210811T063938.json', - 'file_name': Path('tests/output/tap_dummy_test-test_table_three-20210811T063938.json'), - 'file_data': [ - '{"c_pk": 1, "c_varchar": "1", "c_int": 1, "c_time": "04:00:00"}\n', - '{"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_time": "07:15:00"}\n', - '{"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_time": "23:00:03"}\n']}} - - -def clear_dir(dir_path): - for path in dir_path.iterdir(): - path.unlink() - dir_path.rmdir() - - -def test_emit_state(capsys, state): - '''TEST : simple emit_state call''' - - emit_state(state) - captured = capsys.readouterr() - assert captured.out == json.dumps(state) + '\n' - - emit_state(None) - captured = capsys.readouterr() - assert captured.out == '' - - -def test_add_metadata_columns_to_schema(): - '''TEST : simple add_metadata_columns_to_schema call''' - - assert add_metadata_columns_to_schema({ - "type": "SCHEMA", - "stream": "tap_dummy_test-test_table_one", - "schema": { - "properties": { - "c_pk": { - "inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, - "type": ["null", "integer"]}, - "c_varchar": { - "inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, - "c_int": { - "inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, - "type": ["null", "integer"]}}, - "type": "object"}, - "key_properties": ["c_pk"]}) \ - == { - 'type': 'SCHEMA', - 'stream': 'tap_dummy_test-test_table_one', - 'schema': { - 'properties': { - 'c_pk': { - 'inclusion': 'automatic', 'minimum': -2147483648, 'maximum': 2147483647, - 'type': ['null', 'integer']}, - 'c_varchar': { - 'inclusion': 'available', 'maxLength': 16, 'type': ['null', 'string']}, - 'c_int': { - 'inclusion': 'available', 'minimum': -2147483648, 'maximum': 2147483647, - 'type': ['null', 'integer']}, - '_sdc_batched_at': { - 'type': ['null', 'string'], 'format': 'date-time'}, - '_sdc_deleted_at': {'type': ['null', 'string']}, - '_sdc_extracted_at': {'type': ['null', 'string'], 'format': 'date-time'}, - '_sdc_primary_key': {'type': ['null', 'string']}, - '_sdc_received_at': {'type': ['null', 'string'], 'format': 'date-time'}, - '_sdc_sequence': {'type': ['integer']}, - '_sdc_table_version': {'type': ['null', 'string']}}, - 'type': 'object'}, - 'key_properties': ['c_pk']} - - -def test_add_metadata_values_to_record(): - '''TEST : simple add_metadata_values_to_record call''' - - assert add_metadata_values_to_record({ - "type": "RECORD", - "stream": "tap_dummy_test-test_table_one", - "record": { - "c_pk": 1, "c_varchar": "1", "c_int": 1, "c_float": 1.99}, - "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}, {}, dt.fromtimestamp(1628713605.321056, tz=timezone.utc)) \ - == { - 'c_pk': 1, 'c_varchar': '1', 'c_int': 1, 'c_float': 1.99, - '_sdc_batched_at': '2021-08-11T20:26:45.321056', - '_sdc_deleted_at': None, - '_sdc_extracted_at': '2019-01-31T15:51:47.465408Z', - '_sdc_primary_key': None, - '_sdc_received_at': '2021-08-11T20:26:45.321056', - '_sdc_sequence': 1628713605321, - '_sdc_table_version': 1} - - -def test_remove_metadata_values_from_record(): - '''TEST : simple remove_metadata_values_from_record call''' - - assert remove_metadata_values_from_record({ - "type": "RECORD", - "stream": "tap_dummy_test-test_table_one", - "record": { - "c_pk": 1, "c_varchar": "1", "c_int": 1, "c_float": 1.99, - '_sdc_batched_at': '2021-08-11T21:16:22.420939', - '_sdc_deleted_at': None, - '_sdc_extracted_at': '2019-01-31T15:51:47.465408Z', - '_sdc_primary_key': None, - '_sdc_received_at': '2021-08-11T21:16:22.420939', - '_sdc_sequence': 1628712982421, - '_sdc_table_version': 1}, - "version": 1, "time_extracted": "2019-01-31T15:51:47.465408Z"}) \ - == { - 'c_pk': 1, 'c_varchar': '1', 'c_int': 1, 'c_float': 1.99} - - -def test_get_target_key(config): - '''TEST : simple get_target_key call''' - - timestamp = dt.strptime('20220407_062544', '%Y%m%d_%H%M%S') - assert get_target_key('dummy_stream', config, timestamp=timestamp, prefix='xxx_') == 'xxx_dummy_stream-20220407T062544.jsonl' - - config.update(naming_convention='{date:%Y-%m-%d}{stream:_>8}_{timestamp:%Y%m%d_%H%M%S}.jsonl') - assert get_target_key('my', config, timestamp=timestamp) == '2022-04-07______my_20220407_062544.jsonl' - - -def test_save_file(config, file_metadata): - '''TEST : simple save_jsonl_file call''' - Path(config['temp_dir']).mkdir(parents=True, exist_ok=True) - - # NOTE: test compression saved file - for open_func, extension in {open: '', gzip.open: '.gz', lzma.open: '.xz'}.items(): - file_metadata_copy = deepcopy(file_metadata) - for _, file_info in file_metadata_copy.items(): - file_info['file_name'] = file_info['file_name'].parent / f"{file_info['file_name'].name}{extension}" - save_jsonl_file(file_info, open_func) - - assert not file_metadata_copy['tap_dummy_test-test_table_one']['file_name'].exists() - assert not file_metadata_copy['tap_dummy_test-test_table_two']['file_name'].exists() - assert file_metadata_copy['tap_dummy_test-test_table_three']['file_name'].exists() - - with open_func(file_metadata_copy['tap_dummy_test-test_table_three']['file_name'], 'rt', encoding='utf-8') as input_file: - assert [item for item in input_file] == file_metadata['tap_dummy_test-test_table_three']['file_data'] - - del file_metadata_copy - - clear_dir(Path(config['temp_dir'])) - - -@mock_s3 -def test_upload_files(config, file_metadata): - '''TEST : simple upload_files call''' - - Path(config['temp_dir']).mkdir(parents=True, exist_ok=True) - for _, file_info in file_metadata.items(): - save_jsonl_file(file_info, open) - - conn = boto3.resource('s3', region_name='us-east-1') - conn.create_bucket(Bucket=config['s3_bucket']) - - upload_files(file_metadata, config) - - assert not file_metadata['tap_dummy_test-test_table_three']['file_name'].exists() - - clear_dir(Path(config['temp_dir'])) - - -def test_persist_lines(caplog, config, input_data, input_multi_stream_data, invalid_row_data, invalid_order_data, state, file_metadata): - '''TEST : simple persist_lines call''' - output_state, output_file_metadata = persist_lines(input_multi_stream_data, config) - file_paths = set(path for path in Path(config['temp_dir']).iterdir()) - - assert output_state == state - - assert len(file_paths) == 3 - - assert len(set(str(values['file_name']) for _, values in output_file_metadata.items()) - set(str(path) for path in file_paths)) == 0 - - with open(output_file_metadata['tap_dummy_test-test_table_three']['file_name'], 'r', encoding='utf-8') as input_file: - assert [item for item in input_file] == file_metadata['tap_dummy_test-test_table_three']['file_data'] - - for compression, extension in {'gzip': '.gz', 'lzma': '.xz', 'none': ''}.items(): - clear_dir(Path(config['temp_dir'])) - config_copy = deepcopy(config) - config_copy['compression'] = compression - output_state, output_file_metadata = persist_lines(input_multi_stream_data, config_copy) - file_paths = set(path for path in Path(config['temp_dir']).iterdir()) - - assert len(file_paths) == 3 - - assert len(set(str(values['file_name']) for _, values in output_file_metadata.items()) - set(str(path) for path in file_paths)) == 0 - - clear_dir(Path(config['temp_dir'])) - - config_copy = deepcopy(config) - config_copy['add_metadata_columns'] = True - output_state, output_file_metadata = persist_lines(input_multi_stream_data, config_copy) - - assert output_state == state - - clear_dir(Path(config['temp_dir'])) - - config_copy = deepcopy(config) - config_copy['memory_buffer'] = 9 - output_state, output_file_metadata = persist_lines(input_multi_stream_data, config_copy) - - assert output_state == state - - clear_dir(Path(config['temp_dir'])) - - dummy_type = '{"type": "DUMMY", "value": {"currently_syncing": "tap_dummy_test-test_table_one"}}' - output_state, output_file_metadata = persist_lines([dummy_type] + input_multi_stream_data, config) - - assert 'Unknown message type "{}" in message "{}"'.format(json.loads(dummy_type)['type'], dummy_type.replace('"', "'")) in caplog.text - - with raises(json.decoder.JSONDecodeError): - output_state, output_file_metadata = persist_lines(invalid_row_data, config) - - with raises(Exception): - output_state, output_file_metadata = persist_lines(invalid_order_data, config) - - record = { - "type": "RECORD", - "stream": "tap_dummy_test-test_table_one", - "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1}, - "version": 1, - "time_extracted": "2019-01-31T15:51:47.465408Z"} - - with raises(Exception): - dummy_input_multi_stream_data = deepcopy(input_multi_stream_data) - dummy_record = deepcopy(record) - dummy_record.pop('stream') - dummy_input_multi_stream_data.insert(3, json.dumps(dummy_record)) - output_state, output_file_metadata = persist_lines(dummy_input_multi_stream_data, config) - - schema = { - "type": "SCHEMA", - "stream": "tap_dummy_test-test_table_one", - "schema": { - "properties": { - "c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, - "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, - "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}}, - "type": "object"}, - "key_properties": ["c_pk"]} - - with raises(Exception): - dummy_input_multi_stream_data = deepcopy(input_multi_stream_data) - dummy_schema = deepcopy(schema) - dummy_schema.pop('stream') - dummy_input_multi_stream_data.insert(1, json.dumps(dummy_schema)) - output_state, output_file_metadata = persist_lines(dummy_input_multi_stream_data, config) - - with raises(Exception): - dummy_input_multi_stream_data = deepcopy(input_multi_stream_data) - dummy_schema = deepcopy(schema) - dummy_schema.pop('key_properties') - dummy_input_multi_stream_data.insert(1, json.dumps(dummy_schema)) - output_state, output_file_metadata = persist_lines(dummy_input_multi_stream_data, config) - - # NOTE: 2 distant waves of the same stream - dummy_input_data = deepcopy(input_data) - for item in input_data[-4:-7:-1]: - dummy_input_data.insert(5, item) - output_state, output_file_metadata = persist_lines(dummy_input_data, config) - - assert output_state == json.loads(input_data[-1])['value'] - - with open(output_file_metadata['users']['file_name'], 'r', encoding='utf-8') as input_file: - assert [item for item in input_file] == [json.dumps(json.loads(item)['record']) + '\n' for item in input_data[1:3]] * 2 - - clear_dir(Path(config['temp_dir'])) - - -def test_get_config(config): - '''TEST : extract and enrich the configuration''' - - assert config_file(str(Path('tests', 'resources', 'config.json'))) == config - - assert config_file(str(Path('tests', 'resources', 'config_naked.json'))) == { - 's3_bucket': 'BUCKET', - 'compression': 'none', - 'naming_convention': '{stream}-{timestamp:%Y%m%dT%H%M%S}.json', - 'memory_buffer': 64e6, - 'naming_convention_default': '{stream}-{timestamp:%Y%m%dT%H%M%S}.json', - 'open_func': open - } - - with raises(Exception): - config_file(str(Path('tests', 'resources', 'config_no_bucket.json'))) - - with raises(Exception): - config_file(str(Path('tests', 'resources', 'config_unknown_param.json'))) - - assert config_file(str(Path('tests', 'resources', 'config_compression_gzip.json'))) == { - 's3_bucket': 'BUCKET', - 'compression': 'gzip', - 'naming_convention': '{stream}-{timestamp:%Y%m%dT%H%M%S}.json.gz', - 'memory_buffer': 64e6, - 'naming_convention_default': '{stream}-{timestamp:%Y%m%dT%H%M%S}.json.gz', - 'open_func': gzip.open - } - - assert config_file(str(Path('tests', 'resources', 'config_compression_lzma.json'))) == { - 's3_bucket': 'BUCKET', - 'compression': 'lzma', - 'naming_convention': '{stream}-{timestamp:%Y%m%dT%H%M%S}.json.xz', - 'memory_buffer': 64e6, - 'naming_convention_default': '{stream}-{timestamp:%Y%m%dT%H%M%S}.json.xz', - 'open_func': lzma.open - } - - with raises(NotImplementedError): - config_file(str(Path('tests', 'resources', 'config_compression_dummy.json'))) - - -@mock_s3 -def test_main(monkeypatch, capsys, patch_datetime, patch_argument_parser, input_multi_stream_data, config, state, file_metadata): - '''TEST : simple main call''' - - monkeypatch.setattr(sys, 'stdin', input_multi_stream_data) - - conn = boto3.resource('s3', region_name='us-east-1') - conn.create_bucket(Bucket=config['s3_bucket']) - - main() - - captured = capsys.readouterr() - assert captured.out == json.dumps(state) + '\n' - - for _, file_info in file_metadata.items(): - assert not file_info['file_name'].exists() - - class argument_parser: - - def __init__(self): - self.config = str(Path('tests', 'resources', 'config_local.json')) - - def add_argument(self, x, y, help='Dummy config file', required=False): - pass - - def parse_args(self): - return self - - monkeypatch.setattr(argparse, 'ArgumentParser', argument_parser) - - main() - - captured = capsys.readouterr() - assert captured.out == json.dumps(state) + '\n' - - for _, file_info in file_metadata.items(): - assert file_info['file_name'].exists() - - clear_dir(Path(config['temp_dir'])) diff --git a/tests/test_s3.py b/tests/test_s3.py index d9bd77c..5754dbf 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -1,77 +1,267 @@ -'''Tests for the target_s3_jsonl.main module''' +'''Tests for the target_s3_json.s3 module''' # Standard library imports +import sys +from os import environ +from asyncio import run from copy import deepcopy +from re import match +import lzma from pathlib import Path +import datetime +import boto3 +from botocore.client import BaseClient +# from botocore.stub import Stubber +# from botocore.exceptions import ClientError +# from aiobotocore.session import get_session + +from io import BytesIO import json -import os +import gzip -import boto3 +# from pytest import patch +# Third party imports +from pytest import fixture, raises, mark from moto import mock_s3, mock_sts -from pytest import fixture, raises # Package imports -from target_s3_jsonl.s3 import create_client, upload_file, log_backoff_attempt +# from target.file import save_json +from target_s3_json.s3 import ( + _log_backoff_attempt, config_compression, create_session, get_encryption_args, put_object, upload_file, config_s3, main +) + +# from .conftest import clear_dir + +# import shutil +# import signal +# import subprocess as sp +# import sys +# import time +# import requests +# from aiohttp.web_exceptions import HTTPError +# from aiohttp import ClientSession, ClientResponse, BasicAuth, ClientResponseError, ClientError, TCPConnector + + +# _proxy_bypass = { +# "http": None, +# "https": None, +# } + + +# async def start_service(session, service_name, host, port): +# moto_svr_path = shutil.which("moto_server") +# args = [sys.executable, moto_svr_path, service_name, "-H", host, +# "-p", str(port)] +# process = sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.DEVNULL) +# url = "http://{host}:{port}".format(host=host, port=port) + +# for _ in range(30): +# if process.poll() is not None: +# break + +# async with session.request('get', url, timeout=0.1, proxies=_proxy_bypass) as _: +# try: +# # we need to bypass the proxies due to monkeypatches +# # requests.get(url, timeout=0.1, proxies=_proxy_bypass) +# # response.status +# break +# # except requests.exceptions.RequestException: +# except (HTTPError, ConnectionError): +# time.sleep(0.1) +# else: +# stop_process(process) +# raise AssertionError("Can not start service: {}".format(service_name)) + +# return process + + +# def stop_process(process, timeout=20): +# try: +# process.send_signal(signal.SIGTERM) +# process.communicate(timeout=timeout / 2) +# except sp.TimeoutExpired: +# process.kill() +# outs, errors = process.communicate(timeout=timeout / 2) +# exit_code = process.returncode +# msg = "Child process finished {} not in clean way: {} {}" \ +# .format(exit_code, outs, errors) +# raise RuntimeError(msg) + + +# @fixture(scope='session') +# def s3_server(): +# host = 'localhost' +# port = 5002 +# url = 'http://{host}:{port}'.format(host=host, port=port) + +# session = ClientSession( +# # NOTE: overwrite the default connector to customise the default connection settings applied to the queries +# connector=TCPConnector( +# # NOTE: max concurrent connections to the end point +# limit_per_host=16, +# # NOTE: limit on the client connections total count. 100 by default +# # limit=limit_connections_count, +# # NOTE: live connection duration +# keepalive_timeout=30), +# connector_owner=True) + + +# process = run(start_service(session, 's3', host, port)) +# yield url +# stop_process(session, process) + + +# @fixture(scope='session') +# def s3_server(): +# host = 'localhost' +# port = 5002 +# url = 'http://{host}:{port}'.format(host=host, port=port) +# process = start_service('s3', host, port) +# yield url +# stop_process(process) + + +@fixture +def config_raw(temp_path): + '''Use custom configuration set''' + + return { + 's3_bucket': 'BUCKET', + 'aws_access_key_id': 'ACCESS-KEY', + 'aws_secret_access_key': 'SECRET', + 'add_metadata_columns': False, + 'work_dir': f'{temp_path}/tests/output', + 'memory_buffer': 2000000, + 'compression': 'none', + 'timezone_offset': 0, + 'path_template': '{stream}-{date_time}.json' + } @fixture -def config(): - '''Use custom parameters set''' +def config(patch_datetime, config_raw): + '''Use custom configuration set''' - with open(Path('tests', 'resources', 'config.json'), 'r', encoding='utf-8') as config_file: - return json.load(config_file) + return config_raw | { + # 'date_time': dt.strptime('2021-08-11 06:39:38.321056+00:00', '%Y-%m-%d %H:%M:%S.%f%z'), + 'date_time': datetime.datetime.utcnow(), + 'work_path': Path(config_raw['work_dir']), + 'open_func': open + } @fixture -def config_assume_role(): - with open(Path('tests', 'resources', 'config_assume_role.json'), 'r', encoding='utf-8') as f: - return json.load(f) +def config_assume_role(config): + '''Use custom configuration set''' + + return config | { + 'role_arn': 'arn:aws:iam::123456789012:role/TestAssumeRole' + } + + +# @fixture +# @mock_s3 +# def bucket(config): +# conn = boto3.resource('s3', region_name='us-east-1', endpoint_url='https://s3.amazonaws.com') +# # We need to create the bucket since this is all in Moto's 'virtual' AWS account +# conn.create_bucket(Bucket=config['s3_bucket']) +# return conn + +# NOTE: Explore https://github.com/aio-libs/aiobotocore/issues/440 @fixture(scope='module') def aws_credentials(): - """Mocked AWS Credentials for moto.""" + '''Mocked AWS Credentials for moto.''' moto_credentials_file_path = Path('tests', 'resources', 'aws_credentials') - os.environ['AWS_SHARED_CREDENTIALS_FILE'] = str(moto_credentials_file_path) + environ['AWS_SHARED_CREDENTIALS_FILE'] = str(moto_credentials_file_path) - os.environ['AWS_ACCESS_KEY_ID'] = 'that_key' - os.environ['AWS_SECRET_ACCESS_KEY'] = 'no_big_secret' + environ['AWS_ACCESS_KEY_ID'] = 'that_key' + environ['AWS_SECRET_ACCESS_KEY'] = 'no_big_secret' def test_log_backoff_attempt(caplog): - '''TEST : simple upload_files call''' + '''TEST : simple _log_backoff_attempt call''' + + _log_backoff_attempt({'tries': 99}) + pat = r'INFO root:s3.py:\d{2} Error detected communicating with Amazon, triggering backoff: 99 try\n' - log_backoff_attempt({'tries': 2}) + assert match(pat, caplog.text) - assert 'Error detected communicating with Amazon, triggering backoff: 2 try' in caplog.text + +def test_config_compression(config): + '''TEST : simple config_compression call''' + + assert f"{config.get('compression')}".lower() in {'', 'none', 'gzip', 'lzma'} + + with raises(Exception): + config_compression(config | {'compression': 'dummy'}) + + +@mark.parametrize("compression,extention,open_func", [('none', '', open), ('gzip', '.gz', gzip.compress), ('lzma', '.xz', lzma.compress)]) +def test_config_compression_open_func(config, compression, extention, open_func): + '''TEST : simple config_compression call''' + + assert config_compression(config | {'compression': compression}) == config | { + 'compression': compression, + 'path_template': config['path_template'] + extention, + 'open_func': open_func + } @mock_sts @mock_s3 -def test_create_client_with_assumed_role(config_assume_role, caplog): - """Assert client is created with assumed role when role_arn is specified""" - create_client(config_assume_role) - assert caplog.text.endswith('Creating s3 client with role TestAssumeRole\n') +def test_create_client_with_assumed_role(caplog, config_assume_role: dict): + '''Assert client is created with assumed role when role_arn is specified''' + + create_session(config_assume_role) + assert caplog.text.endswith('Creating s3 session with role TestAssumeRole\n') @mock_s3 -def test_create_client(aws_credentials, config): - '''TEST : simple upload_files call''' +def test_create_session(aws_credentials, config): + '''TEST : simple create_session call''' conn = boto3.resource('s3', region_name='us-east-1', endpoint_url='https://s3.amazonaws.com') # We need to create the bucket since this is all in Moto's 'virtual' AWS account conn.create_bucket(Bucket=config['s3_bucket']) - client = create_client(config) + # async with get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server) as client: + # with patch('aiobotocore.AioSession.create_client') as mock: + # mock.return_value = client + + client: BaseClient = create_session(config).client('s3') client.put_object(Bucket=config['s3_bucket'], Key='Eddy is', Body='awesome!') body = conn.Object(config['s3_bucket'], 'Eddy is').get()['Body'].read().decode("utf-8") assert body == 'awesome!' + # NOTE: Test jsonl upload + file_metadata = { + 'absolute_path': Path('tests', 'resources', 'messages.json'), + 'relative_path': 'dummy/messages.json'} + + stream_src = [ + {"c_pk": 1, "c_varchar": "1", "c_int": 1, "c_time": "04:00:00"}, + {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_time": "07:15:00"}, + {"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_time": "23:00:03"}] + # stream_bin = b''.join(json.dumps(record, ensure_ascii=False).encode('utf-8') + b'\n' for record in stream_src) + # client.put_object(Body=gzip.GzipFile(fileobj=BytesIO(stream_bin), mode='w'), Bucket=config['s3_bucket'], Key=file_metadata['relative_path']) + + # with gzip.open(BytesIO([json.dumps(record, ensure_ascii=False).encode('utf-8') + b'\n' for record in stream_src]), 'wt', encoding='utf-8') as output_data: + # client.put_object(Body=output_data, Bucket=config['s3_bucket'], Key=file_metadata['relative_path']) + + stream_bin = b''.join(json.dumps(record, ensure_ascii=False).encode('utf-8') + b'\n' for record in stream_src) + client.put_object(Body=gzip.compress(stream_bin), Bucket=config['s3_bucket'], Key=file_metadata['relative_path']) + body = conn.Object(config['s3_bucket'], file_metadata['relative_path']).get()['Body'].read() + # stream_txt = gzip.decompress(body) + with gzip.open(BytesIO(body), 'rt', encoding='utf-8') as input_data: + assert [json.loads(item) for item in input_data] == stream_src + with raises(Exception): config_copy = deepcopy(config) config_copy['aws_endpoint_url'] = 'xXx' - client = create_client(config_copy) + client = create_session(config_copy).client('s3', endpoint_url=config_copy.get('aws_endpoint_url')) client.put_object(Bucket=config_copy['s3_bucket'], Key='Eddy is', Body='awesome!') body = conn.Object(config_copy['s3_bucket'], 'Eddy is').get()['Body'].read().decode("utf-8") @@ -80,75 +270,224 @@ def test_create_client(aws_credentials, config): config_copy['aws_profile'] = 'dummy' config_copy.pop('aws_access_key_id') config_copy.pop('aws_secret_access_key') - os.environ.pop('AWS_ACCESS_KEY_ID') - os.environ.pop('AWS_SECRET_ACCESS_KEY') + environ.pop('AWS_ACCESS_KEY_ID') + environ.pop('AWS_SECRET_ACCESS_KEY') - client = create_client(config_copy) + client = create_session(config_copy).client('s3') client.put_object(Bucket=config_copy['s3_bucket'], Key='Look!', Body='No access key!') body = conn.Object(config_copy['s3_bucket'], 'Look!').get()['Body'].read().decode("utf-8") assert body == 'No access key!' +def test_get_encryption_args(config): + '''TEST : simple get_encryption_args call''' + + encryption_desc, encryption_args = get_encryption_args(config) + assert encryption_args == {} + assert encryption_desc == '' + + encryption_desc, encryption_args = get_encryption_args(config | {'encryption_type': 'kms'}) + assert encryption_args == {'ExtraArgs': {'ServerSideEncryption': 'aws:kms'}} + assert encryption_desc == ' using default KMS encryption' + + encryption_desc, encryption_args = get_encryption_args(config | {'encryption_type': 'kms', 'encryption_key': 'SECRET'}) + assert encryption_args == {'ExtraArgs': {'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': 'SECRET'}} + assert encryption_desc == " using KMS encryption key ID '{}'".format('SECRET') + + with raises(Exception): + encryption_desc, encryption_args = get_encryption_args(config | {'encryption_type': 'dummy'}) + + @mock_s3 -def test_upload_file(config): - '''TEST : simple upload_files call''' +def test_put_object(config): + '''TEST : simple put_object call''' + config |= {'compression': 'gzip', 'open_func': gzip.compress} conn = boto3.resource('s3', region_name='us-east-1') conn.create_bucket(Bucket=config['s3_bucket']) - client = create_client(config) + client: BaseClient = create_session(config).client('s3') + + file_metadata = { + 'absolute_path': Path('tests', 'resources', 'messages.json.gz'), + 'relative_path': 'dummy/messages.json.gz'} + + stream_data = [ + {"c_pk": 1, "c_varchar": "1", "c_int": 1, "c_time": "04:00:00"}, + {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_time": "07:15:00"}, + {"c_pk": 3, "c_varchar": "3", "c_int": 3, "c_time": "23:00:03"}] - file_path = str(Path('tests', 'resources', 'messages.json')) - s3_key = 'dummy/messages.json' - upload_file( - client, - file_path, - config.get('s3_bucket'), - s3_key, - encryption_type=config.get('encryption_type'), - encryption_key=config.get('encryption_key')) + run(put_object( + config, + file_metadata, + stream_data, + client)) - head = client.head_object(Bucket=config.get('s3_bucket'), Key=s3_key) + head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) + assert head['ResponseMetadata']['HTTPStatusCode'] == 200 + assert head['ContentLength'] == 102 + assert head['ResponseMetadata']['RetryAttempts'] == 0 + + # NOTE: 'kms' encryption_type with default encryption_key + file_metadata = { + 'absolute_path': Path('tests', 'resources', 'messages.json.gz'), + 'relative_path': 'dummy/messages_kms.json.gz'} + run(put_object( + config | {'encryption_type': 'kms', 'encryption_key': None}, + file_metadata, + stream_data, + client)) + + head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) + assert head['ResponseMetadata']['HTTPStatusCode'] == 200 + assert head['ContentLength'] == 102 + assert head['ResponseMetadata']['RetryAttempts'] == 0 + + # NOTE: 'kms' encryption_type with encryption_key + file_metadata = { + 'absolute_path': Path('tests', 'resources', 'messages.json.gz'), + 'relative_path': 'dummy/messages_kms.json.gz'} + run(put_object( + config | {'encryption_type': 'kms', 'encryption_key': 'xXx'}, + file_metadata, + stream_data, + client)) + + head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) + assert head['ResponseMetadata']['HTTPStatusCode'] == 200 + assert head['ContentLength'] == 102 + assert head['ResponseMetadata']['RetryAttempts'] == 0 + + # NOTE: 'dummy' encryption_type + with raises(Exception): + run(put_object( + config | {'encryption_type': 'dummy'}, + file_metadata | {'relative_path': 'dummy/messages_dummy.json.gz'}, + stream_data, + client)) + + +@mock_s3 +def test_upload_file(config, temp_path): + '''TEST : simple upload_file call''' + + conn = boto3.resource('s3', region_name='us-east-1') + conn.create_bucket(Bucket=config['s3_bucket']) + + client: BaseClient = create_session(config).client('s3') + + temp_file: Path = Path(temp_path.join('temp_file.json')) + temp_file.write_bytes(Path('tests', 'resources', 'messages.json').read_bytes()) + + file_metadata = { + 'absolute_path': temp_file, + 'relative_path': 'dummy/messages.json'} + + run(upload_file( + config | {'local': True, 'client': client, 'remove_file': False}, + file_metadata)) + + assert 'Contents' not in client.list_objects_v2(Bucket=config['s3_bucket'], Prefix=file_metadata['relative_path'], MaxKeys=1) + assert file_metadata['absolute_path'].exists() + + run(upload_file( + config | {'client': client, 'remove_file': False}, + file_metadata)) + + head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) assert head['ResponseMetadata']['HTTPStatusCode'] == 200 assert head['ContentLength'] == 613 assert head['ResponseMetadata']['RetryAttempts'] == 0 # NOTE: 'kms' encryption_type with default encryption_key - s3_key = 'dummy/messages_kms.json' - upload_file( - client, - file_path, - config.get('s3_bucket'), - s3_key, - encryption_type='kms') - - head = client.head_object(Bucket=config.get('s3_bucket'), Key=s3_key) + file_metadata = { + 'absolute_path': temp_file, + 'relative_path': 'dummy/messages_kms.json'} + run(upload_file( + config | {'client': client, 'remove_file': False, 'encryption_type': 'kms', 'encryption_key': None}, + file_metadata)) + + head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) assert head['ResponseMetadata']['HTTPStatusCode'] == 200 assert head['ContentLength'] == 613 assert head['ResponseMetadata']['RetryAttempts'] == 0 # NOTE: 'kms' encryption_type with encryption_key - s3_key = 'dummy/messages_kms.json' - upload_file( - client, - file_path, - config.get('s3_bucket'), - s3_key, - encryption_type='kms', - encryption_key='xXx') - - head = client.head_object(Bucket=config.get('s3_bucket'), Key=s3_key) + file_metadata = { + 'absolute_path': temp_file, + 'relative_path': 'dummy/messages_kms.json'} + run(upload_file( + config | {'client': client, 'encryption_type': 'kms', 'encryption_key': 'xXx'}, + file_metadata)) + + head = client.head_object(Bucket=config.get('s3_bucket'), Key=file_metadata['relative_path']) assert head['ResponseMetadata']['HTTPStatusCode'] == 200 assert head['ContentLength'] == 613 assert head['ResponseMetadata']['RetryAttempts'] == 0 + assert not file_metadata['absolute_path'].exists() + # NOTE: 'dummy' encryption_type + # with raises(Exception): + # run(upload_file( + # config | {'client': client, 'encryption_type': 'dummy'}, + # file_metadata | {'relative_path': 'dummy/messages_dummy.json'})) + + +def test_config_s3(config_raw): + + config = deepcopy(config_raw) + config['temp_dir'] = config.pop('work_dir') + config['naming_convention'] = config.pop('path_template') + assert config_s3(config) == config_raw + + config = deepcopy(config_raw) + config['temp_dir'] = config.pop('work_dir') + config.pop('path_template') + config['naming_convention'] = '{stream}-{timestamp}.json' + assert config_s3(config) == config_raw | {'path_template': '{stream}-{date_time:%Y%m%dT%H%M%S}.json'} + + config = deepcopy(config_raw) + config['temp_dir'] = config.pop('work_dir') + config.pop('path_template') + config['naming_convention'] = '{stream}-{date}.json' + assert config_s3(config) == config_raw | {'path_template': '{stream}-{date_time:%Y%m%d}.json'} + + config.pop('s3_bucket') with raises(Exception): - upload_file( - client, - file_path, - config.get('s3_bucket'), - 'dummy/messages_dummy.json', - encryption_type='dummy', - encryption_key=config.get('encryption_key')) + config_s3(config) + + +@mock_s3 +def test_main(capsys, patch_datetime, patch_sys_stdin, patch_argument_parser, config_raw, state, file_metadata): + '''TEST : simple main call''' + + conn = boto3.resource('s3', region_name='us-east-1', endpoint_url='https://s3.amazonaws.com') + # We need to create the bucket since this is all in Moto's 'virtual' AWS account + conn.create_bucket(Bucket=config_raw['s3_bucket']) + + main(lines=sys.stdin) + + captured = capsys.readouterr() + assert captured.out == json.dumps(state) + '\n' + + for file_info in file_metadata.values(): + assert not file_info['path'][1]['absolute_path'].exists() + + client: BaseClient = create_session(config_raw).client('s3') + + head = client.head_object(Bucket=config_raw.get('s3_bucket'), Key=file_metadata['tap_dummy_test-test_table_one']['path'][1]['relative_path']) + assert head['ResponseMetadata']['HTTPStatusCode'] == 200 + assert head['ContentLength'] == 42 + assert head['ResponseMetadata']['RetryAttempts'] == 0 + + head = client.head_object(Bucket=config_raw.get('s3_bucket'), Key=file_metadata['tap_dummy_test-test_table_two']['path'][1]['relative_path']) + assert head['ResponseMetadata']['HTTPStatusCode'] == 200 + assert head['ContentLength'] == 150 + assert head['ResponseMetadata']['RetryAttempts'] == 0 + + head = client.head_object(Bucket=config_raw.get('s3_bucket'), Key=file_metadata['tap_dummy_test-test_table_three']['path'][1]['relative_path']) + assert head['ResponseMetadata']['HTTPStatusCode'] == 200 + assert head['ContentLength'] == 192 + assert head['ResponseMetadata']['RetryAttempts'] == 0