Skip to content

Commit

Permalink
fix: handle column order mismatches (#117)
Browse files Browse the repository at this point in the history
Closes #116

I created a new test that creates a table then sends singer records that
are in a different order. The current implementation causes the test to
fail because
https://github.com/MeltanoLabs/target-snowflake/actions/runs/6160685807/job/16718219357
its trying to put a string in a boolean column. After implementing
Derek's suggested fix to explicitly define column names in the copy into
statement, the tests pass.

cc @visch
  • Loading branch information
pnadolny13 authored Sep 12, 2023
1 parent f560958 commit 8184ffc
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 4 deletions.
28 changes: 24 additions & 4 deletions target_snowflake/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,19 @@ def _get_put_statement(self, sync_id: str, file_uri: str) -> Tuple[text, dict]:
"""Get Snowflake PUT statement."""
return (text(f"put :file_uri '@~/target-snowflake/{sync_id}'"), {})

@staticmethod
def _format_column_selections(column_selections: dict, format: str) -> str:
if format == "json_casting":
return ', '.join(
[
f"$1:{col['clean_property_name']}::{col['sql_type']} as {col['clean_alias']}" for col in column_selections
]
)
elif format == "col_alias":
return f"({', '.join([col['clean_alias'] for col in column_selections])})"
else:
raise NotImplementedError(f"Column format not implemented: {format}")

def _get_column_selections(self, schema: dict, formatter: SnowflakeIdentifierPreparer) -> list:
column_selections = []
for property_name, property_def in schema["properties"].items():
Expand All @@ -306,7 +319,11 @@ def _get_column_selections(self, schema: dict, formatter: SnowflakeIdentifierPre
if '"' in clean_property_name:
clean_alias = clean_property_name.upper()
column_selections.append(
f"$1:{clean_property_name}::{self.to_sql_type(property_def)} as {clean_alias}"
{
"clean_property_name": clean_property_name,
"sql_type": self.to_sql_type(property_def),
"clean_alias": clean_alias,
}
)
return column_selections

Expand All @@ -317,6 +334,7 @@ def _get_merge_from_stage_statement(

formatter = SnowflakeIdentifierPreparer(SnowflakeDialect())
column_selections = self._get_column_selections(schema, formatter)
json_casting_selects = self._format_column_selections(column_selections, "json_casting")

# use UPPER from here onwards
formatted_properties = [formatter.format_collation(col) for col in schema["properties"].keys()]
Expand All @@ -336,7 +354,7 @@ def _get_merge_from_stage_statement(
return (
text(
f"merge into {full_table_name} d using "
+ f"(select {', '.join(column_selections)} from '@~/target-snowflake/{sync_id}'"
+ f"(select {json_casting_selects} from '@~/target-snowflake/{sync_id}'"
+ f"(file_format => {file_format}) {dedup}) s "
+ f"on {join_expr} "
+ f"when matched then update set {matched_clause} "
Expand All @@ -350,10 +368,12 @@ def _get_copy_statement(self, full_table_name, schema, sync_id, file_format):
"""Get Snowflake COPY statement."""
formatter = SnowflakeIdentifierPreparer(SnowflakeDialect())
column_selections = self._get_column_selections(schema, formatter)
json_casting_selects = self._format_column_selections(column_selections, "json_casting")
col_alias_selects = self._format_column_selections(column_selections, "col_alias")
return (
text(
f"copy into {full_table_name} from "
+ f"(select {', '.join(column_selections)} from "
f"copy into {full_table_name} {col_alias_selects} from "
+ f"(select {json_casting_selects} from "
+ f"'@~/target-snowflake/{sync_id}')"
+ f"file_format = (format_name='{file_format}')"
),
Expand Down
26 changes: 26 additions & 0 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,31 @@ def validate(self) -> None:
assert column.name in expected_types
isinstance(column.type, expected_types[column.name])



class SnowflakeTargetColumnOrderMismatch(TargetFileTestTemplate):

name = "column_order_mismatch"

def setup(self) -> None:
connector = self.target.default_sink_class.connector_class(self.target.config)
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.{self.name}".upper()
# Seed the 2 columns from tap schema and an unused third column to assert explicit inserts are working
connector.connection.execute(
f"""
CREATE OR REPLACE TABLE {table} (
COL1 VARCHAR(16777216),
COL3 TIMESTAMP_NTZ(9),
COL2 BOOLEAN
)
"""
)

@property
def singer_filepath(self) -> Path:
current_dir = Path(__file__).resolve().parent
return current_dir / "target_test_streams" / f"{self.name}.singer"

target_tests = TestSuite(
kind="target",
tests=[
Expand Down Expand Up @@ -475,5 +500,6 @@ def validate(self) -> None:
SnowflakeTargetExistingTable,
SnowflakeTargetExistingTableAlter,
SnowflakeTargetTypeEdgeCasesTest,
SnowflakeTargetColumnOrderMismatch,
],
)
2 changes: 2 additions & 0 deletions tests/target_test_streams/column_order_mismatch.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"type": "SCHEMA", "stream": "column_order_mismatch", "schema": {"properties": {"COL2": {"type": "boolean"}, "COL1": {"type": ["string", "null"]}}}, "key_properties": [], "bookmark_properties": []}
{"type": "RECORD", "stream": "column_order_mismatch", "record": {"COL2": true, "COL1": "foo"}}

0 comments on commit 8184ffc

Please sign in to comment.