Skip to content

Commit

Permalink
Add identity function as default to FlatMap (#30744)
Browse files Browse the repository at this point in the history
* Add identity function as default to FlatMap

* Update sdks/python/apache_beam/transforms/core_test.py

Co-authored-by: tvalentyn <[email protected]>

* update docstring

* yapf

* Add flatmap with no function example

* yapf

* Update flatmp_nofunction metadata

* Update website/www/site/content/en/documentation/transforms/python/elementwise/flatmap.md

Co-authored-by: tvalentyn <[email protected]>

* Update sdks/python/apache_beam/examples/snippets/transforms/elementwise/flatmap_nofunction.py

Co-authored-by: tvalentyn <[email protected]>

* isort

---------

Co-authored-by: tvalentyn <[email protected]>
  • Loading branch information
hjtran and tvalentyn authored Apr 5, 2024
1 parent 4452a6c commit 3c9e9c8
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint:disable=line-too-long

# beam-playground:
# name: FlatMapNoFunction
# description: Demonstration of FlatMap transform usage without a function.
# multifile: false
# default_example: false
# context_line: 44
# categories:
# - Core Transforms
# complexity: BASIC
# tags:
# - transforms
# - strings
# - map


def flatmap_nofunction(test=None):
# [START flatmap_nofunction]
import apache_beam as beam

with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create(
[['🍓Strawberry', '🥕Carrot', '🍆Eggplant'], ['🍅Tomato', '🥔Potato']])
| 'Split words' >> beam.FlatMap()
| beam.Map(print))
# [END flatmap_nofunction]
if test:
test(plants)


if __name__ == '__main__':
flatmap_nofunction()
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from . import flatmap_generator
from . import flatmap_lambda
from . import flatmap_multiple_arguments
from . import flatmap_nofunction
from . import flatmap_side_inputs_dict
from . import flatmap_side_inputs_iter
from . import flatmap_side_inputs_singleton
Expand Down Expand Up @@ -65,6 +66,9 @@ def check_valid_plants(actual):
@mock.patch(
'apache_beam.examples.snippets.transforms.elementwise.flatmap_function.print',
str)
@mock.patch(
'apache_beam.examples.snippets.transforms.elementwise.flatmap_nofunction.print',
str)
@mock.patch(
'apache_beam.examples.snippets.transforms.elementwise.flatmap_lambda.print',
str)
Expand Down Expand Up @@ -93,6 +97,9 @@ def test_flatmap_simple(self):
def test_flatmap_function(self):
flatmap_function.flatmap_function(check_plants)

def test_flatmap_nofunction(self):
flatmap_nofunction.flatmap_nofunction(check_plants)

def test_flatmap_lambda(self):
flatmap_lambda.flatmap_lambda(check_plants)

Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1899,13 +1899,19 @@ def to_runner_api(self, unused_context):
return beam_runner_api_pb2.FunctionSpec(urn=self._urn)


def FlatMap(fn, *args, **kwargs): # pylint: disable=invalid-name
def identity(x: T) -> T:
return x


