From 73dff6f8248670ef10e9d40870a3ac5cc5af7dcb Mon Sep 17 00:00:00 2001 From: mravi Date: Sun, 24 Nov 2024 15:36:07 -0800 Subject: [PATCH 1/5] Validate circular reference for yaml --- .../python/apache_beam/yaml/yaml_transform.py | 27 ++++++++++++ .../apache_beam/yaml/yaml_transform_test.py | 42 +++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index b8e49e81c579..4782a8e388b6 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -955,10 +955,37 @@ def preprocess_languages(spec): else: return spec + def validate_transform_references(spec): + if 'transforms' not in spec: + return spec + + for transform in spec['transforms']: + name = transform.get('name') + inputs = transform.get('input') + if name is None or inputs is None: + continue + + input_values = [] + if isinstance(inputs, str): + input_values = [inputs] + elif isinstance(inputs, list): + input_values = inputs + elif isinstance(inputs, dict): + input_values = list(inputs.values()) + + for input_value in input_values: + if isinstance(input_value, str) and input_value.lower() == name.lower(): + raise ValueError( + f"Circular reference detected: Transform {name} references itself as input" + f" in {identify_object(transform)}") + + return spec + for phase in [ ensure_transforms_have_types, normalize_mapping, normalize_combine, + validate_transform_references, preprocess_languages, ensure_transforms_have_providers, preprocess_source_sink, diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 7fcea7e2b662..78bce70fbdbf 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -259,6 +259,48 @@ def test_csv_to_json(self): lines=True).sort_values('rank').reindex() pd.testing.assert_frame_equal(data, result) + def test_circular_reference_validation(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with self.assertRaisesRegex(ValueError, r'Circular reference detected.*'): + result = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: Create + config: + elements: [0, 1, 3, 4] + input: Create + - type: PyMap + name: PyMap + config: + fn: "lambda row: row.element * row.element" + input: Create + output: PyMap + ''', + providers=TEST_PROVIDERS) + + def test_circular_reference_multi_inputs_validation(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with self.assertRaisesRegex(ValueError, r'Circular reference detected.*'): + result = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + name: Create + config: + elements: [0, 1, 3, 4] + - type: PyMap + name: PyMap + config: + fn: "lambda row: row.element * row.element" + input: [Create, PyMap] + output: PyMap + ''', providers=TEST_PROVIDERS) + def test_name_is_not_ambiguous(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: From cf1257461aba507fe79b156ba4b2c58d735af5c8 Mon Sep 17 00:00:00 2001 From: mravi Date: Sun, 24 Nov 2024 18:30:09 -0800 Subject: [PATCH 2/5] fixup: lint fixes --- .../python/apache_beam/yaml/yaml_transform.py | 36 +++++++++---------- .../apache_beam/yaml/yaml_transform_test.py | 10 +++--- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 4782a8e388b6..caa8d853d785 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -960,24 +960,24 @@ def validate_transform_references(spec): return spec for transform in spec['transforms']: - name = transform.get('name') - inputs = transform.get('input') - if name is None or inputs is None: - continue - - input_values = [] - if isinstance(inputs, str): - input_values = [inputs] - elif isinstance(inputs, list): - input_values = inputs - elif isinstance(inputs, dict): - input_values = list(inputs.values()) - - for input_value in input_values: - if isinstance(input_value, str) and input_value.lower() == name.lower(): - raise ValueError( - f"Circular reference detected: Transform {name} references itself as input" - f" in {identify_object(transform)}") + name = transform.get('name') + inputs = transform.get('input') + if name is None or inputs is None: + continue + + input_values = [] + if isinstance(inputs, str): + input_values = [inputs] + elif isinstance(inputs, list): + input_values = inputs + elif isinstance(inputs, dict): + input_values = list(inputs.values()) + + for input_value in input_values: + if isinstance(input_value, str) and input_value.lower() == name.lower(): + raise ValueError( + f"Circular reference detected: Transform {name} " + f"references itself as input in {identify_object(transform)}") return spec diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 78bce70fbdbf..ac6e22b8bd2d 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -262,9 +262,10 @@ def test_csv_to_json(self): def test_circular_reference_validation(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: + # pylint: disable=expression-not-assigned with self.assertRaisesRegex(ValueError, r'Circular reference detected.*'): - result = p | YamlTransform( - ''' + p | YamlTransform( + ''' type: composite transforms: - type: Create @@ -284,9 +285,10 @@ def test_circular_reference_validation(self): def test_circular_reference_multi_inputs_validation(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: + # pylint: disable=expression-not-assigned with self.assertRaisesRegex(ValueError, r'Circular reference detected.*'): - result = p | YamlTransform( - ''' + p | YamlTransform( + ''' type: composite transforms: - type: Create From b34f958f3343a6f86f70a75e3b40de521ecbd199 Mon Sep 17 00:00:00 2001 From: mravi Date: Sun, 24 Nov 2024 19:52:27 -0800 Subject: [PATCH 3/5] fixup: format fixes --- sdks/python/apache_beam/yaml/yaml_transform.py | 4 ++-- sdks/python/apache_beam/yaml/yaml_transform_test.py | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index caa8d853d785..1454ef76edd4 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -976,8 +976,8 @@ def validate_transform_references(spec): for input_value in input_values: if isinstance(input_value, str) and input_value.lower() == name.lower(): raise ValueError( - f"Circular reference detected: Transform {name} " - f"references itself as input in {identify_object(transform)}") + f"Circular reference detected: Transform {name} " + f"references itself as input in {identify_object(transform)}") return spec diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index ac6e22b8bd2d..a13af58492df 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -261,7 +261,7 @@ def test_csv_to_json(self): def test_circular_reference_validation(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( - pickle_library='cloudpickle')) as p: + pickle_library='cloudpickle')) as p: # pylint: disable=expression-not-assigned with self.assertRaisesRegex(ValueError, r'Circular reference detected.*'): p | YamlTransform( @@ -280,15 +280,15 @@ def test_circular_reference_validation(self): input: Create output: PyMap ''', - providers=TEST_PROVIDERS) + providers=TEST_PROVIDERS) def test_circular_reference_multi_inputs_validation(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( - pickle_library='cloudpickle')) as p: + pickle_library='cloudpickle')) as p: # pylint: disable=expression-not-assigned with self.assertRaisesRegex(ValueError, r'Circular reference detected.*'): p | YamlTransform( - ''' + ''' type: composite transforms: - type: Create @@ -301,7 +301,8 @@ def test_circular_reference_multi_inputs_validation(self): fn: "lambda row: row.element * row.element" input: [Create, PyMap] output: PyMap - ''', providers=TEST_PROVIDERS) + ''', + providers=TEST_PROVIDERS) def test_name_is_not_ambiguous(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( From 499b227a2778142eaa23a2500ea5a3a3dbc3004b Mon Sep 17 00:00:00 2001 From: mravi Date: Sat, 7 Dec 2024 15:17:20 -0800 Subject: [PATCH 4/5] fixup: Review comments --- .../python/apache_beam/yaml/yaml_transform.py | 27 ++++++------------- .../apache_beam/yaml/yaml_transform_test.py | 2 +- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 1454ef76edd4..4c3abaf1b202 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -956,28 +956,17 @@ def preprocess_languages(spec): return spec def validate_transform_references(spec): - if 'transforms' not in spec: - return spec - - for transform in spec['transforms']: - name = transform.get('name') - inputs = transform.get('input') - if name is None or inputs is None: - continue - - input_values = [] - if isinstance(inputs, str): - input_values = [inputs] - elif isinstance(inputs, list): - input_values = inputs - elif isinstance(inputs, dict): - input_values = list(inputs.values()) + name = spec.get('name', '') + transform_type = spec.get('type') + inputs = spec.get('input').get('input', []) + if not is_empty(inputs): + input_values = [inputs] if isinstance(inputs, str) else inputs for input_value in input_values: - if isinstance(input_value, str) and input_value.lower() == name.lower(): + if input_value in (name, transform_type): raise ValueError( f"Circular reference detected: Transform {name} " - f"references itself as input in {identify_object(transform)}") + f"references itself as input in {identify_object(spec)}") return spec @@ -985,13 +974,13 @@ def validate_transform_references(spec): ensure_transforms_have_types, normalize_mapping, normalize_combine, - validate_transform_references, preprocess_languages, ensure_transforms_have_providers, preprocess_source_sink, preprocess_chain, tag_explicit_inputs, normalize_inputs_outputs, + validate_transform_references, preprocess_flattened_inputs, ensure_errors_consumed, preprocess_windowing, diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index a13af58492df..b9caca4ca9f4 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -330,7 +330,7 @@ def test_name_is_ambiguous(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: # pylint: disable=expression-not-assigned - with self.assertRaisesRegex(ValueError, r'Ambiguous.*'): + with self.assertRaisesRegex(ValueError, r'Circular reference detected.*'): p | YamlTransform( ''' type: composite From a3e87b619396f9284aeb5d72de1984f1df74a99c Mon Sep 17 00:00:00 2001 From: mravi Date: Sat, 14 Dec 2024 15:06:23 -0800 Subject: [PATCH 5/5] Trigger Build