Skip to content

Commit

Permalink
Add an experiment enable_gbk_state_multiplexing for multiplexing Gr…
Browse files Browse the repository at this point in the history
…oupByKey state over a fixed number of sharding keys.

The number of sharding keys is fixed at 32k.
  • Loading branch information
arunpandianp committed Dec 6, 2024
1 parent 1712964 commit 5af575c
Show file tree
Hide file tree
Showing 5 changed files with 651 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -214,6 +215,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
"unsafely_attempt_to_process_unbounded_data_in_batch_mode";

private static final Logger LOG = LoggerFactory.getLogger(DataflowRunner.class);
private static final String EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING =
"enable_gbk_state_multiplexing";
/** Provided configuration options. */
private final DataflowPipelineOptions options;

Expand Down Expand Up @@ -596,6 +599,7 @@ protected DataflowRunner(DataflowPipelineOptions options) {

private static class AlwaysCreateViaRead<T>
implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> {

@Override
public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<T>>
getReplacementTransform(
Expand Down Expand Up @@ -797,6 +801,12 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
new RedistributeByKeyOverrideFactory()));

if (streaming) {
if (DataflowRunner.hasExperiment(options, EXPERIMENT_ENABLE_GBK_STATE_MULTIPLEXING)) {
overridesBuilder.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(GroupByKey.class),
new StateMultiplexingGroupByKeyOverrideFactory<>()));
}
// For update compatibility, always use a Read for Create in streaming mode.
overridesBuilder
.add(
Expand Down Expand Up @@ -1180,6 +1190,7 @@ private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() {
@VisibleForTesting
static boolean isMultiLanguagePipeline(Pipeline pipeline) {
class IsMultiLanguageVisitor extends PipelineVisitor.Defaults {

private boolean isMultiLanguage = false;

private void performMultiLanguageTest(Node node) {
Expand Down Expand Up @@ -1648,6 +1659,7 @@ private static EnvironmentInfo getEnvironmentInfoFromEnvironmentId(

@AutoValue
abstract static class EnvironmentInfo {

static EnvironmentInfo create(
String environmentId, String containerUrl, List<String> capabilities) {
return new AutoValue_DataflowRunner_EnvironmentInfo(
Expand Down Expand Up @@ -2105,6 +2117,7 @@ protected String getKindString() {
}

private static class StreamingPubsubSinkTranslators {

/** Rewrite {@link StreamingPubsubIOWrite} to the appropriate internal node. */
static class StreamingPubsubIOWriteTranslator
implements TransformTranslator<StreamingPubsubIOWrite> {
Expand Down Expand Up @@ -2161,6 +2174,7 @@ private static void translate(

private static class SingleOutputExpandableTransformTranslator
implements TransformTranslator<External.SingleOutputExpandableTransform> {

@Override
public void translate(
External.SingleOutputExpandableTransform transform, TranslationContext context) {
Expand All @@ -2178,6 +2192,7 @@ public void translate(

private static class MultiOutputExpandableTransformTranslator
implements TransformTranslator<External.MultiOutputExpandableTransform> {

@Override
public void translate(
External.MultiOutputExpandableTransform transform, TranslationContext context) {
Expand Down Expand Up @@ -2726,6 +2741,7 @@ static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) {
*/
private static class DataflowPayloadTranslator
implements TransformPayloadTranslator<PTransform<?, ?>> {

@Override
public String getUrn(PTransform transform) {
return "dataflow_stub:" + transform.getClass().getName();
Expand All @@ -2750,6 +2766,7 @@ public RunnerApi.FunctionSpec translate(
})
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class DataflowTransformTranslator implements TransformPayloadTranslatorRegistrar {

@Override
public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
getTransformPayloadTranslators() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;

import org.apache.beam.runners.dataflow.internal.StateMultiplexingGroupByKey;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.util.construction.PTransformReplacements;
import org.apache.beam.sdk.util.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

class StateMultiplexingGroupByKeyOverrideFactory<K, V>
extends SingleInputOutputOverrideFactory<
PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>> {

@Override
public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
getReplacementTransform(
AppliedPTransform<
PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKey<K, V>>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
StateMultiplexingGroupByKey.create(transform.getTransform().fewKeys()));
}
}
Loading

0 comments on commit 5af575c

Please sign in to comment.