Skip to content

Commit

Permalink
cleanup + plumb fewKeys + make keys configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Dec 15, 2024
1 parent ede9dea commit 1e28c57
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test"
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/33318": "noting that PR #33318 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test"
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/33318": "noting that PR #33318 should run this test"
}
3 changes: 2 additions & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,8 @@ task validatesRunnerStreaming {
description "Validates Dataflow runner forcing streaming mode"
dependsOn(createLegacyWorkerValidatesRunnerTest(
name: 'validatesRunnerLegacyWorkerTestStreaming',
pipelineOptions: legacyPipelineOptions + ['--streaming'],
pipelineOptions: legacyPipelineOptions + ['--streaming']
+ ['--experiments=enable_gbk_state_multiplexing'],
excludedCategories: [
'org.apache.beam.sdk.testing.UsesCommittedMetrics',
'org.apache.beam.sdk.testing.UsesMapState',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,20 @@ private <K, V> void dataflowGroupByKeyHelper(
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));

WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
boolean isStreaming =
context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
// :TODO do we set this for batch?
boolean allowCombinerLifting = false;
if (isStreaming) {
allowCombinerLifting =
!windowingStrategy.needsMerge()
&& windowingStrategy.getWindowFn().assignsToOneWindow();
allowCombinerLifting &= transform.fewKeys();
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
allowCombinerLifting &= (windowingStrategy.getTrigger() instanceof DefaultTrigger);
}
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, !allowCombinerLifting);

stepContext.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
Expand All @@ -64,6 +65,7 @@
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.internal.StateMultiplexingGroupByKey;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
Expand Down Expand Up @@ -215,8 +217,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
"unsafely_attempt_to_process_unbounded_data_in_batch_mode";

private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class);
private static final String EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING =
"enable_gbk_state_multiplexing";
/** Provided configuration options. */
private final DataflowPipelineOptions options;

Expand Down Expand Up @@ -801,11 +801,12 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
new RedistributeByKeyOverrideFactory()));

if (streaming) {
if (DataflowRunner.hasExperiment(options, EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING)) {
if (DataflowRunner.hasExperiment(
options, StateMultiplexingGroupByKey.EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING)) {
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(GroupByKey.class),
new StateMultiplexingGroupByKeyOverrideFactory<>()));
new StateMultiplexingGroupByKeyOverrideFactory<>(options)));
}
// For update compatibility, always use a Read for Create in streaming mode.
overridesBuilder
Expand Down Expand Up @@ -1714,6 +1715,22 @@ public static boolean hasExperiment(DataflowPipelineDebugOptions options, String
return experiments.contains(experiment);
}

/** Return the value for the specified experiment or null if not present. */
public static Optional<String> getExperimentValue(
DataflowPipelineDebugOptions options, String experiment) {
List<String> experiments = options.getExperiments();
if (experiments == null) {
return Optional.empty();
}
String prefix = experiment + "=";
for (String experimentEntry : experiments) {
if (experimentEntry.startsWith(prefix)) {
return Optional.of(experimentEntry.substring(prefix.length()));
}
}
return Optional.empty();
}

/** Helper to configure the Dataflow Job Environment based on the user's job options. */
private static Map<String, Object> getEnvironmentVersion(DataflowPipelineOptions options) {
DataflowRunnerInfo runnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow;

import org.apache.beam.runners.dataflow.internal.StateMultiplexingGroupByKey;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.util.construction.PTransformReplacements;
Expand All @@ -28,6 +29,11 @@
class StateMultiplexingGroupByKeyOverrideFactory<K, V>
extends SingleInputOutputOverrideFactory<
PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> {
private final DataflowPipelineOptions options;

StateMultiplexingGroupByKeyOverrideFactory(DataflowPipelineOptions options) {
this.options = options;
}

@Override
public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
Expand All @@ -37,6 +43,6 @@ class StateMultiplexingGroupByKeyOverrideFactory<K, V>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
StateMultiplexingGroupByKey.create(transform.getTransform().fewKeys()));
StateMultiplexingGroupByKey.create(options, transform.getTransform().fewKeys()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ public class DataflowGroupByKey<K, V>
// Plumbed from Redistribute transform.
private final boolean allowDuplicates;

private DataflowGroupByKey(boolean allowDuplicates) {
private final boolean fewKeys;

private DataflowGroupByKey(boolean allowDuplicates, boolean fewKeys) {
this.allowDuplicates = allowDuplicates;
this.fewKeys = fewKeys;
}

/**
Expand All @@ -59,7 +62,11 @@ private DataflowGroupByKey(boolean allowDuplicates) {
* {@code Iterable}s in the output {@code PCollection}
*/
public static <K, V> DataflowGroupByKey<K, V> create() {
return new DataflowGroupByKey<>(false);
return new DataflowGroupByKey<>(/*allowDuplicates=*/ false, /*fewKeys=*/ false);
}

static <K, V> DataflowGroupByKey<K, V> createWithFewKeys() {
return new DataflowGroupByKey<>(/*allowDuplicates=*/ false, /*fewKeys=*/ true);
}

/**
Expand All @@ -71,14 +78,18 @@ public static <K, V> DataflowGroupByKey<K, V> create() {
* {@code Iterable}s in the output {@code PCollection}
*/
public static <K, V> DataflowGroupByKey<K, V> createWithAllowDuplicates() {
return new DataflowGroupByKey<>(true);
return new DataflowGroupByKey<>(/*allowDuplicates=*/ true, /*fewKeys=*/ false);
}

/** Returns whether it allows duplicated elements in the output. */
public boolean allowDuplicates() {
return allowDuplicates;
}

/** Returns whether it groups just few keys. */
public boolean fewKeys() {
return fewKeys;
}
/////////////////////////////////////////////////////////////////////////////

public static void applicableTo(PCollection<?> input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.windowing;
package org.apache.beam.runners.dataflow.internal;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -27,15 +27,18 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.runners.dataflow.util.ByteStringCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
Expand Down Expand Up @@ -214,7 +217,7 @@ public static class KeyedWindowCoder<W extends BoundedWindow> extends Coder<Keye
private final KvCoder<ByteString, W> coder;

public KeyedWindowCoder(Coder<W> windowCoder) {
//:TODO consider swapping the order for improved state locality
// :TODO consider swapping the order for improved state locality
this.coder = KvCoder.of(ByteStringCoder.of(), windowCoder);
}

Expand Down Expand Up @@ -244,26 +247,4 @@ public boolean consistentWithEquals() {
return coder.getValueCoder().consistentWithEquals();
}
}

public static class ByteStringCoder extends AtomicCoder<ByteString> {
public static ByteStringCoder of() {
return INSTANCE;
}

private static final ByteStringCoder INSTANCE = new ByteStringCoder();

private ByteStringCoder() {}

@Override
public void encode(ByteString value, OutputStream os) throws IOException {
VarInt.encode(value.size(), os);
value.writeTo(os);
}

@Override
public ByteString decode(InputStream is) throws IOException {
int size = VarInt.decodeInt(is);
return ByteString.readFrom(ByteStreams.limit(is, size), size);
}
}
}
Loading

0 comments on commit 1e28c57

Please sign in to comment.