From e7e140ec2169d9908472e551e33f5c9c86bac7dc Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 15 Sep 2023 19:44:59 -0400 Subject: [PATCH 1/4] Implement unittest --- .../portability/fn_api_runner/translations.py | 1 + .../fn_api_runner/translations_test.py | 29 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py index 13f1dc635965..1f4d78581d7c 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py @@ -1367,6 +1367,7 @@ def lifted_stages(stage): payload=transform.spec.payload), inputs={'in': grouped_pcoll_id}, outputs={'out': merged_pcoll_id}, + annotations=transform.annotations, environment_id=transform.environment_id)) yield make_stage( diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py index 144f067900f3..d52ec65afc52 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py @@ -384,6 +384,35 @@ def expand(self, pcoll): 'multiple-small-combines/min-4-globally/CombinePerKey', optimized_stage_names) + def test_combineperkey_annotation_propagation(self): + """ + Test that the CPK component transforms inherit annotations from the + source CPK + """ + class MyCombinePerKey(beam.CombinePerKey): + def annotations(self): + return {"my_annotation":b""} + # Verify the results are as expected. + with TestPipeline() as pipeline: + pipeline | beam.Create([(1, 2)]) | MyCombinePerKey(min) + + # Verify the optimization is as expected. + proto = pipeline.to_runner_api( + default_environment=environments.EmbeddedPythonEnvironment( + capabilities=environments.python_sdk_capabilities())) + optimized = translations.optimize_pipeline( + proto, + phases=[translations.lift_combiners], + known_runner_urns=frozenset(), + partial=True) + for transform_id in [ + 'MyCombinePerKey(min)/Precombine', + 'MyCombinePerKey(min)/Group', + 'MyCombinePerKey(min)/Merge', + 'MyCombinePerKey(min)/ExtractOutputs']: + assert ("my_annotation" in + optimized.components.transforms[transform_id].annotations) + def test_conditionally_packed_combiners(self): class RecursiveCombine(beam.PTransform): def __init__(self, labels): From f37d86014ae0e6ec0b671bad5c62e29c611348d7 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 15 Sep 2023 19:46:25 -0400 Subject: [PATCH 2/4] Implement fix --- .../runners/portability/fn_api_runner/translations.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py index 1f4d78581d7c..cc1494fc7ae2 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py @@ -1346,6 +1346,7 @@ def lifted_stages(stage): payload=transform.spec.payload), inputs=transform.inputs, outputs={'out': precombined_pcoll_id}, + annotations=transform.annotations, environment_id=transform.environment_id)) yield make_stage( @@ -1355,6 +1356,7 @@ def lifted_stages(stage): spec=beam_runner_api_pb2.FunctionSpec( urn=common_urns.primitives.GROUP_BY_KEY.urn), inputs={'in': precombined_pcoll_id}, + annotations=transform.annotations, outputs={'out': grouped_pcoll_id})) yield make_stage( @@ -1380,6 +1382,7 @@ def lifted_stages(stage): payload=transform.spec.payload), inputs={'in': merged_pcoll_id}, outputs=transform.outputs, + annotations=transform.annotations, environment_id=transform.environment_id)) def unlifted_stages(stage): From 4a5a0076f35a0ff46a7df7de2e311e0c43014598 Mon Sep 17 00:00:00 2001 From: Hai Joey Tran Date: Fri, 15 Sep 2023 19:55:42 -0400 Subject: [PATCH 3/4] Update translations_test.py --- .../runners/portability/fn_api_runner/translations_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py index d52ec65afc52..b06ebcc4b2f4 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py @@ -392,11 +392,11 @@ def test_combineperkey_annotation_propagation(self): class MyCombinePerKey(beam.CombinePerKey): def annotations(self): return {"my_annotation":b""} - # Verify the results are as expected. with TestPipeline() as pipeline: pipeline | beam.Create([(1, 2)]) | MyCombinePerKey(min) - # Verify the optimization is as expected. + # Verify the annotations are propagated to the split up + # CPK transforms proto = pipeline.to_runner_api( default_environment=environments.EmbeddedPythonEnvironment( capabilities=environments.python_sdk_capabilities())) From b3a8e1531cabf42beab6fda79bce533d4b4ddfd5 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 15 Sep 2023 22:37:36 -0400 Subject: [PATCH 4/4] yapf --- .../fn_api_runner/translations_test.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py index b06ebcc4b2f4..3ff2421e6265 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py @@ -391,9 +391,10 @@ def test_combineperkey_annotation_propagation(self): """ class MyCombinePerKey(beam.CombinePerKey): def annotations(self): - return {"my_annotation":b""} + return {"my_annotation": b""} + with TestPipeline() as pipeline: - pipeline | beam.Create([(1, 2)]) | MyCombinePerKey(min) + _ = pipeline | beam.Create([(1, 2)]) | MyCombinePerKey(min) # Verify the annotations are propagated to the split up # CPK transforms @@ -405,13 +406,13 @@ def annotations(self): phases=[translations.lift_combiners], known_runner_urns=frozenset(), partial=True) - for transform_id in [ - 'MyCombinePerKey(min)/Precombine', - 'MyCombinePerKey(min)/Group', - 'MyCombinePerKey(min)/Merge', - 'MyCombinePerKey(min)/ExtractOutputs']: - assert ("my_annotation" in - optimized.components.transforms[transform_id].annotations) + for transform_id in ['MyCombinePerKey(min)/Precombine', + 'MyCombinePerKey(min)/Group', + 'MyCombinePerKey(min)/Merge', + 'MyCombinePerKey(min)/ExtractOutputs']: + assert ( + "my_annotation" in + optimized.components.transforms[transform_id].annotations) def test_conditionally_packed_combiners(self): class RecursiveCombine(beam.PTransform):