From 0fb42e5ee321300304621a4d022d42be4b7fcd94 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 9 Jan 2024 07:44:52 -0800 Subject: [PATCH] [YAML] Better support for inline PyTransforms. --- .../fully_qualified_named_transform.py | 13 +- sdks/python/apache_beam/yaml/inline_python.md | 180 ++++++++++++++++++ sdks/python/apache_beam/yaml/readme_test.py | 20 +- sdks/python/apache_beam/yaml/yaml_provider.py | 9 +- 4 files changed, 217 insertions(+), 5 deletions(-) create mode 100644 sdks/python/apache_beam/yaml/inline_python.md diff --git a/sdks/python/apache_beam/transforms/fully_qualified_named_transform.py b/sdks/python/apache_beam/transforms/fully_qualified_named_transform.py index ab2cadd166a9..03a8b2e7444f 100644 --- a/sdks/python/apache_beam/transforms/fully_qualified_named_transform.py +++ b/sdks/python/apache_beam/transforms/fully_qualified_named_transform.py @@ -27,6 +27,7 @@ from apache_beam.typehints.native_type_compatibility import convert_to_typing_type from apache_beam.typehints.schemas import named_fields_to_schema from apache_beam.typehints.trivial_inference import instance_to_type +from apache_beam.utils import python_callable PYTHON_FULLY_QUALIFIED_NAMED_TRANSFORM_URN = ( 'beam:transforms:python:fully_qualified_named') @@ -63,6 +64,8 @@ def expand(self, pinput): args = self._args kwargs = dict(self._kwargs) source = kwargs.pop('source') + if isinstance(source, str): + source = python_callable.PythonCallableWithSource(source) if self._constructor == '__constructor__': transform = source(*args, **kwargs) @@ -127,7 +130,15 @@ def to_runner_api_parameter(self, unused_context): @staticmethod def from_runner_api_parameter(unused_ptransform, payload, unused_context): row = coders.RowCoder(payload.schema).decode(payload.payload) - maybe_as_dict = lambda x: x._asdict() if x else {} + + def maybe_as_dict(x): + if isinstance(x, dict): + return x + elif x: + return x._asdict() + else: + return {} + return FullyQualifiedNamedTransform( row.constructor, tuple(getattr(row, 'args', ())), diff --git a/sdks/python/apache_beam/yaml/inline_python.md b/sdks/python/apache_beam/yaml/inline_python.md new file mode 100644 index 000000000000..2363b13bfff2 --- /dev/null +++ b/sdks/python/apache_beam/yaml/inline_python.md @@ -0,0 +1,180 @@ + + +# Using PyTransform form YAML + +Beam YAML provides the ability to easily invoke Python transforms via the +`PyTransform` type, simply referencing them by fully qualified name. +For example, + +``` +- type: PyTransform + config: + constructor: apache_beam.pkg.module.SomeTransform + args: [1, 'foo'] + kwargs: + baz: 3 +``` + +will invoke the transform `apache_beam.pkg.mod.SomeTransform(1, 'foo', baz=3)`. +This fully qualified name can be any PTransform class or other callable that +returns a PTransform. Note, however, that PTransforms that do not accept or +return schema'd data may not be as useable to use from YAML. +Restoring the schema-ness after a non-schema returning transform can be done +by using the `callable` option on `MapToFields` which takes the entire element +as an input, e.g. + +``` +- type: PyTransform + config: + constructor: apache_beam.pkg.module.SomeTransform + args: [1, 'foo'] + kwargs: + baz: 3 +- type: MapToFields + config: + language: python + fields: + col1: + callable: 'lambda element: element.col1' + output_type: string + col2: + callable: 'lambda element: element.col2' + output_type: integer +``` + + +## Defining a transform inline using `__constructor__` + +If the desired transform does not exist, one can define it inline as well. +This is done with the special `__constructor__` keywords, +similar to how cross-language transforms are done. + +With the `__constuctor__` keyword, one defines a Python callable that, on +invocation, *returns* the desired transform. The first argument (or `source` +keyword argument, if there are no positional arguments) +is interpreted as the Python code. For example + +``` +- type: PyTransform + config: + constructor: __constructor__ + kwargs: + source: | + import apache_beam as beam + + def create_my_transform(inc): + return beam.Map(lambda x: beam.Row(a=x.col2 + inc)) + + inc: 10 +``` + +will apply `beam.Map(lambda x: beam.Row(a=x.col2 + 10))` to the incoming +PCollection. + +As a class object can be invoked as its own constructor, this allows one to +define a `beam.PTransform` inline, e.g. + +``` +- type: PyTransform + config: + constructor: __constructor__ + kwargs: + source: | + class MyPTransform(beam.PTransform): + def __init__(self, inc): + self._inc = inc + def expand(self, pcoll): + return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + self._inc)) + + inc: 10 +``` + +which works exactly as one would expect. + + +## Defining a transform inline using `__callable__` + +The `__callable__` keyword works similarly, but instead of defining a +callable that returns an applicable `PTransform` one simply defines the +expansion to be performed as a callable. This is analogous to BeamPython's +`ptransform.ptransform_fn` decorator. + +In this case one can simply write + +``` +- type: PyTransform + config: + constructor: __callable__ + kwargs: + source: | + def my_ptransform(pcoll, inc): + return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + inc)) + + inc: 10 +``` + + +# External transforms + +One can also invoke PTransforms define elsewhere via a `python` provider, +for example + +``` +pipeline: + transforms: + - ... + - type: MyTransform + config: + kwarg: whatever + +providers: + - ... + - type: python + input: ... + config: + packages: + - 'some_pypi_package>=version' + transforms: + MyTransform: 'pkg.module.MyTransform' +``` + +These can be defined inline as well, with or without dependencies, e.g. + +``` +pipeline: + transforms: + - ... + - type: ToCase + input: ... + config: + upper: True + +providers: + - type: python + config: {} + transforms: + 'ToCase': | + @beam.ptransform_fn + def ToCase(pcoll, upper): + if upper: + return pcoll | beam.Map(lambda x: str(x).upper()) + else: + return pcoll | beam.Map(lambda x: str(x).lower()) +``` diff --git a/sdks/python/apache_beam/yaml/readme_test.py b/sdks/python/apache_beam/yaml/readme_test.py index 8ec9b22cc92f..da00b01f3ab3 100644 --- a/sdks/python/apache_beam/yaml/readme_test.py +++ b/sdks/python/apache_beam/yaml/readme_test.py @@ -185,7 +185,10 @@ def replace_recursive(spec, transform_type, arg_name, arg_value): def create_test_method(test_type, test_name, test_yaml): - test_yaml = test_yaml.replace('pkg.module.fn', 'str') + test_yaml = test_yaml.replace( + 'pkg.module.', 'apache_beam.yaml.readme_test._Fakes.') + test_yaml = test_yaml.replace( + 'apache_beam.pkg.module.', 'apache_beam.yaml.readme_test._Fakes.') def test(self): with TestEnvironment() as env: @@ -265,6 +268,17 @@ def createTestSuite(name, path): return type(name, (unittest.TestCase, ), dict(parse_test_methods(readme))) +class _Fakes: + fn = str + + class SomeTransform(beam.PTransform): + def __init__(*args, **kwargs): + pass + + def expand(self, pcoll): + return pcoll + + ReadMeTest = createTestSuite( 'ReadMeTest', os.path.join(os.path.dirname(__file__), 'README.md')) @@ -275,6 +289,10 @@ def createTestSuite(name, path): CombineTest = createTestSuite( 'CombineTest', os.path.join(os.path.dirname(__file__), 'yaml_combine.md')) +InlinePythonTest = createTestSuite( + 'InlinePythonTest', + os.path.join(os.path.dirname(__file__), 'inline_python.md')) + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--render_dir', default=None) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index a9cd504f08cd..fdfcf69ba663 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -205,7 +205,7 @@ def provider_from_spec(cls, spec): raise ValueError( f'Missing {required} in provider ' f'at line {SafeLineLoader.get_line(spec)}') - urns = spec['transforms'] + urns = SafeLineLoader.strip_metadata(spec['transforms']) type = spec['type'] config = SafeLineLoader.strip_metadata(spec.get('config', {})) extra_params = set(SafeLineLoader.strip_metadata(spec).keys()) - set( @@ -329,8 +329,7 @@ def python(urns, packages=()): else: return InlineProvider({ name: - python_callable.PythonCallableWithSource.load_from_fully_qualified_name( - constructor) + python_callable.PythonCallableWithSource.load_from_source(constructor) for (name, constructor) in urns.items() }) @@ -348,6 +347,10 @@ def cache_artifacts(self): def create_external_transform(self, urn, args): # Python transforms are "registered" by fully qualified name. + if not re.match(r'^[\w.]*$', urn): + # Treat it as source. + args = {'source': urn, **args} + urn = '__constructor__' return external.ExternalTransform( "beam:transforms:python:fully_qualified_named", external.ImplicitSchemaPayloadBuilder({