Skip to content

Commit

Permalink
Merge pull request #30378 [YAML] Fix MapToFields error output type in…
Browse files Browse the repository at this point in the history
…ference
  • Loading branch information
robertwb authored Feb 23, 2024
2 parents 03866ab + d918004 commit 20675c8
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from apache_beam.typehints import row_type
from apache_beam.typehints import schemas
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.row_type import RowTypeConstraint
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import json_utils
Expand Down Expand Up @@ -319,18 +320,22 @@ def exception_handling_args(error_handling_spec):
return None


def _map_errors_to_standard_format():
def _map_errors_to_standard_format(input_type):
# TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.

return beam.Map(
lambda x: beam.Row(element=x[0], msg=str(x[1][1]), stack=str(x[1][2])))
lambda x: beam.Row(element=x[0], msg=str(x[1][1]), stack=str(x[1][2]))
).with_output_types(
RowTypeConstraint.from_fields([("element", input_type), ("msg", str),
("stack", str)]))


def maybe_with_exception_handling(inner_expand):
def expand(self, pcoll):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, self._exception_handling_args)
return inner_expand(self, wrapped_pcoll).as_result(
_map_errors_to_standard_format())
_map_errors_to_standard_format(pcoll.element_type))

return expand

Expand All @@ -340,8 +345,8 @@ def maybe_with_exception_handling_transform_fn(transform_fn):
def expand(pcoll, error_handling=None, **kwargs):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, exception_handling_args(error_handling))
return transform_fn(wrapped_pcoll,
**kwargs).as_result(_map_errors_to_standard_format())
return transform_fn(wrapped_pcoll, **kwargs).as_result(
_map_errors_to_standard_format(pcoll.element_type))

original_signature = inspect.signature(transform_fn)
new_parameters = list(original_signature.parameters.values())
Expand Down

0 comments on commit 20675c8

Please sign in to comment.