Skip to content

Commit

Permalink
Merge pull request #30141: Adding a default watermark emit interval f…
Browse files Browse the repository at this point in the history
…or FlinkUnboundedSourceReader
  • Loading branch information
je-ik authored Feb 6, 2024
2 parents d5aa44c + a341eb6 commit cd5f271
Showing 1 changed file with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class FlinkUnboundedSourceReader<T>
// This name is defined in FLIP-33.
@VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = "pendingBytes";
private static final long SLEEP_ON_IDLE_MS = 50L;
private static final long MIN_WATERMARK_EMIT_INTERVAL_MS = 10L;
private final AtomicReference<CompletableFuture<Void>> dataAvailableFutureRef;
private final List<ReaderAndOutput> readers;
private int currentReaderIndex;
Expand Down Expand Up @@ -103,22 +104,29 @@ public void start() {
createPendingBytesGauge(context);
Long watermarkInterval =
pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval();
if (watermarkInterval != null) {
scheduleTaskAtFixedRate(
() -> {
// Set the watermark emission flag first.
shouldEmitWatermark = true;
// Wake up the main thread if necessary.
CompletableFuture<Void> f = dataAvailableFutureRef.get();
if (f != DUMMY_FUTURE) {
f.complete(null);
}
},
watermarkInterval,
if (watermarkInterval == null) {
watermarkInterval =
(pipelineOptions.as(FlinkPipelineOptions.class).getMaxBundleTimeMills()) / 5L;
watermarkInterval =
(watermarkInterval > MIN_WATERMARK_EMIT_INTERVAL_MS)
? watermarkInterval
: MIN_WATERMARK_EMIT_INTERVAL_MS;
LOG.warn(
"AutoWatermarkInterval is not set, watermarks will be emitted at a default interval of {} ms",
watermarkInterval);
} else {
LOG.warn("AutoWatermarkInterval is not set, watermarks won't be emitted.");
}
scheduleTaskAtFixedRate(
() -> {
// Set the watermark emission flag first.
shouldEmitWatermark = true;
// Wake up the main thread if necessary.
CompletableFuture<Void> f = dataAvailableFutureRef.get();
if (f != DUMMY_FUTURE) {
f.complete(null);
}
},
watermarkInterval,
watermarkInterval);
}

@Override
Expand Down

0 comments on commit cd5f271

Please sign in to comment.