Skip to content

Commit

Permalink
Merge pull request #29958 [YAML] Better support for inline PyTransforms.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jan 12, 2024
2 parents 812684f + 0fb42e5 commit cfec512
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from apache_beam.typehints.native_type_compatibility import convert_to_typing_type
from apache_beam.typehints.schemas import named_fields_to_schema
from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.utils import python_callable

PYTHON_FULLY_QUALIFIED_NAMED_TRANSFORM_URN = (
'beam:transforms:python:fully_qualified_named')
Expand Down Expand Up @@ -63,6 +64,8 @@ def expand(self, pinput):
args = self._args
kwargs = dict(self._kwargs)
source = kwargs.pop('source')
if isinstance(source, str):
source = python_callable.PythonCallableWithSource(source)

if self._constructor == '__constructor__':
transform = source(*args, **kwargs)
Expand Down Expand Up @@ -127,7 +130,15 @@ def to_runner_api_parameter(self, unused_context):
@staticmethod
def from_runner_api_parameter(unused_ptransform, payload, unused_context):
row = coders.RowCoder(payload.schema).decode(payload.payload)
maybe_as_dict = lambda x: x._asdict() if x else {}

def maybe_as_dict(x):
if isinstance(x, dict):
return x
elif x:
return x._asdict()
else:
return {}

