diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index e46cab776670..a0c529c9b7c7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -35,7 +35,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; @@ -44,11 +43,13 @@ import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.metrics.Counter; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,9 +75,6 @@ public abstract class FlinkSourceReaderBase private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderBase.class); protected static final CompletableFuture AVAILABLE_NOW = CompletableFuture.completedFuture(null); - // Some dummy instances to make the annotation checker happy with AtomicReference. - protected static final CompletableFuture DUMMY_FUTURE = new CompletableFuture<>(); - protected static final Exception NO_EXCEPTION = new Exception(); protected final PipelineOptions pipelineOptions; protected final @Nullable Function timestampExtractor; @@ -90,9 +88,10 @@ public abstract class FlinkSourceReaderBase protected final Counter numRecordsInCounter; protected final long idleTimeoutMs; private final CompletableFuture idleTimeoutFuture; - private final AtomicReference exception; + private final AtomicReference<@Nullable Throwable> exception; private boolean idleTimeoutCountingDown; - private CompletableFuture waitingForSplitChangeFuture; + private final AtomicReference> waitingForSplitChangeFuture = + new AtomicReference<>(new CompletableFuture<>()); private boolean noMoreSplits; protected FlinkSourceReaderBase( @@ -119,12 +118,11 @@ protected FlinkSourceReaderBase( this.pipelineOptions = pipelineOptions; this.timestampExtractor = timestampExtractor; this.beamSourceReaders = new ConcurrentHashMap<>(); - this.exception = new AtomicReference<>(NO_EXCEPTION); + this.exception = new AtomicReference<>(); this.executor = executor; this.idleTimeoutMs = pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs(); this.idleTimeoutFuture = new CompletableFuture<>(); - this.waitingForSplitChangeFuture = new CompletableFuture<>(); this.idleTimeoutCountingDown = false; // TODO: Remove the casting and use SourceReaderMetricGroup after minimum FLink version is // upgraded to 1.14 and above. @@ -166,23 +164,23 @@ public CompletableFuture isAvailable() { // Regardless of whether there is data available from the alive readers, the // main thread needs to be woken up if there is a split change. Hence, we // need to combine the data available future with the split change future. - if (waitingForSplitChangeFuture.isDone()) { - waitingForSplitChangeFuture = new CompletableFuture<>(); + if (waitingForSplitChangeFuture.get().isDone()) { + waitingForSplitChangeFuture.set(new CompletableFuture<>()); } - return CompletableFuture.anyOf(aliveReaderAvailableFuture, waitingForSplitChangeFuture) + return CompletableFuture.anyOf(aliveReaderAvailableFuture, waitingForSplitChangeFuture.get()) .thenAccept(ignored -> {}); } else if (noMoreSplits) { // All the splits have been read, wait for idle timeout. - LOG.debug("All splits have been read, waiting for shutdown timeout {}", idleTimeoutMs); + LOG.info("All splits have been read, waiting for shutdown timeout {}", idleTimeoutMs); checkIdleTimeoutAndMaybeStartCountdown(); return idleTimeoutFuture; } else { // There are no live readers, waiting for new split assignments or no more splits // notification. - if (waitingForSplitChangeFuture.isDone()) { - waitingForSplitChangeFuture = new CompletableFuture<>(); + if (waitingForSplitChangeFuture.get().isDone()) { + waitingForSplitChangeFuture.set(new CompletableFuture<>()); } - return waitingForSplitChangeFuture; + return waitingForSplitChangeFuture.get(); } } @@ -191,7 +189,7 @@ public void notifyNoMoreSplits() { checkExceptionAndMaybeThrow(); LOG.info("Received NoMoreSplits signal from enumerator."); noMoreSplits = true; - waitingForSplitChangeFuture.complete(null); + waitingForSplitChangeFuture.get().complete(null); } @Override @@ -199,7 +197,7 @@ public void addSplits(List> splits) { checkExceptionAndMaybeThrow(); LOG.info("Adding splits {}", splits); sourceSplits.addAll(splits); - waitingForSplitChangeFuture.complete(null); + waitingForSplitChangeFuture.get().complete(null); } @Override @@ -282,19 +280,19 @@ protected void execute(Runnable runnable) { } protected void recordException(Throwable e) { - if (!exception.compareAndSet(NO_EXCEPTION, e)) { - exception.get().addSuppressed(e); + if (!exception.compareAndSet(null, e)) { + Optional.ofNullable(exception.get()).ifPresent(exc -> exc.addSuppressed(e)); } } protected void checkExceptionAndMaybeThrow() { - if (exception.get() != NO_EXCEPTION) { + if (exception.get() != null) { throw new RuntimeException("The source reader received exception.", exception.get()); } } protected boolean hasException() { - return exception.get() != NO_EXCEPTION; + return exception.get() != null; } protected Collection> sourceSplits() { @@ -344,6 +342,15 @@ public boolean startOrAdvance() throws IOException { public @Nullable SourceOutput sourceOutput() { return outputForSplit; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("splitId", splitId) + .add("reader", reader) + .add("started", started) + .toString(); + } } private final class ErrorRecordingRunnable implements Runnable { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java index 768a474f14f8..6bc4fdcfc198 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source; import java.io.Serializable; -import javax.annotation.Nullable; import org.apache.beam.runners.flink.translation.utils.SerdeUtils; import org.apache.beam.sdk.io.Source; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.checkerframework.checker.nullness.qual.Nullable; /** * A Flink {@link SourceSplit} implementation that encapsulates a Beam {@link Source}. This class @@ -37,13 +37,13 @@ public class FlinkSourceSplit implements SourceSplit, Serializable { // The index of the split. private final int splitIndex; private final Source beamSplitSource; - private final @Nullable byte[] splitState; + private final byte @Nullable [] splitState; public FlinkSourceSplit(int splitIndex, Source beamSplitSource) { this(splitIndex, beamSplitSource, null); } - public FlinkSourceSplit(int splitIndex, Source beamSplitSource, @Nullable byte[] splitState) { + public FlinkSourceSplit(int splitIndex, Source beamSplitSource, byte @Nullable [] splitState) { this.splitIndex = splitIndex; this.beamSplitSource = beamSplitSource; this.splitState = splitState; @@ -53,7 +53,7 @@ public int splitIndex() { return splitIndex; } - public @Nullable byte[] getSplitState() { + public byte @Nullable [] getSplitState() { return splitState; } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java index 66aae984253a..70afd76b611b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java @@ -153,7 +153,10 @@ private List> splitBeamSource() throws Exception { long desiredSizeBytes = boundedSource.getEstimatedSizeBytes(pipelineOptions) / numSplits; return boundedSource.split(desiredSizeBytes, pipelineOptions); } else if (beamSource instanceof UnboundedSource) { - return ((UnboundedSource) beamSource).split(numSplits, pipelineOptions); + List> splits = + ((UnboundedSource) beamSource).split(numSplits, pipelineOptions); + LOG.info("Split source {} to {} splits", beamSource, splits); + return splits; } else { throw new IllegalStateException("Unknown source type " + beamSource.getClass()); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java index 39ef63c8f7e9..e603fd1eee19 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; @@ -47,6 +46,7 @@ import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.core.io.InputStatus; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,9 +70,10 @@ public class FlinkUnboundedSourceReader @VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = "pendingBytes"; private static final long SLEEP_ON_IDLE_MS = 50L; private static final long MIN_WATERMARK_EMIT_INTERVAL_MS = 10L; - private final AtomicReference> dataAvailableFutureRef; - private final List readers; - private int currentReaderIndex; + private final AtomicReference<@Nullable CompletableFuture> dataAvailableFutureRef = + new AtomicReference<>(); + private final List readers = new ArrayList<>(); + private int currentReaderIndex = 0; private volatile boolean shouldEmitWatermark; public FlinkUnboundedSourceReader( @@ -81,9 +82,6 @@ public FlinkUnboundedSourceReader( PipelineOptions pipelineOptions, @Nullable Function>, Long> timestampExtractor) { super(stepName, context, pipelineOptions, timestampExtractor); - this.readers = new ArrayList<>(); - this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE); - this.currentReaderIndex = 0; } @VisibleForTesting @@ -94,9 +92,6 @@ protected FlinkUnboundedSourceReader( ScheduledExecutorService executor, @Nullable Function>, Long> timestampExtractor) { super(stepName, executor, context, pipelineOptions, timestampExtractor); - this.readers = new ArrayList<>(); - this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE); - this.currentReaderIndex = 0; } @Override @@ -121,7 +116,7 @@ public void start() { shouldEmitWatermark = true; // Wake up the main thread if necessary. CompletableFuture f = dataAvailableFutureRef.get(); - if (f != DUMMY_FUTURE) { + if (f != null) { f.complete(null); } }, @@ -151,10 +146,10 @@ public InputStatus pollNext(ReaderOutput>> ou private boolean isEndOfAllReaders() { return allReaders().values().stream() - .mapToLong(r -> asUnbounded(r.reader).getWatermark().getMillis()) - .min() - .orElse(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) - >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis(); + .allMatch( + r -> + asUnbounded(r.reader).getWatermark().getMillis() + >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); } /** @@ -169,7 +164,7 @@ private boolean isEndOfAllReaders() { @Override protected CompletableFuture isAvailableForAliveReaders() { CompletableFuture future = dataAvailableFutureRef.get(); - if (future == DUMMY_FUTURE) { + if (future == null) { CompletableFuture newFuture = new CompletableFuture<>(); // Need to set the future first to avoid the race condition of missing the watermark emission // notification. @@ -177,14 +172,14 @@ protected CompletableFuture isAvailableForAliveReaders() { if (shouldEmitWatermark || hasException()) { // There are exception after we set the new future, // immediately complete the future and return. - dataAvailableFutureRef.set(DUMMY_FUTURE); + dataAvailableFutureRef.set(null); newFuture.complete(null); } else { LOG.debug("There is no data available, scheduling the idle reader checker."); scheduleTask( () -> { CompletableFuture f = dataAvailableFutureRef.get(); - if (f != DUMMY_FUTURE) { + if (f != null) { f.complete(null); } }, @@ -193,7 +188,7 @@ protected CompletableFuture isAvailableForAliveReaders() { return newFuture; } else if (future.isDone()) { // The previous future is completed, just use it and reset the future ref. - dataAvailableFutureRef.getAndSet(DUMMY_FUTURE); + dataAvailableFutureRef.compareAndSet(future, null); return future; } else { // The previous future has not been completed, just use it. @@ -330,7 +325,7 @@ byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader reader) { private Source.Reader createUnboundedSourceReader( - Source beamSource, @Nullable byte[] splitState) throws IOException { + Source beamSource, byte @Nullable [] splitState) throws IOException { UnboundedSource unboundedSource = (UnboundedSource) beamSource; Coder coder = unboundedSource.getCheckpointMarkCoder(); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java index 0ae5b407a157..1b2c70dc2cda 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java @@ -255,6 +255,18 @@ public void testWatermarkOnEmptySource() throws Exception { } } + @Test + public void testWatermarkOnNoSplits() throws Exception { + ManuallyTriggeredScheduledExecutorService executor = + new ManuallyTriggeredScheduledExecutorService(); + try (FlinkUnboundedSourceReader> reader = + (FlinkUnboundedSourceReader>) createReader(executor, -1L)) { + reader.start(); + reader.notifyNoMoreSplits(); + assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(null)); + } + } + @Test public void testPendingBytesMetric() throws Exception { ManuallyTriggeredScheduledExecutorService executor =