Skip to content

Commit

Permalink
Enable Dataflow managed service for Python tests (#33134)
Browse files Browse the repository at this point in the history
* Enable Python Managed Tests on Dataflow

* trigger tests

* fix

* copy over annotations

* write to GCS

* fix lint error

* fix format error

* trigger direct runner tests

* lint
  • Loading branch information
ahmedabu98 authored Nov 18, 2024
1 parent 366d89f commit de6965a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
1 change: 1 addition & 0 deletions sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ def _normalize(coder_proto):
for tag,
pcoll in self._expanded_transform.outputs.items()
},
annotations=self._expanded_transform.annotations,
environment_id=self._expanded_transform.environment_id)


Expand Down
30 changes: 13 additions & 17 deletions sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
#

import os
import secrets
import shutil
import tempfile
import time
import unittest

import pytest

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

Expand All @@ -35,17 +33,15 @@
"EXPANSION_JARS environment var is not provided, "
"indicating that jars have not been built")
class ManagedIcebergIT(unittest.TestCase):
def setUp(self):
self._tempdir = tempfile.mkdtemp()
if not os.path.exists(self._tempdir):
os.mkdir(self._tempdir)
test_warehouse_name = 'test_warehouse_%d_%s' % (
int(time.time()), secrets.token_hex(3))
self.warehouse_path = os.path.join(self._tempdir, test_warehouse_name)
os.mkdir(self.warehouse_path)
WAREHOUSE = "gs://temp-storage-for-end-to-end-tests/xlang-python-using-java"

def tearDown(self):
shutil.rmtree(self._tempdir, ignore_errors=False)
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.args = self.test_pipeline.get_full_options_as_args()
self.args.extend([
'--experiments=enable_managed_transforms',
'--dataflow_endpoint=https://dataflow-staging.sandbox.googleapis.com',
])

def _create_row(self, num: int):
return beam.Row(
Expand All @@ -57,24 +53,24 @@ def _create_row(self, num: int):

def test_write_read_pipeline(self):
iceberg_config = {
"table": "test.write_read",
"table": "test_iceberg_write_read.test_" + str(int(time.time())),
"catalog_name": "default",
"catalog_properties": {
"type": "hadoop",
"warehouse": f"file://{self.warehouse_path}",
"warehouse": self.WAREHOUSE,
}
}

rows = [self._create_row(i) for i in range(100)]
expected_dicts = [row.as_dict() for row in rows]

with beam.Pipeline() as write_pipeline:
with beam.Pipeline(argv=self.args) as write_pipeline:
_ = (
write_pipeline
| beam.Create(rows)
| beam.managed.Write(beam.managed.ICEBERG, config=iceberg_config))

with beam.Pipeline() as read_pipeline:
with beam.Pipeline(argv=self.args) as read_pipeline:
output_dicts = (
read_pipeline
| beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config)
Expand Down

0 comments on commit de6965a

Please sign in to comment.