From 6a74832c2d41ef3c286c8b5dc078d5ab1888bc69 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 20 Dec 2024 11:11:14 -0700 Subject: [PATCH] feat: Make native shuffle compression configurable and respect `spark.shuffle.compress` (#1185) * Make shuffle compression codec and level configurable * remove lz4 references * docs * update comment * clippy * fix benches * clippy * clippy * disable test for miri * remove lz4 reference from proto --- .../scala/org/apache/comet/CometConf.scala | 14 ++- .../shuffle/IpcInputStreamIterator.scala | 6 +- .../execution/shuffle/ShuffleUtils.scala | 31 ++++--- docs/source/user-guide/configs.md | 3 +- docs/source/user-guide/tuning.md | 6 ++ native/core/benches/shuffle_writer.rs | 87 +++++++++++++----- native/core/src/execution/planner.rs | 17 +++- native/core/src/execution/shuffle/mod.rs | 2 +- native/core/src/execution/shuffle/row.rs | 5 +- .../src/execution/shuffle/shuffle_writer.rs | 90 +++++++++++++++---- native/proto/src/proto/operator.proto | 7 ++ .../shuffle/CometShuffleExchangeExec.scala | 14 ++- .../shuffle/CometShuffleManager.scala | 2 +- 13 files changed, 221 insertions(+), 63 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index b602d7cf1..8815ac4eb 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -272,13 +272,21 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf( - s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec") + val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf( + s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec") .doc( - "The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported.") + "The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " + + "Compression can be disabled by setting spark.shuffle.compress=false.") .stringConf + .checkValues(Set("zstd")) .createWithDefault("zstd") + val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level") + .doc("The compression level to use when compression shuffle files.") + .intConf + .createWithDefault(1) + val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.columnar.shuffle.async.enabled") .doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.") diff --git a/common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/IpcInputStreamIterator.scala b/common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/IpcInputStreamIterator.scala index 281c48108..d1d5af350 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/IpcInputStreamIterator.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/IpcInputStreamIterator.scala @@ -110,8 +110,10 @@ case class IpcInputStreamIterator( currentLimitedInputStream = is if (decompressingNeeded) { - val zs = ShuffleUtils.compressionCodecForShuffling.compressedInputStream(is) - Channels.newChannel(zs) + ShuffleUtils.compressionCodecForShuffling match { + case Some(codec) => Channels.newChannel(codec.compressedInputStream(is)) + case _ => Channels.newChannel(is) + } } else { Channels.newChannel(is) } diff --git a/common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala b/common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala index eea134ab5..23b4a5ec2 100644 --- a/common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala +++ b/common/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/ShuffleUtils.scala @@ -21,22 +21,33 @@ package org.apache.spark.sql.comet.execution.shuffle import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.IO_COMPRESSION_CODEC +import org.apache.spark.internal.config.{IO_COMPRESSION_CODEC, SHUFFLE_COMPRESS} import org.apache.spark.io.CompressionCodec -import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf private[spark] object ShuffleUtils extends Logging { - lazy val compressionCodecForShuffling: CompressionCodec = { + // optional compression codec to use when compressing shuffle files + lazy val compressionCodecForShuffling: Option[CompressionCodec] = { val sparkConf = SparkEnv.get.conf - val codecName = CometConf.COMET_EXEC_SHUFFLE_CODEC.get(SQLConf.get) - - // only zstd compression is supported at the moment - if (codecName != "zstd") { - logWarning( - s"Overriding config ${IO_COMPRESSION_CODEC}=${codecName} in shuffling, force using zstd") + val shuffleCompressionEnabled = sparkConf.getBoolean(SHUFFLE_COMPRESS.key, true) + val sparkShuffleCodec = sparkConf.get(IO_COMPRESSION_CODEC.key, "lz4") + val cometShuffleCodec = CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC.get() + if (shuffleCompressionEnabled) { + if (sparkShuffleCodec != cometShuffleCodec) { + logWarning( + s"Overriding config $IO_COMPRESSION_CODEC=$sparkShuffleCodec in shuffling, " + + s"force using $cometShuffleCodec") + } + cometShuffleCodec match { + case "zstd" => + Some(CompressionCodec.createCodec(sparkConf, "zstd")) + case other => + throw new UnsupportedOperationException( + s"Unsupported shuffle compression codec: $other") + } + } else { + None } - CompressionCodec.createCodec(sparkConf, "zstd") } } diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 69da79222..7881f0763 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -50,7 +50,8 @@ Comet provides the following configuration settings. | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 | | spark.comet.exec.project.enabled | Whether to enable project by default. | true | | spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false | -| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | +| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd | +| spark.comet.exec.shuffle.compression.level | The compression level to use when compression shuffle files. | 1 | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true | | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true | | spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true | diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index d68481d17..e04e750b4 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -103,6 +103,12 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set, then any shuffle operations that cannot be supported in this mode will fall back to Spark. +### Shuffle Compression + +By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression. +Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in +certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage. + ## Explain Plan ### Extended Explain With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 272887238..865ca73b4 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -15,36 +15,47 @@ // specific language governing permissions and limitations // under the License. +use arrow_array::builder::Int32Builder; use arrow_array::{builder::StringBuilder, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; -use comet::execution::shuffle::ShuffleWriterExec; +use comet::execution::shuffle::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec}; use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::physical_plan::metrics::Time; use datafusion::{ physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan}, prelude::SessionContext, }; use datafusion_physical_expr::{expressions::Column, Partitioning}; +use std::io::Cursor; use std::sync::Arc; use tokio::runtime::Runtime; fn criterion_benchmark(c: &mut Criterion) { - let batch = create_batch(); - let mut batches = Vec::new(); - for _ in 0..10 { - batches.push(batch.clone()); - } - let partitions = &[batches]; - let exec = ShuffleWriterExec::try_new( - Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()), - Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), - "/tmp/data.out".to_string(), - "/tmp/index.out".to_string(), - ) - .unwrap(); - let mut group = c.benchmark_group("shuffle_writer"); - group.bench_function("shuffle_writer", |b| { + group.bench_function("shuffle_writer: encode (no compression))", |b| { + let batch = create_batch(8192, true); + let mut buffer = vec![]; + let mut cursor = Cursor::new(&mut buffer); + let ipc_time = Time::default(); + b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time)); + }); + group.bench_function("shuffle_writer: encode and compress (zstd level 1)", |b| { + let batch = create_batch(8192, true); + let mut buffer = vec![]; + let mut cursor = Cursor::new(&mut buffer); + let ipc_time = Time::default(); + b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time)); + }); + group.bench_function("shuffle_writer: encode and compress (zstd level 6)", |b| { + let batch = create_batch(8192, true); + let mut buffer = vec![]; + let mut cursor = Cursor::new(&mut buffer); + let ipc_time = Time::default(); + b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time)); + }); + group.bench_function("shuffle_writer: end to end", |b| { let ctx = SessionContext::new(); + let exec = create_shuffle_writer_exec(CompressionCodec::Zstd(1)); b.iter(|| { let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx).unwrap(); @@ -54,19 +65,47 @@ fn criterion_benchmark(c: &mut Criterion) { }); } -fn create_batch() -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); +fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWriterExec { + let batches = create_batches(8192, 10); + let schema = batches[0].schema(); + let partitions = &[batches]; + ShuffleWriterExec::try_new( + Arc::new(MemoryExec::try_new(partitions, schema, None).unwrap()), + Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), + compression_codec, + "/tmp/data.out".to_string(), + "/tmp/index.out".to_string(), + ) + .unwrap() +} + +fn create_batches(size: usize, count: usize) -> Vec { + let batch = create_batch(size, true); + let mut batches = Vec::new(); + for _ in 0..count { + batches.push(batch.clone()); + } + batches +} + +fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Int32, true), + Field::new("c1", DataType::Utf8, true), + ])); + let mut a = Int32Builder::new(); let mut b = StringBuilder::new(); - for i in 0..8192 { - if i % 10 == 0 { + for i in 0..num_rows { + a.append_value(i as i32); + if allow_nulls && i % 10 == 0 { b.append_null(); } else { - b.append_value(format!("{i}")); + b.append_value(format!("this is string number {i}")); } } - let array = b.finish(); - - RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap() + let a = a.finish(); + let b = b.finish(); + RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap() } fn config() -> Criterion { diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index a48d03eb8..9e87f3755 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -68,6 +68,7 @@ use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr} use datafusion_functions_nested::concat::ArrayAppend; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; +use crate::execution::shuffle::CompressionCodec; use crate::execution::spark_plan::SparkPlan; use datafusion_comet_proto::{ spark_expression::{ @@ -76,8 +77,8 @@ use datafusion_comet_proto::{ }, spark_operator::{ self, lower_window_frame_bound::LowerFrameBoundStruct, operator::OpStruct, - upper_window_frame_bound::UpperFrameBoundStruct, BuildSide, JoinType, Operator, - WindowFrameType, + upper_window_frame_bound::UpperFrameBoundStruct, BuildSide, + CompressionCodec as SparkCompressionCodec, JoinType, Operator, WindowFrameType, }, spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; @@ -1064,9 +1065,21 @@ impl PhysicalPlanner { let partitioning = self .create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?; + let codec = match writer.codec.try_into() { + Ok(SparkCompressionCodec::None) => Ok(CompressionCodec::None), + Ok(SparkCompressionCodec::Zstd) => { + Ok(CompressionCodec::Zstd(writer.compression_level)) + } + _ => Err(ExecutionError::GeneralError(format!( + "Unsupported shuffle compression codec: {:?}", + writer.codec + ))), + }?; + let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( Arc::clone(&child.native_plan), partitioning, + codec, writer.output_data_file.clone(), writer.output_index_file.clone(), )?); diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index 8721ead74..8111f5eed 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -19,4 +19,4 @@ mod list; mod map; pub mod row; mod shuffle_writer; -pub use shuffle_writer::ShuffleWriterExec; +pub use shuffle_writer::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec}; diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index ecab77d96..405f64216 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -292,6 +292,7 @@ macro_rules! downcast_builder_ref { } // Expose the macro for other modules. +use crate::execution::shuffle::shuffle_writer::CompressionCodec; pub(crate) use downcast_builder_ref; /// Appends field of row to the given struct builder. `dt` is the data type of the field. @@ -3358,7 +3359,9 @@ pub fn process_sorted_row_partition( // we do not collect metrics in Native_writeSortedFileNative let ipc_time = Time::default(); - written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?; + // compression codec is not configurable for CometBypassMergeSortShuffleWriter + let codec = CompressionCodec::Zstd(1); + written += write_ipc_compressed(&batch, &mut cursor, &codec, &ipc_time)?; if let Some(checksum) = &mut current_checksum { checksum.update(&mut cursor)?; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index fcc8c51f6..01117199e 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -90,6 +90,7 @@ pub struct ShuffleWriterExec { /// Metrics metrics: ExecutionPlanMetricsSet, cache: PlanProperties, + codec: CompressionCodec, } impl DisplayAs for ShuffleWriterExec { @@ -126,6 +127,7 @@ impl ExecutionPlan for ShuffleWriterExec { 1 => Ok(Arc::new(ShuffleWriterExec::try_new( Arc::clone(&children[0]), self.partitioning.clone(), + self.codec.clone(), self.output_data_file.clone(), self.output_index_file.clone(), )?)), @@ -152,6 +154,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), metrics, context, + self.codec.clone(), ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -181,6 +184,7 @@ impl ShuffleWriterExec { pub fn try_new( input: Arc, partitioning: Partitioning, + codec: CompressionCodec, output_data_file: String, output_index_file: String, ) -> Result { @@ -197,6 +201,7 @@ impl ShuffleWriterExec { output_data_file, output_index_file, cache, + codec, }) } } @@ -217,6 +222,7 @@ struct PartitionBuffer { batch_size: usize, /// Memory reservation for this partition buffer. reservation: MemoryReservation, + codec: CompressionCodec, } impl PartitionBuffer { @@ -225,6 +231,7 @@ impl PartitionBuffer { batch_size: usize, partition_id: usize, runtime: &Arc, + codec: CompressionCodec, ) -> Self { let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", partition_id)) .with_can_spill(true) @@ -238,6 +245,7 @@ impl PartitionBuffer { num_active_rows: 0, batch_size, reservation, + codec, } } @@ -337,7 +345,7 @@ impl PartitionBuffer { let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new(&mut self.frozen); cursor.seek(SeekFrom::End(0))?; - write_ipc_compressed(&frozen_batch, &mut cursor, ipc_time)?; + write_ipc_compressed(&frozen_batch, &mut cursor, &self.codec, ipc_time)?; mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; Ok(mem_diff) @@ -687,6 +695,7 @@ impl ShuffleRepartitioner { metrics: ShuffleRepartitionerMetrics, runtime: Arc, batch_size: usize, + codec: CompressionCodec, ) -> Self { let num_output_partitions = partitioning.partition_count(); let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id)) @@ -709,7 +718,13 @@ impl ShuffleRepartitioner { schema: Arc::clone(&schema), buffered_partitions: (0..num_output_partitions) .map(|partition_id| { - PartitionBuffer::new(Arc::clone(&schema), batch_size, partition_id, &runtime) + PartitionBuffer::new( + Arc::clone(&schema), + batch_size, + partition_id, + &runtime, + codec.clone(), + ) }) .collect::>(), spills: Mutex::new(vec![]), @@ -1129,6 +1144,7 @@ impl Debug for ShuffleRepartitioner { } } +#[allow(clippy::too_many_arguments)] async fn external_shuffle( mut input: SendableRecordBatchStream, partition_id: usize, @@ -1137,6 +1153,7 @@ async fn external_shuffle( partitioning: Partitioning, metrics: ShuffleRepartitionerMetrics, context: Arc, + codec: CompressionCodec, ) -> Result { let schema = input.schema(); let mut repartitioner = ShuffleRepartitioner::new( @@ -1148,6 +1165,7 @@ async fn external_shuffle( metrics, context.runtime_env(), context.session_config().batch_size(), + codec, ); while let Some(batch) = input.next().await { @@ -1526,11 +1544,18 @@ impl Checksum { } } +#[derive(Debug, Clone)] +pub enum CompressionCodec { + None, + Zstd(i32), +} + /// Writes given record batch as Arrow IPC bytes into given writer. /// Returns number of bytes written. -pub(crate) fn write_ipc_compressed( +pub fn write_ipc_compressed( batch: &RecordBatch, output: &mut W, + codec: &CompressionCodec, ipc_time: &Time, ) -> Result { if batch.num_rows() == 0 { @@ -1543,14 +1568,24 @@ pub(crate) fn write_ipc_compressed( // write ipc_length placeholder output.write_all(&[0u8; 8])?; - // write ipc data - // TODO: make compression level configurable - let mut arrow_writer = StreamWriter::try_new(zstd::Encoder::new(output, 1)?, &batch.schema())?; - arrow_writer.write(batch)?; - arrow_writer.finish()?; + let output = match codec { + CompressionCodec::None => { + let mut arrow_writer = StreamWriter::try_new(output, &batch.schema())?; + arrow_writer.write(batch)?; + arrow_writer.finish()?; + arrow_writer.into_inner()? + } + CompressionCodec::Zstd(level) => { + let encoder = zstd::Encoder::new(output, *level)?; + let mut arrow_writer = StreamWriter::try_new(encoder, &batch.schema())?; + arrow_writer.write(batch)?; + arrow_writer.finish()?; + let zstd_encoder = arrow_writer.into_inner()?; + zstd_encoder.finish()? + } + }; - let zwriter = arrow_writer.into_inner()?; - let output = zwriter.finish()?; + // fill ipc length let end_pos = output.stream_position()?; let ipc_length = end_pos - start_pos - 8; @@ -1611,6 +1646,22 @@ mod test { use datafusion_physical_expr::expressions::Column; use tokio::runtime::Runtime; + #[test] + #[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx` + fn write_ipc_zstd() { + let batch = create_batch(8192); + let mut output = vec![]; + let mut cursor = Cursor::new(&mut output); + write_ipc_compressed( + &batch, + &mut cursor, + &CompressionCodec::Zstd(1), + &Time::default(), + ) + .unwrap(); + assert_eq!(40218, output.len()); + } + #[test] fn test_slot_size() { let batch_size = 1usize; @@ -1673,13 +1724,7 @@ mod test { num_partitions: usize, memory_limit: Option, ) { - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); - let mut b = StringBuilder::new(); - for i in 0..batch_size { - b.append_value(format!("{i}")); - } - let array = b.finish(); - let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap(); + let batch = create_batch(batch_size); let batches = (0..num_batches).map(|_| batch.clone()).collect::>(); @@ -1687,6 +1732,7 @@ mod test { let exec = ShuffleWriterExec::try_new( Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()), Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions), + CompressionCodec::Zstd(1), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), ) @@ -1707,6 +1753,16 @@ mod test { rt.block_on(collect(stream)).unwrap(); } + fn create_batch(batch_size: usize) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); + let mut b = StringBuilder::new(); + for i in 0..batch_size { + b.append_value(format!("{i}")); + } + let array = b.finish(); + RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap() + } + #[test] fn test_pmod() { let i: Vec = vec![0x99f0149d, 0x9c67b85d, 0xc8008529, 0xa05b5d7b, 0xcd1e64fb]; diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 74ec80cb5..5cb2802da 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -82,10 +82,17 @@ message Limit { int32 offset = 2; } +enum CompressionCodec { + None = 0; + Zstd = 1; +} + message ShuffleWriter { spark.spark_partitioning.Partitioning partitioning = 1; string output_data_file = 3; string output_index_file = 4; + CompressionCodec codec = 5; + int32 compression_level = 6; } enum AggregateMode { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index 0cd8a9ce6..3a11b8b28 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -52,8 +52,9 @@ import org.apache.spark.util.random.XORShiftRandom import com.google.common.base.Objects +import org.apache.comet.CometConf import org.apache.comet.serde.{OperatorOuterClass, PartitioningOuterClass, QueryPlanSerde} -import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.OperatorOuterClass.{CompressionCodec, Operator} import org.apache.comet.serde.QueryPlanSerde.serializeDataType import org.apache.comet.shims.ShimCometShuffleExchangeExec @@ -553,6 +554,17 @@ class CometShuffleWriteProcessor( shuffleWriterBuilder.setOutputDataFile(dataFile) shuffleWriterBuilder.setOutputIndexFile(indexFile) + if (SparkEnv.get.conf.getBoolean("spark.shuffle.compress", true)) { + val codec = CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC.get() match { + case "zstd" => CompressionCodec.Zstd + case other => throw new UnsupportedOperationException(s"invalid codec: $other") + } + shuffleWriterBuilder.setCodec(codec) + } else { + shuffleWriterBuilder.setCodec(CompressionCodec.None) + } + shuffleWriterBuilder.setCompressionLevel(CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL.get) + outputPartitioning match { case _: HashPartitioning => val hashPartitioning = outputPartitioning.asInstanceOf[HashPartitioning] diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala index ef67167c4..b2cc2c2ba 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala @@ -243,7 +243,7 @@ object CometShuffleManager extends Logging { lazy val compressionCodecForShuffling: CompressionCodec = { val sparkConf = SparkEnv.get.conf - val codecName = CometConf.COMET_EXEC_SHUFFLE_CODEC.get(SQLConf.get) + val codecName = CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC.get(SQLConf.get) // only zstd compression is supported at the moment if (codecName != "zstd") {