diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java index 5072e6b2459f..bb794e04398d 100644 --- a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java @@ -20,7 +20,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; -import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; /** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ public abstract class AbstractStreamOperatorCompat @@ -45,18 +44,9 @@ protected int numProcessingTimeTimers() { return getTimeServiceManager() .map( manager -> { - InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); - if (tsm instanceof InternalTimeServiceManagerImpl) { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); - } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { - return 0; - } else { - throw new IllegalStateException( - String.format( - "Unknown implementation of InternalTimerServiceManager. %s", tsm)); - } + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); + return cast.numProcessingTimeTimers(); }) .orElse(0); } diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java index d8740964fda9..3b64612d6d19 100644 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java @@ -20,7 +20,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; -import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; /** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ public abstract class AbstractStreamOperatorCompat @@ -45,18 +44,9 @@ protected int numProcessingTimeTimers() { return getTimeServiceManager() .map( manager -> { - InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); - if (tsm instanceof InternalTimeServiceManagerImpl) { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); - } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { - return 0; - } else { - throw new IllegalStateException( - String.format( - "Unknown implementation of InternalTimerServiceManager. %s", tsm)); - } + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); + return cast.numProcessingTimeTimers(); }) .orElse(0); } diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 3bcbfdca0290..75f1034039e8 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -233,7 +233,6 @@ class ValidatesRunnerConfig { String name boolean streaming boolean checkpointing - boolean useDataStreamForBatch ArrayList sickbayTests } @@ -252,7 +251,6 @@ def createValidatesRunnerTask(Map m) { description = "Validates the ${runnerType} runner" def pipelineOptionsArray = ["--runner=TestFlinkRunner", "--streaming=${config.streaming}", - "--useDataStreamForBatch=${config.useDataStreamForBatch}", "--parallelism=2", ] if (config.checkpointing) { @@ -315,17 +313,12 @@ def createValidatesRunnerTask(Map m) { excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate' // https://github.com/apache/beam/issues/20844 excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating' - if (!config.streaming) { - // FlinkBatchExecutionInternalTimeService does not support timer registration on timer firing. - excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnTimerTimestampSkew' - } } } } } createValidatesRunnerTask(name: "validatesRunnerBatch", streaming: false, sickbayTests: sickbayTests) -createValidatesRunnerTask(name: "validatesRunnerBatchWithDataStream", streaming: false, useDataStreamForBatch: true, sickbayTests: sickbayTests) createValidatesRunnerTask(name: "validatesRunnerStreaming", streaming: true, sickbayTests: sickbayTests) // We specifically have a variant which runs with checkpointing enabled for the // tests that require it since running a checkpoint variant is significantly @@ -338,7 +331,6 @@ tasks.register('validatesRunner') { group = 'Verification' description "Validates Flink runner" dependsOn validatesRunnerBatch - dependsOn validatesRunnerBatchWithDataStream dependsOn validatesRunnerStreaming dependsOn validatesRunnerStreamingCheckpointing } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 029eff25a825..4250df2b6210 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.construction.resources.PipelineResources; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -102,17 +101,13 @@ public void translate(Pipeline pipeline) { prepareFilesToStageForRemoteClusterExecution(options); FlinkPipelineTranslator translator; - if (options.isStreaming() || options.getUseDataStreamForBatch()) { + if (options.isStreaming()) { this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) { LOG.warn( "UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); } - translator = - new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming()); - if (!options.isStreaming()) { - flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); - } + translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options); } else { this.flinkBatchEnv = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index f0514c69891b..650768c7b44b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -32,11 +32,7 @@ * requiring flink on the classpath (e.g. to use with the direct runner). */ public interface FlinkPipelineOptions - extends PipelineOptions, - ApplicationNameOptions, - StreamingOptions, - FileStagingOptions, - VersionDependentFlinkPipelineOptions { + extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions { String AUTO = "[auto]"; String PIPELINED = "PIPELINED"; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 3ed00a3c5ef2..a705d89894df 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -81,9 +81,8 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { private int depth = 0; - public FlinkStreamingPipelineTranslator( - StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) { - this.streamingContext = new FlinkStreamingTranslationContext(env, options, isStreaming); + public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) { + this.streamingContext = new FlinkStreamingTranslationContext(env, options); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index e8417c1ad08e..2df2fc74424f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; +import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; @@ -47,9 +48,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -84,6 +82,7 @@ import org.apache.beam.sdk.util.construction.ReadTranslation; import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -97,7 +96,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -222,19 +220,16 @@ public void translateNode( context.getExecutionEnvironment().getMaxParallelism() > 0 ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism(); - - FlinkUnboundedSource unboundedSource = - FlinkSource.unbounded( - transform.getName(), - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); + UnboundedSourceWrapper sourceWrapper = + new UnboundedSourceWrapper<>( + fullName, context.getPipelineOptions(), rawSource, parallelism); nonDedupSource = context .getExecutionEnvironment() - .fromSource( - unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) - .uid(fullName); + .addSource(sourceWrapper) + .name(fullName) + .uid(fullName) + .returns(withIdTypeInfo); if (rawSource.requiresDeduping()) { source = @@ -308,24 +303,15 @@ void translateNode(Impulse transform, FlinkStreamingTranslationContext context) WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE), context.getPipelineOptions()); - FlinkBoundedSource impulseSource; - WatermarkStrategy> watermarkStrategy; - if (context.isStreaming()) { - long shutdownAfterIdleSourcesMs = - context - .getPipelineOptions() - .as(FlinkPipelineOptions.class) - .getShutdownSourcesAfterIdleMs(); - impulseSource = FlinkSource.unboundedImpulse(shutdownAfterIdleSourcesMs); - watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps(); - } else { - impulseSource = FlinkSource.boundedImpulse(); - watermarkStrategy = WatermarkStrategy.noWatermarks(); - } + long shutdownAfterIdleSourcesMs = + context + .getPipelineOptions() + .as(FlinkPipelineOptions.class) + .getShutdownSourcesAfterIdleMs(); SingleOutputStreamOperator> source = context .getExecutionEnvironment() - .fromSource(impulseSource, watermarkStrategy, "Impulse") + .addSource(new ImpulseSourceFunction(shutdownAfterIdleSourcesMs), "Impulse") .returns(typeInfo); context.setOutputDataStream(context.getOutput(transform), source); @@ -344,8 +330,7 @@ private static class ReadSourceTranslator @Override void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - if (ReadTranslation.sourceIsBounded(context.getCurrentTransform()) - == PCollection.IsBounded.BOUNDED) { + if (context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED)) { boundedTranslator.translateNode(transform, context); } else { unboundedTranslator.translateNode(transform, context); @@ -376,26 +361,24 @@ public void translateNode( } String fullName = getCurrentTransformName(context); - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - - FlinkBoundedSource flinkBoundedSource = - FlinkSource.bounded( - transform.getName(), - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); - + UnboundedSource adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource); DataStream> source; try { + int parallelism = + context.getExecutionEnvironment().getMaxParallelism() > 0 + ? context.getExecutionEnvironment().getMaxParallelism() + : context.getExecutionEnvironment().getParallelism(); + UnboundedSourceWrapperNoValueWithRecordId sourceWrapper = + new UnboundedSourceWrapperNoValueWithRecordId<>( + new UnboundedSourceWrapper<>( + fullName, context.getPipelineOptions(), adaptedRawSource, parallelism)); source = context .getExecutionEnvironment() - .fromSource( - flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) - .uid(fullName); + .addSource(sourceWrapper) + .name(fullName) + .uid(fullName) + .returns(outputTypeInfo); } catch (Exception e) { throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); } @@ -562,9 +545,7 @@ static void translateParDo( KeySelector, ?> keySelector = null; boolean stateful = false; DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (!signature.stateDeclarations().isEmpty() - || !signature.timerDeclarations().isEmpty() - || !signature.timerFamilyDeclarations().isEmpty()) { + if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index 10ba64a77148..acbd7d10da3d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -51,7 +51,6 @@ class FlinkStreamingTranslationContext { private final StreamExecutionEnvironment env; private final PipelineOptions options; - private final boolean isStreaming; /** * Keeps a mapping between the output value of the PTransform and the Flink Operator that produced @@ -63,11 +62,9 @@ class FlinkStreamingTranslationContext { private AppliedPTransform currentTransform; - public FlinkStreamingTranslationContext( - StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) { + public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { this.env = checkNotNull(env); this.options = checkNotNull(options); - this.isStreaming = isStreaming; } public StreamExecutionEnvironment getExecutionEnvironment() { @@ -78,10 +75,6 @@ public PipelineOptions getPipelineOptions() { return options; } - public boolean isStreaming() { - return isStreaming; - } - @SuppressWarnings("unchecked") public DataStream getInputDataStream(PValue value) { return (DataStream) dataStreams.get(value); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java index 46458eccb83c..e78105633362 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java @@ -36,7 +36,7 @@ class FlinkTransformOverrides { static List getDefaultOverrides(FlinkPipelineOptions options) { ImmutableList.Builder builder = ImmutableList.builder(); - if (options.isStreaming() || options.getUseDataStreamForBatch()) { + if (options.isStreaming()) { builder .add( PTransformOverride.of( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java deleted file mode 100644 index 48ee15501156..000000000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; - -public interface VersionDependentFlinkPipelineOptions extends PipelineOptions { - - @Description( - "When set to true, the batch job execution will use DataStream API. " - + "Otherwise, the batch job execution will use the legacy DataSet API.") - @Default.Boolean(false) - Boolean getUseDataStreamForBatch(); - - void setUseDataStreamForBatch(Boolean useDataStreamForBatch); -} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 63f5ede00242..0a9731da8b56 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -110,12 +110,10 @@ import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; -import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -688,7 +686,6 @@ protected final void setBundleFinishedCallback(Runnable callback) { @Override public final void processElement(StreamRecord> streamRecord) { checkInvokeStartBundle(); - LOG.trace("Processing element {} in {}", streamRecord.getValue().getValue(), doFn.getClass()); long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L; doFnRunner.processElement(streamRecord.getValue()); checkInvokeFinishBundleByCount(); @@ -771,7 +768,6 @@ public final void processElement2(StreamRecord streamRecord) thro @Override public final void processWatermark(Watermark mark) throws Exception { - LOG.trace("Processing watermark {} in {}", mark.getTimestamp(), doFn.getClass()); processWatermark1(mark); } @@ -1460,10 +1456,8 @@ private void populateOutputTimestampQueue(InternalTimerService timerS BiConsumerWithException consumer = (timerData, stamp) -> keyedStateInternals.addWatermarkHoldUsage(timerData.getOutputTimestamp()); - if (timerService instanceof InternalTimerServiceImpl) { - timerService.forEachEventTimeTimer(consumer); - timerService.forEachProcessingTimeTimer(consumer); - } + timerService.forEachEventTimeTimer(consumer); + timerService.forEachProcessingTimeTimer(consumer); } private String constructTimerId(String timerFamilyId, String timerId) { @@ -1514,7 +1508,6 @@ public void setTimer(TimerData timer) { } private void registerTimer(TimerData timer, String contextTimerId) throws Exception { - LOG.debug("Registering timer {}", timer); pendingTimersById.put(contextTimerId, timer); long time = timer.getTimestamp().getMillis(); switch (timer.getDomain()) { @@ -1625,31 +1618,7 @@ public Instant currentProcessingTime() { @Override public Instant currentInputWatermarkTime() { - if (timerService instanceof BatchExecutionInternalTimeService) { - // In batch mode, this method will only either return BoundedWindow.TIMESTAMP_MIN_VALUE, - // or BoundedWindow.TIMESTAMP_MAX_VALUE. - // - // For batch execution mode, the currentInputWatermark variable will never be updated - // until all the records are processed. However, every time when a record with a new - // key arrives, the Flink timer service watermark will be set to - // MAX_WATERMARK(LONG.MAX_VALUE) so that all the timers associated with the current - // key can fire. After that the Flink timer service watermark will be reset to - // LONG.MIN_VALUE, so the next key will start from a fresh env as if the previous - // records of a different key never existed. So the watermark is either Long.MIN_VALUE - // or long MAX_VALUE. So we should just use the Flink time service watermark in batch mode. - // - // In Flink the watermark ranges from - // [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the - // beam - // watermark range is [BoundedWindow.TIMESTAMP_MIN_VALUE (-9223372036854775), - // BoundedWindow.TIMESTAMP_MAX_VALUE (9223372036854775)]. To ensure the timestamp visible to - // the users follow the Beam convention, we just use the Beam range instead. - return timerService.currentWatermark() == Long.MAX_VALUE - ? new Instant(Long.MAX_VALUE) - : BoundedWindow.TIMESTAMP_MIN_VALUE; - } else { - return new Instant(getEffectiveInputWatermark()); - } + return new Instant(getEffectiveInputWatermark()); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java index 6f2f473feddc..c4d82cb5c8ad 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java @@ -51,9 +51,4 @@ public Iterable timersIterable() { public Iterable> elementsIterable() { return Collections.singletonList(value); } - - @Override - public String toString() { - return String.format("{%s, [%s]}", key, value); - } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index ec44d279586d..49d317d46ced 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -29,8 +29,6 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.file.Files; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -51,33 +49,17 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.powermock.reflect.Whitebox; /** Tests for {@link FlinkExecutionEnvironments}. */ -@RunWith(Parameterized.class) public class FlinkExecutionEnvironmentsTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); - @Parameterized.Parameter public boolean useDataStreamForBatch; - - @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") - public static Collection useDataStreamForBatchJobValues() { - return Arrays.asList(new Object[][] {{false}, {true}}); - } - - private FlinkPipelineOptions getDefaultPipelineOptions() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); - options.setUseDataStreamForBatch(useDataStreamForBatch); - return options; - } - @Test public void shouldSetParallelismBatch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setParallelism(42); @@ -89,7 +71,7 @@ public void shouldSetParallelismBatch() { @Test public void shouldSetParallelismStreaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setParallelism(42); @@ -102,7 +84,7 @@ public void shouldSetParallelismStreaming() { @Test public void shouldSetMaxParallelismStreaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setMaxParallelism(42); @@ -117,7 +99,7 @@ public void shouldSetMaxParallelismStreaming() { public void shouldInferParallelismFromEnvironmentBatch() throws IOException { String flinkConfDir = extractFlinkConfig(); - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -133,7 +115,7 @@ public void shouldInferParallelismFromEnvironmentBatch() throws IOException { public void shouldInferParallelismFromEnvironmentStreaming() throws IOException { String confDir = extractFlinkConfig(); - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -147,7 +129,7 @@ public void shouldInferParallelismFromEnvironmentStreaming() throws IOException @Test public void shouldFallbackToDefaultParallelismBatch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -159,7 +141,7 @@ public void shouldFallbackToDefaultParallelismBatch() { @Test public void shouldFallbackToDefaultParallelismStreaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -172,7 +154,7 @@ public void shouldFallbackToDefaultParallelismStreaming() { @Test public void useDefaultParallelismFromContextBatch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); @@ -184,7 +166,7 @@ public void useDefaultParallelismFromContextBatch() { @Test public void useDefaultParallelismFromContextStreaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); StreamExecutionEnvironment sev = @@ -197,7 +179,7 @@ public void useDefaultParallelismFromContextStreaming() { @Test public void shouldParsePortForRemoteEnvironmentBatch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:1234"); @@ -209,7 +191,7 @@ public void shouldParsePortForRemoteEnvironmentBatch() { @Test public void shouldParsePortForRemoteEnvironmentStreaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:1234"); @@ -222,7 +204,7 @@ public void shouldParsePortForRemoteEnvironmentStreaming() { @Test public void shouldAllowPortOmissionForRemoteEnvironmentBatch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host"); @@ -234,7 +216,7 @@ public void shouldAllowPortOmissionForRemoteEnvironmentBatch() { @Test public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host"); @@ -247,7 +229,7 @@ public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() { @Test public void shouldTreatAutoAndEmptyHostTheSameBatch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); ExecutionEnvironment sev = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); @@ -261,7 +243,7 @@ public void shouldTreatAutoAndEmptyHostTheSameBatch() { @Test public void shouldTreatAutoAndEmptyHostTheSameStreaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); StreamExecutionEnvironment sev = @@ -277,7 +259,7 @@ public void shouldTreatAutoAndEmptyHostTheSameStreaming() { @Test public void shouldDetectMalformedPortBatch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:p0rt"); @@ -289,7 +271,7 @@ public void shouldDetectMalformedPortBatch() { @Test public void shouldDetectMalformedPortStreaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:p0rt"); @@ -301,7 +283,7 @@ public void shouldDetectMalformedPortStreaming() { @Test public void shouldSupportIPv4Batch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("192.168.1.1:1234"); @@ -315,7 +297,7 @@ public void shouldSupportIPv4Batch() { @Test public void shouldSupportIPv4Streaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("192.168.1.1:1234"); @@ -329,7 +311,7 @@ public void shouldSupportIPv4Streaming() { @Test public void shouldSupportIPv6Batch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234"); @@ -344,7 +326,7 @@ public void shouldSupportIPv6Batch() { @Test public void shouldSupportIPv6Streaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234"); @@ -360,7 +342,7 @@ public void shouldSupportIPv6Streaming() { @Test public void shouldRemoveHttpProtocolFromHostBatch() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); for (String flinkMaster : @@ -376,7 +358,7 @@ public void shouldRemoveHttpProtocolFromHostBatch() { @Test public void shouldRemoveHttpProtocolFromHostStreaming() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); for (String flinkMaster : @@ -400,7 +382,7 @@ private String extractFlinkConfig() throws IOException { @Test public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() { // Checkpointing disabled, shut down sources immediately - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(0L)); } @@ -408,7 +390,7 @@ public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() { @Test public void shouldAutoSetIdleSourcesFlagWithCheckpointing() { // Checkpointing is enabled, never shut down sources - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setCheckpointingInterval(1000L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(Long.MAX_VALUE)); @@ -417,7 +399,7 @@ public void shouldAutoSetIdleSourcesFlagWithCheckpointing() { @Test public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() { // Checkpointing disabled, accept flag - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setShutdownSourcesAfterIdleMs(42L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(42L)); @@ -426,7 +408,7 @@ public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() { @Test public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() { // Checkpointing enable, still accept flag - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setCheckpointingInterval(1000L); options.setShutdownSourcesAfterIdleMs(42L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); @@ -436,7 +418,7 @@ public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() { @Test public void shouldSetSavepointRestoreForRemoteStreaming() { String path = "fakePath"; - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); options.setSavepointPath(path); @@ -450,7 +432,7 @@ public void shouldSetSavepointRestoreForRemoteStreaming() { @Test public void shouldFailOnUnknownStateBackend() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setStreaming(true); options.setStateBackend("unknown"); options.setStateBackendStoragePath("/path"); @@ -463,7 +445,7 @@ public void shouldFailOnUnknownStateBackend() { @Test public void shouldFailOnNoStoragePathProvided() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setStreaming(true); options.setStateBackend("unknown"); @@ -475,7 +457,7 @@ public void shouldFailOnNoStoragePathProvided() { @Test public void shouldCreateFileSystemStateBackend() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setStreaming(true); options.setStateBackend("fileSystem"); options.setStateBackendStoragePath(temporaryFolder.getRoot().toURI().toString()); @@ -488,7 +470,7 @@ public void shouldCreateFileSystemStateBackend() { @Test public void shouldCreateRocksDbStateBackend() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setStreaming(true); options.setStateBackend("rocksDB"); options.setStateBackendStoragePath(temporaryFolder.getRoot().toURI().toString()); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java index 9d898ed53a89..0c9211ff8032 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java @@ -28,7 +28,6 @@ import static org.hamcrest.core.Every.everyItem; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; -import static org.junit.Assume.assumeFalse; import java.io.ByteArrayOutputStream; import java.io.File; @@ -39,8 +38,6 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; @@ -71,13 +68,13 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.powermock.reflect.Whitebox; /** Tests for {@link FlinkPipelineExecutionEnvironment}. */ -@RunWith(Parameterized.class) +@RunWith(JUnit4.class) @SuppressWarnings({ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) @@ -85,22 +82,9 @@ public class FlinkPipelineExecutionEnvironmentTest implements Serializable { @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); - @Parameterized.Parameter public boolean useDataStreamForBatch; - - @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") - public static Collection useDataStreamForBatchJobValues() { - return Arrays.asList(new Object[][] {{false}, {true}}); - } - - private FlinkPipelineOptions getDefaultPipelineOptions() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); - options.setUseDataStreamForBatch(useDataStreamForBatch); - return options; - } - @Test public void shouldRecognizeAndTranslateStreamingPipeline() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("[auto]"); @@ -152,8 +136,6 @@ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws IOEx @Test public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection() throws IOException { - // StreamingExecutionEnv does not support "collection" mode. - assumeFalse(useDataStreamForBatch); FlinkPipelineOptions options = testPreparingResourcesToStage("[collection]"); assertThat(options.getFilesToStage().size(), is(2)); @@ -170,7 +152,7 @@ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws IOE @Test public void shouldUseDefaultTempLocationIfNoneSet() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); @@ -186,33 +168,42 @@ public void shouldUseDefaultTempLocationIfNoneSet() { @Test public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception { - shouldUsePreparedFilesOnRemoteStreamEnvironment(true); - shouldUsePreparedFilesOnRemoteStreamEnvironment(false); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setRunner(TestFlinkRunner.class); + options.setFlinkMaster("clusterAddress"); + + FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); + + Pipeline pipeline = Pipeline.create(options); + flinkEnv.translate(pipeline); + + ExecutionEnvironment executionEnvironment = flinkEnv.getBatchExecutionEnvironment(); + assertThat(executionEnvironment, instanceOf(RemoteEnvironment.class)); + + List jarFiles = getJars(executionEnvironment); + + List urlConvertedStagedFiles = convertFilesToURLs(options.getFilesToStage()); + + assertThat(jarFiles, is(urlConvertedStagedFiles)); } - public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode) - throws Exception { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + @Test + public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws Exception { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); - options.setStreaming(streamingMode); + options.setStreaming(true); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); Pipeline pipeline = Pipeline.create(options); flinkEnv.translate(pipeline); - List jarFiles; - if (streamingMode || options.getUseDataStreamForBatch()) { - StreamExecutionEnvironment streamExecutionEnvironment = - flinkEnv.getStreamExecutionEnvironment(); - assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); - jarFiles = getJars(streamExecutionEnvironment); - } else { - ExecutionEnvironment executionEnvironment = flinkEnv.getBatchExecutionEnvironment(); - assertThat(executionEnvironment, instanceOf(RemoteEnvironment.class)); - jarFiles = getJars(executionEnvironment); - } + StreamExecutionEnvironment streamExecutionEnvironment = + flinkEnv.getStreamExecutionEnvironment(); + assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); + + List jarFiles = getJars(streamExecutionEnvironment); List urlConvertedStagedFiles = convertFilesToURLs(options.getFilesToStage()); @@ -223,7 +214,7 @@ public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMod public void shouldUseTransformOverrides() { boolean[] testParameters = {true, false}; for (boolean streaming : testParameters) { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setStreaming(streaming); options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -243,7 +234,7 @@ public void shouldUseTransformOverrides() { @Test public void shouldProvideParallelismToTransformOverrides() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setStreaming(true); options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -287,7 +278,7 @@ public boolean matches(Object actual) { @Test public void shouldUseStreamingTransformOverridesWithUnboundedSources() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); // no explicit streaming mode set options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -312,7 +303,7 @@ public void shouldUseStreamingTransformOverridesWithUnboundedSources() { @Test public void testTranslationModeOverrideWithUnboundedSources() { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setStreaming(false); @@ -328,7 +319,7 @@ public void testTranslationModeOverrideWithUnboundedSources() { public void testTranslationModeNoOverrideWithoutUnboundedSources() { boolean[] testArgs = new boolean[] {true, false}; for (boolean streaming : testArgs) { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(FlinkRunner.class); options.setStreaming(streaming); @@ -417,7 +408,7 @@ private FlinkPipelineOptions testPreparingResourcesToStage( private FlinkPipelineOptions setPipelineOptions( String flinkMaster, String tempLocation, List filesToStage) { - FlinkPipelineOptions options = getDefaultPipelineOptions(); + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster(flinkMaster); options.setTempLocation(tempLocation); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java index da8c560690a6..c2d9163aacc9 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java @@ -94,7 +94,6 @@ public void testDefaults() { assertThat(options.getMaxBundleSize(), is(1000L)); assertThat(options.getMaxBundleTimeMills(), is(1000L)); assertThat(options.getExecutionModeForBatch(), is(ExecutionMode.PIPELINED.name())); - assertThat(options.getUseDataStreamForBatch(), is(false)); assertThat(options.getSavepointPath(), is(nullValue())); assertThat(options.getAllowNonRestoredState(), is(false)); assertThat(options.getDisableMetrics(), is(false)); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java index 84f1dc3c6457..5d56e6ddbf67 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java @@ -156,7 +156,7 @@ public void testStatefulParDoAfterCombineChaining() { private JobGraph getStatefulParDoAfterCombineChainingJobGraph(boolean stablePartitioning) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FlinkStreamingPipelineTranslator translator = - new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create(), true); + new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create()); final PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); pipelineOptions.setRunner(FlinkRunner.class); final Pipeline pipeline = Pipeline.create(pipelineOptions); @@ -188,7 +188,7 @@ public void testStatefulParDoAfterGroupByKeyChaining() { private JobGraph getStatefulParDoAfterGroupByKeyChainingJobGraph(boolean stablePartitioning) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FlinkStreamingPipelineTranslator translator = - new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create(), true); + new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create()); final PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); pipelineOptions.setRunner(FlinkRunner.class); final Pipeline pipeline = Pipeline.create(pipelineOptions); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 733bf536634c..4310d5e0aa4f 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -29,6 +29,8 @@ import java.util.Map; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource; +import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -49,8 +51,8 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; @@ -76,10 +78,11 @@ public void readSourceTranslatorBoundedWithMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - FlinkBoundedSource source = - (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); + UnboundedSourceWrapperNoValueWithRecordId source = + (UnboundedSourceWrapperNoValueWithRecordId) + ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); - assertEquals(maxParallelism, source.getNumSplits()); + assertEquals(maxParallelism, source.getUnderlyingSource().getSplitSources().size()); } @Test @@ -95,10 +98,11 @@ public void readSourceTranslatorBoundedWithoutMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - FlinkBoundedSource source = - (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); + UnboundedSourceWrapperNoValueWithRecordId source = + (UnboundedSourceWrapperNoValueWithRecordId) + ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); - assertEquals(parallelism, source.getNumSplits()); + assertEquals(parallelism, source.getUnderlyingSource().getSplitSources().size()); } @Test @@ -117,12 +121,13 @@ public void readSourceTranslatorUnboundedWithMaxParallelism() { (OneInputTransformation) applyReadSourceTransform(transform, PCollection.IsBounded.UNBOUNDED, env); - FlinkSource source = - (FlinkSource) - ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) - .getSource(); + UnboundedSourceWrapper source = + (UnboundedSourceWrapper) + ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getOperator() + .getUserFunction(); - assertEquals(maxParallelism, source.getNumSplits()); + assertEquals(maxParallelism, source.getSplitSources().size()); } @Test @@ -139,12 +144,13 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { (OneInputTransformation) applyReadSourceTransform(transform, PCollection.IsBounded.UNBOUNDED, env); - FlinkSource source = - (FlinkSource) - ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) - .getSource(); + UnboundedSourceWrapper source = + (UnboundedSourceWrapper) + ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) + .getOperator() + .getUserFunction(); - assertEquals(parallelism, source.getNumSplits()); + assertEquals(parallelism, source.getSplitSources().size()); } private Object applyReadSourceTransform( @@ -153,7 +159,7 @@ private Object applyReadSourceTransform( FlinkStreamingPipelineTranslator.StreamTransformTranslator> translator = getReadSourceTranslator(); FlinkStreamingTranslationContext ctx = - new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create(), true); + new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create()); Pipeline pipeline = Pipeline.create(); PCollection pc = diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java index 22a9ce4f39ab..21d9d05ad325 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java @@ -72,8 +72,6 @@ public class FlinkSubmissionTest { /** Counter which keeps track of the number of jobs submitted. */ private static int expectedNumberOfJobs; - public static boolean useDataStreamForBatch; - @BeforeClass public static void beforeClass() throws Exception { Configuration config = new Configuration(); @@ -106,12 +104,6 @@ public void testSubmissionBatch() throws Exception { runSubmission(false, false); } - @Test - public void testSubmissionBatchUseDataStream() throws Exception { - FlinkSubmissionTest.useDataStreamForBatch = true; - runSubmission(false, false); - } - @Test public void testSubmissionStreaming() throws Exception { runSubmission(false, true); @@ -122,12 +114,6 @@ public void testDetachedSubmissionBatch() throws Exception { runSubmission(true, false); } - @Test - public void testDetachedSubmissionBatchUseDataStream() throws Exception { - FlinkSubmissionTest.useDataStreamForBatch = true; - runSubmission(true, false); - } - @Test public void testDetachedSubmissionStreaming() throws Exception { runSubmission(true, true); @@ -178,7 +164,6 @@ private void waitUntilJobIsCompleted() throws Exception { /** The Flink program which is executed by the CliFrontend. */ public static void main(String[] args) { FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); - options.setUseDataStreamForBatch(useDataStreamForBatch); options.setRunner(FlinkRunner.class); options.setStreaming(streaming); options.setParallelism(1); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java index b8dc52f6cd4b..8da44d4b3a83 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java @@ -56,19 +56,13 @@ public void postSubmit() throws Exception { } @Test - public void testStreaming() { - runProgram(resultPath, true); + public void testProgram() throws Exception { + runProgram(resultPath); } - @Test - public void testBatch() { - runProgram(resultPath, false); - } - - private static void runProgram(String resultPath, boolean streaming) { + private static void runProgram(String resultPath) { - Pipeline p = - streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch(); + Pipeline p = FlinkTestPipeline.createForStreaming(); p.apply(GenerateSequence.from(0).to(10)) .apply(