From 09a6fc5c5f3db65592ce45d25832d69206e6973d Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 21 Feb 2024 16:54:27 -0800 Subject: [PATCH 1/2] Fix updating of annotations for explicitly named transforms. --- sdks/python/apache_beam/transforms/ptransform.py | 9 +++++++++ sdks/python/apache_beam/transforms/ptransform_test.py | 10 ++++++++++ 2 files changed, 19 insertions(+) diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index c99d90e184e2..79d411790ba6 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -1112,6 +1112,9 @@ def __ror__(self, pvalueish, _unused=None): def expand(self, pvalue): raise RuntimeError("Should never be expanded directly.") + def annotations(self): + return self.transform.annotations() + def __getattr__(self, attr): transform_attr = getattr(self.transform, attr) if callable(transform_attr): @@ -1128,6 +1131,12 @@ def wrapper(*args, **kwargs): else: return transform_attr + def __setattr__(self, attr, value): + if attr == 'annotations': + self.transform.annotations = value + else: + super().__setattr__(attr, value) + # Defined here to avoid circular import issues for Beam library transforms. def annotate_yaml(constructor): diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 3134659549ed..ca7349ff840f 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -101,6 +101,16 @@ def test_str(self): """inputs=('ci',) side_inputs=('cs',)>""", str(inputs_tr)) + def test_named_annotations(self): + t = beam.Impulse() + t.annotations = lambda: {'test': 'value'} + named_t = 'Name' >> t + self.assertEqual(named_t.annotations(), {'test': 'value'}) + original_annotations = named_t.annotations() + named_t.annotations = lambda: {'another': 'value', **original_annotations} + # This is what gets used in apply. + self.assertEqual(t.annotations(), {'test': 'value', 'another': 'value'}) + def test_do_with_do_fn(self): class AddNDoFn(beam.DoFn): def process(self, element, addon): From e6e91b99542689d14859a055c4362a77b5754290 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 28 Feb 2024 10:46:19 -0800 Subject: [PATCH 2/2] Expand on comment. --- sdks/python/apache_beam/transforms/ptransform_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index ca7349ff840f..4bcd07ad7b2c 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -108,7 +108,8 @@ def test_named_annotations(self): self.assertEqual(named_t.annotations(), {'test': 'value'}) original_annotations = named_t.annotations() named_t.annotations = lambda: {'another': 'value', **original_annotations} - # This is what gets used in apply. + # Verify this is reflected on the original transform, + # which is what gets used in apply. self.assertEqual(t.annotations(), {'test': 'value', 'another': 'value'}) def test_do_with_do_fn(self):