Skip to content

Commit

Permalink
#26395 Disabling possibly unnecessary prefetching during GroupIntoBat…
Browse files Browse the repository at this point in the history
…ches by using an experimental flag
  • Loading branch information
nbali committed May 10, 2023
1 parent 86f56e9 commit 1d62a65
Showing 1 changed file with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
Expand Down Expand Up @@ -110,6 +112,10 @@
public class GroupIntoBatches<K, InputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> {

/** Experiment to "avoid possibly unnecessary prefetching". */
public static final String AVOID_POSSIBLY_UNNECESSARY_PREFETCHING =
"unsafely_attempt_to_process_unbounded_data_in_batch_mode";

/**
* Wrapper class for batching parameters supplied by users. Shared by both {@link
* GroupIntoBatches} and {@link GroupIntoBatches.WithShardedKey}.
Expand Down Expand Up @@ -343,7 +349,8 @@ public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, InputT>> in
weigher,
params.getMaxBufferingDuration(),
allowedLateness,
valueCoder)));
valueCoder,
input.getPipeline().getOptions())));
}

@VisibleForTesting
Expand Down Expand Up @@ -418,7 +425,8 @@ private static class GroupIntoBatchesDoFn<K, InputT>
@Nullable SerializableFunction<InputT, Long> weigher,
Duration maxBufferingDuration,
Duration allowedLateness,
Coder<InputT> inputValueCoder) {
Coder<InputT> inputValueCoder,
PipelineOptions options) {
this.batchSize = batchSize;
this.batchSizeBytes = batchSizeBytes;
this.weigher = weigher;
Expand Down Expand Up @@ -457,8 +465,14 @@ public long apply(long left, long right) {
this.timerTsSpec = StateSpecs.value();
this.minBufferedTsSpec = StateSpecs.combining(minCombineFn);

// Prefetch every 20% of batchSize elements. Do not prefetch if batchSize is too little
this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5);
// Prefetch every 20% of batchSize elements. Do not prefetch ...
if (ExperimentalOptions.hasExperiment(options, AVOID_POSSIBLY_UNNECESSARY_PREFETCHING)) {
// ... if we have disabled prefetching
this.prefetchFrequency = Long.MAX_VALUE;
} else {
// ... or if the batchSize is too little
this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5);
}
}

@Override
Expand Down

0 comments on commit 1d62a65

Please sign in to comment.