Skip to content

Commit

Permalink
[Flink] finalize checkpoint marks in the new Flink source implementat…
Browse files Browse the repository at this point in the history
…ion (#30849) (#30987)

Co-authored-by: jto <[email protected]>
  • Loading branch information
je-ik and jto authored Apr 16, 2024
1 parent ec31847 commit a62dfa7
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
*/
public class FlinkSourceSplit<T> implements SourceSplit, Serializable {
// The index of the split.

private static final long serialVersionUID = 7458114818012108972L;

private final int splitIndex;
private final Source<T> beamSplitSource;
private final byte @Nullable [] splitState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.NavigableMap;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -78,8 +77,8 @@ public class FlinkUnboundedSourceReader<T>
private final List<ReaderAndOutput> readers = new ArrayList<>();
private int currentReaderIndex = 0;
private volatile boolean shouldEmitWatermark;
private final NavigableMap<Long, List<FlinkSourceSplit<T>>> unfinishedCheckpoints =
new TreeMap<>();
private final LinkedHashMap<Long, List<FlinkSourceSplit<T>>> pendingCheckpoints =
new LinkedHashMap<>();

public FlinkUnboundedSourceReader(
String stepName,
Expand All @@ -103,22 +102,22 @@ protected FlinkUnboundedSourceReader(
protected void addSplitsToUnfinishedForCheckpoint(
long checkpointId, List<FlinkSourceSplit<T>> flinkSourceSplits) {

unfinishedCheckpoints.put(checkpointId, flinkSourceSplits);
pendingCheckpoints.put(checkpointId, flinkSourceSplits);
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
SortedMap<Long, List<FlinkSourceSplit<T>>> headMap =
unfinishedCheckpoints.headMap(checkpointId + 1);
for (List<FlinkSourceSplit<T>> splits : headMap.values()) {
for (FlinkSourceSplit<T> s : splits) {
finalizeSourceSplit(s.getCheckpointMark());
List<Long> finalized = new ArrayList<>();
for (Map.Entry<Long, List<FlinkSourceSplit<T>>> e : pendingCheckpoints.entrySet()) {
if (e.getKey() <= checkpointId) {
for (FlinkSourceSplit<T> s : e.getValue()) {
finalizeSourceSplit(s.getCheckpointMark());
}
finalized.add(e.getKey());
}
}
for (long checkpoint : new ArrayList<>(headMap.keySet())) {
unfinishedCheckpoints.remove(checkpoint);
}
finalized.forEach(pendingCheckpoints::remove);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,8 @@ public void testSnapshotStateAndRestore() throws Exception {
reader = createReader()) {
pollAndValidate(reader, splits, validatingOutput, numSplits * numRecordsPerSplit / 2);
snapshot = reader.snapshotState(0L);
// use higher checkpoint number to verify that we finalize everything that was created
// up to that checkpoint
reader.notifyCheckpointComplete(1L);
}

assertEquals(numSplits, DummySource.numFinalizeCalled.size());

// Create another reader, add the snapshot splits back.
try (SourceReader<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
Expand Down Expand Up @@ -299,15 +294,43 @@ public void testPendingBytesMetric() throws Exception {
}
}

// --------------- private helper classes -----------------
/** A source whose advance() method only returns true occasionally. */
private static class DummySource extends TestCountingSource {
@Test
public void testCheckMarksFinalized() throws Exception {

static List<Integer> numFinalizeCalled = new ArrayList<>();
final int numSplits = 2;
final int numRecordsPerSplit = 10;

static {
TestCountingSource.setFinalizeTracker(numFinalizeCalled);
List<FlinkSourceSplit<KV<Integer, Integer>>> splits =
createSplits(numSplits, numRecordsPerSplit, 0);
RecordsValidatingOutput validatingOutput = new RecordsValidatingOutput(splits);
// Create a reader, take a snapshot.
try (SourceReader<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
FlinkSourceSplit<KV<Integer, Integer>>>
reader = createReader()) {
List<Integer> finalizeTracker = new ArrayList<>();
TestCountingSource.setFinalizeTracker(finalizeTracker);
pollAndValidate(reader, splits, validatingOutput, numSplits * numRecordsPerSplit / 2);
assertTrue(finalizeTracker.isEmpty());
reader.snapshotState(0L);
// notifyCheckpointComplete is normally called by the SourceOperator
reader.notifyCheckpointComplete(0L);
// every split should be finalized
assertEquals(numSplits, finalizeTracker.size());
pollAndValidate(reader, splits, validatingOutput, numSplits);
// no notifyCheckpointComplete here, assume the checkpoint failed
reader.snapshotState(1L);
pollAndValidate(reader, splits, validatingOutput, numSplits);
reader.snapshotState(2L);
reader.notifyCheckpointComplete(2L);
// 2 * numSplits more should be finalized
assertEquals(3 * numSplits, finalizeTracker.size());
}
}

// --------------- private helper classes -----------------
/** A source whose advance() method only returns true occasionally. */
private static class DummySource extends TestCountingSource {

public DummySource(int numMessagesPerShard) {
super(numMessagesPerShard);
Expand Down

0 comments on commit a62dfa7

Please sign in to comment.