Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumb Redistribute "allow duplicates" property to Dataflow #31490

Merged
merged 38 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e090217
Add new DataflowGroupByKey transform
celeste-zeng Jun 4, 2024
379a8e9
Add DataflowGroupByKeyTranslation
celeste-zeng Jun 4, 2024
2354e45
Add Urn for DataflowGBK
celeste-zeng Jun 4, 2024
72bb33c
Add DataflowGBK to runner proto
celeste-zeng Jun 4, 2024
d485a2c
Expand Redistribute into DataflowGBK
celeste-zeng Jun 4, 2024
7ec7c93
Add translator for DataflowGBK
celeste-zeng Jun 4, 2024
8fa5e10
Touch one test file
celeste-zeng Jun 4, 2024
8daf8a6
Eliminate nullability errors
celeste-zeng Jun 4, 2024
1461de5
Fix formatting
celeste-zeng Jun 4, 2024
748c345
Create RedistributeByKeyOverrideFactory.java
celeste-zeng Jun 7, 2024
0081190
Add overrideBuilder to DataflowRunner.java
celeste-zeng Jun 7, 2024
58aa3ca
Revert Redistribute.java
celeste-zeng Jun 7, 2024
73cc455
Merge branch 'apache:master' into dataflow-gbk
celeste-zeng Jun 7, 2024
d30c9b3
Add missing import.
celeste-zeng Jun 7, 2024
e6d9a27
Fix check failure
celeste-zeng Jun 7, 2024
bf8d056
mark create public.
celeste-zeng Jun 7, 2024
1c36214
fix commnent
celeste-zeng Jun 7, 2024
99bf842
make changes based on check failure
celeste-zeng Jun 7, 2024
16543e4
Remove surpress for DataflowGBK Translation
celeste-zeng Jun 7, 2024
e14a5be
Add suppress back due to lots of check faliure.
celeste-zeng Jun 7, 2024
35e12b6
Fix check failure
celeste-zeng Jun 7, 2024
ac8fc22
Remove unused methods, like fewKeys and the RedistributeByKey translator
celeste-zeng Jun 8, 2024
5ad5c28
fix check failure
celeste-zeng Jun 8, 2024
e1a37c1
Merge branch 'apache:master' into dataflow-gbk
celeste-zeng Jun 8, 2024
92041a2
remove unused validation
celeste-zeng Jun 8, 2024
7082ff2
Remove unused import
celeste-zeng Jun 8, 2024
6452750
Made changes based on comments
celeste-zeng Jun 10, 2024
b8b5459
supress rawtypes warnings
celeste-zeng Jun 10, 2024
0619f06
Surpress nullness
celeste-zeng Jun 10, 2024
1bf8b9c
Filter null values to pass null check failure
celeste-zeng Jun 10, 2024
e038bde
Fix check failure
celeste-zeng Jun 10, 2024
1da17e6
add suppress to specific place
celeste-zeng Jun 10, 2024
b42d0d1
Merge branch 'apache:master' into dataflow-gbk
celeste-zeng Jun 11, 2024
f39fcd8
Try if delay assignment can resolve type mismatch
celeste-zeng Jun 11, 2024
f804251
fix formatting
celeste-zeng Jun 11, 2024
7c4321b
Touch remaining test files.
celeste-zeng Jun 12, 2024
3b94fa4
touch all test files
celeste-zeng Jun 12, 2024
1a02260
Change location of DataflowGBK, delete DataflowGBKTranslation
celeste-zeng Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test"
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31450 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,21 @@
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DataflowGroupByKey;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute.RedistributeByKey;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHint;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CoderUtils;
Expand Down Expand Up @@ -920,37 +919,30 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
});

registerTransformTranslator(
RedistributeByKey.class,
new TransformTranslator<RedistributeByKey>() {
DataflowGroupByKey.class,
new TransformTranslator<DataflowGroupByKey>() {
@Override
public void translate(RedistributeByKey transform, TranslationContext context) {
redistributeByKeyHelper(transform, context);
public void translate(DataflowGroupByKey transform, TranslationContext context) {
dataflowGroupByKeyHelper(transform, context);
}

private <K, V> void redistributeByKeyHelper(
RedistributeByKey<K, V> transform, TranslationContext context) {
private <K, V> void dataflowGroupByKeyHelper(
DataflowGroupByKey<K, V> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");

PCollection<KV<K, V>> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));

// Dataflow worker implements reshuffle by reading GBK with ReshuffleTrigger; that is
// the only part of
// the windowing strategy that should be observed.
WindowingStrategy<?, ?> windowingStrategy =
input.getWindowingStrategy().withTrigger(new ReshuffleTrigger<>());
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(
serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));

// Many group by key options do not apply to redistribute but Dataflow doesn't
// understand
// that. We set them here to be sure to avoid any complex codepaths
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
stepContext.addInput(PropertyNames.IS_MERGING_WINDOW_FN, false);
stepContext.addInput(PropertyNames.ALLOW_DUPLICATES, transform.getAllowDuplicates());
stepContext.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
stepContext.addInput(PropertyNames.ALLOW_DUPLICATES, transform.allowDuplicates());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute.RedistributeByKey;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
Expand Down Expand Up @@ -698,6 +699,11 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
PTransformMatchers.classEqualTo(ParDo.SingleOutput.class),
new PrimitiveParDoSingleFactory()));

overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(RedistributeByKey.class),
new RedistributeByKeyOverrideFactory()));

if (streaming) {
// For update compatibility, always use a Read for Create in streaming mode.
overridesBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;

import java.util.Collections;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
import org.apache.beam.sdk.transforms.DataflowGroupByKey;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute.RedistributeByKey;
import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IdentityWindowFn;
import org.apache.beam.sdk.util.construction.PTransformReplacements;
import org.apache.beam.sdk.util.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;

class RedistributeByKeyOverrideFactory<K, V>
extends SingleInputOutputOverrideFactory<
PCollection<KV<K, V>>, PCollection<KV<K, V>>, RedistributeByKey<K, V>> {

@Override
public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, V>>>
getReplacementTransform(
AppliedPTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>, RedistributeByKey<K, V>>
transform) {
return PTransformOverrideFactory.PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
new DataflowRedistributeByKey<>(transform.getTransform()));
}

/** Specialized implementation of {@link RedistributeByKey} for Dataflow pipelines. */
private static class DataflowRedistributeByKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {

private final RedistributeByKey<K, V> originalTransform;

private DataflowRedistributeByKey(RedistributeByKey<K, V> originalTransform) {
this.originalTransform = originalTransform;
}

@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
Window<KV<K, V>> rewindow =
Window.<KV<K, V>>into(
new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
.triggering(new ReshuffleTrigger<>())
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));

PCollection<KV<K, ValueInSingleWindow<V>>> reified =
input
.apply("SetIdentityWindow", rewindow)
.apply("ReifyOriginalMetadata", Reify.windowsInValue());

PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped;
if (originalTransform.getAllowDuplicates()) {
grouped = reified.apply(DataflowGroupByKey.createWithAllowDuplicates());
} else {
grouped = reified.apply(DataflowGroupByKey.create());
}

return grouped
.apply(
"ExpandIterable",
ParDo.of(
new DoFn<
KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(
@Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
K key = element.getKey();
for (ValueInSingleWindow<V> value : element.getValue()) {
r.output(KV.of(key, value));
}
}
}))
.apply("RestoreMetadata", new RestoreMetadata<>())
.setWindowingStrategyInternal(originalStrategy);
}
}

private static class RestoreMetadata<K, V>
extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>, PCollection<KV<K, V>>> {
@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, ValueInSingleWindow<V>>> input) {
return input.apply(
ParDo.of(
new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
@Override
public Duration getAllowedTimestampSkew() {
return Duration.millis(Long.MAX_VALUE);
}

@ProcessElement
public void processElement(
@Element KV<K, ValueInSingleWindow<V>> kv, OutputReceiver<KV<K, V>> r) {
r.outputWindowedValue(
KV.of(kv.getKey(), kv.getValue().getValue()),
kv.getValue().getTimestamp(),
Collections.singleton(kv.getValue().getWindow()),
kv.getValue().getPane());
}
}));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
* Specialized implementation of {@code GroupByKey} for translating Redistribute transform into
* Dataflow service protos.
*/
public class DataflowGroupByKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {

// Plumbed from Redistribute transform.
private final boolean allowDuplicates;

private DataflowGroupByKey(boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}

/**
* Returns a {@code DataflowGroupByKey<K, V>} {@code PTransform}.
*
* @param <K> the type of the keys of the input and output {@code PCollection}s
* @param <V> the type of the values of the input {@code PCollection} and the elements of the
* {@code Iterable}s in the output {@code PCollection}
*/
public static <K, V> DataflowGroupByKey<K, V> create() {
return new DataflowGroupByKey<>(false);
}

/**
* Returns a {@code DataflowGroupByKey<K, V>} {@code PTransform} that its output can have
* duplicated elements.
*
* @param <K> the type of the keys of the input and output {@code PCollection}s
* @param <V> the type of the values of the input {@code PCollection} and the elements of the
* {@code Iterable}s in the output {@code PCollection}
*/
public static <K, V> DataflowGroupByKey<K, V> createWithAllowDuplicates() {
return new DataflowGroupByKey<>(true);
}

/** Returns whether it allows duplicated elements in the output. */
public boolean allowDuplicates() {
return allowDuplicates;
}

/////////////////////////////////////////////////////////////////////////////

public static void applicableTo(PCollection<?> input) {
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
// Verify that the input PCollection is bounded, or that there is windowing/triggering being
// used. Without this, the watermark (at end of global window) will never be reached.
if (windowingStrategy.getWindowFn() instanceof GlobalWindows
&& windowingStrategy.getTrigger() instanceof DefaultTrigger
&& input.isBounded() != IsBounded.BOUNDED) {
throw new IllegalStateException(
"DataflowGroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow"
+ " without a trigger. Use a Window.into or Window.triggering transform prior to"
+ " DataflowGroupByKey.");
}
}

public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
// If the WindowFn was merging, set the bit to indicate it is already merged.
// Switch to the continuation trigger associated with the current trigger.
return inputStrategy
.withAlreadyMerged(!inputStrategy.getWindowFn().isNonMerging())
.withTrigger(inputStrategy.getTrigger().getContinuationTrigger());
}

@Override
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
applicableTo(input);

// Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
// the key coder is deterministic.
Coder<K> keyCoder = getKeyCoder(input.getCoder());
try {
keyCoder.verifyDeterministic();
} catch (NonDeterministicException e) {
throw new IllegalStateException(
"the keyCoder of a DataflowGroupByKey must be deterministic", e);
}

// This primitive operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
updateWindowingStrategy(input.getWindowingStrategy()),
input.isBounded(),
getOutputKvCoder(input.getCoder()));
}

/**
* Returns the {@code Coder} of the input to this transform, which should be a {@code KvCoder}.
*/
@SuppressWarnings("unchecked")
static <K, V> KvCoder<K, V> getInputKvCoder(Coder<?> inputCoder) {
if (!(inputCoder instanceof KvCoder)) {
throw new IllegalStateException("DataflowGroupByKey requires its input to use KvCoder");
}
return (KvCoder<K, V>) inputCoder;
}

/////////////////////////////////////////////////////////////////////////////

/**
* Returns the {@code Coder} of the keys of the input to this transform, which is also used as the
* {@code Coder} of the keys of the output of this transform.
*/
static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
return DataflowGroupByKey.<K, V>getInputKvCoder(inputCoder).getKeyCoder();
}

/** Returns the {@code Coder} of the values of the input to this transform. */
public static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) {
return DataflowGroupByKey.<K, V>getInputKvCoder(inputCoder).getValueCoder();
}

/** Returns the {@code Coder} of the {@code Iterable} values of the output of this transform. */
static <K, V> Coder<Iterable<V>> getOutputValueCoder(Coder<KV<K, V>> inputCoder) {
return IterableCoder.of(getInputValueCoder(inputCoder));
}

/** Returns the {@code Coder} of the output of this transform. */
public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));
}
}
Loading
Loading