Skip to content

Commit

Permalink
Reduce the maximum size of input splits in Flink to better distribute…
Browse files Browse the repository at this point in the history
… work (#28045)
  • Loading branch information
jto authored Sep 1, 2023
1 parent 3463aa3 commit 6b3ce09
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ public interface FlinkPipelineOptions

void setFlinkConfDir(String confDir);

@Description(
"Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.")
@Default.Long(0)
Long getFileInputSplitMaxSizeMB();

void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB);

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import java.io.IOException;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand Down Expand Up @@ -108,12 +110,30 @@ public float getAverageRecordWidth() {
return null;
}

private long getDesiredSizeBytes(int numSplits) throws Exception {
long totalSize = initialSource.getEstimatedSizeBytes(options);
long defaultSplitSize = totalSize / numSplits;
long maxSplitSize = 0;
if (options != null) {
maxSplitSize = options.as(FlinkPipelineOptions.class).getFileInputSplitMaxSizeMB();
}
if (initialSource instanceof FileBasedSource && maxSplitSize > 0) {
// Most of the time parallelism is < number of files in source.
// Each file becomes a unique split which commonly create skew.
// This limits the size of splits to reduce skew.
return Math.min(defaultSplitSize, maxSplitSize * 1024 * 1024);
} else {
return defaultSplitSize;
}
}

@Override
@SuppressWarnings("unchecked")
public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
try {
long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
long desiredSizeBytes = getDesiredSizeBytes(numSplits);
List<? extends Source<T>> shards = initialSource.split(desiredSizeBytes, options);

int numShards = shards.size();
SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
for (int i = 0; i < numShards; i++) {
Expand Down

0 comments on commit 6b3ce09

Please sign in to comment.