From d674a8ed41764f6d37f8a6f1981c8dea8c2530ae Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 27 Sep 2024 13:12:56 -0400 Subject: [PATCH] Always write to BQ from global window --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- .../beam_PostCommit_Python_ValidatesContainer_Dataflow.json | 3 ++- sdks/python/apache_beam/io/gcp/bigquery.py | 3 +++ sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py | 4 ++-- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 30ee463ad4e9..1eb60f6e4959 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 3 } diff --git a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json index d6c608f6daba..4897480d69ad 100644 --- a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json @@ -1,3 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 } \ No newline at end of file diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index b897df2d32ab..a5f84dc441e2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1864,6 +1864,9 @@ def _restore_table_ref(sharded_table_ref_elems_kv): return ( tagged_data | 'FromHashableTableRef' >> beam.Map(_restore_table_ref) + # Use global window for writes since we're outputting back into the + # global window. + | 'Window into Global Window' >> beam.WindowInto(GlobalWindows()) | 'StreamInsertRows' >> ParDo( bigquery_write_fn, *self.schema_side_inputs).with_outputs( BigQueryWriteFn.FAILED_ROWS, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index b0140793cf79..bb5c36a3e9b9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -470,7 +470,7 @@ def test_big_query_write_insert_non_transient_api_call_error(self): input_data = [{ 'number': 1, 'str': 'some_string', - }] + }]*500 table_schema = { "fields": [{ @@ -483,7 +483,7 @@ def test_big_query_write_insert_non_transient_api_call_error(self): bq_result_errors = [({ 'number': 1, 'str': 'some_string', - }, "Not Found")] + }, "Not Found")]*500 args = self.test_pipeline.get_full_options_as_args()