diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index c99d90e184e2..79d411790ba6 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -1112,6 +1112,9 @@ def __ror__(self, pvalueish, _unused=None): def expand(self, pvalue): raise RuntimeError("Should never be expanded directly.") + def annotations(self): + return self.transform.annotations() + def __getattr__(self, attr): transform_attr = getattr(self.transform, attr) if callable(transform_attr): @@ -1128,6 +1131,12 @@ def wrapper(*args, **kwargs): else: return transform_attr + def __setattr__(self, attr, value): + if attr == 'annotations': + self.transform.annotations = value + else: + super().__setattr__(attr, value) + # Defined here to avoid circular import issues for Beam library transforms. def annotate_yaml(constructor): diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 3134659549ed..4bcd07ad7b2c 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -101,6 +101,17 @@ def test_str(self): """inputs=('ci',) side_inputs=('cs',)>""", str(inputs_tr)) + def test_named_annotations(self): + t = beam.Impulse() + t.annotations = lambda: {'test': 'value'} + named_t = 'Name' >> t + self.assertEqual(named_t.annotations(), {'test': 'value'}) + original_annotations = named_t.annotations() + named_t.annotations = lambda: {'another': 'value', **original_annotations} + # Verify this is reflected on the original transform, + # which is what gets used in apply. + self.assertEqual(t.annotations(), {'test': 'value', 'another': 'value'}) + def test_do_with_do_fn(self): class AddNDoFn(beam.DoFn): def process(self, element, addon):