Skip to content

Commit

Permalink
Merge pull request #32445 Add various utility meta-transforms to Beam.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Oct 18, 2024
2 parents 3786c4a + 621cdfb commit 3b839d1
Show file tree
Hide file tree
Showing 13 changed files with 490 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
* Added support for read with metadata in MqttIO (Java) ([#32195](https://github.com/apache/beam/issues/32195))
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Added support for processing events which use a global sequence to "ordered" extension (Java) [#32540](https://github.com/apache/beam/pull/32540)
* Add new meta-transform FlattenWith and Tee that allow one to introduce branching
without breaking the linear/chaining style of pipeline construction.

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
Expand Down Expand Up @@ -82,6 +83,81 @@ public static <T> Iterables<T> iterables() {
return new Iterables<>();
}

/**
* Returns a {@link PTransform} that flattens the input {@link PCollection} with a given {@link
* PCollection} resulting in a {@link PCollection} containing all the elements of both {@link
* PCollection}s as its output.
*
* <p>This is equivalent to creating a {@link PCollectionList} containing both the input and
* {@code other} and then applying {@link #pCollections()}, but has the advantage that it can be
* more easily used inline.
*
* <p>Both {@cpde PCollections} must have equal {@link WindowFn}s. The output elements of {@code
* Flatten<T>} are in the same windows and have the same timestamps as their corresponding input
* elements. The output {@code PCollection} will have the same {@link WindowFn} as both inputs.
*
* @param other the other PCollection to flatten with the input
* @param <T> the type of the elements in the input and output {@code PCollection}s.
*/
public static <T> PTransform<PCollection<T>, PCollection<T>> with(PCollection<T> other) {
return new FlattenWithPCollection<>(other);
}

/** Implementation of {@link #with(PCollection)}. */
private static class FlattenWithPCollection<T>
extends PTransform<PCollection<T>, PCollection<T>> {
// We only need to access this at pipeline construction time.
private final transient PCollection<T> other;

public FlattenWithPCollection(PCollection<T> other) {
this.other = other;
}

@Override
public PCollection<T> expand(PCollection<T> input) {
return PCollectionList.of(input).and(other).apply(pCollections());
}

@Override
public String getKindString() {
return "Flatten.With";
}
}

/**
* Returns a {@link PTransform} that flattens the input {@link PCollection} with the output of
* another {@link PTransform} resulting in a {@link PCollection} containing all the elements of
* both the input {@link PCollection}s and the output of the given {@link PTransform} as its
* output.
*
* <p>This is equivalent to creating a {@link PCollectionList} containing both the input and the
* output of {@code other} and then applying {@link #pCollections()}, but has the advantage that
* it can be more easily used inline.
*
* <p>Both {@code PCollections} must have equal {@link WindowFn}s. The output elements of {@code
* Flatten<T>} are in the same windows and have the same timestamps as their corresponding input
* elements. The output {@code PCollection} will have the same {@link WindowFn} as both inputs.
*
* @param <T> the type of the elements in the input and output {@code PCollection}s.
* @param other a PTransform whose ouptput should be flattened with the input
*/
public static <T> PTransform<PCollection<T>, PCollection<T>> with(
PTransform<PBegin, PCollection<T>> other) {
return new PTransform<PCollection<T>, PCollection<T>>() {
@Override
public PCollection<T> expand(PCollection<T> input) {
return PCollectionList.of(input)
.and(input.getPipeline().apply(other))
.apply(pCollections());
}

@Override
public String getKindString() {
return "Flatten.With";
}
};
}

/**
* A {@link PTransform} that flattens a {@link PCollectionList} into a {@link PCollection}
* containing all the elements of all the {@link PCollection}s in its input. Implements {@link
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;

import java.util.function.Consumer;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;

/**
* A PTransform that returns its input, but also applies its input to an auxiliary PTransform, akin
* to the shell {@code tee} command, which is named after the T-splitter used in plumbing.
*
* <p>This can be useful to write out or otherwise process an intermediate transform without
* breaking the linear flow of a chain of transforms, e.g.
*
* <pre><code>
* {@literal PCollection<T>} input = ... ;
* {@literal PCollection<T>} result =
* {@literal input.apply(...)}
* ...
* {@literal input.apply(Tee.of(someSideTransform)}
* ...
* {@literal input.apply(...)};
* </code></pre>
*
* @param <T> the element type of the input PCollection
*/
public class Tee<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final PTransform<PCollection<T>, ?> consumer;

/**
* Returns a new Tee PTransform that will apply an auxilary transform to the input as well as pass
* it on.
*
* @param consumer An additional PTransform that should process the input PCollection. Its output
* will be ignored.
* @param <T> the type of the elements in the input {@code PCollection}.
*/
public static <T> Tee<T> of(PTransform<PCollection<T>, ?> consumer) {
return new Tee<>(consumer);
}

/**
* Returns a new Tee PTransform that will apply an auxilary transform to the input as well as pass
* it on.
*
* @param consumer An arbitrary {@link Consumer} that will be wrapped in a PTransform and applied
* to the input. Its output will be ignored.
* @param <T> the type of the elements in the input {@code PCollection}.
*/
public static <T> Tee<T> of(Consumer<PCollection<T>> consumer) {
return of(
new PTransform<PCollection<T>, PCollectionTuple>() {
@Override
public PCollectionTuple expand(PCollection<T> input) {
consumer.accept(input);
return PCollectionTuple.empty(input.getPipeline());
}
});
}

private Tee(PTransform<PCollection<T>, ?> consumer) {
this.consumer = consumer;
}

@Override
public PCollection<T> expand(PCollection<T> input) {
input.apply(consumer);
return input;
}

@Override
protected String getKindString() {
return "Tee(" + consumer.getName() + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,32 @@ public void testFlattenWithDifferentInputAndOutputCoders2() {

/////////////////////////////////////////////////////////////////////////////

@Test
@Category(NeedsRunner.class)
public void testFlattenWithPCollection() {
PCollection<String> output =
p.apply(Create.of(LINES))
.apply("FlattenWithLines1", Flatten.with(p.apply("Create1", Create.of(LINES))))
.apply("FlattenWithLines2", Flatten.with(p.apply("Create2", Create.of(LINES2))));

PAssert.that(output).containsInAnyOrder(flattenLists(Arrays.asList(LINES, LINES2, LINES)));
p.run();
}

@Test
@Category(NeedsRunner.class)
public void testFlattenWithPTransform() {
PCollection<String> output =
p.apply(Create.of(LINES))
.apply("Create1", Flatten.with(Create.of(LINES)))
.apply("Create2", Flatten.with(Create.of(LINES2)));

PAssert.that(output).containsInAnyOrder(flattenLists(Arrays.asList(LINES, LINES2, LINES)));
p.run();
}

/////////////////////////////////////////////////////////////////////////////

@Test
@Category(NeedsRunner.class)
public void testEqualWindowFnPropagation() {
Expand Down Expand Up @@ -470,6 +496,7 @@ public void testIncompatibleWindowFnPropagationFailure() {
public void testFlattenGetName() {
Assert.assertEquals("Flatten.Iterables", Flatten.<String>iterables().getName());
Assert.assertEquals("Flatten.PCollections", Flatten.<String>pCollections().getName());
Assert.assertEquals("Flatten.With", Flatten.<String>with((PCollection<String>) null).getName());
}

/////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimaps;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for Tee. */
@RunWith(JUnit4.class)
public class TeeTest {

@Rule public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testTee() {
List<String> elements = Arrays.asList("a", "b", "c");
CollectToMemory<String> collector = new CollectToMemory<>();
PCollection<String> output = p.apply(Create.of(elements)).apply(Tee.of(collector));

PAssert.that(output).containsInAnyOrder(elements);
p.run().waitUntilFinish();

// Here we assert that this "sink" had the correct side effects.
assertThat(collector.get(), containsInAnyOrder(elements.toArray(new String[3])));
}

private static class CollectToMemory<T> extends PTransform<PCollection<T>, PCollection<Void>> {

private static final Multimap<UUID, Object> ALL_ELEMENTS =
Multimaps.synchronizedMultimap(HashMultimap.<UUID, Object>create());

UUID uuid = UUID.randomUUID();

@Override
public PCollection<Void> expand(PCollection<T> input) {
return input.apply(
ParDo.of(
new DoFn<T, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
ALL_ELEMENTS.put(uuid, c.element());
}
}));
}

@SuppressWarnings("unchecked")
public Collection<T> get() {
return (Collection<T>) ALL_ELEMENTS.get(uuid);
}
}
}
54 changes: 54 additions & 0 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,60 @@ def model_multiple_pcollections_flatten(contents, output_path):
merged | beam.io.WriteToText(output_path)


def model_multiple_pcollections_flatten_with(contents, output_path):
"""Merging a PCollection with FlattenWith."""
some_hash_fn = lambda s: ord(s[0])
partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
import apache_beam as beam
with TestPipeline() as pipeline: # Use TestPipeline for testing.

# Partition into deciles
partitioned = pipeline | beam.Create(contents) | beam.Partition(
partition_fn, 3)
pcoll1 = partitioned[0]
pcoll2 = partitioned[1]
pcoll3 = partitioned[2]
SomeTransform = lambda: beam.Map(lambda x: x)
SomeOtherTransform = lambda: beam.Map(lambda x: x)

# Flatten them back into 1

# A collection of PCollection objects can be represented simply
# as a tuple (or list) of PCollections.
# (The SDK for Python has no separate type to store multiple
# PCollection objects, whether containing the same or different
# types.)
# [START model_multiple_pcollections_flatten_with]
merged = (
pcoll1
| SomeTransform()
| beam.FlattenWith(pcoll2, pcoll3)
| SomeOtherTransform())
# [END model_multiple_pcollections_flatten_with]
merged | beam.io.WriteToText(output_path)


def model_multiple_pcollections_flatten_with_transform(contents, output_path):
"""Merging output of PTransform with FlattenWith."""
some_hash_fn = lambda s: ord(s[0])
partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
import apache_beam as beam
with TestPipeline() as pipeline: # Use TestPipeline for testing.

pcoll = pipeline | beam.Create(contents)
SomeTransform = lambda: beam.Map(lambda x: x)
SomeOtherTransform = lambda: beam.Map(lambda x: x)

# [START model_multiple_pcollections_flatten_with_transform]
merged = (
pcoll
| SomeTransform()
| beam.FlattenWith(beam.Create(['x', 'y', 'z']))
| SomeOtherTransform())
# [END model_multiple_pcollections_flatten_with_transform]
merged | beam.io.WriteToText(output_path)


def model_multiple_pcollections_partition(contents, output_path):
"""Splitting a PCollection with Partition."""
some_hash_fn = lambda s: ord(s[0])
Expand Down
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/examples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,19 @@ def test_model_multiple_pcollections_flatten(self):
snippets.model_multiple_pcollections_flatten(contents, result_path)
self.assertEqual(contents, self.get_output(result_path))

def test_model_multiple_pcollections_flatten_with(self):
contents = ['a', 'b', 'c', 'd', 'e', 'f']
result_path = self.create_temp_file()
snippets.model_multiple_pcollections_flatten_with(contents, result_path)
self.assertEqual(contents, self.get_output(result_path))

def test_model_multiple_pcollections_flatten_with_transform(self):
contents = ['a', 'b', 'c', 'd', 'e', 'f']
result_path = self.create_temp_file()
snippets.model_multiple_pcollections_flatten_with_transform(
contents, result_path)
self.assertEqual(contents + ['x', 'y', 'z'], self.get_output(result_path))

def test_model_multiple_pcollections_partition(self):
contents = [17, 42, 64, 32, 0, 99, 53, 89]
result_path = self.create_temp_file()
Expand Down
Loading

0 comments on commit 3b839d1

Please sign in to comment.