From e737fdcb90749916017d97f08ad961337829a53d Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Tue, 19 Sep 2023 17:30:25 -0400 Subject: [PATCH] Disable Javascript UDFs for 2.51.0 Signed-off-by: Jeffrey Kinard --- sdks/python/apache_beam/yaml/yaml_mapping.py | 41 +++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index b6dea894b3e9..eecf15a1fbad 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -67,15 +67,33 @@ def __setstate__(self, state): # ECMAScript 5 and 6 def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): + def _recursive_dict(d): + if str(type(d).__name__).startswith('BeamSchema_'): + d = dict(d._asdict()) + for key in d.keys(): + if isinstance(d.get(key), dict): + d[key] = _recursive_dict(d.get(key)) + if isinstance(d.get(key), list): + for i in range(len(d.get(key))): + d[key][i] = _recursive_dict(d[key][i]) + return d + + def _normalize_output(result): + if isinstance(result, js2py.base.JsObjectWrapper): + # TODO(polber) - Convert js2py output types into Beam Row + r = result.to_dict() + return r + return result + if expression: args = ', '.join(original_fields) js_func = f'function fn({args}) {{return ({expression})}}' - js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func)) - return lambda __row__: js_callable(*__row__._asdict().values()) + js_expr_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func)) + fn = lambda __row__: js_expr_callable(*__row__._asdict().values()) elif callable: js_callable = _CustomJsObjectWrapper(js2py.eval_js(callable)) - return lambda __row__: js_callable(__row__._asdict()) + fn = lambda __row__: js_callable(_recursive_dict(__row__)) else: if not path.endswith('.js'): @@ -83,8 +101,10 @@ def _expand_javascript_mapping_func( udf_code = FileSystems.open(path).read().decode() js = js2py.EvalJs() js.eval(udf_code) - js_callable = _CustomJsObjectWrapper(getattr(js, name)) - return lambda __row__: js_callable(__row__._asdict()) + js_file_callable = _CustomJsObjectWrapper(getattr(js, name)) + fn = lambda __row__: js_file_callable(__row__._asdict()) + + return lambda __row__: _normalize_output(fn(__row__)) def _expand_python_mapping_func( @@ -129,14 +149,15 @@ def _as_callable(original_fields, expr, transform_name, language): _check_mapping_arguments(transform_name, **expr) - if language == "javascript": - return _expand_javascript_mapping_func(original_fields, **expr) - elif language == "python": + # TODO(yaml) - Disable until JsObjectWrapper can be parsed into Beam Row + # if language == "javascript": + # return _expand_javascript_mapping_func(original_fields, **expr) + if language == "python": return _expand_python_mapping_func(original_fields, **expr) else: raise ValueError( f'Unknown language for mapping transform: {language}. ' - 'Supported languages are "javascript" and "python."') + 'Supported languages are "python."') # TODO(yaml): This should be available in all environments, in which case @@ -322,7 +343,7 @@ def MapToFields( return result - elif language == 'python' or language == 'javascript': + elif language == 'python': return pcoll | yaml_create_transform({ 'type': 'PyTransform', 'config': {