diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 8f4a2118c236..11cec247df55 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -297,10 +297,13 @@ def _expand_python_mapping_func( # TODO(robertwb): Consider constructing a single callable that takes # the row and returns the new row, rather than invoking (and unpacking) # for each field individually. - source = '\n'.join(['def fn(__row__):'] + [ - f' {name} = __row__.{name}' + source = '\n'.join(['def fn(__row__):'] + [' try:'] + [ + f' {name} = __row__.{name}' for name in original_fields if name in expression - ] + [' return (' + expression + ')']) + ] + [f' return ({expression})'] + [' except NameError as e:'] + [ + f' raise ValueError(f"{{e}}. Valid values include ' + f'{original_fields}")' + ]) else: source = callable diff --git a/sdks/python/apache_beam/yaml/yaml_mapping_test.py b/sdks/python/apache_beam/yaml/yaml_mapping_test.py index 2c5feec18278..63a7e8cacf1a 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping_test.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping_test.py @@ -76,6 +76,28 @@ def test_drop(self): beam.Row(label='389a', rank=2), ])) + def test_name_error(self): + with self.assertRaisesRegex( + ValueError, + f".*error_field.*is not defined.*{'.*'.join(DATA[0].as_dict().keys())}" + ): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + output = ( + p + | beam.Create(DATA) + | YamlTransform( + ''' + type: MapToFields + config: + language: python + fields: + new_field: error_field + append: true + drop: [conductor] + ''')) + self.assertFalse(output) + def test_filter(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: