From 71d97b6896a7de7fdbb48f0f7835081e411964fc Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 17 Sep 2024 15:11:09 -0700 Subject: [PATCH] Add note about FlattenWith to the documentation. --- .../apache_beam/examples/snippets/snippets.py | 33 +++++++++++++++++++ .../examples/snippets/snippets_test.py | 6 ++++ .../en/documentation/programming-guide.md | 27 ++++++++++++++- 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 715011d302d2..2636f7d2637d 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1143,6 +1143,39 @@ def model_multiple_pcollections_flatten(contents, output_path): merged | beam.io.WriteToText(output_path) +def model_multiple_pcollections_flatten_with(contents, output_path): + """Merging a PCollection with FlattenWith.""" + some_hash_fn = lambda s: ord(s[0]) + partition_fn = lambda element, partitions: some_hash_fn(element) % partitions + import apache_beam as beam + with TestPipeline() as pipeline: # Use TestPipeline for testing. + + # Partition into deciles + partitioned = pipeline | beam.Create(contents) | beam.Partition( + partition_fn, 3) + pcoll1 = partitioned[0] + pcoll2 = partitioned[1] + pcoll3 = partitioned[2] + SomeTransform = lambda: beam.Map(lambda x: x) + SomeOtherTransform = lambda: beam.Map(lambda x: x) + + # Flatten them back into 1 + + # A collection of PCollection objects can be represented simply + # as a tuple (or list) of PCollections. + # (The SDK for Python has no separate type to store multiple + # PCollection objects, whether containing the same or different + # types.) + # [START model_multiple_pcollections_flatten_with] + merged = ( + pcoll1 + | SomeTransform() + | beam.FlattenWith(pcoll2, pcoll3) + | SomeOtherTransform()) + # [END model_multiple_pcollections_flatten_with] + merged | beam.io.WriteToText(output_path) + + def model_multiple_pcollections_partition(contents, output_path): """Splitting a PCollection with Partition.""" some_hash_fn = lambda s: ord(s[0]) diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index e8cb8960cf4d..0560e9710f03 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -917,6 +917,12 @@ def test_model_multiple_pcollections_flatten(self): snippets.model_multiple_pcollections_flatten(contents, result_path) self.assertEqual(contents, self.get_output(result_path)) + def test_model_multiple_pcollections_flatten_with(self): + contents = ['a', 'b', 'c', 'd', 'e', 'f'] + result_path = self.create_temp_file() + snippets.model_multiple_pcollections_flatten_with(contents, result_path) + self.assertEqual(contents, self.get_output(result_path)) + def test_model_multiple_pcollections_partition(self): contents = [17, 42, 64, 32, 0, 99, 53, 89] result_path = self.create_temp_file() diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index c716c7554db4..cdf82d566a4f 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -2024,7 +2024,7 @@ playerAccuracies := ... // PCollection #### 4.2.5. Flatten {#flatten} [`Flatten`](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/index.html?org/apache/beam/sdk/transforms/Flatten.html) -[`Flatten`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py) +[`Flatten`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.Flatten) [`Flatten`](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/flatten.go) `Flatten` is a Beam transform for `PCollection` objects that store the same data type. @@ -2045,6 +2045,22 @@ PCollectionList collections = PCollectionList.of(pc1).and(pc2).and(pc3); PCollection merged = collections.apply(Flatten.pCollections()); {{< /highlight >}} +{{< paragraph class="language-java" >}} +One can also use the [`FlattenWith`](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/index.html?org/apache/beam/sdk/transforms/Flatten.html) +transform to merge PCollections into an output PCollection in a manner more compatible with chaining. +{{< /paragraph >}} + +{{< highlight java >}} +PCollection merged = pc1 + .apply(...) + // Merges the elements of pc2 in at this point... + .apply(FlattenWith.of(pc2)) + .apply(...) + // and the elements of pc3 at this point. + .apply(FlattenWith.of(pc3)) + .apply(...); +{{< /highlight >}} + {{< highlight py >}} # Flatten takes a tuple of PCollection objects. @@ -2052,6 +2068,15 @@ PCollection merged = collections.apply(Flatten.pCollections()); {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_multiple_pcollections_flatten >}} {{< /highlight >}} +{{< paragraph class="language-py" >}} +One can also use the [`FlattenWith`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.FlattenWith) +transform to merge PCollections into an output PCollection in a manner more compatible with chaining. +{{< /paragraph >}} + +{{< highlight py >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_multiple_pcollections_flatten_with >}} +{{< /highlight >}} + {{< highlight go >}} // Flatten accepts any number of PCollections of the same element type. // Returns a single PCollection that contains all of the elements in input PCollections.