diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index df2fdbf6aaa1..debf6a63af26 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -491,7 +491,7 @@ def expand_leaf_transform(spec, scope): return {f'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)} elif isinstance(outputs, beam.PCollection): return {'out': outputs} - elif outputs is None: + elif outputs is None or isinstance(outputs, beam.pvalue.PDone): return {} else: raise ValueError( @@ -518,10 +518,13 @@ class CompositePTransform(beam.PTransform): @staticmethod def expand(inputs): inner_scope.compute_all() - return { - key: inner_scope.get_pcollection(value) - for (key, value) in spec['output'].items() - } + if '__implicit_outputs__' in spec['output']: + return inner_scope.get_outputs(spec['output']['__implicit_outputs__']) + else: + return { + key: inner_scope.get_pcollection(value) + for (key, value) in spec['output'].items() + } if 'name' not in spec: spec['name'] = 'Composite' @@ -596,7 +599,7 @@ def is_not_output_of_last_transform(new_transforms, value): for (key, value) in composite_spec['output'].items() } else: - composite_spec['output'] = last_transform + composite_spec['output'] = {'__implicit_outputs__': last_transform} if 'name' not in composite_spec: composite_spec['name'] = 'Chain' composite_spec['type'] = 'composite' diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py index 39f31619a741..c4b6899329a6 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py @@ -256,7 +256,8 @@ def test_chain_as_composite(self): config: fn: 'lambda x: x*x' input: {spec['transforms'][0]['__uuid__']} - output: {spec['transforms'][1]['__uuid__']} + output: + '__implicit_outputs__': {spec['transforms'][1]['__uuid__']} ''' self.assertYaml(expected, result)