Skip to content

Commit

Permalink
Merge pull request apache#30862 Better handling of no-output transforms.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Apr 5, 2024
2 parents ffc96bc + a841c55 commit 040dba1
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
15 changes: 9 additions & 6 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def expand_leaf_transform(spec, scope):
return {f'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)}
elif isinstance(outputs, beam.PCollection):
return {'out': outputs}
elif outputs is None:
elif outputs is None or isinstance(outputs, beam.pvalue.PDone):
return {}
else:
raise ValueError(
Expand All @@ -518,10 +518,13 @@ class CompositePTransform(beam.PTransform):
@staticmethod
def expand(inputs):
inner_scope.compute_all()
return {
key: inner_scope.get_pcollection(value)
for (key, value) in spec['output'].items()
}
if '__implicit_outputs__' in spec['output']:
return inner_scope.get_outputs(spec['output']['__implicit_outputs__'])
else:
return {
key: inner_scope.get_pcollection(value)
for (key, value) in spec['output'].items()
}

if 'name' not in spec:
spec['name'] = 'Composite'
Expand Down Expand Up @@ -596,7 +599,7 @@ def is_not_output_of_last_transform(new_transforms, value):
for (key, value) in composite_spec['output'].items()
}
else:
composite_spec['output'] = last_transform
composite_spec['output'] = {'__implicit_outputs__': last_transform}
if 'name' not in composite_spec:
composite_spec['name'] = 'Chain'
composite_spec['type'] = 'composite'
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ def test_chain_as_composite(self):
config:
fn: 'lambda x: x*x'
input: {spec['transforms'][0]['__uuid__']}
output: {spec['transforms'][1]['__uuid__']}
output:
'__implicit_outputs__': {spec['transforms'][1]['__uuid__']}
'''
self.assertYaml(expected, result)

Expand Down

0 comments on commit 040dba1

Please sign in to comment.