diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 99dfa6e..d8689ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: strategy: fail-fast: true matrix: - python-version: [3.6, 3.7, 3.8] + python-version: [3.9, 3.10, 3.11] steps: - name: Checking out repo diff --git a/CHANGELOG.md b/CHANGELOG.md index 18d786e..97a4076 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +1.6.0-2 (SIGNAL AI) (2024-01-12) +-------------------------------- + +- Add zstd compression support on COPY + +1.6.0-1 (SIGNAL AI) (2024-01-12) +-------------------------------- + +- Update dependencies (support Python 3.11) +- `object` types are converted to `SUPER` in redshift +- Fix log (see https://github.com/transferwise/pipelinewise-target-redshift/pull/70) + 1.6.0 (2020-08-03) ------------------- diff --git a/README.md b/README.md index ceba354..c6cbf50 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,13 @@ This is a [PipelineWise](https://transferwise.github.io/pipelinewise) compatible target connector. +## Signal changes + +- Update dependencies (support Python 3.11) +- `object` types are converted to `SUPER` in redshift +- Fix log (see https://github.com/transferwise/pipelinewise-target-redshift/pull/70) +- Support zstd compression on COPY + ## How to use it The recommended method of running this target is to use it from [PipelineWise](https://transferwise.github.io/pipelinewise). When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at [Target Redshift](https://transferwise.github.io/pipelinewise/connectors/targets/redshift.html) diff --git a/setup.py b/setup.py index 8213ecd..7fadae7 100644 --- a/setup.py +++ b/setup.py @@ -18,18 +18,19 @@ ], py_modules=["target_redshift"], install_requires=[ - 'pipelinewise-singer-python==1.*', - 'boto3==1.12.39', - 'psycopg2-binary==2.8.5', - 'inflection==0.4.0', - 'joblib==0.16.0' + 'pipelinewise-singer-python==2.*', + 'boto3==1.34.17', + 'psycopg[binary]==3.1.17', + 'inflection==0.5.1', + 'joblib==1.3.2', + 'zstandard ==0.22.0' ], extras_require={ "test": [ - "pylint==2.4.2", - "pytest==5.3.0", - "mock==3.0.5", - "coverage==4.5.4" + "pylint==3.0.3", + "pytest==7.4.4", + "mock==5.1.0", + "coverage==7.4.0" ] }, entry_points=""" diff --git a/target_redshift/__init__.py b/target_redshift/__init__.py index bf3c6a8..bb97cac 100644 --- a/target_redshift/__init__.py +++ b/target_redshift/__init__.py @@ -8,6 +8,7 @@ import copy import gzip import bz2 +import zstandard as zstd from datetime import datetime from decimal import Decimal from tempfile import mkstemp @@ -375,6 +376,7 @@ def flush_records(stream, records_to_load, row_count, db_sync, compression=None, slices = slices or 1 use_gzip = compression == "gzip" use_bzip2 = compression == "bzip2" + use_zstd = compression == "zstd" if temp_dir: temp_dir = os.path.expanduser(temp_dir) @@ -388,6 +390,9 @@ def flush_records(stream, records_to_load, row_count, db_sync, compression=None, elif use_bzip2: open_method = bz2.open file_extension = file_extension + ".bz2" + elif use_zstd: + open_method = zstd.open + file_extension = file_extension + ".zstd" if not isinstance(slices, int): raise Exception("The provided configuration value 'slices' was not an integer") diff --git a/target_redshift/db_sync.py b/target_redshift/db_sync.py index 4a79049..06a7888 100644 --- a/target_redshift/db_sync.py +++ b/target_redshift/db_sync.py @@ -1,19 +1,24 @@ -import collections + +import sys + +# pylint: disable=no-name-in-module +if sys.version_info.major == 3 and sys.version_info.minor >= 10: + from collections.abc import MutableMapping +else: + from collections import MutableMapping +# pylint: enable=no-name-in-module + import itertools import json import os import re -import sys import time import boto3 -import psycopg2 -import psycopg2.extras - import inflection +import psycopg from singer import get_logger - DEFAULT_VARCHAR_LENGTH = 10000 SHORT_VARCHAR_LENGTH = 256 LONG_VARCHAR_LENGTH = 65535 @@ -51,7 +56,9 @@ def column_type(schema_property, with_length=True): varchar_length = DEFAULT_VARCHAR_LENGTH if schema_property.get('maxLength', 0) > varchar_length: varchar_length = LONG_VARCHAR_LENGTH - if 'object' in property_type or 'array' in property_type: + if 'object' in property_type: + column_type = 'super' + if 'array' in property_type: column_type = 'character varying' varchar_length = LONG_VARCHAR_LENGTH @@ -161,7 +168,7 @@ def flatten_record(d, flatten_schema=None, parent_key=[], sep='__', level=0, max items = [] for k, v in d.items(): new_key = flatten_key(k, parent_key, sep) - if isinstance(v, collections.MutableMapping) and level < max_level: + if isinstance(v, MutableMapping) and level < max_level: items.extend(flatten_record(v, flatten_schema, parent_key + [k], sep=sep, level=level + 1, max_level=max_level).items()) else: items.append((new_key, json.dumps(v) if _should_json_dump_value(k, v, flatten_schema) else v)) @@ -326,12 +333,12 @@ def open_connection(self): self.connection_config['port'] ) - return psycopg2.connect(conn_string) + return psycopg.connect(conn_string) def query(self, query, params=None): self.logger.debug("Running query: {}".format(query)) with self.open_connection() as connection: - with connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + with connection.cursor(row_factory=psycopg.rows.dict_row) as cur: cur.execute( query, params @@ -415,7 +422,7 @@ def load_csv(self, s3_key, count, size_bytes, compression=False): ] with self.open_connection() as connection: - with connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: + with connection.cursor(row_factory=psycopg.rows.dict_row) as cur: inserts = 0 updates = 0 @@ -447,6 +454,8 @@ def load_csv(self, s3_key, count, size_bytes, compression=False): compression_option = " GZIP" elif compression == "bzip2": compression_option = " BZIP2" + elif compression == "zstd": + compression_option = " ZSTD" else: compression_option = "" @@ -737,5 +746,5 @@ def sync_table(self): self.logger.info("Table '{}' does not exist. Creating...".format(table_name_with_schema)) self.create_table_and_grant_privilege() else: - self.logger.info("Table '{}' exists".format(self.schema_name)) + self.logger.info("Table '{}' exists".format(table_name_with_schema)) self.update_columns() diff --git a/tests/integration/test_target_redshift.py b/tests/integration/test_target_redshift.py index 142fa35..773fb32 100644 --- a/tests/integration/test_target_redshift.py +++ b/tests/integration/test_target_redshift.py @@ -357,6 +357,15 @@ def test_loading_tables_with_bz2_compression(self): self.assert_three_streams_are_loaded_in_redshift() + def test_loading_tables_with_zstd_compression(self): + """Loading multiple tables from the same input tap with various columns types and zstd compression""" + tap_lines = test_utils.get_test_tap_lines("messages-with-three-streams.json") + + self.config["compression"] = "zstd" + target_redshift.persist_lines(self.config, tap_lines) + + self.assert_three_streams_are_loaded_in_redshift() + def test_loading_tables_with_hard_delete(self): """Loading multiple tables from the same input tap with deleted rows""" tap_lines = test_utils.get_test_tap_lines("messages-with-three-streams.json") diff --git a/tests/unit/test_db_sync.py b/tests/unit/test_db_sync.py index 8808b7f..4631fe4 100644 --- a/tests/unit/test_db_sync.py +++ b/tests/unit/test_db_sync.py @@ -1,4 +1,3 @@ -import pytest import target_redshift @@ -72,7 +71,7 @@ def test_column_type_mapping(self): json_bool = {"type": ["boolean"] } json_obj = {"type": ["object"] } json_arr = {"type": ["array"] } - + # Mapping from JSON schema types ot Redshift column types assert mapper(json_str) == 'character varying(10000)' assert mapper(json_str_or_null) == 'character varying(10000)' @@ -84,7 +83,7 @@ def test_column_type_mapping(self): assert mapper(json_int) == 'numeric' assert mapper(json_int_or_str) == 'character varying(65535)' assert mapper(json_bool) == 'boolean' - assert mapper(json_obj) == 'character varying(65535)' + assert mapper(json_obj) == 'super' assert mapper(json_arr) == 'character varying(65535)'