Skip to content

Commit

Permalink
flush buffer during drain operation for requiresStableInput operator (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Kanishk Karanawat committed Oct 16, 2024
1 parent 5a3185c commit e175373
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,21 @@ public interface FlinkPipelineOptions

void setFlinkConfDir(String confDir);

@Description(
"Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.")
@Default.Long(0)
Long getFileInputSplitMaxSizeMB();

void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB);

@Description(
"Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,"
+ "the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.")
@Default.Boolean(false)
Boolean getEnableStableInputDrain();

void setEnableStableInputDrain(Boolean enableStableInputDrain);

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ public class DoFnOperator<InputT, OutputT>
/** If true, we must process elements only after a checkpoint is finished. */
final boolean requiresStableInput;

/**
* If both requiresStableInput and this parameter are true, we must flush the buffer during drain
* operation.
*/
final boolean enableStableInputDrain;

final int numConcurrentCheckpoints;

private final boolean usesOnWindowExpiration;
Expand Down Expand Up @@ -323,6 +329,8 @@ public DoFnOperator(
+ Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints()));
}

this.enableStableInputDrain = flinkOptions.getEnableStableInputDrain();

this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints();

this.finishBundleBeforeCheckpointing = flinkOptions.getFinishBundleBeforeCheckpointing();
Expand Down Expand Up @@ -626,6 +634,12 @@ void flushData() throws Exception {
while (bundleStarted) {
invokeFinishBundle();
}
if (requiresStableInput && enableStableInputDrain) {
// Flush any buffered events here before draining the pipeline. Note that this is best-effort
// and requiresStableInput contract might be violated in cases where buffer processing fails.
bufferingDoFnRunner.checkpointCompleted(Long.MAX_VALUE);
updateOutputWatermark();
}
if (currentOutputWatermark < Long.MAX_VALUE) {
throw new RuntimeException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2015,6 +2015,98 @@ public void finishBundle(FinishBundleContext context) {
WindowedValue.valueInGlobalWindow("finishBundle")));
}

@Test
public void testExactlyOnceBufferingFlushDuringDrain() throws Exception {
FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
options.setMaxBundleSize(2L);
options.setCheckpointingInterval(1L);
options.setEnableStableInputDrain(true);

TupleTag<String> outputTag = new TupleTag<>("main-output");
WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());

numStartBundleCalled = 0;
DoFn<String, String> doFn =
new DoFn<String, String>() {
@StartBundle
public void startBundle(StartBundleContext context) {
numStartBundleCalled += 1;
}

@ProcessElement
// Use RequiresStableInput to force buffering elements
@RequiresStableInput
public void processElement(ProcessContext context) {
context.output(context.element());
}

@FinishBundle
public void finishBundle(FinishBundleContext context) {
context.output(
"finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
}
};

DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory<>(
outputTag,
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE),
new SerializablePipelineOptions(options));

Supplier<DoFnOperator<String, String>> doFnOperatorSupplier =
() ->
new DoFnOperator<>(
doFn,
"stepName",
windowedValueCoder,
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
outputManagerFactory,
WindowingStrategy.globalDefault(),
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
options,
null,
null,
DoFnSchemaInformation.create(),
Collections.emptyMap());

DoFnOperator<String, String> doFnOperator = doFnOperatorSupplier.get();
OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
new OneInputStreamOperatorTestHarness<>(doFnOperator);

testHarness.open();

testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("a")));
testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("b")));

assertThat(Iterables.size(testHarness.getOutput()), is(0));
assertThat(numStartBundleCalled, is(0));

// Simulate pipeline drain scenario
OperatorSubtaskState backup = testHarness.snapshot(0, 0);
doFnOperator.flushData();

assertThat(numStartBundleCalled, is(1));
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("a"),
WindowedValue.valueInGlobalWindow("b"),
WindowedValue.valueInGlobalWindow("finishBundle")));

doFnOperator = doFnOperatorSupplier.get();
testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator);
testHarness.open();

doFnOperator.notifyCheckpointComplete(0L);

assertThat(numStartBundleCalled, is(1));
assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()), emptyIterable());
}

@Test
public void testExactlyOnceBufferingKeyed() throws Exception {
FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<td>Disable Beam metrics in Flink Runner</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>enableStableInputDrain</code></td>
<td>Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>executionModeForBatch</code></td>
<td>Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<td>Disable Beam metrics in Flink Runner</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>enable_stable_input_drain</code></td>
<td>Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>execution_mode_for_batch</code></td>
<td>Flink mode for data exchange of batch pipelines. Reference {@link org.apache.flink.api.common.ExecutionMode}. Set this to BATCH_FORCED if pipelines get blocked, see https://issues.apache.org/jira/browse/FLINK-10672</td>
Expand Down

0 comments on commit e175373

Please sign in to comment.