def FlatMap(fn=identity, *args, **kwargs): # pylint: disable=invalid-name
""":func:`FlatMap` is like :class:`ParDo` except it takes a callable to
specify the transformation.
The callable must return an iterable for each element of the input
:class:`~apache_beam.pvalue.PCollection`. The elements of these iterables will
be flattened into the output :class:`~apache_beam.pvalue.PCollection`.
be flattened into the output :class:`~apache_beam.pvalue.PCollection`. If
no callable is given, then all elements of the input PCollection must already
be iterables themselves and will be flattened into the output PCollection.
Args:
fn (callable): a callable object.
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/transforms/core_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ def test_flatten_mismatched_windows(self):
_ = (source1, source2, source3) | "flatten" >> beam.Flatten()


class FlatMapTest(unittest.TestCase):
def test_default(self):

with beam.Pipeline() as pipeline:
letters = (
pipeline
| beam.Create(['abc', 'def'], reshuffle=False)
| beam.FlatMap())
assert_that(letters, equal_to(['a', 'b', 'c', 'd', 'e', 'f']))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ We define a function `split_words` which splits an input `str` element using the
{{< playground_snippet language="py" path="SDK_PYTHON_FlatMapFunction" show="flatmap_function" >}}
{{< /playground >}}

### Example 3: FlatMap with a lambda function
### Example 3: FlatMap without a function

A common use case of `FlatMap` is to flatten a `PCollection` of iterables into a `PCollection` of elements. To do that, don't specify the function argument to `FlatMap`, which uses the identity mapping function.

{{< playground height="700px" >}}
{{< playground_snippet language="py" path="SDK_PYTHON_FlatMapNoFunction" show="flatmap_no_function" >}}
{{< /playground >}}

### Example 4: FlatMap with a lambda function

For this example, we want to flatten a `PCollection` of lists of `str`s into a `PCollection` of `str`s.
Each input element is already an `iterable`, where each element is what we want in the resulting `PCollection`.
Expand All @@ -59,7 +67,7 @@ We use a lambda function that returns the same input element it received.
{{< playground_snippet language="py" path="SDK_PYTHON_FlatMapLambda" show="flatmap_lambda" >}}
{{< /playground >}}

### Example 4: FlatMap with a generator
### Example 5: FlatMap with a generator

For this example, we want to flatten a `PCollection` of lists of `str`s into a `PCollection` of `str`s.
We use a generator to iterate over the input list and yield each of the elements.
Expand All @@ -69,7 +77,7 @@ Each yielded result in the generator is an element in the resulting `PCollection
{{< playground_snippet language="py" path="SDK_PYTHON_FlatMapGenerator" show="flatmap_generator" >}}
{{< /playground >}}

### Example 5: FlatMapTuple for key-value pairs
### Example 6: FlatMapTuple for key-value pairs

If your `PCollection` consists of `(key, value)` pairs,
you can use `FlatMapTuple` to unpack them into different function arguments.
Expand All @@ -78,7 +86,7 @@ you can use `FlatMapTuple` to unpack them into different function arguments.
{{< playground_snippet language="py" path="SDK_PYTHON_FlatMapTuple" show="flatmap_tuple" >}}
{{< /playground >}}

### Example 6: FlatMap with multiple arguments
### Example 7: FlatMap with multiple arguments

You can pass functions with multiple arguments to `FlatMap`.
They are passed as additional positional arguments or keyword arguments to the function.
Expand All @@ -89,7 +97,7 @@ In this example, `split_words` takes `text` and `delimiter` as arguments.
{{< playground_snippet language="py" path="SDK_PYTHON_FlatMapMultipleArguments" show="flatmap_multiple_arguments" >}}
{{< /playground >}}

### Example 7: FlatMap with side inputs as singletons
### Example 8: FlatMap with side inputs as singletons

If the `PCollection` has a single value, such as the average from another computation,
passing the `PCollection` as a *singleton* accesses that value.
Expand All @@ -101,7 +109,7 @@ We then use that value as the delimiter for the `str.split` method.
{{< playground_snippet language="py" path="SDK_PYTHON_FlatMapSideInputSingleton" show="flatmap_side_inputs_singleton" >}}
{{< /playground >}}

### Example 8: FlatMap with side inputs as iterators
### Example 9: FlatMap with side inputs as iterators

If the `PCollection` has multiple values, pass the `PCollection` as an *iterator*.
This accesses elements lazily as they are needed,
Expand All @@ -114,7 +122,7 @@ so it is possible to iterate over large `PCollection`s that won't fit into memor
> **Note**: You can pass the `PCollection` as a *list* with `beam.pvalue.AsList(pcollection)`,
> but this requires that all the elements fit into memory.
### Example 9: FlatMap with side inputs as dictionaries
### Example 10: FlatMap with side inputs as dictionaries

If a `PCollection` is small enough to fit into memory, then that `PCollection` can be passed as a *dictionary*.
Each element must be a `(key, value)` pair.
Expand Down

0 comments on commit 3c9e9c8

Please sign in to comment.