Skip to content

Commit

Permalink
Merge pull request #29169 [YAML] fix javascript UDF output in mapping…
Browse files Browse the repository at this point in the history
… transforms
  • Loading branch information
robertwb authored Nov 1, 2023
2 parents 4dd147d + b9725e3 commit ae86136
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 38 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2260,7 +2260,7 @@ def __init__(self, pcoll, exception_handling_args, upstream_errors=()):

@property
def pipeline(self):
return self._pvalue.pipeline
return self._pcoll.pipeline

@property
def element_type(self):
Expand Down
80 changes: 72 additions & 8 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

"""This module defines the basic MapToFields operation."""
import itertools
from collections import abc
from typing import Any
from typing import Callable
from typing import Collection
Expand All @@ -26,6 +27,9 @@
from typing import Union

import js2py
from js2py import base
from js2py.constructors import jsdate
from js2py.internals import simplex

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
Expand All @@ -38,6 +42,7 @@
from apache_beam.yaml import json_utils
from apache_beam.yaml import options
from apache_beam.yaml import yaml_provider
from apache_beam.yaml.yaml_provider import dicts_to_rows


def _check_mapping_arguments(
Expand Down Expand Up @@ -74,28 +79,87 @@ def __setstate__(self, state):
self.__dict__.update(state)


# TODO(yaml) Improve type inferencing for JS UDF's
def py_value_to_js_dict(py_value):
if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or
isinstance(py_value, beam.Row)):
py_value = py_value._asdict()
if isinstance(py_value, dict):
return {key: py_value_to_js_dict(value) for key, value in py_value.items()}
elif not isinstance(py_value, str) and isinstance(py_value, abc.Iterable):
return [py_value_to_js_dict(value) for value in list(py_value)]
else:
return py_value


# TODO(yaml) Consider adding optional language version parameter to support
# ECMAScript 5 and 6
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):

js_array_type = (
base.PyJsArray,
base.PyJsArrayBuffer,
base.PyJsInt8Array,
base.PyJsUint8Array,
base.PyJsUint8ClampedArray,
base.PyJsInt16Array,
base.PyJsUint16Array,
base.PyJsInt32Array,
base.PyJsUint32Array,
base.PyJsFloat32Array,
base.PyJsFloat64Array)

def _js_object_to_py_object(obj):
if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
return base.to_python(obj)
elif isinstance(obj, js_array_type):
return [_js_object_to_py_object(value) for value in obj.to_list()]
elif isinstance(obj, jsdate.PyJsDate):
return obj.to_utc_dt()
elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
return None
elif isinstance(obj, base.PyJsError):
raise RuntimeError(obj['message'])
elif isinstance(obj, base.PyJsObject):
return {
key: _js_object_to_py_object(value['value'])
for (key, value) in obj.own.items()
}
elif isinstance(obj, base.JsObjectWrapper):
return _js_object_to_py_object(obj._obj)

return obj

if expression:
args = ', '.join(original_fields)
js_func = f'function fn({args}) {{return ({expression})}}'
js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
return lambda __row__: js_callable(*__row__._asdict().values())
source = '\n'.join(['function(__row__) {'] + [
f' {name} = __row__.{name}'
for name in original_fields if name in expression
] + [' return (' + expression + ')'] + ['}'])
js_func = _CustomJsObjectWrapper(js2py.eval_js(source))

elif callable:
js_callable = _CustomJsObjectWrapper(js2py.eval_js(callable))
return lambda __row__: js_callable(__row__._asdict())
js_func = _CustomJsObjectWrapper(js2py.eval_js(callable))

else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
js = js2py.EvalJs()
js.eval(udf_code)
js_callable = _CustomJsObjectWrapper(getattr(js, name))
return lambda __row__: js_callable(__row__._asdict())
js_func = _CustomJsObjectWrapper(getattr(js, name))

def js_wrapper(row):
row_as_dict = py_value_to_js_dict(row)
try:
js_result = js_func(row_as_dict)
except simplex.JsException as exn:
raise RuntimeError(
f"Error evaluating javascript expression: "
f"{exn.mes['message']}") from exn
return dicts_to_rows(_js_object_to_py_object(js_result))

return js_wrapper


