Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[runners-flink] Fix watermark emission for empty splits (#29816) #30969

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainerWithoutAccumulator;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
Expand All @@ -44,11 +43,13 @@
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.metrics.Counter;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -74,9 +75,6 @@ public abstract class FlinkSourceReaderBase<T, OutputT>
private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceReaderBase.class);
protected static final CompletableFuture<Void> AVAILABLE_NOW =
CompletableFuture.completedFuture(null);
// Some dummy instances to make the annotation checker happy with AtomicReference.
protected static final CompletableFuture<Void> DUMMY_FUTURE = new CompletableFuture<>();
protected static final Exception NO_EXCEPTION = new Exception();

protected final PipelineOptions pipelineOptions;
protected final @Nullable Function<OutputT, Long> timestampExtractor;
Expand All @@ -90,9 +88,10 @@ public abstract class FlinkSourceReaderBase<T, OutputT>
protected final Counter numRecordsInCounter;
protected final long idleTimeoutMs;
private final CompletableFuture<Void> idleTimeoutFuture;
private final AtomicReference<Throwable> exception;
private final AtomicReference<@Nullable Throwable> exception;
private boolean idleTimeoutCountingDown;
private CompletableFuture<Void> waitingForSplitChangeFuture;
private final AtomicReference<CompletableFuture<Void>> waitingForSplitChangeFuture =
new AtomicReference<>(new CompletableFuture<>());
private boolean noMoreSplits;

