From 0b2f0e9cce53a9626de833cd23b69da8132bde99 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Jan 2025 12:41:00 -0700 Subject: [PATCH 1/4] Make compression codec configurable for columnar shuffle --- native/core/src/execution/jni_api.rs | 17 ++++++++++++++++- native/core/src/execution/shuffle/row.rs | 5 ++--- .../sort/CometShuffleExternalSorter.java | 16 ++++++++++++++-- .../execution/shuffle/CometDiskBlockWriter.java | 13 ++++++++++++- .../comet/execution/shuffle/SpillWriter.java | 8 ++++++-- .../main/scala/org/apache/comet/Native.scala | 10 +++++++++- 6 files changed, 59 insertions(+), 10 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index b5f744b08..7b58a77b5 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -58,7 +58,7 @@ use jni::{ use tokio::runtime::Runtime; use crate::execution::operators::ScanExec; -use crate::execution::shuffle::read_ipc_compressed; +use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec}; use crate::execution::spark_plan::SparkPlan; use log::info; @@ -467,6 +467,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative checksum_enabled: jboolean, checksum_algo: jint, current_checksum: jlong, + compression_codec: jstring, + compression_level: jint, ) -> jlongArray { try_unwrap_or_throw(&e, |mut env| unsafe { let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?; @@ -494,6 +496,18 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative Some(current_checksum as u32) }; + let compression_codec: String = env + .get_string(&JString::from_raw(compression_codec)) + .unwrap() + .into(); + + let compression_codec = match compression_codec.as_str() { + "zstd" => CompressionCodec::Zstd(compression_level.into()), + "lz4" => CompressionCodec::Lz4Frame, + "snappy" => CompressionCodec::Snappy, + _ => CompressionCodec::Lz4Frame, + }; + let (written_bytes, checksum) = process_sorted_row_partition( row_num, batch_size as usize, @@ -505,6 +519,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative checksum_enabled, checksum_algo, current_checksum, + &compression_codec, )?; let checksum = if let Some(checksum) = checksum { diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index 405f64216..9037bd794 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -3297,6 +3297,7 @@ pub fn process_sorted_row_partition( // this is the initial checksum for this method, as it also gets updated iteratively // inside the loop within the method across batches. initial_checksum: Option, + codec: &CompressionCodec, ) -> Result<(i64, Option), CometError> { // TODO: We can tune this parameter automatically based on row size and cache size. let row_step = 10; @@ -3359,9 +3360,7 @@ pub fn process_sorted_row_partition( // we do not collect metrics in Native_writeSortedFileNative let ipc_time = Time::default(); - // compression codec is not configurable for CometBypassMergeSortShuffleWriter - let codec = CompressionCodec::Zstd(1); - written += write_ipc_compressed(&batch, &mut cursor, &codec, &ipc_time)?; + written += write_ipc_compressed(&batch, &mut cursor, codec, &ipc_time)?; if let Some(checksum) = &mut current_checksum { checksum.update(&mut cursor)?; diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java index cc4495570..66ce64fa8 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java @@ -107,6 +107,8 @@ public final class CometShuffleExternalSorter implements CometShuffleChecksumSup private final long[] partitionChecksums; private final String checksumAlgorithm; + private final String compressionCodec; + private final int compressionLevel; // The memory allocator for this sorter. It is used to allocate/free memory pages for this sorter. // Because we need to allocate off-heap memory regardless of configured Spark memory mode @@ -153,6 +155,8 @@ public CometShuffleExternalSorter( this.peakMemoryUsedBytes = getMemoryUsage(); this.partitionChecksums = createPartitionChecksums(numPartitions, conf); this.checksumAlgorithm = getChecksumAlgorithm(conf); + this.compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get(); + this.compressionLevel = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL().get(); this.initialSize = initialSize; @@ -556,7 +560,9 @@ public void writeSortedFileNative(boolean isLastFile) throws IOException { spillInfo.file, rowPartition, writeMetricsToUse, - preferDictionaryRatio); + preferDictionaryRatio, + compressionCodec, + compressionLevel); spillInfo.partitionLengths[currentPartition] = written; // Store the checksum for the current partition. @@ -578,7 +584,13 @@ public void writeSortedFileNative(boolean isLastFile) throws IOException { if (currentPartition != -1) { long written = doSpilling( - dataTypes, spillInfo.file, rowPartition, writeMetricsToUse, preferDictionaryRatio); + dataTypes, + spillInfo.file, + rowPartition, + writeMetricsToUse, + preferDictionaryRatio, + compressionCodec, + compressionLevel); spillInfo.partitionLengths[currentPartition] = written; synchronized (spills) { diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java index dcb9d99d3..97f568e30 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java @@ -103,6 +103,8 @@ public final class CometDiskBlockWriter { private long totalWritten = 0L; private boolean initialized = false; private final int columnarBatchSize; + private final String compressionCodec; + private final int compressionLevel; private final boolean isAsync; private final int asyncThreadNum; private final ExecutorService threadPool; @@ -153,6 +155,8 @@ public final class CometDiskBlockWriter { this.threadPool = threadPool; this.columnarBatchSize = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get(); + this.compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get(); + this.compressionLevel = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL().get(); this.numElementsForSpillThreshold = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get(); @@ -397,7 +401,14 @@ long doSpilling(boolean isLast) throws IOException { synchronized (file) { outputRecords += rowPartition.getNumRows(); written = - doSpilling(dataTypes, file, rowPartition, writeMetricsToUse, preferDictionaryRatio); + doSpilling( + dataTypes, + file, + rowPartition, + writeMetricsToUse, + preferDictionaryRatio, + compressionCodec, + compressionLevel); } // Update metrics diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java index 3dc86b05b..a4f09b415 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java @@ -171,7 +171,9 @@ protected long doSpilling( File file, RowPartition rowPartition, ShuffleWriteMetricsReporter writeMetricsToUse, - double preferDictionaryRatio) { + double preferDictionaryRatio, + String compressionCodec, + int compressionLevel) { long[] addresses = rowPartition.getRowAddresses(); int[] sizes = rowPartition.getRowSizes(); @@ -190,7 +192,9 @@ protected long doSpilling( batchSize, checksumEnabled, checksumAlgo, - currentChecksum); + currentChecksum, + compressionCodec, + compressionLevel); long written = results[0]; checksum = results[1]; diff --git a/spark/src/main/scala/org/apache/comet/Native.scala b/spark/src/main/scala/org/apache/comet/Native.scala index 97948880f..50c61544d 100644 --- a/spark/src/main/scala/org/apache/comet/Native.scala +++ b/spark/src/main/scala/org/apache/comet/Native.scala @@ -118,9 +118,14 @@ class Native extends NativeBase { * @param currentChecksum * the current checksum of the file. As the checksum is computed incrementally, this is used * to resume the computation of checksum for previous written data. + * @param compressionCodec + * the compression codec + * @param compressionLevel + * the compression level * @return * [the number of bytes written to disk, the checksum] */ + // scalastyle:off @native def writeSortedFileNative( addresses: Array[Long], rowSizes: Array[Int], @@ -130,7 +135,10 @@ class Native extends NativeBase { batchSize: Int, checksumEnabled: Boolean, checksumAlgo: Int, - currentChecksum: Long): Array[Long] + currentChecksum: Long, + compressionCodec: String, + compressionLevel: Int): Array[Long] + // scalastyle:on /** * Sorts partition ids of Spark unsafe rows in place. Used by Comet shuffle external sorter. From ad1adc169efc1ec9c65273392dbfc73023ade5a2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Jan 2025 12:45:09 -0700 Subject: [PATCH 2/4] clippy --- native/core/src/execution/jni_api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 7b58a77b5..f8bb8ffbc 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -502,7 +502,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative .into(); let compression_codec = match compression_codec.as_str() { - "zstd" => CompressionCodec::Zstd(compression_level.into()), + "zstd" => CompressionCodec::Zstd(compression_level), "lz4" => CompressionCodec::Lz4Frame, "snappy" => CompressionCodec::Snappy, _ => CompressionCodec::Lz4Frame, From 3e15b12b808b8e2299467e5a9cbfc0d63b148242 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Jan 2025 13:17:39 -0700 Subject: [PATCH 3/4] fix bench --- native/core/benches/row_columnar.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/native/core/benches/row_columnar.rs b/native/core/benches/row_columnar.rs index 60b41330e..e8d4185ba 100644 --- a/native/core/benches/row_columnar.rs +++ b/native/core/benches/row_columnar.rs @@ -21,6 +21,7 @@ use comet::execution::shuffle::row::{ }; use criterion::{criterion_group, criterion_main, Criterion}; use tempfile::Builder; +use comet::execution::shuffle::CompressionCodec; const NUM_ROWS: usize = 10000; const BATCH_SIZE: usize = 5000; @@ -77,6 +78,7 @@ fn benchmark(c: &mut Criterion) { false, 0, None, + &CompressionCodec::Zstd(1) ) .unwrap(); }); From 1c08a4bb8e12607240b6c2140a2f30ef5665d2f5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Jan 2025 13:34:25 -0700 Subject: [PATCH 4/4] fmt --- native/core/benches/row_columnar.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/benches/row_columnar.rs b/native/core/benches/row_columnar.rs index e8d4185ba..a62574111 100644 --- a/native/core/benches/row_columnar.rs +++ b/native/core/benches/row_columnar.rs @@ -19,9 +19,9 @@ use arrow::datatypes::DataType as ArrowDataType; use comet::execution::shuffle::row::{ process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow, }; +use comet::execution::shuffle::CompressionCodec; use criterion::{criterion_group, criterion_main, Criterion}; use tempfile::Builder; -use comet::execution::shuffle::CompressionCodec; const NUM_ROWS: usize = 10000; const BATCH_SIZE: usize = 5000; @@ -78,7 +78,7 @@ fn benchmark(c: &mut Criterion) { false, 0, None, - &CompressionCodec::Zstd(1) + &CompressionCodec::Zstd(1), ) .unwrap(); });