Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink-runner] Improve Datastream for batch performances #32440

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
df1d56b
[Flink] Set return type of bounded sources
jto Jul 23, 2024
ed922af
[Flink] Use a lazy split enumerator for bounded sources
jto Jul 23, 2024
24ce8fa
[Flink] Default to maxParallelism = parallelism in batch
jto Aug 19, 2024
b12c87b
[Flink] Avoid re-serializing trigger on every element
jto Aug 20, 2024
c8c3f96
[Flink] Avoid re-evaluating options every time a new state is stored
jto Aug 20, 2024
f002d49
[Flink] Only serialize states namespace keys if necessary
jto Aug 21, 2024
dd734b3
[Flink] Make ToKeyedWorkItem part of the DoFnOperator
jto Aug 6, 2024
89687e9
[Flink] Remove ToBinaryKV
jto Aug 19, 2024
b2727c8
[Flink] Refactor CombinePerKeyTranslator
jto Aug 8, 2024
feaa32e
[Flink] Combine before Reduce (no side-input only)
jto Aug 9, 2024
0c5237f
[Flink] Combine before GBK
jto Aug 23, 2024
6749b9e
[Flink] Combine before reduce (with side input)
jto Aug 28, 2024
85e58ac
[Flink] Force slot sharing group in batch mode
jto Aug 27, 2024
ec10076
[Flink] Disable bundling in batch mode
jto Aug 26, 2024
275b4d8
[Flink] Lower default max bundle size in batch mode
jto Aug 23, 2024
d3c4483
[Flink] Code cleanup
jto Aug 28, 2024
352ea78
[Flink] fix WindowDoFnOperatorTest
jto Aug 29, 2024
4532df1
[Flink] spotless
jto Aug 30, 2024
73378a2
[Flink] fix broken tests
jto Sep 10, 2024
615ec19
[Flink] Remove 1.14 compat code
jto Sep 12, 2024
fae1902
[Flink] Fix flaky test
jto Sep 12, 2024
d81369e
[Flink] Use a custom key type to better distribute load
jto Oct 16, 2024
8fb69f1
[Flink] Add post commit triggers
jto Oct 16, 2024
6c98da6
[Flink] licence
jto Oct 16, 2024
8169038
[Flink] spotless
jto Oct 16, 2024
687113a
Additional Flink github action trigger files
kennknowles Oct 18, 2024
80a7e22
[Flink] fix GBK streaming with side input
jto Nov 6, 2024
ac2eddc
[Flink] fix error management in lazy source
jto Nov 12, 2024
2c01ac0
[Flink] disable operator chaining in validatesRunner
jto Nov 13, 2024
e359eda
[Flink] fix lazy source enumerator behaviour on error
jto Nov 13, 2024
777dcf7
[Flink] set validates runner parallelism to 1
jto Nov 14, 2024
6161f4e
[Flink] add org.apache.beam.sdk.transforms.ParDoTest to sickbay
jto Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[Flink] Code cleanup
* spotless
* checkstyle
* spotless
jto committed Nov 6, 2024
commit d3c44839eb873e13cfd62876bd0b28f18bbcf4c5
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@
package org.apache.beam.runners.core;

import java.util.Collection;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -477,7 +477,10 @@ public void initializeState(StateInitializationContext context) throws Exception
if (keyCoder != null) {
keyedStateInternals =
new FlinkStateInternals<>(
(KeyedStateBackend) getKeyedStateBackend(), keyCoder, windowingStrategy.getWindowFn().windowCoder(), serializedOptions);
(KeyedStateBackend) getKeyedStateBackend(),
keyCoder,
windowingStrategy.getWindowFn().windowCoder(),
serializedOptions);

