Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm committed Sep 30, 2024
1 parent c19dc93 commit bc0dde5
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,8 +1671,7 @@ def _flush_batch(self, destination):
len(rows_and_insert_ids_with_windows))
self.batch_size_metric.update(len(rows_and_insert_ids_with_windows))

rows_and_insert_ids = [r[0] for r in rows_and_insert_ids_with_windows]
window_values = [r[1] for r in rows_and_insert_ids_with_windows]
rows_and_insert_ids, window_values = zip(*rows_and_insert_ids_with_windows)
rows = [r[0] for r in rows_and_insert_ids]
if self.ignore_insert_ids:
insert_ids = [None for r in rows_and_insert_ids]
Expand All @@ -1694,6 +1693,7 @@ def _flush_batch(self, destination):
failed_rows = [(
rows[entry['index']], entry["errors"], window_values[entry['index']])
for entry in errors]
failed_insert_ids = [insert_ids[entry['index']] for entry in errors]
retry_backoff = next(self._backoff_calculator, None)

# If retry_backoff is None, then we will not retry and must log.
Expand Down Expand Up @@ -1724,7 +1724,11 @@ def _flush_batch(self, destination):
_LOGGER.info(
'Sleeping %s seconds before retrying insertion.', retry_backoff)
time.sleep(retry_backoff)
# We can now safely discard all information about successful rows and
# just focus on the failed ones
rows = [fr[0] for fr in failed_rows]
window_values = [fr[2] for fr in failed_rows]
insert_ids = failed_insert_ids
self._throttled_secs.inc(retry_backoff)

self._total_buffered_rows -= len(self._rows_buffer[destination])
Expand Down

0 comments on commit bc0dde5

Please sign in to comment.