Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Support zstd compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Limess committed Jan 12, 2024
1 parent eba84b5 commit df17b46
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 4 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@

This is a [PipelineWise](https://transferwise.github.io/pipelinewise) compatible target connector.

## Signal changes

- Update dependencies (support Python 3.11)
- `object` types are converted to `SUPER` in redshift
- Fix log (see https://github.com/transferwise/pipelinewise-target-redshift/pull/70)
- Support zstd compression on COPY

## How to use it

The recommended method of running this target is to use it from [PipelineWise](https://transferwise.github.io/pipelinewise). When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at [Target Redshift](https://transferwise.github.io/pipelinewise/connectors/targets/redshift.html)
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
'boto3==1.34.17',
'psycopg[binary]==3.1.17',
'inflection==0.5.1',
'joblib==1.3.2'
'joblib==1.3.2',
'zstandard ==0.22.0'
],
extras_require={
"test": [
Expand Down
5 changes: 5 additions & 0 deletions target_redshift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import copy
import gzip
import bz2
import zstandard as zstd
from datetime import datetime
from decimal import Decimal
from tempfile import mkstemp
Expand Down Expand Up @@ -375,6 +376,7 @@ def flush_records(stream, records_to_load, row_count, db_sync, compression=None,
slices = slices or 1
use_gzip = compression == "gzip"
use_bzip2 = compression == "bzip2"
use_zstd = compression == "zstd"

if temp_dir:
temp_dir = os.path.expanduser(temp_dir)
Expand All @@ -388,6 +390,9 @@ def flush_records(stream, records_to_load, row_count, db_sync, compression=None,
elif use_bzip2:
open_method = bz2.open
file_extension = file_extension + ".bz2"
elif use_zstd:
open_method = zstd.open
file_extension = file_extension + ".zstd"

if not isinstance(slices, int):
raise Exception("The provided configuration value 'slices' was not an integer")
Expand Down
7 changes: 4 additions & 3 deletions target_redshift/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import boto3
import inflection
import psycopg
import psycopg.extras
from singer import get_logger

DEFAULT_VARCHAR_LENGTH = 10000
Expand Down Expand Up @@ -339,7 +338,7 @@ def open_connection(self):
def query(self, query, params=None):
self.logger.debug("Running query: {}".format(query))
with self.open_connection() as connection:
with connection.cursor(cursor_factory=psycopg.extras.DictCursor) as cur:
with connection.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute(
query,
params
Expand Down Expand Up @@ -423,7 +422,7 @@ def load_csv(self, s3_key, count, size_bytes, compression=False):
]

with self.open_connection() as connection:
with connection.cursor(cursor_factory=psycopg.extras.DictCursor) as cur:
with connection.cursor(row_factory=psycopg.rows.dict_row) as cur:
inserts = 0
updates = 0

Expand Down Expand Up @@ -455,6 +454,8 @@ def load_csv(self, s3_key, count, size_bytes, compression=False):
compression_option = " GZIP"
elif compression == "bzip2":
compression_option = " BZIP2"
elif compression == "zstd":
compression_option = " ZSTD"
else:
compression_option = ""

Expand Down
9 changes: 9 additions & 0 deletions tests/integration/test_target_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,15 @@ def test_loading_tables_with_bz2_compression(self):

self.assert_three_streams_are_loaded_in_redshift()

def test_loading_tables_with_zstd_compression(self):
"""Loading multiple tables from the same input tap with various columns types and zstd compression"""
tap_lines = test_utils.get_test_tap_lines("messages-with-three-streams.json")

self.config["compression"] = "zstd"
target_redshift.persist_lines(self.config, tap_lines)

self.assert_three_streams_are_loaded_in_redshift()

def test_loading_tables_with_hard_delete(self):
"""Loading multiple tables from the same input tap with deleted rows"""
tap_lines = test_utils.get_test_tap_lines("messages-with-three-streams.json")
Expand Down

0 comments on commit df17b46

Please sign in to comment.