From 895d69c43ee5378880bd495c5e2ba88d8e738d1c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 15 Nov 2024 19:42:40 -0500 Subject: [PATCH 1/9] Enable Python Managed Tests on Dataflow --- .../apache_beam/transforms/managed_iceberg_it_test.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index 0dfa2aa19c51..ff46e02a523c 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -27,6 +27,7 @@ import apache_beam as beam from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.testing.test_pipeline import TestPipeline @pytest.mark.uses_io_java_expansion_service @@ -36,6 +37,12 @@ "indicating that jars have not been built") class ManagedIcebergIT(unittest.TestCase): def setUp(self): + self.test_pipeline = TestPipeline(is_integration_test=True) + self.args = self.test_pipeline.get_full_options_as_args() + self.args.extend([ + '--experiments=enable_managed_transforms', + '--dataflow_endpoint=https://dataflow-staging.sandbox.googleapis.com' + ]) self._tempdir = tempfile.mkdtemp() if not os.path.exists(self._tempdir): os.mkdir(self._tempdir) @@ -68,13 +75,13 @@ def test_write_read_pipeline(self): rows = [self._create_row(i) for i in range(100)] expected_dicts = [row.as_dict() for row in rows] - with beam.Pipeline() as write_pipeline: + with beam.Pipeline(options=self.args) as write_pipeline: _ = ( write_pipeline | beam.Create(rows) | beam.managed.Write(beam.managed.ICEBERG, config=iceberg_config)) - with beam.Pipeline() as read_pipeline: + with beam.Pipeline(options=self.args) as read_pipeline: output_dicts = ( read_pipeline | beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config) From 4b2694bb4a96de6b62f20ad46c50bb97362bcad6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 15 Nov 2024 20:07:05 -0500 Subject: [PATCH 2/9] trigger tests --- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } From 4a12d936f247b9767e54de5b5a1c3b350d7063f0 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 15 Nov 2024 20:24:37 -0500 Subject: [PATCH 3/9] fix --- sdks/python/apache_beam/transforms/managed_iceberg_it_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index ff46e02a523c..2e259f22c8bf 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -75,13 +75,13 @@ def test_write_read_pipeline(self): rows = [self._create_row(i) for i in range(100)] expected_dicts = [row.as_dict() for row in rows] - with beam.Pipeline(options=self.args) as write_pipeline: + with beam.Pipeline(argv=self.args) as write_pipeline: _ = ( write_pipeline | beam.Create(rows) | beam.managed.Write(beam.managed.ICEBERG, config=iceberg_config)) - with beam.Pipeline(options=self.args) as read_pipeline: + with beam.Pipeline(argv=self.args) as read_pipeline: output_dicts = ( read_pipeline | beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config) From 00568413619dd2232c2c7fd71d278a70f0618e0a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 15 Nov 2024 21:17:11 -0500 Subject: [PATCH 4/9] copy over annotations --- sdks/python/apache_beam/transforms/external.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 83c439ca8ddd..cdf5d874d7fa 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -923,6 +923,7 @@ def _normalize(coder_proto): for tag, pcoll in self._expanded_transform.outputs.items() }, + annotations=self._expanded_transform.annotations, environment_id=self._expanded_transform.environment_id) From 8e687b8f4b487766624b08b88440903a03ee1285 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 16 Nov 2024 20:09:06 -0500 Subject: [PATCH 5/9] write to GCS --- .../transforms/managed_iceberg_it_test.py | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index 2e259f22c8bf..773bf116ae6c 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -16,9 +16,6 @@ # import os -import secrets -import shutil -import tempfile import time import unittest @@ -41,18 +38,8 @@ def setUp(self): self.args = self.test_pipeline.get_full_options_as_args() self.args.extend([ '--experiments=enable_managed_transforms', - '--dataflow_endpoint=https://dataflow-staging.sandbox.googleapis.com' + '--dataflow_endpoint=https://dataflow-staging.sandbox.googleapis.com', ]) - self._tempdir = tempfile.mkdtemp() - if not os.path.exists(self._tempdir): - os.mkdir(self._tempdir) - test_warehouse_name = 'test_warehouse_%d_%s' % ( - int(time.time()), secrets.token_hex(3)) - self.warehouse_path = os.path.join(self._tempdir, test_warehouse_name) - os.mkdir(self.warehouse_path) - - def tearDown(self): - shutil.rmtree(self._tempdir, ignore_errors=False) def _create_row(self, num: int): return beam.Row( @@ -64,11 +51,11 @@ def _create_row(self, num: int): def test_write_read_pipeline(self): iceberg_config = { - "table": "test.write_read", + "table": "test_iceberg_write_read.test_" + str(int(time.time())), "catalog_name": "default", "catalog_properties": { "type": "hadoop", - "warehouse": f"file://{self.warehouse_path}", + "warehouse": "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java", } } From 30bc3a70cad086d06abcf1d92adb819c4be9c9cc Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 18 Nov 2024 10:17:21 -0500 Subject: [PATCH 6/9] fix lint error --- sdks/python/apache_beam/transforms/managed_iceberg_it_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index 773bf116ae6c..404ebaf3a925 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -33,6 +33,7 @@ "EXPANSION_JARS environment var is not provided, " "indicating that jars have not been built") class ManagedIcebergIT(unittest.TestCase): + WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java" def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.args = self.test_pipeline.get_full_options_as_args() @@ -55,7 +56,7 @@ def test_write_read_pipeline(self): "catalog_name": "default", "catalog_properties": { "type": "hadoop", - "warehouse": "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java", + "warehouse": self.WAREHOUSE, } } From 4c031950d91691a5deace7e2c70ef7218e5ee517 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 18 Nov 2024 10:32:29 -0500 Subject: [PATCH 7/9] fix format error --- sdks/python/apache_beam/transforms/managed_iceberg_it_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index 404ebaf3a925..6607971c434a 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -34,6 +34,7 @@ "indicating that jars have not been built") class ManagedIcebergIT(unittest.TestCase): WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java" + def setUp(self): self.test_pipeline = TestPipeline(is_integration_test=True) self.args = self.test_pipeline.get_full_options_as_args() From 8649e6eb46a8f4820eb1d553b7c4f625c02530e7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 18 Nov 2024 10:37:44 -0500 Subject: [PATCH 8/9] trigger direct runner tests --- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index e3d6056a5de9..b26833333238 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } From 93947751e30a6bb6dd2a6207ddfaea35b1cb7dd2 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 18 Nov 2024 11:05:02 -0500 Subject: [PATCH 9/9] lint --- sdks/python/apache_beam/transforms/managed_iceberg_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index 6607971c434a..a09203f313eb 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -22,9 +22,9 @@ import pytest import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam.testing.test_pipeline import TestPipeline @pytest.mark.uses_io_java_expansion_service