Skip to content

Commit

Permalink
Disable Javascript UDFs for 2.51.0
Browse files Browse the repository at this point in the history
Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber committed Sep 19, 2023
1 parent 6261a00 commit e737fdc
Showing 1 changed file with 31 additions and 10 deletions.
41 changes: 31 additions & 10 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,44 @@ def __setstate__(self, state):
# ECMAScript 5 and 6
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):
def _recursive_dict(d):
if str(type(d).__name__).startswith('BeamSchema_'):
d = dict(d._asdict())
for key in d.keys():
if isinstance(d.get(key), dict):
d[key] = _recursive_dict(d.get(key))
if isinstance(d.get(key), list):
for i in range(len(d.get(key))):
d[key][i] = _recursive_dict(d[key][i])
return d

def _normalize_output(result):
if isinstance(result, js2py.base.JsObjectWrapper):
# TODO(polber) - Convert js2py output types into Beam Row
r = result.to_dict()
return r
return result

if expression:
args = ', '.join(original_fields)
js_func = f'function fn({args}) {{return ({expression})}}'
js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
return lambda __row__: js_callable(*__row__._asdict().values())
js_expr_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
fn = lambda __row__: js_expr_callable(*__row__._asdict().values())

elif callable:
js_callable = _CustomJsObjectWrapper(js2py.eval_js(callable))
return lambda __row__: js_callable(__row__._asdict())
fn = lambda __row__: js_callable(_recursive_dict(__row__))

else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
js = js2py.EvalJs()
js.eval(udf_code)
js_callable = _CustomJsObjectWrapper(getattr(js, name))
return lambda __row__: js_callable(__row__._asdict())
js_file_callable = _CustomJsObjectWrapper(getattr(js, name))
fn = lambda __row__: js_file_callable(__row__._asdict())

return lambda __row__: _normalize_output(fn(__row__))


def _expand_python_mapping_func(
Expand Down Expand Up @@ -129,14 +149,15 @@ def _as_callable(original_fields, expr, transform_name, language):

_check_mapping_arguments(transform_name, **expr)

if language == "javascript":
return _expand_javascript_mapping_func(original_fields, **expr)
elif language == "python":
# TODO(yaml) - Disable until JsObjectWrapper can be parsed into Beam Row
# if language == "javascript":
# return _expand_javascript_mapping_func(original_fields, **expr)
if language == "python":
return _expand_python_mapping_func(original_fields, **expr)
else:
raise ValueError(
f'Unknown language for mapping transform: {language}. '
'Supported languages are "javascript" and "python."')
'Supported languages are "python."')


# TODO(yaml): This should be available in all environments, in which case
Expand Down Expand Up @@ -322,7 +343,7 @@ def MapToFields(

return result

elif language == 'python' or language == 'javascript':
elif language == 'python':
return pcoll | yaml_create_transform({
'type': 'PyTransform',
'config': {
Expand Down

0 comments on commit e737fdc

Please sign in to comment.