if (timerService == null) {
timerService =
@@ -607,7 +610,10 @@ private void earlyBindStateIfNeeded() throws IllegalArgumentException, IllegalAc
if (doFn != null) {
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
FlinkStateInternals.EarlyBinder earlyBinder =
new FlinkStateInternals.EarlyBinder(getKeyedStateBackend(), serializedOptions, windowingStrategy.getWindowFn().windowCoder());
new FlinkStateInternals.EarlyBinder(
getKeyedStateBackend(),
serializedOptions,
windowingStrategy.getWindowFn().windowCoder());
for (DoFnSignature.StateDeclaration value : signature.stateDeclarations().values()) {
StateSpec<?> spec =
(StateSpec<?>) signature.stateDeclarations().get(value.id()).field().get(doFn);
@@ -985,7 +991,7 @@ private void checkInvokeStartBundle() {
@SuppressWarnings("NonAtomicVolatileUpdate")
@SuppressFBWarnings("VO_VOLATILE_INCREMENT")
private void checkInvokeFinishBundleByCount() {
if(!shoudBundleElements()) {
if (!shoudBundleElements()) {
return;
}
// We do not access this statement concurrently, but we want to make sure that each thread
@@ -1001,7 +1007,7 @@ private void checkInvokeFinishBundleByCount() {

/** Check whether invoke finishBundle by timeout. */
private void checkInvokeFinishBundleByTime() {
if(!shoudBundleElements()) {
if (!shoudBundleElements()) {
return;
}
long now = getProcessingTimeService().getCurrentProcessingTime();
@@ -1231,6 +1237,7 @@ public static class BufferedOutputManager<OutputT> implements DoFnRunners.Output
* buffering. It will not be acquired during flushing the buffer.
*/
private final Lock bufferLock;

private final boolean isStreaming;

private Map<Integer, TupleTag<?>> idsToTags;
@@ -1438,7 +1445,13 @@ public BufferedOutputManager<OutputT> create(
NonKeyedPushedBackElementsHandler.create(listStateBuffer);

return new BufferedOutputManager<>(
output, mainTag, tagsToOutputTags, tagsToIds, bufferLock, pushedBackElementsHandler, isStreaming);
output,
mainTag,
tagsToOutputTags,
tagsToIds,
bufferLock,
pushedBackElementsHandler,
isStreaming);
}

private TaggedKvCoder buildTaggedKvCoder() {
Original file line number Diff line number Diff line change
@@ -111,7 +111,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
Original file line number Diff line number Diff line change
@@ -17,9 +17,12 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.util.*;
import java.util.stream.Collectors;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.runners.flink.translation.functions.AbstractFlinkCombineRunner;
import org.apache.beam.runners.flink.translation.functions.HashingFlinkCombineRunner;
import org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner;
@@ -37,7 +40,6 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -146,13 +148,12 @@ public void initializeState(StateInitializationContext context) throws Exception

ListStateDescriptor<WindowedValue<KV<K, InputT>>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
new CoderTypeSerializer<>(windowedInputCoder, serializedOptions));
"buffered-elements", new CoderTypeSerializer<>(windowedInputCoder, serializedOptions));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

if(context.isRestored() && this.checkpointedState != null) {
for(WindowedValue<KV<K, InputT>> wkv : this.checkpointedState.get()) {
if (context.isRestored() && this.checkpointedState != null) {
for (WindowedValue<KV<K, InputT>> wkv : this.checkpointedState.get()) {
this.state.put(wkv.getValue().getKey(), wkv);
}
}
Original file line number Diff line number Diff line change
@@ -119,18 +119,12 @@ public Boundedness getBoundedness() {
public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>
createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception {

if(boundedness == Boundedness.BOUNDED) {
if (boundedness == Boundedness.BOUNDED) {
return new LazyFlinkSourceSplitEnumerator<>(
enumContext,
beamSource,
serializablePipelineOptions.get(),
numSplits);
enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
} else {
return new FlinkSourceSplitEnumerator<>(
enumContext,
beamSource,
serializablePipelineOptions.get(),
numSplits);
enumContext, beamSource, serializablePipelineOptions.get(), numSplits);
}
}

@@ -141,7 +135,7 @@ public Boundedness getBoundedness() {
Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)
throws Exception {
SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> enumerator =
createEnumerator(enumContext);
createEnumerator(enumContext);
checkpoint.forEach(
(subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));
return enumerator;
Original file line number Diff line number Diff line change
@@ -19,26 +19,18 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import javax.annotation.Nullable;

import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplit;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSourceSplitEnumerator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.compat.SplitEnumeratorCompat;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -92,23 +84,23 @@ public void start() {
@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
// reader failed between sending the request and now. skip this request.
return;
// reader failed between sending the request and now. skip this request.
return;
}

if (LOG.isInfoEnabled()) {
final String hostInfo =
hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";
LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);
final String hostInfo =
hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";
LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);
}

if (!pendingSplits.isEmpty()) {
final FlinkSourceSplit<T> split = pendingSplits.remove(pendingSplits.size() - 1);
context.assignSplit(split, subtask);
LOG.info("Assigned split to subtask {} : {}", subtask, split);
final FlinkSourceSplit<T> split = pendingSplits.remove(pendingSplits.size() - 1);
context.assignSplit(split, subtask);
LOG.info("Assigned split to subtask {} : {}", subtask, split);
} else {
context.signalNoMoreSplits(subtask);
LOG.info("No more splits available for subtask {}", subtask);
context.signalNoMoreSplits(subtask);
LOG.info("No more splits available for subtask {}", subtask);
}
}

Original file line number Diff line number Diff line change
@@ -101,7 +101,7 @@ protected FlinkBoundedSourceReader(
public InputStatus pollNext(ReaderOutput<WindowedValue<T>> output) throws Exception {
checkExceptionAndMaybeThrow();

if(currentReader == null && currentSplitId == -1) {
if (currentReader == null && currentSplitId == -1) {
context.sendSplitRequest();
}

Loading