diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index cb8e226fc1dd..13315b054c21 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -26,6 +26,8 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.StateSpec; @@ -110,6 +112,10 @@ public class GroupIntoBatches extends PTransform>, PCollection>>> { + /** Experiment to "avoid possibly unnecessary prefetching". */ + public static final String AVOID_POSSIBLY_UNNECESSARY_PREFETCHING = + "unsafely_attempt_to_process_unbounded_data_in_batch_mode"; + /** * Wrapper class for batching parameters supplied by users. Shared by both {@link * GroupIntoBatches} and {@link GroupIntoBatches.WithShardedKey}. @@ -343,7 +349,8 @@ public PCollection>> expand(PCollection> in weigher, params.getMaxBufferingDuration(), allowedLateness, - valueCoder))); + valueCoder, + input.getPipeline().getOptions()))); } @VisibleForTesting @@ -418,7 +425,8 @@ private static class GroupIntoBatchesDoFn @Nullable SerializableFunction weigher, Duration maxBufferingDuration, Duration allowedLateness, - Coder inputValueCoder) { + Coder inputValueCoder, + PipelineOptions options) { this.batchSize = batchSize; this.batchSizeBytes = batchSizeBytes; this.weigher = weigher; @@ -457,8 +465,14 @@ public long apply(long left, long right) { this.timerTsSpec = StateSpecs.value(); this.minBufferedTsSpec = StateSpecs.combining(minCombineFn); - // Prefetch every 20% of batchSize elements. Do not prefetch if batchSize is too little - this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5); + // Prefetch every 20% of batchSize elements. Do not prefetch ... + if (ExperimentalOptions.hasExperiment(options, AVOID_POSSIBLY_UNNECESSARY_PREFETCHING)) { + // ... if we have disabled prefetching + this.prefetchFrequency = Long.MAX_VALUE; + } else { + // ... or if the batchSize is too little + this.prefetchFrequency = ((batchSize / 5) <= 1) ? Long.MAX_VALUE : (batchSize / 5); + } } @Override