Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] #31390 emit watermark with empty source #31391

Merged
merged 1 commit into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,14 @@ public SourceOutput<OutputT> getAndMaybeCreateSplitOutput(ReaderOutput<OutputT>
return outputForSplit;
}

public boolean startOrAdvance() throws IOException {
public boolean startOrAdvance(ReaderOutput<OutputT> output) throws IOException {
if (started) {
// associate output with the split
getAndMaybeCreateSplitOutput(output);
return invocationUtil.invokeAdvance(reader);
} else {
started = true;
return invocationUtil.invokeStart(reader);
}
started = true;
return invocationUtil.invokeStart(reader);
}

public @Nullable SourceOutput<OutputT> sourceOutput() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public InputStatus pollNext(ReaderOutput<WindowedValue<ValueWithRecordId<T>>> ou
maybeEmitWatermark();
maybeCreateReaderForNewSplits();

ReaderAndOutput reader = nextReaderWithData();
ReaderAndOutput reader = nextReaderWithData(output);
if (reader != null) {
emitRecord(reader, output);
return InputStatus.MORE_AVAILABLE;
Expand Down Expand Up @@ -300,12 +300,14 @@ private void maybeCreateReaderForNewSplits() throws Exception {
}
}

private @Nullable ReaderAndOutput nextReaderWithData() throws IOException {
private @Nullable ReaderAndOutput nextReaderWithData(
ReaderOutput<WindowedValue<ValueWithRecordId<T>>> output) throws IOException {

int numReaders = readers.size();
for (int i = 0; i < numReaders; i++) {
ReaderAndOutput readerAndOutput = readers.get(currentReaderIndex);
currentReaderIndex = (currentReaderIndex + 1) % numReaders;
if (readerAndOutput.startOrAdvance()) {
if (readerAndOutput.startOrAdvance(output)) {
return readerAndOutput;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public List<? extends EmptyUnboundedSource<T>> split(

@Override
public UnboundedReader<T> createReader(
PipelineOptions options, @Nullable DummyCheckpointMark checkpointMark) throws IOException {
PipelineOptions options, @Nullable DummyCheckpointMark checkpointMark) {
return new UnboundedReader<T>() {
@Override
public boolean start() throws IOException {
return advance();
}

@Override
public boolean advance() throws IOException {
public boolean advance() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.metrics.Gauge;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.junit.Test;

/** Unite tests for {@link FlinkUnboundedSourceReader}. */
Expand Down Expand Up @@ -228,6 +232,38 @@ public void testWatermark() throws Exception {
public void testWatermarkOnEmptySource() throws Exception {
ManuallyTriggeredScheduledExecutorService executor =
new ManuallyTriggeredScheduledExecutorService();
AtomicReference<Instant> watermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
ReaderOutput<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> output =
new ReaderOutput<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>() {
@Override
public void collect(WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> unused) {}

@Override
public void collect(
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> unused, long l) {}

@Override
public void emitWatermark(Watermark w) {
watermark.compareAndSet(
BoundedWindow.TIMESTAMP_MIN_VALUE, Instant.ofEpochMilli(w.getTimestamp()));
}

@Override
public void markIdle() {}

@Override
public SourceOutput<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
createOutputForSplit(String s) {
return this;
}

@Override
public void releaseOutputForSplit(String s) {}

@Override
public void markActive() {}
};
Instant now = Instant.now();
try (FlinkUnboundedSourceReader<KV<Integer, Integer>> reader =
(FlinkUnboundedSourceReader<KV<Integer, Integer>>) createReader(executor, -1L)) {
List<FlinkSourceSplit<KV<Integer, Integer>>> splits = createEmptySplits(2);
Expand All @@ -236,22 +272,34 @@ public void testWatermarkOnEmptySource() throws Exception {
reader.notifyNoMoreSplits();

for (int i = 0; i < 4; i++) {
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(null));
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(output));
}

// move first reader to 'now'
((EmptyUnboundedSource<KV<Integer, Integer>>) splits.get(0).getBeamSplitSource())
.setWatermark(now);
// force trigger timeout
executor.triggerScheduledTasks();
for (int i = 0; i < 4; i++) {
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(output));
}

// check we have emitted watermark
assertEquals(now, watermark.get());

// move first reader to end of time
((EmptyUnboundedSource<KV<Integer, Integer>>) splits.get(0).getBeamSplitSource())
.setWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

for (int i = 0; i < 4; i++) {
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(null));
assertEquals(InputStatus.NOTHING_AVAILABLE, reader.pollNext(output));
}

// move the second reader to end of time
((EmptyUnboundedSource<KV<Integer, Integer>>) splits.get(1).getBeamSplitSource())
.setWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(null));
assertEquals(InputStatus.END_OF_INPUT, reader.pollNext(output));
}
}

Expand Down
Loading