Skip to content

Commit

Permalink
current work
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Jan 4, 2024
1 parent bfe902f commit ffd157c
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.transforms.AutoshardedKeyReshuffle;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
Expand Down Expand Up @@ -525,6 +526,25 @@ public String toString() {
};
}

public static PTransformMatcher autoshardableKeys() {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
return application.getTransform().getClass().equals(AutoshardedKeyReshuffle.ViaRandomKey.class);
}

@Override
public boolean matchesDuringValidation(AppliedPTransform<?, ?, ?> application) {
return false;
}

@Override
public String toString() {
return MoreObjects.toStringHelper("autoshardableKeysMatcher").toString();
}
};
}

public static PTransformMatcher writeWithRunnerDeterminedSharding() {
return application -> {
if (WRITE_FILES_TRANSFORM_URN.equals(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;

import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.AutoshardedKeyReshuffle;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IdentityWindowFn;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import java.util.Map;
import org.apache.beam.sdk.values.TupleTag;

/**
* A {@link PTransformOverrideFactory} that overrides {@link Reshuffle} with a version that does not
* reify timestamps. Dataflow has special handling of the {@link ReshuffleTrigger} which never
* buffers elements and outputs elements with their original timestamp.
*/
public class AutoshardedKeyReshuffleOverride{


static class StreamingAutoshardedKeyReshuffleOverrideFactory<K, V>
implements PTransformOverrideFactory<
PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, V>>, AutoshardedKeyReshuffle<K, V>.ViaRandomKey<K, V>> {
private final DataflowRunner runner;

StreamingAutoshardedKeyReshuffleOverrideFactory(DataflowRunner runner) {
this.runner = runner;
}

@Override
public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, V>>>
getReplacementTransform(
AppliedPTransform<
PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, V>>, AutoshardedKeyReshuffle<K, V>.ViaRandomKey<K, V>>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
new StreamingAutoshardedKeyReshuffleOverride<>(
runner,
transform.getTransform(),
PTransformReplacements.getSingletonMainOutput(transform)));
}

@Override
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<ShardedKey<K>, V>> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}

/**
* Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for unbounded Dataflow
* pipelines. The override does the same thing as the original transform but additionally records
* the output in order to append required step properties during the graph translation.
*/
static class StreamingAutoshardedKeyReshuffleOverride<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, V>>> {

private final transient DataflowRunner runner;
private final AutoshardedKeyReshuffle<K, V>.ViaRandomKey originalTransform;
private final transient PCollection<KV<ShardedKey<K>, V>> originalOutput;

public StreamingAutoshardedKeyReshuffleOverride(
DataflowRunner runner,
AutoshardedKeyReshuffle<K, V>.ViaRandomKey original,
PCollection<KV<ShardedKey<K>, V>> output) {
this.runner = runner;
this.originalTransform = original;
this.originalOutput = output;
}

@Override
public PCollection<KV<ShardedKey<K>, V>> expand(PCollection<KV<K, V>> input) {
// Record the output PCollection of the original transform since the new output will be
// replaced by the original one when the replacement transform is wired to other nodes in the
// graph, although the old and the new outputs are effectively the same.
runner.maybeRecordPCollectionWithAutoSharding(originalOutput);
return input.apply(originalTransform);
}
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,18 @@ private void addOutput(String name, PValue value, Coder<?> valueCoder) {

// If the output requires runner determined sharding, also append necessary input properties.
if (value instanceof PCollection) {
System.out.println(value.getClass().toString());
if (translator.runner.doesPCollectionRequireAutoSharding((PCollection<?>) value)) {
System.out.println("xxx add autosharding: ");
addInput(PropertyNames.ALLOWS_SHARDABLE_STATE, "true");
} else {
System.out.println( "xxx no sharding state" + value.getClass().toString());
}
if (translator.runner.doesPCollectionPreserveKeys((PCollection<?>) value)) {
addInput(PropertyNames.PRESERVES_KEYS, "true");
}
} else {
System.out.println("xxx not p collection" + value.getClass().toString());
}

Map<String, Object> properties = getProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.MultimapState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.transforms.AutoshardedKeyReshuffle;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
Expand Down Expand Up @@ -691,8 +692,17 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
new PrimitiveParDoSingleFactory()));

if (streaming) {
// overridesBuilder.add(
// PTransformOverride.of(
// PTransformMatchers.groupWithShardableStates(),
// new GroupIntoBatchesOverride.StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(
// this)));
// For update compatibility, always use a Read for Create in streaming mode.
overridesBuilder
.add(
PTransformOverride.of(
PTransformMatchers.autoshardableKeys(),
new AutoshardedKeyReshuffleOverride.StreamingAutoshardedKeyReshuffleOverrideFactory(this)))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(Create.Values.class), new AlwaysCreateViaRead()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.UUID;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
/**
* <b>For internal use only; no backwards compatibility guarantees.</b>
*
Expand Down Expand Up @@ -64,8 +65,8 @@ public static <K, V> AutoshardedKeyReshuffle<K, V> of() {
* Encapsulates the sequence "take keyed value, return the same as autosharded key with single autoshardable key, apply {@link Reshuffle#of}, DONT drop the
* key" commonly used to break fusion.
*/
public static <K, V> ViaRandomKey <K, V> viaRandomKey() {
return new ViaRandomKey<>();
public static <K, V> ViaRandomKey <K, V> viaRandomKey(Coder<K> keyCoder, Coder<V> valueCoder) {
return new ViaRandomKey<>(keyCoder, valueCoder);
}

@Override
Expand Down Expand Up @@ -110,12 +111,20 @@ public void processElement(

/** Implementation of {@link #viaRandomKey()}. */
public static class ViaRandomKey<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, V>>> {
public ViaRandomKey() {}

private final Coder<K> keyCoder;
private final Coder<V> valueCoder;

public ViaRandomKey(Coder<K> keyCoder, Coder<V> valueCoder) {
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
}

@Override
public PCollection<KV<ShardedKey<K>, V>> expand(PCollection<KV<K, V>> input) {
return input
.apply("Pair element with a single autoshardable key", ParDo.of(new AssignShardFn<>())) // assign a single autoshardable key
.apply("Pair element with a single autoshardable key", ParDo.of(new AssignShardFn<>()))
.setCoder(KvCoder.of(ShardedKey.Coder.of(keyCoder), valueCoder)) // assign a single autoshardable key
.apply(AutoshardedKeyReshuffle.of()); // takes autosharded keyed input, ensures windows are correctly handled
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ public WriteResult expandInconsistent(
kmsKey,
usesCdc,
defaultMissingValueInterpretation,
allowAutosharding));
allowAutosharding,
destinationCoder));

PCollection<BigQueryStorageApiInsertError> insertErrors =
PCollectionList.of(convertMessagesResult.get(failedRowsTag))
Expand Down
Loading

0 comments on commit ffd157c

Please sign in to comment.