return FullyQualifiedNamedTransform(
row.constructor,
tuple(getattr(row, 'args', ())),
Expand Down
180 changes: 180 additions & 0 deletions sdks/python/apache_beam/yaml/inline_python.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Using PyTransform form YAML

Beam YAML provides the ability to easily invoke Python transforms via the
`PyTransform` type, simply referencing them by fully qualified name.
For example,

```
- type: PyTransform
config:
constructor: apache_beam.pkg.module.SomeTransform
args: [1, 'foo']
kwargs:
baz: 3
```

will invoke the transform `apache_beam.pkg.mod.SomeTransform(1, 'foo', baz=3)`.
This fully qualified name can be any PTransform class or other callable that
returns a PTransform. Note, however, that PTransforms that do not accept or
return schema'd data may not be as useable to use from YAML.
Restoring the schema-ness after a non-schema returning transform can be done
by using the `callable` option on `MapToFields` which takes the entire element
as an input, e.g.

```
- type: PyTransform
config:
constructor: apache_beam.pkg.module.SomeTransform
args: [1, 'foo']
kwargs:
baz: 3
- type: MapToFields
config:
language: python
fields:
col1:
callable: 'lambda element: element.col1'
output_type: string
col2:
callable: 'lambda element: element.col2'
output_type: integer
```


## Defining a transform inline using `__constructor__`

If the desired transform does not exist, one can define it inline as well.
This is done with the special `__constructor__` keywords,
similar to how cross-language transforms are done.

With the `__constuctor__` keyword, one defines a Python callable that, on
invocation, *returns* the desired transform. The first argument (or `source`
keyword argument, if there are no positional arguments)
is interpreted as the Python code. For example

```
- type: PyTransform
config:
constructor: __constructor__
kwargs:
source: |
import apache_beam as beam
def create_my_transform(inc):
return beam.Map(lambda x: beam.Row(a=x.col2 + inc))
inc: 10
```

will apply `beam.Map(lambda x: beam.Row(a=x.col2 + 10))` to the incoming
PCollection.

As a class object can be invoked as its own constructor, this allows one to
define a `beam.PTransform` inline, e.g.

```
- type: PyTransform
config:
constructor: __constructor__
kwargs:
source: |
class MyPTransform(beam.PTransform):
def __init__(self, inc):
self._inc = inc
def expand(self, pcoll):
return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + self._inc))
inc: 10
```

which works exactly as one would expect.


## Defining a transform inline using `__callable__`

The `__callable__` keyword works similarly, but instead of defining a
callable that returns an applicable `PTransform` one simply defines the
expansion to be performed as a callable. This is analogous to BeamPython's
`ptransform.ptransform_fn` decorator.

In this case one can simply write

```
- type: PyTransform
config:
constructor: __callable__
kwargs:
source: |
def my_ptransform(pcoll, inc):
return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + inc))
inc: 10
```


# External transforms

One can also invoke PTransforms define elsewhere via a `python` provider,
for example

```
pipeline:
transforms:
- ...
- type: MyTransform
config:
kwarg: whatever
providers:
- ...
- type: python
input: ...
config:
packages:
- 'some_pypi_package>=version'
transforms:
MyTransform: 'pkg.module.MyTransform'
```

These can be defined inline as well, with or without dependencies, e.g.

```
pipeline:
transforms:
- ...
- type: ToCase
input: ...
config:
upper: True
providers:
- type: python
config: {}
transforms:
'ToCase': |
@beam.ptransform_fn
def ToCase(pcoll, upper):
if upper:
return pcoll | beam.Map(lambda x: str(x).upper())
else:
return pcoll | beam.Map(lambda x: str(x).lower())
```
20 changes: 19 additions & 1 deletion sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ def replace_recursive(spec, transform_type, arg_name, arg_value):


def create_test_method(test_type, test_name, test_yaml):
test_yaml = test_yaml.replace('pkg.module.fn', 'str')
test_yaml = test_yaml.replace(
'pkg.module.', 'apache_beam.yaml.readme_test._Fakes.')
test_yaml = test_yaml.replace(
'apache_beam.pkg.module.', 'apache_beam.yaml.readme_test._Fakes.')

def test(self):
with TestEnvironment() as env:
Expand Down Expand Up @@ -265,6 +268,17 @@ def createTestSuite(name, path):
return type(name, (unittest.TestCase, ), dict(parse_test_methods(readme)))


class _Fakes:
fn = str

class SomeTransform(beam.PTransform):
def __init__(*args, **kwargs):
pass

def expand(self, pcoll):
return pcoll


ReadMeTest = createTestSuite(
'ReadMeTest', os.path.join(os.path.dirname(__file__), 'README.md'))

Expand All @@ -275,6 +289,10 @@ def createTestSuite(name, path):
CombineTest = createTestSuite(
'CombineTest', os.path.join(os.path.dirname(__file__), 'yaml_combine.md'))

InlinePythonTest = createTestSuite(
'InlinePythonTest',
os.path.join(os.path.dirname(__file__), 'inline_python.md'))

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--render_dir', default=None)
Expand Down
61 changes: 39 additions & 22 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import logging
import os
import re
import shutil
import subprocess
import sys
import urllib.parse
Expand Down Expand Up @@ -204,7 +205,7 @@ def provider_from_spec(cls, spec):
raise ValueError(
f'Missing {required} in provider '
f'at line {SafeLineLoader.get_line(spec)}')
urns = spec['transforms']
urns = SafeLineLoader.strip_metadata(spec['transforms'])
type = spec['type']
config = SafeLineLoader.strip_metadata(spec.get('config', {}))
extra_params = set(SafeLineLoader.strip_metadata(spec).keys()) - set(
Expand Down Expand Up @@ -328,8 +329,7 @@ def python(urns, packages=()):
else:
return InlineProvider({
name:
python_callable.PythonCallableWithSource.load_from_fully_qualified_name(
constructor)
python_callable.PythonCallableWithSource.load_from_source(constructor)
for (name, constructor) in urns.items()
})

Expand All @@ -347,6 +347,10 @@ def cache_artifacts(self):

def create_external_transform(self, urn, args):
# Python transforms are "registered" by fully qualified name.
if not re.match(r'^[\w.]*$', urn):
# Treat it as source.
args = {'source': urn, **args}
urn = '__constructor__'
return external.ExternalTransform(
"beam:transforms:python:fully_qualified_named",
external.ImplicitSchemaPayloadBuilder({
Expand Down Expand Up @@ -730,36 +734,49 @@ def _path(cls, base_python, packages):
def _create_venv_from_scratch(cls, base_python, packages):
venv = cls._path(base_python, packages)
if not os.path.exists(venv):
subprocess.run([base_python, '-m', 'venv', venv], check=True)
venv_python = os.path.join(venv, 'bin', 'python')
subprocess.run([venv_python, '-m', 'ensurepip'], check=True)
subprocess.run([venv_python, '-m', 'pip', 'install'] + packages,
check=True)
with open(venv + '-requirements.txt', 'w') as fout:
fout.write('\n'.join(packages))
try:
subprocess.run([base_python, '-m', 'venv', venv], check=True)
venv_python = os.path.join(venv, 'bin', 'python')
venv_pip = os.path.join(venv, 'bin', 'pip')
subprocess.run([venv_python, '-m', 'ensurepip'], check=True)
subprocess.run([venv_pip, 'install'] + packages, check=True)
with open(venv + '-requirements.txt', 'w') as fout:
fout.write('\n'.join(packages))
except: # pylint: disable=bare-except
if os.path.exists(venv):
shutil.rmtree(venv, ignore_errors=True)
raise
return venv

@classmethod
def _create_venv_from_clone(cls, base_python, packages):
venv = cls._path(base_python, packages)
if not os.path.exists(venv):
clonable_venv = cls._create_venv_to_clone(base_python)
clonable_python = os.path.join(clonable_venv, 'bin', 'python')
subprocess.run(
[clonable_python, '-m', 'clonevirtualenv', clonable_venv, venv],
check=True)
venv_binary = os.path.join(venv, 'bin', 'python')
subprocess.run([venv_binary, '-m', 'pip', 'install'] + packages,
check=True)
with open(venv + '-requirements.txt', 'w') as fout:
fout.write('\n'.join(packages))
try:
clonable_venv = cls._create_venv_to_clone(base_python)
clonable_python = os.path.join(clonable_venv, 'bin', 'python')
subprocess.run(
[clonable_python, '-m', 'clonevirtualenv', clonable_venv, venv],
check=True)
venv_pip = os.path.join(venv, 'bin', 'pip')
subprocess.run([venv_pip, 'install'] + packages, check=True)
with open(venv + '-requirements.txt', 'w') as fout:
fout.write('\n'.join(packages))
except: # pylint: disable=bare-except
if os.path.exists(venv):
shutil.rmtree(venv, ignore_errors=True)
raise
return venv

@classmethod
def _create_venv_to_clone(cls, base_python):
if '.dev' in beam_version:
base_venv = os.path.dirname(os.path.dirname(base_python))
print('Cloning dev environment from', base_venv)
return cls._create_venv_from_scratch(
base_python, [
'apache_beam[dataframe,gcp,test]==' + beam_version,
base_python,
[
'apache_beam[dataframe,gcp,test,yaml]==' + beam_version,
'virtualenv-clone'
])

Expand Down
1 change: 1 addition & 0 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ def get_portability_package_data():
'yaml': [
'docstring-parser>=0.15,<1.0',
'pyyaml>=3.12,<7.0.0',
'virtualenv-clone>=0.5,<1.0',
] + dataframe_dependency
},
zip_safe=False,
Expand Down

0 comments on commit cfec512

Please sign in to comment.