diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index 01910c4..74f3e73 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -553,7 +553,8 @@ def merge_from_stage( key_properties=key_properties, ) self.logger.debug("Merging with SQL: %s", merge_statement) - conn.execute(merge_statement, **kwargs) + result = conn.execute(merge_statement, **kwargs) + return result.rowcount def copy_from_stage( self, @@ -578,7 +579,8 @@ def copy_from_stage( file_format=file_format, ) self.logger.debug("Copying with SQL: %s", copy_statement) - conn.execute(copy_statement, **kwargs) + result = conn.execute(copy_statement, **kwargs) + return result.rowcount def drop_file_format(self, file_format: str) -> None: """Drop a file format in the schema. diff --git a/target_snowflake/sinks.py b/target_snowflake/sinks.py index 4c2313b..5fc1c5d 100644 --- a/target_snowflake/sinks.py +++ b/target_snowflake/sinks.py @@ -190,7 +190,7 @@ def insert_batch_files_via_internal_stage( if self.key_properties: # merge into destination table - self.connector.merge_from_stage( + record_count = self.connector.merge_from_stage( full_table_name=full_table_name, schema=self.schema, sync_id=sync_id, @@ -199,13 +199,16 @@ def insert_batch_files_via_internal_stage( ) else: - self.connector.copy_from_stage( + record_count = self.connector.copy_from_stage( full_table_name=full_table_name, schema=self.schema, sync_id=sync_id, file_format=file_format, ) + with self.record_counter_metric as counter: + counter.increment(record_count) + finally: self.logger.debug("Cleaning up after batch processing") self.connector.drop_file_format(file_format=file_format)