protected FlinkSourceReaderBase(
Expand All @@ -119,12 +118,11 @@ protected FlinkSourceReaderBase(
this.pipelineOptions = pipelineOptions;
this.timestampExtractor = timestampExtractor;
this.beamSourceReaders = new ConcurrentHashMap<>();
this.exception = new AtomicReference<>(NO_EXCEPTION);
this.exception = new AtomicReference<>();
this.executor = executor;
this.idleTimeoutMs =
pipelineOptions.as(FlinkPipelineOptions.class).getShutdownSourcesAfterIdleMs();
this.idleTimeoutFuture = new CompletableFuture<>();
this.waitingForSplitChangeFuture = new CompletableFuture<>();
this.idleTimeoutCountingDown = false;
// TODO: Remove the casting and use SourceReaderMetricGroup after minimum FLink version is
// upgraded to 1.14 and above.
Expand Down Expand Up @@ -166,23 +164,23 @@ public CompletableFuture<Void> isAvailable() {
// Regardless of whether there is data available from the alive readers, the
// main thread needs to be woken up if there is a split change. Hence, we
// need to combine the data available future with the split change future.
if (waitingForSplitChangeFuture.isDone()) {
waitingForSplitChangeFuture = new CompletableFuture<>();
if (waitingForSplitChangeFuture.get().isDone()) {
waitingForSplitChangeFuture.set(new CompletableFuture<>());
}
return CompletableFuture.anyOf(aliveReaderAvailableFuture, waitingForSplitChangeFuture)
return CompletableFuture.anyOf(aliveReaderAvailableFuture, waitingForSplitChangeFuture.get())
.thenAccept(ignored -> {});
} else if (noMoreSplits) {
// All the splits have been read, wait for idle timeout.
LOG.debug("All splits have been read, waiting for shutdown timeout {}", idleTimeoutMs);
LOG.info("All splits have been read, waiting for shutdown timeout {}", idleTimeoutMs);
checkIdleTimeoutAndMaybeStartCountdown();
return idleTimeoutFuture;
} else {
// There are no live readers, waiting for new split assignments or no more splits
// notification.
if (waitingForSplitChangeFuture.isDone()) {
waitingForSplitChangeFuture = new CompletableFuture<>();
if (waitingForSplitChangeFuture.get().isDone()) {
waitingForSplitChangeFuture.set(new CompletableFuture<>());
}
return waitingForSplitChangeFuture;
return waitingForSplitChangeFuture.get();
}
}

Expand All @@ -191,15 +189,15 @@ public void notifyNoMoreSplits() {
checkExceptionAndMaybeThrow();
LOG.info("Received NoMoreSplits signal from enumerator.");
noMoreSplits = true;
waitingForSplitChangeFuture.complete(null);
waitingForSplitChangeFuture.get().complete(null);
}

@Override
public void addSplits(List<FlinkSourceSplit<T>> splits) {
checkExceptionAndMaybeThrow();
LOG.info("Adding splits {}", splits);
sourceSplits.addAll(splits);
waitingForSplitChangeFuture.complete(null);
waitingForSplitChangeFuture.get().complete(null);
}

@Override
Expand Down Expand Up @@ -282,19 +280,19 @@ protected void execute(Runnable runnable) {
}

protected void recordException(Throwable e) {
if (!exception.compareAndSet(NO_EXCEPTION, e)) {
exception.get().addSuppressed(e);
if (!exception.compareAndSet(null, e)) {
Optional.ofNullable(exception.get()).ifPresent(exc -> exc.addSuppressed(e));
}
}

protected void checkExceptionAndMaybeThrow() {
if (exception.get() != NO_EXCEPTION) {
if (exception.get() != null) {
throw new RuntimeException("The source reader received exception.", exception.get());
}
}

protected boolean hasException() {
return exception.get() != NO_EXCEPTION;
return exception.get() != null;
}

protected Collection<FlinkSourceSplit<T>> sourceSplits() {
Expand Down Expand Up @@ -344,6 +342,15 @@ public boolean startOrAdvance() throws IOException {
public @Nullable SourceOutput<OutputT> sourceOutput() {
return outputForSplit;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("splitId", splitId)
.add("reader", reader)
.add("started", started)
.toString();
}
}

private final class ErrorRecordingRunnable implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source;

import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.translation.utils.SerdeUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A Flink {@link SourceSplit} implementation that encapsulates a Beam {@link Source}. This class
Expand All @@ -37,13 +37,13 @@ public class FlinkSourceSplit<T> implements SourceSplit, Serializable {
// The index of the split.
private final int splitIndex;
private final Source<T> beamSplitSource;
private final @Nullable byte[] splitState;
private final byte @Nullable [] splitState;

public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource) {
this(splitIndex, beamSplitSource, null);
}

public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource, @Nullable byte[] splitState) {
public FlinkSourceSplit(int splitIndex, Source<T> beamSplitSource, byte @Nullable [] splitState) {
this.splitIndex = splitIndex;
this.beamSplitSource = beamSplitSource;
this.splitState = splitState;
Expand All @@ -53,7 +53,7 @@ public int splitIndex() {
return splitIndex;
}

public @Nullable byte[] getSplitState() {
public byte @Nullable [] getSplitState() {
return splitState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ private List<? extends Source<T>> splitBeamSource() throws Exception {
long desiredSizeBytes = boundedSource.getEstimatedSizeBytes(pipelineOptions) / numSplits;
return boundedSource.split(desiredSizeBytes, pipelineOptions);
} else if (beamSource instanceof UnboundedSource) {
return ((UnboundedSource<T, ?>) beamSource).split(numSplits, pipelineOptions);
List<? extends UnboundedSource<T, ?>> splits =
((UnboundedSource<T, ?>) beamSource).split(numSplits, pipelineOptions);
LOG.info("Split source {} to {} splits", beamSource, splits);
return splits;
} else {
throw new IllegalStateException("Unknown source type " + beamSource.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
Expand All @@ -47,6 +46,7 @@
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -70,9 +70,10 @@ public class FlinkUnboundedSourceReader<T>
@VisibleForTesting protected static final String PENDING_BYTES_METRIC_NAME = "pendingBytes";
private static final long SLEEP_ON_IDLE_MS = 50L;
private static final long MIN_WATERMARK_EMIT_INTERVAL_MS = 10L;
private final AtomicReference<CompletableFuture<Void>> dataAvailableFutureRef;
private final List<ReaderAndOutput> readers;
private int currentReaderIndex;
private final AtomicReference<@Nullable CompletableFuture<Void>> dataAvailableFutureRef =
new AtomicReference<>();
private final List<ReaderAndOutput> readers = new ArrayList<>();
private int currentReaderIndex = 0;
private volatile boolean shouldEmitWatermark;

public FlinkUnboundedSourceReader(
Expand All @@ -81,9 +82,6 @@ public FlinkUnboundedSourceReader(
PipelineOptions pipelineOptions,
@Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> timestampExtractor) {
super(stepName, context, pipelineOptions, timestampExtractor);
this.readers = new ArrayList<>();
this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
this.currentReaderIndex = 0;
}

@VisibleForTesting
Expand All @@ -94,9 +92,6 @@ protected FlinkUnboundedSourceReader(
ScheduledExecutorService executor,
@Nullable Function<WindowedValue<ValueWithRecordId<T>>, Long> timestampExtractor) {
super(stepName, executor, context, pipelineOptions, timestampExtractor);
this.readers = new ArrayList<>();
this.dataAvailableFutureRef = new AtomicReference<>(DUMMY_FUTURE);
this.currentReaderIndex = 0;
}

@Override
Expand All @@ -121,7 +116,7 @@ public void start() {
shouldEmitWatermark = true;
// Wake up the main thread if necessary.
CompletableFuture<Void> f = dataAvailableFutureRef.get();
if (f != DUMMY_FUTURE) {
if (f != null) {
f.complete(null);
}
},
Expand Down Expand Up @@ -151,10 +146,10 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> ou

private boolean isEndOfAllReaders() {
return allReaders().values().stream()
.mapToLong(r -> asUnbounded(r.reader).getWatermark().getMillis())
.min()
.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())
>= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
.allMatch(
r ->
asUnbounded(r.reader).getWatermark().getMillis()
>= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the only relevant production code change. The rest is observability and some code cleanup.

}

/**
Expand All @@ -169,22 +164,22 @@ private boolean isEndOfAllReaders() {
@Override
protected CompletableFuture<Void> isAvailableForAliveReaders() {
CompletableFuture<Void> future = dataAvailableFutureRef.get();
if (future == DUMMY_FUTURE) {
if (future == null) {
CompletableFuture<Void> newFuture = new CompletableFuture<>();
// Need to set the future first to avoid the race condition of missing the watermark emission
// notification.
dataAvailableFutureRef.set(newFuture);
if (shouldEmitWatermark || hasException()) {
// There are exception after we set the new future,
// immediately complete the future and return.
dataAvailableFutureRef.set(DUMMY_FUTURE);
dataAvailableFutureRef.set(null);
newFuture.complete(null);
} else {
LOG.debug("There is no data available, scheduling the idle reader checker.");
scheduleTask(
() -> {
CompletableFuture<Void> f = dataAvailableFutureRef.get();
if (f != DUMMY_FUTURE) {
if (f != null) {
f.complete(null);
}
},
Expand All @@ -193,7 +188,7 @@ protected CompletableFuture<Void> isAvailableForAliveReaders() {
return newFuture;
} else if (future.isDone()) {
// The previous future is completed, just use it and reset the future ref.
dataAvailableFutureRef.getAndSet(DUMMY_FUTURE);
dataAvailableFutureRef.compareAndSet(future, null);
return future;
} else {
// The previous future has not been completed, just use it.
Expand Down Expand Up @@ -330,7 +325,7 @@ byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader<T> reader) {

private <CheckpointMarkT extends UnboundedSource.CheckpointMark>
Source.Reader<T> createUnboundedSourceReader(
Source<T> beamSource, @Nullable byte[] splitState) throws IOException {
Source<T> beamSource, byte @Nullable [] splitState) throws IOException {
UnboundedSource<T, CheckpointMarkT> unboundedSource =
(UnboundedSource<T, CheckpointMarkT>) beamSource;
Coder<CheckpointMarkT> coder = unboundedSource.getCheckpointMarkCoder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,18 @@ public void testWatermarkOnEmptySource() throws Exception {
}
}

@Test
public void testWatermarkOnNoSplits() throws Exception {
ManuallyTriggeredScheduledExecutorService executor =
new ManuallyTriggeredScheduledExecutorService();
try (FlinkUnboundedSourceReader<KV<Integer, Integer>> reader =
(FlinkUnboundedSourceReader<KV<Integer, Integer>>) createReader(executor, -1L)) {
reader.start();
reader.notifyNoMoreSplits();
assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(null));
}
}

@Test
public void testPendingBytesMetric() throws Exception {
ManuallyTriggeredScheduledExecutorService executor =
Expand Down
Loading