Skip to content

Commit

Permalink
Output record count metric from batch files insert
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenFrankel committed Oct 3, 2024
1 parent 2b69ad8 commit 359db5c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
6 changes: 4 additions & 2 deletions target_snowflake/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ def merge_from_stage(
key_properties=key_properties,
)
self.logger.debug("Merging with SQL: %s", merge_statement)
conn.execute(merge_statement, **kwargs)
result = conn.execute(merge_statement, **kwargs)
return result.rowcount

def copy_from_stage(
self,
Expand All @@ -578,7 +579,8 @@ def copy_from_stage(
file_format=file_format,
)
self.logger.debug("Copying with SQL: %s", copy_statement)
conn.execute(copy_statement, **kwargs)
result = conn.execute(copy_statement, **kwargs)
return result.rowcount

def drop_file_format(self, file_format: str) -> None:
"""Drop a file format in the schema.
Expand Down
7 changes: 5 additions & 2 deletions target_snowflake/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def insert_batch_files_via_internal_stage(

if self.key_properties:
# merge into destination table
self.connector.merge_from_stage(
record_count = self.connector.merge_from_stage(
full_table_name=full_table_name,
schema=self.schema,
sync_id=sync_id,
Expand All @@ -199,13 +199,16 @@ def insert_batch_files_via_internal_stage(
)

else:
self.connector.copy_from_stage(
record_count = self.connector.copy_from_stage(
full_table_name=full_table_name,
schema=self.schema,
sync_id=sync_id,
file_format=file_format,
)

with self.record_counter_metric as counter:
counter.increment(record_count)

finally:
self.logger.debug("Cleaning up after batch processing")
self.connector.drop_file_format(file_format=file_format)
Expand Down

0 comments on commit 359db5c

Please sign in to comment.