diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index 8efdc6e..3b0b772 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -298,6 +298,19 @@ def _get_put_statement(self, sync_id: str, file_uri: str) -> Tuple[text, dict]: """Get Snowflake PUT statement.""" return (text(f"put :file_uri '@~/target-snowflake/{sync_id}'"), {}) + @staticmethod + def _format_column_selections(column_selections: dict, format: str) -> str: + if format == "json_casting": + return ', '.join( + [ + f"$1:{col['clean_property_name']}::{col['sql_type']} as {col['clean_alias']}" for col in column_selections + ] + ) + elif format == "col_alias": + return f"({', '.join([col['clean_alias'] for col in column_selections])})" + else: + raise NotImplementedError(f"Column format not implemented: {format}") + def _get_column_selections(self, schema: dict, formatter: SnowflakeIdentifierPreparer) -> list: column_selections = [] for property_name, property_def in schema["properties"].items(): @@ -306,7 +319,11 @@ def _get_column_selections(self, schema: dict, formatter: SnowflakeIdentifierPre if '"' in clean_property_name: clean_alias = clean_property_name.upper() column_selections.append( - f"$1:{clean_property_name}::{self.to_sql_type(property_def)} as {clean_alias}" + { + "clean_property_name": clean_property_name, + "sql_type": self.to_sql_type(property_def), + "clean_alias": clean_alias, + } ) return column_selections @@ -317,6 +334,7 @@ def _get_merge_from_stage_statement( formatter = SnowflakeIdentifierPreparer(SnowflakeDialect()) column_selections = self._get_column_selections(schema, formatter) + json_casting_selects = self._format_column_selections(column_selections, "json_casting") # use UPPER from here onwards formatted_properties = [formatter.format_collation(col) for col in schema["properties"].keys()] @@ -336,7 +354,7 @@ def _get_merge_from_stage_statement( return ( text( f"merge into {full_table_name} d using " - + f"(select {', '.join(column_selections)} from '@~/target-snowflake/{sync_id}'" + + f"(select {json_casting_selects} from '@~/target-snowflake/{sync_id}'" + f"(file_format => {file_format}) {dedup}) s " + f"on {join_expr} " + f"when matched then update set {matched_clause} " @@ -350,10 +368,12 @@ def _get_copy_statement(self, full_table_name, schema, sync_id, file_format): """Get Snowflake COPY statement.""" formatter = SnowflakeIdentifierPreparer(SnowflakeDialect()) column_selections = self._get_column_selections(schema, formatter) + json_casting_selects = self._format_column_selections(column_selections, "json_casting") + col_alias_selects = self._format_column_selections(column_selections, "col_alias") return ( text( - f"copy into {full_table_name} from " - + f"(select {', '.join(column_selections)} from " + f"copy into {full_table_name} {col_alias_selects} from " + + f"(select {json_casting_selects} from " + f"'@~/target-snowflake/{sync_id}')" + f"file_format = (format_name='{file_format}')" ), diff --git a/tests/core.py b/tests/core.py index b935c69..683a8e8 100644 --- a/tests/core.py +++ b/tests/core.py @@ -448,6 +448,31 @@ def validate(self) -> None: assert column.name in expected_types isinstance(column.type, expected_types[column.name]) + + +class SnowflakeTargetColumnOrderMismatch(TargetFileTestTemplate): + + name = "column_order_mismatch" + + def setup(self) -> None: + connector = self.target.default_sink_class.connector_class(self.target.config) + table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.{self.name}".upper() + # Seed the 2 columns from tap schema and an unused third column to assert explicit inserts are working + connector.connection.execute( + f""" + CREATE OR REPLACE TABLE {table} ( + COL1 VARCHAR(16777216), + COL3 TIMESTAMP_NTZ(9), + COL2 BOOLEAN + ) + """ + ) + + @property + def singer_filepath(self) -> Path: + current_dir = Path(__file__).resolve().parent + return current_dir / "target_test_streams" / f"{self.name}.singer" + target_tests = TestSuite( kind="target", tests=[ @@ -475,5 +500,6 @@ def validate(self) -> None: SnowflakeTargetExistingTable, SnowflakeTargetExistingTableAlter, SnowflakeTargetTypeEdgeCasesTest, + SnowflakeTargetColumnOrderMismatch, ], ) diff --git a/tests/target_test_streams/column_order_mismatch.singer b/tests/target_test_streams/column_order_mismatch.singer new file mode 100644 index 0000000..e82ec39 --- /dev/null +++ b/tests/target_test_streams/column_order_mismatch.singer @@ -0,0 +1,2 @@ +{"type": "SCHEMA", "stream": "column_order_mismatch", "schema": {"properties": {"COL2": {"type": "boolean"}, "COL1": {"type": ["string", "null"]}}}, "key_properties": [], "bookmark_properties": []} +{"type": "RECORD", "stream": "column_order_mismatch", "record": {"COL2": true, "COL1": "foo"}}