From 6b3ce095b0a12a8385eacdb1748fb523247850c1 Mon Sep 17 00:00:00 2001 From: Julien Tournay Date: Fri, 1 Sep 2023 22:48:25 +0200 Subject: [PATCH] Reduce the maximum size of input splits in Flink to better distribute work (#28045) --- .../runners/flink/FlinkPipelineOptions.java | 7 ++++++ .../wrappers/SourceInputFormat.java | 22 ++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 2f32ab2c2ea6..1e01514fe8b6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -313,6 +313,13 @@ public interface FlinkPipelineOptions void setFlinkConfDir(String confDir); + @Description( + "Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.") + @Default.Long(0) + Long getFileInputSplitMaxSizeMB(); + + void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB); + static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 7e1835eac721..a1b8bced7a1d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -20,9 +20,11 @@ import java.io.IOException; import java.util.List; import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -108,12 +110,30 @@ public float getAverageRecordWidth() { return null; } + private long getDesiredSizeBytes(int numSplits) throws Exception { + long totalSize = initialSource.getEstimatedSizeBytes(options); + long defaultSplitSize = totalSize / numSplits; + long maxSplitSize = 0; + if (options != null) { + maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB(); + } + if (initialSource instanceof FileBasedSource && maxSplitSize > 0) { + // Most of the time parallelism is < number of files in source. + // Each file becomes a unique split which commonly create skew. + // This limits the size of splits to reduce skew. + return Math.min(defaultSplitSize, maxSplitSize * 1024 * 1024); + } else { + return defaultSplitSize; + } + } + @Override @SuppressWarnings("unchecked") public SourceInputSplit[] createInputSplits(int numSplits) throws IOException { try { - long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; + long desiredSizeBytes = getDesiredSizeBytes(numSplits); List> shards = initialSource.split(desiredSizeBytes, options); + int numShards = shards.size(); SourceInputSplit[] sourceInputSplits = new SourceInputSplit[numShards]; for (int i = 0; i < numShards; i++) {