Skip to content

Commit

Permalink
[YAML] Add a new StripErrorMetadata transform.
Browse files Browse the repository at this point in the history
Beam Yaml's error handling framework returns per-record errors as a
schema'd PCollection with associated error metadata (e.g. error messages,
tracebacks). Currently there is no way to "unnest" the nested rececords
(except for field by field) back to the top level if one wants to
re-process these records (or otherwise ignore the metadata).  Even if
there was a way to do this "up-one-level" unnesting it's not clear that
this would be obvious to users to find.  Worse, various forms of error
handling are not consistent in what the "bad records" schema is, or
even where the original record is found (though we do have a caveat in
the docs that this is still not set in stone).

This adds a simple, easy to identify transform that abstracts all of
these complexities away for the basic usecase.
  • Loading branch information
robertwb committed Nov 12, 2024
1 parent 628348b commit 886c70f
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 3 deletions.
61 changes: 61 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.typehints.schemas import schema_from_element_type
from apache_beam.typehints.schemas import typing_from_runner_api
from apache_beam.utils import python_callable
from apache_beam.yaml import json_utils
from apache_beam.yaml import options
Expand Down Expand Up @@ -482,6 +483,65 @@ def expand(pcoll, error_handling=None, **kwargs):
return expand


class _StripErrorMetadata(beam.PTransform):
"""Strips error metadata from outputs returned via error handling.
Generally the error outputs for transformations return information about
the error encountered (e.g. error messages and tracebacks) in addition to the
failing element itself. This transformation attempts to remove that metadata
and returns the bad element alone which can be useful for re-processing.
For example, in the following pipeline snippet::
- name: MyMappingTransform
type: MapToFields
input: SomeInput
config:
language: python
fields:
...
error_handling:
output: errors
- name: RecoverOriginalElements
type: StripErrorMetadata
input: MyMappingTransform.errors
the output of `RecoverOriginalElements` will contain exactly those elements
from SomeInput that failed to processes (whereas `MyMappingTransform.errors`
would contain those elements paired with error information).
Note that this relies on the preceding transform actually returning the
failing input in a schema'd way. Most built-in transformation follow the
correct conventions.
"""

_ERROR_FIELD_NAMES = ('failed_row', 'element', 'record')

def expand(self, pcoll):
try:
existing_fields = {
fld.name: fld.type
for fld in schema_from_element_type(pcoll.element_type).fields
}
except TypeError:
fld = None
else:
for fld in self._ERROR_FIELD_NAMES:
if fld in existing_fields:
break
else:
raise ValueError(
f"No field name matches one of {self._ERROR_FIELD_NAMES}")

if fld is None:
# This handles with_exception_handling() that returns bare tuples.
return pcoll | beam.Map(lambda x: x[0])
else:
return pcoll | beam.Map(lambda x: getattr(x, fld)).with_output_types(
typing_from_runner_api(existing_fields[fld]))


class _Validate(beam.PTransform):
"""Validates each element of a PCollection against a json schema.
Expand Down Expand Up @@ -838,6 +898,7 @@ def create_mapping_providers():
'Partition-python': _Partition,
'Partition-javascript': _Partition,
'Partition-generic': _Partition,
'StripErrorMetadata': _StripErrorMetadata,
'ValidateWithSchema': _Validate,
}),
yaml_provider.SqlBackedProvider({
Expand Down
45 changes: 45 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_transform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,51 @@ def test_error_handling_outputs(self):
assert_that(result['good'], equal_to(['a', 'b']), label="CheckGood")
assert_that(result['bad'], equal_to(["ValueError('biiiiig')"]))

def test_strip_error_metadata(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
result = p | YamlTransform(
'''
type: composite
transforms:
- type: Create
config:
elements: ['a', 'b', 'biiiiig']
- type: SizeLimiter
input: Create
config:
limit: 5
error_handling:
output: errors
- type: StripErrorMetadata
name: StripErrorMetadata1
input: SizeLimiter.errors
- type: MapToFields
input: Create
config:
language: python
fields:
out: "1/(1-len(element))"
error_handling:
output: errors
- type: StripErrorMetadata
name: StripErrorMetadata2
input: MapToFields.errors
output:
good: SizeLimiter
bad1: StripErrorMetadata1
bad2: StripErrorMetadata2
''',
providers=TEST_PROVIDERS)
assert_that(result['good'], equal_to(['a', 'b']), label="CheckGood")
assert_that(
result['bad1'] | beam.Map(lambda x: x.element), equal_to(['biiiiig']))
assert_that(
result['bad2'] | beam.Map(lambda x: x.element), equal_to(['a', 'b']))

def test_must_handle_error_output(self):
with self.assertRaisesRegex(Exception, 'Unconsumed error output .*line 7'):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
Expand Down
15 changes: 12 additions & 3 deletions website/www/site/content/en/documentation/sdks/yaml-errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ for a robust pipeline).
Note also that the exact format of the error outputs is still being finalized.
They can be safely printed and written to outputs, but their precise schema
may change in a future version of Beam and should not yet be depended on.
Currently it has, at the very least, an `element` field which holds the element
that caused the error.
It generally contains the failed record itself as well as information about
the error that was encountered (e.g. error messages and tracebacks).
To recover the bad record alone one can process the error output with the
`StripErrorMetadata` transformation.

Some transforms allow for extra arguments in their error_handling config, e.g.
for Python functions one can give a `threshold` which limits the relative number
Expand Down Expand Up @@ -139,9 +141,16 @@ pipeline:
error_handling:
output: my_error_output
- type: StripErrorMetadata
name: FailedRecordsWithoutMetadata
# Takes the error information from ComputeRatio and returns just the
# failing records themselves for another attempt with a different
# transform.
input: ComputeRatio.my_error_output
- type: MapToFields
name: ComputeRatioForBadRecords
input: ComputeRatio.my_error_output
input: FailedRecordsWithoutMetadata
config:
language: python
fields:
Expand Down

0 comments on commit 886c70f

Please sign in to comment.