Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Fix MutableMapping for python 3.10 #160

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
fail-fast: true
matrix:
python-version: [3.6, 3.7, 3.8]
python-version: [3.9, 3.10, 3.11]

steps:
- name: Checking out repo
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
1.6.0-2 (SIGNAL AI) (2024-01-12)
--------------------------------

- Add zstd compression support on COPY

1.6.0-1 (SIGNAL AI) (2024-01-12)
--------------------------------

- Update dependencies (support Python 3.11)
- `object` types are converted to `SUPER` in redshift
- Fix log (see https://github.com/transferwise/pipelinewise-target-redshift/pull/70)

1.6.0 (2020-08-03)
-------------------

Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@

This is a [PipelineWise](https://transferwise.github.io/pipelinewise) compatible target connector.

## Signal changes

- Update dependencies (support Python 3.11)
- `object` types are converted to `SUPER` in redshift
- Fix log (see https://github.com/transferwise/pipelinewise-target-redshift/pull/70)
- Support zstd compression on COPY

## How to use it

The recommended method of running this target is to use it from [PipelineWise](https://transferwise.github.io/pipelinewise). When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at [Target Redshift](https://transferwise.github.io/pipelinewise/connectors/targets/redshift.html)
Expand Down
19 changes: 10 additions & 9 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
],
py_modules=["target_redshift"],
install_requires=[
'pipelinewise-singer-python==1.*',
'boto3==1.12.39',
'psycopg2-binary==2.8.5',
'inflection==0.4.0',
'joblib==0.16.0'
'pipelinewise-singer-python==2.*',
'boto3==1.34.17',
'psycopg[binary]==3.1.17',
'inflection==0.5.1',
'joblib==1.3.2',
'zstandard ==0.22.0'
],
extras_require={
"test": [
"pylint==2.4.2",
"pytest==5.3.0",
"mock==3.0.5",
"coverage==4.5.4"
"pylint==3.0.3",
"pytest==7.4.4",
"mock==5.1.0",
"coverage==7.4.0"
]
},
entry_points="""
Expand Down
5 changes: 5 additions & 0 deletions target_redshift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import copy
import gzip
import bz2
import zstandard as zstd
from datetime import datetime
from decimal import Decimal
from tempfile import mkstemp
Expand Down Expand Up @@ -375,6 +376,7 @@ def flush_records(stream, records_to_load, row_count, db_sync, compression=None,
slices = slices or 1
use_gzip = compression == "gzip"
use_bzip2 = compression == "bzip2"
use_zstd = compression == "zstd"

if temp_dir:
temp_dir = os.path.expanduser(temp_dir)
Expand All @@ -388,6 +390,9 @@ def flush_records(stream, records_to_load, row_count, db_sync, compression=None,
elif use_bzip2:
open_method = bz2.open
file_extension = file_extension + ".bz2"
elif use_zstd:
open_method = zstd.open
file_extension = file_extension + ".zstd"

if not isinstance(slices, int):
raise Exception("The provided configuration value 'slices' was not an integer")
Expand Down
33 changes: 21 additions & 12 deletions target_redshift/db_sync.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
import collections

import sys

# pylint: disable=no-name-in-module
if sys.version_info.major == 3 and sys.version_info.minor >= 10:
from collections.abc import MutableMapping
else:
from collections import MutableMapping
# pylint: enable=no-name-in-module

import itertools
import json
import os
import re
import sys
import time

import boto3
import psycopg2
import psycopg2.extras

import inflection
import psycopg
from singer import get_logger


DEFAULT_VARCHAR_LENGTH = 10000
SHORT_VARCHAR_LENGTH = 256
LONG_VARCHAR_LENGTH = 65535
Expand Down Expand Up @@ -51,7 +56,9 @@ def column_type(schema_property, with_length=True):
varchar_length = DEFAULT_VARCHAR_LENGTH
if schema_property.get('maxLength', 0) > varchar_length:
varchar_length = LONG_VARCHAR_LENGTH
if 'object' in property_type or 'array' in property_type:
if 'object' in property_type:
column_type = 'super'
if 'array' in property_type:
column_type = 'character varying'
varchar_length = LONG_VARCHAR_LENGTH

Expand Down Expand Up @@ -161,7 +168,7 @@ def flatten_record(d, flatten_schema=None, parent_key=[], sep='__', level=0, max
items = []
for k, v in d.items():
new_key = flatten_key(k, parent_key, sep)
if isinstance(v, collections.MutableMapping) and level < max_level:
if isinstance(v, MutableMapping) and level < max_level:
items.extend(flatten_record(v, flatten_schema, parent_key + [k], sep=sep, level=level + 1, max_level=max_level).items())
else:
items.append((new_key, json.dumps(v) if _should_json_dump_value(k, v, flatten_schema) else v))
Expand Down Expand Up @@ -326,12 +333,12 @@ def open_connection(self):
self.connection_config['port']
)

return psycopg2.connect(conn_string)
return psycopg.connect(conn_string)

def query(self, query, params=None):
self.logger.debug("Running query: {}".format(query))
with self.open_connection() as connection:
with connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
with connection.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute(
query,
params
Expand Down Expand Up @@ -415,7 +422,7 @@ def load_csv(self, s3_key, count, size_bytes, compression=False):
]

with self.open_connection() as connection:
with connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
with connection.cursor(row_factory=psycopg.rows.dict_row) as cur:
inserts = 0
updates = 0

Expand Down Expand Up @@ -447,6 +454,8 @@ def load_csv(self, s3_key, count, size_bytes, compression=False):
compression_option = " GZIP"
elif compression == "bzip2":
compression_option = " BZIP2"
elif compression == "zstd":
compression_option = " ZSTD"
else:
compression_option = ""

Expand Down Expand Up @@ -737,5 +746,5 @@ def sync_table(self):
self.logger.info("Table '{}' does not exist. Creating...".format(table_name_with_schema))
self.create_table_and_grant_privilege()
else:
self.logger.info("Table '{}' exists".format(self.schema_name))
self.logger.info("Table '{}' exists".format(table_name_with_schema))
self.update_columns()
9 changes: 9 additions & 0 deletions tests/integration/test_target_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,15 @@ def test_loading_tables_with_bz2_compression(self):

self.assert_three_streams_are_loaded_in_redshift()

def test_loading_tables_with_zstd_compression(self):
"""Loading multiple tables from the same input tap with various columns types and zstd compression"""
tap_lines = test_utils.get_test_tap_lines("messages-with-three-streams.json")

self.config["compression"] = "zstd"
target_redshift.persist_lines(self.config, tap_lines)

self.assert_three_streams_are_loaded_in_redshift()

def test_loading_tables_with_hard_delete(self):
"""Loading multiple tables from the same input tap with deleted rows"""
tap_lines = test_utils.get_test_tap_lines("messages-with-three-streams.json")
Expand Down
5 changes: 2 additions & 3 deletions tests/unit/test_db_sync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pytest
import target_redshift


Expand Down Expand Up @@ -72,7 +71,7 @@ def test_column_type_mapping(self):
json_bool = {"type": ["boolean"] }
json_obj = {"type": ["object"] }
json_arr = {"type": ["array"] }

# Mapping from JSON schema types ot Redshift column types
assert mapper(json_str) == 'character varying(10000)'
assert mapper(json_str_or_null) == 'character varying(10000)'
Expand All @@ -84,7 +83,7 @@ def test_column_type_mapping(self):
assert mapper(json_int) == 'numeric'
assert mapper(json_int_or_str) == 'character varying(65535)'
assert mapper(json_bool) == 'boolean'
assert mapper(json_obj) == 'character varying(65535)'
assert mapper(json_obj) == 'super'
assert mapper(json_arr) == 'character varying(65535)'


Expand Down
Loading