def _expand_python_mapping_func(
Expand Down
121 changes: 92 additions & 29 deletions sdks/python/apache_beam/yaml/yaml_udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,27 @@
from apache_beam.options import pipeline_options
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.yaml.yaml_mapping import py_value_to_js_dict
from apache_beam.yaml.yaml_provider import dicts_to_rows
from apache_beam.yaml.yaml_transform import YamlTransform


def AsRows():
return beam.Map(lambda named_tuple: beam.Row(**named_tuple._asdict()))
return beam.Map(
lambda named_tuple: dicts_to_rows(py_value_to_js_dict(named_tuple)))


class YamlUDFMappingTest(unittest.TestCase):
def __init__(self, method_name='runYamlMappingTest'):
super().__init__(method_name)
self.data = [
beam.Row(label='11a', conductor=11, rank=0),
beam.Row(label='37a', conductor=37, rank=1),
beam.Row(label='389a', conductor=389, rank=2),
beam.Row(
label='11a', conductor=11, row=beam.Row(rank=0, values=[1, 2, 3])),
beam.Row(
label='37a', conductor=37, row=beam.Row(rank=1, values=[4, 5, 6])),
beam.Row(
label='389a', conductor=389, row=beam.Row(rank=2, values=[7, 8,
9])),
]

def setUp(self):
Expand All @@ -60,16 +67,37 @@ def test_map_to_fields_filter_inline_js(self):
language: javascript
fields:
label:
callable: "function label_map(x) {return x.label + 'x'}"
callable: |
function label_map(x) {
return x.label + 'x'
}
conductor:
callable: "function conductor_map(x) {return x.conductor + 1}"
callable: |
function conductor_map(x) {
return x.conductor + 1
}
row:
callable: |
function row_map(x) {
x.row.values.push(x.row.rank + 10)
return x.row
}
''')
assert_that(
result,
equal_to([
beam.Row(label='11ax', conductor=12),
beam.Row(label='37ax', conductor=38),
beam.Row(label='389ax', conductor=390),
beam.Row(
label='11ax',
conductor=12,
row=beam.Row(rank=0, values=[1, 2, 3, 10])),
beam.Row(
label='37ax',
conductor=38,
row=beam.Row(rank=1, values=[4, 5, 6, 11])),
beam.Row(
label='389ax',
conductor=390,
row=beam.Row(rank=2, values=[7, 8, 9, 12])),
]))

def test_map_to_fields_filter_inline_py(self):
Expand All @@ -86,13 +114,15 @@ def test_map_to_fields_filter_inline_py(self):
callable: "lambda x: x.label + 'x'"
conductor:
callable: "lambda x: x.conductor + 1"
sum:
callable: "lambda x: sum(x.row.values)"
''')
assert_that(
result,
equal_to([
beam.Row(label='11ax', conductor=12),
beam.Row(label='37ax', conductor=38),
beam.Row(label='389ax', conductor=390),
beam.Row(label='11ax', conductor=12, sum=6),
beam.Row(label='37ax', conductor=38, sum=15),
beam.Row(label='389ax', conductor=390, sum=24),
]))

def test_filter_inline_js(self):
Expand All @@ -106,13 +136,22 @@ def test_filter_inline_js(self):
config:
language: javascript
keep:
callable: "function filter(x) {return x.rank > 0}"
callable: |
function filter(x) {
return x.row.rank > 0
}
''')
assert_that(
result | AsRows(),
equal_to([
beam.Row(label='37a', conductor=37, rank=1),
beam.Row(label='389a', conductor=389, rank=2),
beam.Row(
label='37a',
conductor=37,
row=beam.Row(rank=1, values=[4, 5, 6])),
beam.Row(
label='389a',
conductor=389,
row=beam.Row(rank=2, values=[7, 8, 9])),
]))

def test_filter_inline_py(self):
Expand All @@ -125,13 +164,19 @@ def test_filter_inline_py(self):
config:
language: python
keep:
callable: "lambda x: x.rank > 0"
callable: "lambda x: x.row.rank > 0"
''')
assert_that(
result | AsRows(),
equal_to([
beam.Row(label='37a', conductor=37, rank=1),
beam.Row(label='389a', conductor=389, rank=2),
beam.Row(
label='37a',
conductor=37,
row=beam.Row(rank=1, values=[4, 5, 6])),
beam.Row(
label='389a',
conductor=389,
row=beam.Row(rank=2, values=[7, 8, 9])),
]))

def test_filter_expression_js(self):
Expand All @@ -145,12 +190,15 @@ def test_filter_expression_js(self):
config:
language: javascript
keep:
expression: "label.toUpperCase().indexOf('3') == -1 && conductor"
expression: "label.toUpperCase().indexOf('3') == -1 && row.rank < 1"
''')
assert_that(
result | AsRows(),
equal_to([
beam.Row(label='11a', conductor=11, rank=0),
beam.Row(
label='11a',
conductor=11,
row=beam.Row(rank=0, values=[1, 2, 3])),
]))

def test_filter_expression_py(self):
Expand All @@ -168,17 +216,20 @@ def test_filter_expression_py(self):
assert_that(
result | AsRows(),
equal_to([
beam.Row(label='11a', conductor=11, rank=0),
beam.Row(
label='11a',
conductor=11,
row=beam.Row(rank=0, values=[1, 2, 3])),
]))

def test_filter_inline_js_file(self):
data = '''
function f(x) {
return x.rank > 0
return x.row.rank > 0
}
function g(x) {
return x.rank > 1
return x.row.rank > 1
}
'''.replace(' ', '')

Expand All @@ -201,17 +252,23 @@ def test_filter_inline_js_file(self):
assert_that(
result | AsRows(),
equal_to([
beam.Row(label='37a', conductor=37, rank=1),
beam.Row(label='389a', conductor=389, rank=2),
beam.Row(
label='37a',
conductor=37,
row=beam.Row(rank=1, values=[4, 5, 6])),
beam.Row(
label='389a',
conductor=389,
row=beam.Row(rank=2, values=[7, 8, 9])),
]))

def test_filter_inline_py_file(self):
data = '''
def f(x):
return x.rank > 0
return x.row.rank > 0
def g(x):
return x.rank > 1
return x.row.rank > 1
'''.replace(' ', '')

path = os.path.join(self.tmpdir, 'udf.py')
Expand All @@ -232,8 +289,14 @@ def g(x):
assert_that(
result | AsRows(),
equal_to([
beam.Row(label='37a', conductor=37, rank=1),
beam.Row(label='389a', conductor=389, rank=2),
beam.Row(
label='37a',
conductor=37,
row=beam.Row(rank=1, values=[4, 5, 6])),
beam.Row(
label='389a',
conductor=389,
row=beam.Row(rank=2, values=[7, 8, 9])),
]))


Expand Down

0 comments on commit ae86136

Please sign in to comment.