From 2f3893259fa89829aa43069be19914beb87cdf3d Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 15 Apr 2024 17:05:33 +0200 Subject: [PATCH] [runners-flink] Finalize checkpoint marks after successful checkpoint (#29902) --- .../io/source/FlinkSourceReaderBase.java | 11 ++++ .../streaming/io/source/FlinkSourceSplit.java | 21 ++++++- .../unbounded/FlinkUnboundedSourceReader.java | 56 +++++++++++++++---- .../FlinkUnboundedSourceReaderTest.java | 11 ++++ 4 files changed, 87 insertions(+), 12 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index a0c529c9b7c7..288f1f9511b1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -152,6 +152,7 @@ public List> snapshotState(long checkpointId) { String.format("Failed to get checkpoint for split %d", splitId), e); } }); + addSplitsToUnfinishedForCheckpoint(checkpointId, splitsState); return splitsState; } @@ -226,6 +227,16 @@ protected abstract FlinkSourceSplit getReaderCheckpoint( protected abstract Source.Reader createReader(@Nonnull FlinkSourceSplit sourceSplit) throws IOException; + /** + * To be overridden in unbounded reader. Notify the reader of created splits that will be part of + * checkpoint. Will be processed during notifyCheckpointComplete to finalize the associated + * CheckpointMarks. + */ + protected void addSplitsToUnfinishedForCheckpoint( + long checkpointId, List> splits) { + // nop + } + // ----------------- protected helper methods for subclasses -------------------- protected final Optional createAndTrackNextReader() throws IOException { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java index 6bc4fdcfc198..9831ff9ee19f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java @@ -20,7 +20,9 @@ import java.io.Serializable; import org.apache.beam.runners.flink.translation.utils.SerdeUtils; import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.checkerframework.checker.nullness.qual.Nullable; @@ -38,15 +40,25 @@ public class FlinkSourceSplit implements SourceSplit, Serializable { private final int splitIndex; private final Source beamSplitSource; private final byte @Nullable [] splitState; + private final transient UnboundedSource.@Nullable CheckpointMark checkpointMark; public FlinkSourceSplit(int splitIndex, Source beamSplitSource) { - this(splitIndex, beamSplitSource, null); + this(splitIndex, beamSplitSource, null, null); } - public FlinkSourceSplit(int splitIndex, Source beamSplitSource, byte @Nullable [] splitState) { + public FlinkSourceSplit( + int splitIndex, + Source beamSplitSource, + byte @Nullable [] splitState, + UnboundedSource.@Nullable CheckpointMark checkpointMark) { + this.splitIndex = splitIndex; this.beamSplitSource = beamSplitSource; this.splitState = splitState; + this.checkpointMark = checkpointMark; + + // if we have state, we need checkpoint mark that we will finalize + Preconditions.checkArgument(splitState == null || checkpointMark != null); } public int splitIndex() { @@ -66,12 +78,17 @@ public String splitId() { return Integer.toString(splitIndex); } + public UnboundedSource.@Nullable CheckpointMark getCheckpointMark() { + return checkpointMark; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) .add("splitIndex", splitIndex) .add("beamSource", beamSplitSource) .add("splitState.isNull", splitState == null) + .add("checkpointMark", checkpointMark) .toString(); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java index e603fd1eee19..2dcf1a3f594f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java @@ -22,7 +22,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.NavigableMap; import java.util.Optional; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -75,6 +78,8 @@ public class FlinkUnboundedSourceReader private final List readers = new ArrayList<>(); private int currentReaderIndex = 0; private volatile boolean shouldEmitWatermark; + private final NavigableMap>> unfinishedCheckpoints = + new TreeMap<>(); public FlinkUnboundedSourceReader( String stepName, @@ -94,6 +99,28 @@ protected FlinkUnboundedSourceReader( super(stepName, executor, context, pipelineOptions, timestampExtractor); } + @Override + protected void addSplitsToUnfinishedForCheckpoint( + long checkpointId, List> flinkSourceSplits) { + + unfinishedCheckpoints.put(checkpointId, flinkSourceSplits); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + SortedMap>> headMap = + unfinishedCheckpoints.headMap(checkpointId + 1); + for (List> splits : headMap.values()) { + for (FlinkSourceSplit s : splits) { + finalizeSourceSplit(s.getCheckpointMark()); + } + } + for (long checkpoint : new ArrayList<>(headMap.keySet())) { + unfinishedCheckpoints.remove(checkpoint); + } + } + @Override public void start() { createPendingBytesGauge(context); @@ -199,10 +226,16 @@ protected CompletableFuture isAvailableForAliveReaders() { @Override protected FlinkSourceSplit getReaderCheckpoint(int splitId, ReaderAndOutput readerAndOutput) { // The checkpoint for unbounded sources is fine granular. - byte[] checkpointState = - getAndEncodeCheckpointMark((UnboundedSource.UnboundedReader) readerAndOutput.reader); + UnboundedSource.UnboundedReader reader = + (UnboundedSource.UnboundedReader) readerAndOutput.reader; + UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark(); + @SuppressWarnings("unchecked") + Coder coder = + (Coder) reader.getCurrentSource().getCheckpointMarkCoder(); + byte[] checkpointState = encodeCheckpointMark(coder, checkpointMark); + return new FlinkSourceSplit<>( - splitId, readerAndOutput.reader.getCurrentSource(), checkpointState); + splitId, readerAndOutput.reader.getCurrentSource(), checkpointState, checkpointMark); } @Override @@ -308,13 +341,9 @@ private void createPendingBytesGauge(SourceReaderContext context) { }); } - @SuppressWarnings("unchecked") - private - byte[] getAndEncodeCheckpointMark(UnboundedSource.UnboundedReader reader) { - UnboundedSource source = - (UnboundedSource) reader.getCurrentSource(); - CheckpointMarkT checkpointMark = (CheckpointMarkT) reader.getCheckpointMark(); - Coder coder = source.getCheckpointMarkCoder(); + private byte[] encodeCheckpointMark( + Coder coder, CheckpointMarkT checkpointMark) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { coder.encode(checkpointMark, baos); return baos.toByteArray(); @@ -337,4 +366,11 @@ Source.Reader createUnboundedSourceReader( } } } + + private void finalizeSourceSplit(UnboundedSource.@Nullable CheckpointMark mark) + throws IOException { + if (mark != null) { + mark.finalizeCheckpoint(); + } + } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java index 1b2c70dc2cda..295f250df3d6 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReaderTest.java @@ -73,8 +73,13 @@ public void testSnapshotStateAndRestore() throws Exception { reader = createReader()) { pollAndValidate(reader, splits, validatingOutput, numSplits * numRecordsPerSplit / 2); snapshot = reader.snapshotState(0L); + // use higher checkpoint number to verify that we finalize everything that was created + // up to that checkpoint + reader.notifyCheckpointComplete(1L); } + assertEquals(numSplits, DummySource.numFinalizeCalled.size()); + // Create another reader, add the snapshot splits back. try (SourceReader< WindowedValue>>, @@ -298,6 +303,12 @@ public void testPendingBytesMetric() throws Exception { /** A source whose advance() method only returns true occasionally. */ private static class DummySource extends TestCountingSource { + static List numFinalizeCalled = new ArrayList<>(); + + static { + TestCountingSource.setFinalizeTracker(numFinalizeCalled); + } + public DummySource(int numMessagesPerShard) { super(numMessagesPerShard); }