diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 5c14b0f5ea79..130bde75ed96 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -44,6 +44,7 @@ from apache_beam.typehints.row_type import RowTypeConstraint from apache_beam.typehints.schemas import named_fields_from_element_type from apache_beam.typehints.schemas import schema_from_element_type +from apache_beam.typehints.schemas import typing_from_runner_api from apache_beam.utils import python_callable from apache_beam.yaml import json_utils from apache_beam.yaml import options @@ -482,6 +483,65 @@ def expand(pcoll, error_handling=None, **kwargs): return expand +class _StripErrorMetadata(beam.PTransform): + """Strips error metadata from outputs returned via error handling. + + Generally the error outputs for transformations return information about + the error encountered (e.g. error messages and tracebacks) in addition to the + failing element itself. This transformation attempts to remove that metadata + and returns the bad element alone which can be useful for re-processing. + + For example, in the following pipeline snippet:: + + - name: MyMappingTransform + type: MapToFields + input: SomeInput + config: + language: python + fields: + ... + error_handling: + output: errors + + - name: RecoverOriginalElements + type: StripErrorMetadata + input: MyMappingTransform.errors + + the output of `RecoverOriginalElements` will contain exactly those elements + from SomeInput that failed to processes (whereas `MyMappingTransform.errors` + would contain those elements paired with error information). + + Note that this relies on the preceding transform actually returning the + failing input in a schema'd way. Most built-in transformation follow the + correct conventions. + """ + + _ERROR_FIELD_NAMES = ('failed_row', 'element', 'record') + + def expand(self, pcoll): + try: + existing_fields = { + fld.name: fld.type + for fld in schema_from_element_type(pcoll.element_type).fields + } + except TypeError: + fld = None + else: + for fld in self._ERROR_FIELD_NAMES: + if fld in existing_fields: + break + else: + raise ValueError( + f"No field name matches one of {self._ERROR_FIELD_NAMES}") + + if fld is None: + # This handles with_exception_handling() that returns bare tuples. + return pcoll | beam.Map(lambda x: x[0]) + else: + return pcoll | beam.Map(lambda x: getattr(x, fld)).with_output_types( + typing_from_runner_api(existing_fields[fld])) + + class _Validate(beam.PTransform): """Validates each element of a PCollection against a json schema. @@ -838,6 +898,7 @@ def create_mapping_providers(): 'Partition-python': _Partition, 'Partition-javascript': _Partition, 'Partition-generic': _Partition, + 'StripErrorMetadata': _StripErrorMetadata, 'ValidateWithSchema': _Validate, }), yaml_provider.SqlBackedProvider({ diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index fbdae6679e96..7fcea7e2b662 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -401,6 +401,51 @@ def test_error_handling_outputs(self): assert_that(result['good'], equal_to(['a', 'b']), label="CheckGood") assert_that(result['bad'], equal_to(["ValueError('biiiiig')"])) + def test_strip_error_metadata(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + result = p | YamlTransform( + ''' + type: composite + transforms: + - type: Create + config: + elements: ['a', 'b', 'biiiiig'] + + - type: SizeLimiter + input: Create + config: + limit: 5 + error_handling: + output: errors + - type: StripErrorMetadata + name: StripErrorMetadata1 + input: SizeLimiter.errors + + - type: MapToFields + input: Create + config: + language: python + fields: + out: "1/(1-len(element))" + error_handling: + output: errors + - type: StripErrorMetadata + name: StripErrorMetadata2 + input: MapToFields.errors + + output: + good: SizeLimiter + bad1: StripErrorMetadata1 + bad2: StripErrorMetadata2 + ''', + providers=TEST_PROVIDERS) + assert_that(result['good'], equal_to(['a', 'b']), label="CheckGood") + assert_that( + result['bad1'] | beam.Map(lambda x: x.element), equal_to(['biiiiig'])) + assert_that( + result['bad2'] | beam.Map(lambda x: x.element), equal_to(['a', 'b'])) + def test_must_handle_error_output(self): with self.assertRaisesRegex(Exception, 'Unconsumed error output .*line 7'): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( diff --git a/website/www/site/content/en/documentation/sdks/yaml-errors.md b/website/www/site/content/en/documentation/sdks/yaml-errors.md index 903e18d6b3c7..6edd1751a65b 100644 --- a/website/www/site/content/en/documentation/sdks/yaml-errors.md +++ b/website/www/site/content/en/documentation/sdks/yaml-errors.md @@ -78,8 +78,10 @@ for a robust pipeline). Note also that the exact format of the error outputs is still being finalized. They can be safely printed and written to outputs, but their precise schema may change in a future version of Beam and should not yet be depended on. -Currently it has, at the very least, an `element` field which holds the element -that caused the error. +It generally contains the failed record itself as well as information about +the error that was encountered (e.g. error messages and tracebacks). +To recover the bad record alone one can process the error output with the +`StripErrorMetadata` transformation. Some transforms allow for extra arguments in their error_handling config, e.g. for Python functions one can give a `threshold` which limits the relative number @@ -139,9 +141,16 @@ pipeline: error_handling: output: my_error_output + - type: StripErrorMetadata + name: FailedRecordsWithoutMetadata + # Takes the error information from ComputeRatio and returns just the + # failing records themselves for another attempt with a different + # transform. + input: ComputeRatio.my_error_output + - type: MapToFields name: ComputeRatioForBadRecords - input: ComputeRatio.my_error_output + input: FailedRecordsWithoutMetadata config: language: python fields: