Skip to content

Commit

Permalink
Reapply "Add Redistribute transform to Java SDK"
Browse files Browse the repository at this point in the history
This reverts commit f498cdf, adjusting sickbay lists
  • Loading branch information
kennknowles committed May 10, 2024
1 parent bf347fe commit ef431ec
Show file tree
Hide file tree
Showing 8 changed files with 739 additions and 0 deletions.
2 changes: 2 additions & 0 deletions runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi
testFilter: {
// Flink reshuffle override does not preserve all metadata
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31231)
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/20269)
excludeTestsMatching 'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2'
// TODO(https://github.com/apache/beam/issues/20843)
Expand Down
2 changes: 2 additions & 0 deletions runners/portability/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = ""

// TODO(https://github.com/apache/beam/issues/29973)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31231)
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'

for (String test : sickbayTests) {
excludeTestsMatching test
Expand Down
2 changes: 2 additions & 0 deletions runners/samza/job-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ def portableValidatesRunnerTask(String name, boolean docker) {
excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState'
// TODO(https://github.com/apache/beam/issues/29973)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31231)
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
}
)
}
Expand Down
8 changes: 8 additions & 0 deletions runners/spark/job-server/spark_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker,
excludeTestsMatching 'CombineTest$BasicTests.testHotKeyCombining'
// TODO(https://github.com/apache/beam/issues/29973)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31231)
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31234) same reason as GroupByKeyTest and ReshuffleTest above
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterFixedWindows'
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterSlidingWindows'
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterFixedWindowsAndGroupByKey'
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterSessionsAndGroupByKey'
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeAfterSlidingWindowsAndGroupByKey'
}
}
else {
Expand Down
7 changes: 7 additions & 0 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ def applyBatchValidatesRunnerSetup = { Test it ->
def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
applyBatchValidatesRunnerSetup(it)
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"])

// TODO(https://github.com/apache/beam/issues/31231
it.filter {
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
}
}

def validatesRunnerBatchWithBoundedSDFExperiment = tasks.register("validatesRunnerBatchWithBoundedSDFExperiment", Test) {
Expand Down Expand Up @@ -322,6 +327,8 @@ def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedSideInputNotPresent'
// TODO(https://github.com/apache/beam/issues/29973)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
// TODO(https://github.com/apache/beam/issues/31231
excludeTestsMatching 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata'
}

// TestStream using processing time is not supported in Spark
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;

import com.google.auto.service.AutoService;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IdentityWindowFn;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* A family of {@link PTransform PTransforms} that returns a {@link PCollection} equivalent to its
* input but functions as an operational hint to a runner that redistributing the data in some way
* is likely useful.
*/
public class Redistribute {
/** @return a {@link RedistributeArbitrarily} transform with default configuration. */
public static <T> RedistributeArbitrarily<T> arbitrarily() {
return new RedistributeArbitrarily<>(null, false);
}

/** @return a {@link RedistributeByKey} transform with default configuration. */
public static <K, V> RedistributeByKey<K, V> byKey() {
return new RedistributeByKey<>(false);
}

/**
* @param <K> The type of key being reshuffled on.
* @param <V> The type of value being reshuffled.
*/
public static class RedistributeByKey<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {

private final boolean allowDuplicates;

private RedistributeByKey(boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}

public RedistributeByKey<K, V> withAllowDuplicates(boolean newAllowDuplicates) {
return new RedistributeByKey<>(newAllowDuplicates);
}

public boolean getAllowDuplicates() {
return allowDuplicates;
}

@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
// If the input has already had its windows merged, then the GBK that performed the merge
// will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
// here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
// The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
// time.
// Because this outputs as fast as possible, this should not hold the watermark.
Window<KV<K, V>> rewindow =
Window.<KV<K, V>>into(
new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
.triggering(new ReshuffleTrigger<>())
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));

PCollection<KV<K, ValueInSingleWindow<V>>> reified =
input
.apply("SetIdentityWindow", rewindow)
.apply("ReifyOriginalMetadata", Reify.windowsInValue());

PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
reified.apply(GroupByKey.create());
return grouped
.apply(
"ExpandIterable",
ParDo.of(
new DoFn<
KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(
@Element KV<K, Iterable<ValueInSingleWindow<V>>> element,
OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
K key = element.getKey();
for (ValueInSingleWindow<V> value : element.getValue()) {
r.output(KV.of(key, value));
}
}
}))
.apply("RestoreMetadata", new RestoreMetadata<>())
// Set the windowing strategy directly, so that it doesn't get counted as the user having
// set allowed lateness.
.setWindowingStrategyInternal(originalStrategy);
}
}

/**
* @param <K> The type of key being reshuffled on.
* @param <V> The type of value being reshuffled.
*/
public static class RedistributeByKeyAllowingDuplicates<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {

@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
return input.apply(Redistribute.byKey());
}
}

/**
* Noop transform that hints to the runner to try to redistribute the work evenly, or via whatever
* clever strategy the runner comes up with.
*/
public static class RedistributeArbitrarily<T>
extends PTransform<PCollection<T>, PCollection<T>> {
// The number of buckets to shard into.
// A runner is free to ignore this (a runner may ignore the transorm
// entirely!) This is a performance optimization to prevent having
// unit sized bundles on the output. If unset, uses a random integer key.
private @Nullable Integer numBuckets = null;
private boolean allowDuplicates = false;

private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean allowDuplicates) {
this.numBuckets = numBuckets;
this.allowDuplicates = allowDuplicates;
}

public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer numBuckets) {
return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
}

