Skip to content

Commit

Permalink
Make compression codec configurable for columnar shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jan 1, 2025
1 parent f66bced commit 0b2f0e9
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 10 deletions.
17 changes: 16 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use jni::{
use tokio::runtime::Runtime;

use crate::execution::operators::ScanExec;
use crate::execution::shuffle::read_ipc_compressed;
use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec};
use crate::execution::spark_plan::SparkPlan;
use log::info;

Expand Down Expand Up @@ -467,6 +467,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled: jboolean,
checksum_algo: jint,
current_checksum: jlong,
compression_codec: jstring,
compression_level: jint,
) -> jlongArray {
try_unwrap_or_throw(&e, |mut env| unsafe {
let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;
Expand Down Expand Up @@ -494,6 +496,18 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
Some(current_checksum as u32)
};

let compression_codec: String = env
.get_string(&JString::from_raw(compression_codec))
.unwrap()
.into();

let compression_codec = match compression_codec.as_str() {
"zstd" => CompressionCodec::Zstd(compression_level.into()),
"lz4" => CompressionCodec::Lz4Frame,
"snappy" => CompressionCodec::Snappy,
_ => CompressionCodec::Lz4Frame,
};

let (written_bytes, checksum) = process_sorted_row_partition(
row_num,
batch_size as usize,
Expand All @@ -505,6 +519,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled,
checksum_algo,
current_checksum,
&compression_codec,
)?;

let checksum = if let Some(checksum) = checksum {
Expand Down
5 changes: 2 additions & 3 deletions native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3297,6 +3297,7 @@ pub fn process_sorted_row_partition(
// this is the initial checksum for this method, as it also gets updated iteratively
// inside the loop within the method across batches.
initial_checksum: Option<u32>,
codec: &CompressionCodec,
) -> Result<(i64, Option<u32>), CometError> {
// TODO: We can tune this parameter automatically based on row size and cache size.
let row_step = 10;
Expand Down Expand Up @@ -3359,9 +3360,7 @@ pub fn process_sorted_row_partition(

// we do not collect metrics in Native_writeSortedFileNative
let ipc_time = Time::default();
// compression codec is not configurable for CometBypassMergeSortShuffleWriter
let codec = CompressionCodec::Zstd(1);
written += write_ipc_compressed(&batch, &mut cursor, &codec, &ipc_time)?;
written += write_ipc_compressed(&batch, &mut cursor, codec, &ipc_time)?;

if let Some(checksum) = &mut current_checksum {
checksum.update(&mut cursor)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public final class CometShuffleExternalSorter implements CometShuffleChecksumSup
private final long[] partitionChecksums;

private final String checksumAlgorithm;
private final String compressionCodec;
private final int compressionLevel;

// The memory allocator for this sorter. It is used to allocate/free memory pages for this sorter.
// Because we need to allocate off-heap memory regardless of configured Spark memory mode
Expand Down Expand Up @@ -153,6 +155,8 @@ public CometShuffleExternalSorter(
this.peakMemoryUsedBytes = getMemoryUsage();
this.partitionChecksums = createPartitionChecksums(numPartitions, conf);
this.checksumAlgorithm = getChecksumAlgorithm(conf);
this.compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get();
this.compressionLevel = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL().get();

this.initialSize = initialSize;

Expand Down Expand Up @@ -556,7 +560,9 @@ public void writeSortedFileNative(boolean isLastFile) throws IOException {
spillInfo.file,
rowPartition,
writeMetricsToUse,
preferDictionaryRatio);
preferDictionaryRatio,
compressionCodec,
compressionLevel);
spillInfo.partitionLengths[currentPartition] = written;

// Store the checksum for the current partition.
Expand All @@ -578,7 +584,13 @@ public void writeSortedFileNative(boolean isLastFile) throws IOException {
if (currentPartition != -1) {
long written =
doSpilling(
dataTypes, spillInfo.file, rowPartition, writeMetricsToUse, preferDictionaryRatio);
dataTypes,
spillInfo.file,
rowPartition,
writeMetricsToUse,
preferDictionaryRatio,
compressionCodec,
compressionLevel);
spillInfo.partitionLengths[currentPartition] = written;

synchronized (spills) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public final class CometDiskBlockWriter {
private long totalWritten = 0L;
private boolean initialized = false;
private final int columnarBatchSize;
private final String compressionCodec;
private final int compressionLevel;
private final boolean isAsync;
private final int asyncThreadNum;
private final ExecutorService threadPool;
Expand Down Expand Up @@ -153,6 +155,8 @@ public final class CometDiskBlockWriter {
this.threadPool = threadPool;

this.columnarBatchSize = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get();
this.compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get();
this.compressionLevel = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL().get();

this.numElementsForSpillThreshold =
(int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
Expand Down Expand Up @@ -397,7 +401,14 @@ long doSpilling(boolean isLast) throws IOException {
synchronized (file) {
outputRecords += rowPartition.getNumRows();
written =
doSpilling(dataTypes, file, rowPartition, writeMetricsToUse, preferDictionaryRatio);
doSpilling(
dataTypes,
file,
rowPartition,
writeMetricsToUse,
preferDictionaryRatio,
compressionCodec,
compressionLevel);
}

// Update metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ protected long doSpilling(
File file,
RowPartition rowPartition,
ShuffleWriteMetricsReporter writeMetricsToUse,
double preferDictionaryRatio) {
double preferDictionaryRatio,
String compressionCodec,
int compressionLevel) {
long[] addresses = rowPartition.getRowAddresses();
int[] sizes = rowPartition.getRowSizes();

Expand All @@ -190,7 +192,9 @@ protected long doSpilling(
batchSize,
checksumEnabled,
checksumAlgo,
currentChecksum);
currentChecksum,
compressionCodec,
compressionLevel);

long written = results[0];
checksum = results[1];
Expand Down
10 changes: 9 additions & 1 deletion spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,14 @@ class Native extends NativeBase {
* @param currentChecksum
* the current checksum of the file. As the checksum is computed incrementally, this is used
* to resume the computation of checksum for previous written data.
* @param compressionCodec
* the compression codec
* @param compressionLevel
* the compression level
* @return
* [the number of bytes written to disk, the checksum]
*/
// scalastyle:off
@native def writeSortedFileNative(
addresses: Array[Long],
rowSizes: Array[Int],
Expand All @@ -130,7 +135,10 @@ class Native extends NativeBase {
batchSize: Int,
checksumEnabled: Boolean,
checksumAlgo: Int,
currentChecksum: Long): Array[Long]
currentChecksum: Long,
compressionCodec: String,
compressionLevel: Int): Array[Long]
// scalastyle:on

/**
* Sorts partition ids of Spark unsafe rows in place. Used by Comet shuffle external sorter.
Expand Down

0 comments on commit 0b2f0e9

Please sign in to comment.