From 0d55231a19f890f3377fa3f523afbee0a0ea86d7 Mon Sep 17 00:00:00 2001 From: Ravi Magham Date: Tue, 26 Nov 2024 12:50:23 -0800 Subject: [PATCH] Better error messaging for chain-type pipeline (#33210) --- sdks/python/apache_beam/yaml/yaml_transform.py | 4 ++-- .../yaml/yaml_transform_unit_test.py | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index ab86a2aaff56..0190fe20413f 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -576,8 +576,8 @@ def is_not_output_of_last_transform(new_transforms, value): pass else: raise ValueError( - f'Transform {identify_object(transform)} is part of a chain, ' - 'must have implicit inputs and outputs.') + f'Transform {identify_object(transform)} is part of a chain. ' + 'Cannot define explicit inputs on chain pipeline') if ix == 0: if is_explicitly_empty(transform.get('input', None)): pass diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py index bc0493509d5a..084e03cdb197 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py @@ -325,6 +325,23 @@ def test_chain_as_composite_with_input(self): self.assertEqual( chain_as_composite(spec)['transforms'][0]['input'], {"input": "input"}) + def test_chain_as_composite_with_transform_input(self): + spec = ''' + type: chain + transforms: + - type: Create + config: + elements: [0,1,2] + - type: LogForTesting + input: Create + ''' + spec = yaml.load(spec, Loader=SafeLineLoader) + with self.assertRaisesRegex( + ValueError, + r"Transform .* is part of a chain. " + r"Cannot define explicit inputs on chain pipeline"): + chain_as_composite(spec) + def test_normalize_source_sink(self): spec = ''' source: