From a341eb69abe38f9bef66cf9aaa6b52bc6de2e331 Mon Sep 17 00:00:00 2001 From: hardshah <72236623+hardshah@users.noreply.github.com> Date: Sun, 4 Feb 2024 17:59:03 -0700 Subject: [PATCH] Added default watermark generation interval (apache#30141) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Suggestion from PR Co-authored-by: Jan Lukavský styling --- .../unbounded/FlinkUnboundedSourceReader.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java index 99160a9689eb..39ef63c8f7e9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java @@ -69,6 +69,7 @@ public class FlinkUnboundedSourceReader // This name is defined in FLIP-33. @VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = "pendingBytes"; private static final long SLEEP_ON_IDLE_MS = 50L; + private static final long MIN_WATERMARK_EMIT_INTERVAL_MS = 10L; private final AtomicReference> dataAvailableFutureRef; private final List readers; private int currentReaderIndex; @@ -103,22 +104,29 @@ public void start() { createPendingBytesGauge(context); Long watermarkInterval = pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval(); - if (watermarkInterval != null) { - scheduleTaskAtFixedRate( - () -> { - // Set the watermark emission flag first. - shouldEmitWatermark = true; - // Wake up the main thread if necessary. - CompletableFuture f = dataAvailableFutureRef.get(); - if (f != DUMMY_FUTURE) { - f.complete(null); - } - }, - watermarkInterval, + if (watermarkInterval == null) { + watermarkInterval = + (pipelineOptions.as(FlinkPipelineOptions.class).getMaxBundleTimeMills()) / 5L; + watermarkInterval = + (watermarkInterval > MIN_WATERMARK_EMIT_INTERVAL_MS) + ? watermarkInterval + : MIN_WATERMARK_EMIT_INTERVAL_MS; + LOG.warn( + "AutoWatermarkInterval is not set, watermarks will be emitted at a default interval of {} ms", watermarkInterval); - } else { - LOG.warn("AutoWatermarkInterval is not set, watermarks won't be emitted."); } + scheduleTaskAtFixedRate( + () -> { + // Set the watermark emission flag first. + shouldEmitWatermark = true; + // Wake up the main thread if necessary. + CompletableFuture f = dataAvailableFutureRef.get(); + if (f != DUMMY_FUTURE) { + f.complete(null); + } + }, + watermarkInterval, + watermarkInterval); } @Override