diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py index b06ebcc4b2f4..3ff2421e6265 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py @@ -391,9 +391,10 @@ def test_combineperkey_annotation_propagation(self): """ class MyCombinePerKey(beam.CombinePerKey): def annotations(self): - return {"my_annotation":b""} + return {"my_annotation": b""} + with TestPipeline() as pipeline: - pipeline | beam.Create([(1, 2)]) | MyCombinePerKey(min) + _ = pipeline | beam.Create([(1, 2)]) | MyCombinePerKey(min) # Verify the annotations are propagated to the split up # CPK transforms @@ -405,13 +406,13 @@ def annotations(self): phases=[translations.lift_combiners], known_runner_urns=frozenset(), partial=True) - for transform_id in [ - 'MyCombinePerKey(min)/Precombine', - 'MyCombinePerKey(min)/Group', - 'MyCombinePerKey(min)/Merge', - 'MyCombinePerKey(min)/ExtractOutputs']: - assert ("my_annotation" in - optimized.components.transforms[transform_id].annotations) + for transform_id in ['MyCombinePerKey(min)/Precombine', + 'MyCombinePerKey(min)/Group', + 'MyCombinePerKey(min)/Merge', + 'MyCombinePerKey(min)/ExtractOutputs']: + assert ( + "my_annotation" in + optimized.components.transforms[transform_id].annotations) def test_conditionally_packed_combiners(self): class RecursiveCombine(beam.PTransform):