From e175373ea63d3d6b16d101831f33a2b181b7fbaa Mon Sep 17 00:00:00 2001 From: Kanishk Karanawat Date: Sat, 21 Oct 2023 15:20:32 -0700 Subject: [PATCH] flush buffer during drain operation for requiresStableInput operator (#28554) --- .../runners/flink/FlinkPipelineOptions.java | 15 +++ .../wrappers/streaming/DoFnOperator.java | 14 +++ .../wrappers/streaming/DoFnOperatorTest.java | 92 +++++++++++++++++++ .../flink_java_pipeline_options.html | 5 + .../flink_python_pipeline_options.html | 5 + 5 files changed, 131 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 29fe0543adf6..63b043916845 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -299,6 +299,21 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); + @Description( + "Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.") + @Default.Long(0) + Long getFileInputSplitMaxSizeMB(); + + void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB); + + @Description( + "Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining," + + "the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.") + @Default.Boolean(false) + Boolean getEnableStableInputDrain(); + + void setEnableStableInputDrain(Boolean enableStableInputDrain); + static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 13c4e4e0a99f..3f7353ec97f9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -199,6 +199,12 @@ public class DoFnOperator /** If true, we must process elements only after a checkpoint is finished. */ final boolean requiresStableInput; + /** + * If both requiresStableInput and this parameter are true, we must flush the buffer during drain + * operation. + */ + final boolean enableStableInputDrain; + final int numConcurrentCheckpoints; private final boolean usesOnWindowExpiration; @@ -323,6 +329,8 @@ public DoFnOperator( + Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints())); } + this.enableStableInputDrain = flinkOptions.getEnableStableInputDrain(); + this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints(); this.finishBundleBeforeCheckpointing = flinkOptions.getFinishBundleBeforeCheckpointing(); @@ -626,6 +634,12 @@ void flushData() throws Exception { while (bundleStarted) { invokeFinishBundle(); } + if (requiresStableInput && enableStableInputDrain) { + // Flush any buffered events here before draining the pipeline. Note that this is best-effort + // and requiresStableInput contract might be violated in cases where buffer processing fails. + bufferingDoFnRunner.checkpointCompleted(Long.MAX_VALUE); + updateOutputWatermark(); + } if (currentOutputWatermark < Long.MAX_VALUE) { throw new RuntimeException( String.format( diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index c3412206c1bc..ee5bc90093e8 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -2015,6 +2015,98 @@ public void finishBundle(FinishBundleContext context) { WindowedValue.valueInGlobalWindow("finishBundle"))); } + @Test + public void testExactlyOnceBufferingFlushDuringDrain() throws Exception { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setMaxBundleSize(2L); + options.setCheckpointingInterval(1L); + options.setEnableStableInputDrain(true); + + TupleTag outputTag = new TupleTag<>("main-output"); + WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = + WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); + + numStartBundleCalled = 0; + DoFn doFn = + new DoFn() { + @StartBundle + public void startBundle(StartBundleContext context) { + numStartBundleCalled += 1; + } + + @ProcessElement + // Use RequiresStableInput to force buffering elements + @RequiresStableInput + public void processElement(ProcessContext context) { + context.output(context.element()); + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) { + context.output( + "finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE); + } + }; + + DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = + new DoFnOperator.MultiOutputOutputManagerFactory<>( + outputTag, + WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE), + new SerializablePipelineOptions(options)); + + Supplier> doFnOperatorSupplier = + () -> + new DoFnOperator<>( + doFn, + "stepName", + windowedValueCoder, + Collections.emptyMap(), + outputTag, + Collections.emptyList(), + outputManagerFactory, + WindowingStrategy.globalDefault(), + new HashMap<>(), /* side-input mapping */ + Collections.emptyList(), /* side inputs */ + options, + null, + null, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + + DoFnOperator doFnOperator = doFnOperatorSupplier.get(); + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = + new OneInputStreamOperatorTestHarness<>(doFnOperator); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("a"))); + testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("b"))); + + assertThat(Iterables.size(testHarness.getOutput()), is(0)); + assertThat(numStartBundleCalled, is(0)); + + // Simulate pipeline drain scenario + OperatorSubtaskState backup = testHarness.snapshot(0, 0); + doFnOperator.flushData(); + + assertThat(numStartBundleCalled, is(1)); + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + + doFnOperator = doFnOperatorSupplier.get(); + testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator); + testHarness.open(); + + doFnOperator.notifyCheckpointComplete(0L); + + assertThat(numStartBundleCalled, is(1)); + assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()), emptyIterable()); + } + @Test public void testExactlyOnceBufferingKeyed() throws Exception { FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index 8314d5b6879d..40b7986df82d 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -52,6 +52,11 @@ Disable Beam metrics in Flink Runner Default: false + + enableStableInputDrain + Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator. + Default: false + executionModeForBatch Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672 diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index 010d3c942824..84bac0c708f8 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -52,6 +52,11 @@ Disable Beam metrics in Flink Runner Default: false + + enable_stable_input_drain + Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator. + Default: false + execution_mode_for_batch Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672