diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index e8417c1ad08e..d2171d27a142 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; @@ -308,27 +309,27 @@ void translateNode(Impulse transform, FlinkStreamingTranslationContext context) WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE), context.getPipelineOptions()); - FlinkBoundedSource impulseSource; - WatermarkStrategy> watermarkStrategy; + final SingleOutputStreamOperator> impulseOperator; if (context.isStreaming()) { long shutdownAfterIdleSourcesMs = context .getPipelineOptions() .as(FlinkPipelineOptions.class) .getShutdownSourcesAfterIdleMs(); - impulseSource = FlinkSource.unboundedImpulse(shutdownAfterIdleSourcesMs); - watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps(); + impulseOperator = + context + .getExecutionEnvironment() + .addSource(new ImpulseSourceFunction(shutdownAfterIdleSourcesMs), "Impulse") + .returns(typeInfo); } else { - impulseSource = FlinkSource.boundedImpulse(); - watermarkStrategy = WatermarkStrategy.noWatermarks(); + FlinkBoundedSource impulseSource = FlinkSource.boundedImpulse(); + impulseOperator = + context + .getExecutionEnvironment() + .fromSource(impulseSource, WatermarkStrategy.noWatermarks(), "Impulse") + .returns(typeInfo); } - SingleOutputStreamOperator> source = - context - .getExecutionEnvironment() - .fromSource(impulseSource, watermarkStrategy, "Impulse") - .returns(typeInfo); - - context.setOutputDataStream(context.getOutput(transform), source); + context.setOutputDataStream(context.getOutput(transform), impulseOperator); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java index 0b9fdd9dcd7c..506b651da68f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java @@ -29,6 +29,8 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -71,22 +73,16 @@ public static FlinkUnboundedSource unbounded( return new FlinkUnboundedSource<>(stepName, source, serializablePipelineOptions, numSplits); } - public static FlinkBoundedSource unboundedImpulse(long shutdownSourceAfterIdleMs) { + public static FlinkUnboundedSource unboundedImpulse(long shutdownSourceAfterIdleMs) { FlinkPipelineOptions flinkPipelineOptions = FlinkPipelineOptions.defaults(); flinkPipelineOptions.setShutdownSourcesAfterIdleMs(shutdownSourceAfterIdleMs); - // Here we wrap the BeamImpulseSource with a FlinkBoundedSource, but overriding its - // boundedness to CONTINUOUS_UNBOUNDED. By doing so, the Flink engine will treat this - // source as an unbounded source and execute the job in streaming mode. This also - // works well with checkpoint, because the FlinkSourceSplit containing the - // BeamImpulseSource will be discarded after the impulse emission. So the streaming - // job won't see another impulse after failover. - return new FlinkBoundedSource<>( + return new FlinkUnboundedSource<>( "Impulse", - new BeamImpulseSource(), + new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>( + new BeamImpulseSource()), new SerializablePipelineOptions(flinkPipelineOptions), - Boundedness.CONTINUOUS_UNBOUNDED, 1, - record -> Watermark.MAX_WATERMARK.getTimestamp()); + record -> BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); } public static FlinkBoundedSource boundedImpulse() { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java index 6d4fda74b095..e46cab776670 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceReaderBase.java @@ -147,7 +147,8 @@ public List> snapshotState(long checkpointId) { beamSourceReaders.forEach( (splitId, readerAndOutput) -> { try { - splitsState.add(getReaderCheckpoint(splitId, readerAndOutput)); + FlinkSourceSplit checkpoint = getReaderCheckpoint(splitId, readerAndOutput); + splitsState.add(checkpoint); } catch (IOException e) { throw new IllegalStateException( String.format("Failed to get checkpoint for split %d", splitId), e); @@ -176,7 +177,8 @@ public CompletableFuture isAvailable() { checkIdleTimeoutAndMaybeStartCountdown(); return idleTimeoutFuture; } else { - // There is no live readers, waiting for new split assignments or no more splits notification. + // There are no live readers, waiting for new split assignments or no more splits + // notification. if (waitingForSplitChangeFuture.isDone()) { waitingForSplitChangeFuture = new CompletableFuture<>(); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java index 32fcd23344d9..768a474f14f8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplit.java @@ -21,6 +21,7 @@ import javax.annotation.Nullable; import org.apache.beam.runners.flink.translation.utils.SerdeUtils; import org.apache.beam.sdk.io.Source; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -67,7 +68,11 @@ public String splitId() { @Override public String toString() { - return String.format("[SplitIndex: %d, BeamSource: %s]", splitIndex, beamSplitSource); + return MoreObjects.toStringHelper(this) + .add("splitIndex", splitIndex) + .add("beamSource", beamSplitSource) + .add("splitState.isNull", splitState == null) + .toString(); } public static SimpleVersionedSerializer> serializer() { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java index 8ceab393533d..66aae984253a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java @@ -76,6 +76,7 @@ public void start() { context.callAsync( () -> { try { + LOG.info("Starting source {}", beamSource); List> beamSplitSourceList = splitBeamSource(); Map>> flinkSourceSplitsList = new HashMap<>(); int i = 0; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java index ab9a6cc03cd5..685bb870c5fd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSource.java @@ -56,6 +56,7 @@ public FlinkBoundedSource( Boundedness boundedness, int numSplits, @Nullable TimestampExtractor> timestampExtractor) { + super(stepName, beamSource, serializablePipelineOptions, boundedness, numSplits); this.timestampExtractor = timestampExtractor; } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java index a25964af809d..e4bd4496ae90 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReader.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -29,17 +27,13 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceReaderBase; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReaderContext; @@ -61,8 +55,6 @@ */ public class FlinkBoundedSourceReader extends FlinkSourceReaderBase> { private static final Logger LOG = LoggerFactory.getLogger(FlinkBoundedSourceReader.class); - private static final VarLongCoder LONG_CODER = VarLongCoder.of(); - private final Map consumedFromSplit = new HashMap<>(); private @Nullable Source.Reader currentReader; private int currentSplitId; @@ -71,6 +63,7 @@ public FlinkBoundedSourceReader( SourceReaderContext context, PipelineOptions pipelineOptions, @Nullable Function, Long> timestampExtractor) { + super(stepName, context, pipelineOptions, timestampExtractor); currentSplitId = -1; } @@ -82,33 +75,16 @@ protected FlinkSourceSplit getReaderCheckpoint(int splitId, ReaderAndOutput r // stream." // For bounded source, the checkpoint granularity is the entire source split. // So, in case of failure, all the data from this split will be consumed again. - return new FlinkSourceSplit<>( - splitId, readerAndOutput.reader.getCurrentSource(), asBytes(consumedFromSplit(splitId))); + return new FlinkSourceSplit<>(splitId, readerAndOutput.reader.getCurrentSource()); } @Override protected Source.Reader createReader(@Nonnull FlinkSourceSplit sourceSplit) throws IOException { Source beamSource = sourceSplit.getBeamSplitSource(); - byte[] state = sourceSplit.getSplitState(); - if (state != null) { - consumedFromSplit.put(Integer.parseInt(sourceSplit.splitId()), fromBytes(state)); - } return ((BoundedSource) beamSource).createReader(pipelineOptions); } - private byte[] asBytes(long l) throws CoderException { - return CoderUtils.encodeToByteArray(LONG_CODER, l); - } - - private long fromBytes(byte[] b) throws CoderException { - return CoderUtils.decodeFromByteArray(LONG_CODER, b); - } - - private long consumedFromSplit(int splitId) { - return consumedFromSplit.getOrDefault(splitId, 0L); - } - @VisibleForTesting protected FlinkBoundedSourceReader( String stepName, @@ -116,6 +92,7 @@ protected FlinkBoundedSourceReader( PipelineOptions pipelineOptions, ScheduledExecutorService executor, @Nullable Function, Long> timestampExtractor) { + super(stepName, executor, context, pipelineOptions, timestampExtractor); currentSplitId = -1; } @@ -141,12 +118,11 @@ public InputStatus pollNext(ReaderOutput> output) throws Except if (currentReader != null) { // make null checks happy final @Nonnull Source.Reader splitReader = currentReader; - // store number of processed elements from this split - consumedFromSplit.compute(currentSplitId, (k, v) -> v == null ? 1 : v + 1); T record = splitReader.getCurrent(); WindowedValue windowedValue = WindowedValue.of( record, splitReader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + if (timestampExtractor == null) { output.collect(windowedValue); } else { @@ -158,7 +134,6 @@ public InputStatus pollNext(ReaderOutput> output) throws Except // the beginning. So the failover granularity is the entire Flink job. if (!invocationUtil.invokeAdvance(splitReader)) { finishSplit(currentSplitId); - consumedFromSplit.remove(currentSplitId); LOG.debug("Finished reading from {}", currentSplitId); currentReader = null; currentSplitId = -1; @@ -188,12 +163,6 @@ private boolean moveToNextNonEmptyReader() throws IOException { if (invocationUtil.invokeStart(rao.reader)) { currentSplitId = Integer.parseInt(rao.splitId); currentReader = rao.reader; - long toSkipAfterStart = - MoreObjects.firstNonNull(consumedFromSplit.remove(currentSplitId), 0L); - @Nonnull Source.Reader reader = Preconditions.checkArgumentNotNull(currentReader); - while (toSkipAfterStart > 0 && reader.advance()) { - toSkipAfterStart--; - } return true; } else { finishSplit(Integer.parseInt(rao.splitId)); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java index cbf1871dfbaf..20dc9ed6c995 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/impulse/BeamImpulseSource.java @@ -21,6 +21,8 @@ import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -46,38 +48,33 @@ public BoundedReader createReader(PipelineOptions options) throws IOExce return new ImpulseReader(this); } + @Override + public Coder getOutputCoder() { + return ByteArrayCoder.of(); + } + private static class ImpulseReader extends BoundedSource.BoundedReader { private final BeamImpulseSource source; - private boolean started; private int index; private ImpulseReader(BeamImpulseSource source) { this.source = source; - this.started = false; this.index = 0; } @Override - public boolean start() throws IOException { - started = true; - return true; + public boolean start() { + return advance(); } @Override - public boolean advance() throws IOException { - if (!started) { - throw new IllegalStateException("start() should be called before calling advance()"); - } - index++; - return false; + public boolean advance() { + return index++ == 0; } @Override public byte[] getCurrent() throws NoSuchElementException { - if (!started) { - throw new IllegalStateException("The reader hasn't started."); - } - if (index == 0) { + if (index == 1) { return new byte[0]; } else { throw new NoSuchElementException("No element is available."); @@ -91,10 +88,7 @@ public BoundedSource getCurrentSource() { @Override public Instant getCurrentTimestamp() throws NoSuchElementException { - if (!started) { - throw new IllegalStateException("The reader hasn't started."); - } - if (index == 0) { + if (index == 1) { return BoundedWindow.TIMESTAMP_MIN_VALUE; } else { throw new NoSuchElementException("No element is available."); @@ -102,6 +96,6 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { } @Override - public void close() throws IOException {} + public void close() {} } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java index 022f1abde826..1cd83663b1d4 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/bounded/FlinkBoundedSourceReaderTest.java @@ -147,6 +147,7 @@ protected FlinkBoundedSourceReader> createReader( long idleTimeoutMs, @Nullable Function>, Long> timestampExtractor, TestMetricGroup testMetricGroup) { + FlinkPipelineOptions pipelineOptions = FlinkPipelineOptions.defaults(); pipelineOptions.setShutdownSourcesAfterIdleMs(idleTimeoutMs); SourceReaderContext mockContext = createSourceReaderContext(testMetricGroup);