diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index e3d6056a5de9..b26833333238 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 83c439ca8ddd..cdf5d874d7fa 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -923,6 +923,7 @@ def _normalize(coder_proto): for tag, pcoll in self._expanded_transform.outputs.items() }, + annotations=self._expanded_transform.annotations, environment_id=self._expanded_transform.environment_id) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index 0dfa2aa19c51..a09203f313eb 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -16,15 +16,13 @@ # import os -import secrets -import shutil -import tempfile import time import unittest import pytest import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -35,17 +33,15 @@ "EXPANSION_JARS environment var is not provided, " "indicating that jars have not been built") class ManagedIcebergIT(unittest.TestCase): - def setUp(self): - self._tempdir = tempfile.mkdtemp() - if not os.path.exists(self._tempdir): - os.mkdir(self._tempdir) - test_warehouse_name = 'test_warehouse_%d_%s' % ( - int(time.time()), secrets.token_hex(3)) - self.warehouse_path = os.path.join(self._tempdir, test_warehouse_name) - os.mkdir(self.warehouse_path) + WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java" - def tearDown(self): - shutil.rmtree(self._tempdir, ignore_errors=False) + def setUp(self): + self.test_pipeline = TestPipeline(is_integration_test=True) + self.args = self.test_pipeline.get_full_options_as_args() + self.args.extend([ + '--experiments=enable_managed_transforms', + '--dataflow_endpoint=https://dataflow-staging.sandbox.googleapis.com', + ]) def _create_row(self, num: int): return beam.Row( @@ -57,24 +53,24 @@ def _create_row(self, num: int): def test_write_read_pipeline(self): iceberg_config = { - "table": "test.write_read", + "table": "test_iceberg_write_read.test_" + str(int(time.time())), "catalog_name": "default", "catalog_properties": { "type": "hadoop", - "warehouse": f"file://{self.warehouse_path}", + "warehouse": self.WAREHOUSE, } } rows = [self._create_row(i) for i in range(100)] expected_dicts = [row.as_dict() for row in rows] - with beam.Pipeline() as write_pipeline: + with beam.Pipeline(argv=self.args) as write_pipeline: _ = ( write_pipeline | beam.Create(rows) | beam.managed.Write(beam.managed.ICEBERG, config=iceberg_config)) - with beam.Pipeline() as read_pipeline: + with beam.Pipeline(argv=self.args) as read_pipeline: output_dicts = ( read_pipeline | beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config)