public RedistributeArbitrarily<T> withAllowDuplicates(boolean allowDuplicates) {
return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
}

public boolean getAllowDuplicates() {
return allowDuplicates;
}

@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply("Pair with random key", ParDo.of(new AssignShardFn<>(numBuckets)))
.apply(Redistribute.<Integer, T>byKey().withAllowDuplicates(this.allowDuplicates))
.apply(Values.create());
}
}

private static class RestoreMetadata<K, V>
extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>, PCollection<KV<K, V>>> {
@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, ValueInSingleWindow<V>>> input) {
return input.apply(
ParDo.of(
new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
@Override
public Duration getAllowedTimestampSkew() {
return Duration.millis(Long.MAX_VALUE);
}

@ProcessElement
public void processElement(
@Element KV<K, ValueInSingleWindow<V>> kv, OutputReceiver<KV<K, V>> r) {
r.outputWindowedValue(
KV.of(kv.getKey(), kv.getValue().getValue()),
kv.getValue().getTimestamp(),
Collections.singleton(kv.getValue().getWindow()),
kv.getValue().getPane());
}
}));
}
}

static class AssignShardFn<T> extends DoFn<T, KV<Integer, T>> {
private int shard;
private @Nullable Integer numBuckets;

public AssignShardFn(@Nullable Integer numBuckets) {
this.numBuckets = numBuckets;
}

@Setup
public void setup() {
shard = ThreadLocalRandom.current().nextInt();
}

@ProcessElement
public void processElement(@Element T element, OutputReceiver<KV<Integer, T>> r) {
++shard;
// Smear the shard into something more random-looking, to avoid issues
// with runners that don't properly hash the key being shuffled, but rely
// on it being random-looking. E.g. Spark takes the Java hashCode() of keys,
// which for Integer is a no-op and it is an issue:
// http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
// spark.html
// This hashing strategy is copied from
// org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15);
if (numBuckets != null) {
UnsignedInteger unsignedNumBuckets = UnsignedInteger.fromIntBits(numBuckets);
hashOfShard = UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
}
r.output(KV.of(hashOfShard, element));
}
}

static class RedistributeByKeyTranslator
implements PTransformTranslation.TransformPayloadTranslator<RedistributeByKey<?, ?>> {
@Override
public String getUrn() {
return PTransformTranslation.REDISTRIBUTE_BY_KEY_URN;
}

@Override
@SuppressWarnings("nullness") // Cannot figure out how to make this typecheck
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, RedistributeByKey<?, ?>> transform, SdkComponents components) {
return RunnerApi.FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
.setPayload(
RunnerApi.RedistributePayload.newBuilder()
.setAllowDuplicates(transform.getTransform().getAllowDuplicates())
.build()
.toByteString())
.build();
}
}

static class RedistributeArbitrarilyTranslator
implements PTransformTranslation.TransformPayloadTranslator<RedistributeArbitrarily<?>> {
@Override
public String getUrn() {
return PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN;
}

@Override
@SuppressWarnings("nullness") // Cannot figure out how to make this typecheck
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, RedistributeArbitrarily<?>> transform, SdkComponents components) {
return RunnerApi.FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
.setPayload(
RunnerApi.RedistributePayload.newBuilder()
.setAllowDuplicates(transform.getTransform().getAllowDuplicates())
.build()
.toByteString())
.build();
}
}

/** Registers translators for the Redistribute family of transforms. */
@Internal
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements TransformPayloadTranslatorRegistrar {

@Override
@SuppressWarnings("rawtypes")
public Map<
? extends Class<? extends PTransform>,
? extends PTransformTranslation.TransformPayloadTranslator>
getTransformPayloadTranslators() {
return ImmutableMap
.<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
.put(RedistributeByKey.class, new RedistributeByKeyTranslator())
.put(RedistributeArbitrarily.class, new RedistributeArbitrarilyTranslator())
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public class PTransformTranslation {
public static final String COMBINE_PER_KEY_TRANSFORM_URN = "beam:transform:combine_per_key:v1";
public static final String COMBINE_GLOBALLY_TRANSFORM_URN = "beam:transform:combine_globally:v1";
public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
public static final String REDISTRIBUTE_BY_KEY_URN = "beam:transform:redistribute_by_key:v1";
public static final String REDISTRIBUTE_ARBITRARILY_URN =
"beam:transform:redistribute_arbitrarily:v1";
public static final String WRITE_FILES_TRANSFORM_URN = "beam:transform:write_files:v1";
public static final String GROUP_INTO_BATCHES_WITH_SHARDED_KEY_URN =
"beam:transform:group_into_batches_with_sharded_key:v1";
Expand Down Expand Up @@ -200,6 +203,11 @@ public class PTransformTranslation {
COMBINE_GLOBALLY_TRANSFORM_URN.equals(
getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY)));
checkState(RESHUFFLE_URN.equals(getUrn(StandardPTransforms.Composites.RESHUFFLE)));
checkState(
REDISTRIBUTE_BY_KEY_URN.equals(getUrn(StandardPTransforms.Composites.REDISTRIBUTE_BY_KEY)));
checkState(
REDISTRIBUTE_ARBITRARILY_URN.equals(
getUrn(StandardPTransforms.Composites.REDISTRIBUTE_ARBITRARILY)));
checkState(
WRITE_FILES_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Composites.WRITE_FILES)));
checkState(PUBSUB_READ.equals(getUrn(StandardPTransforms.Composites.PUBSUB_READ)));
Expand Down
Loading

0 comments on commit ef431ec

Please sign in to comment.