Skip to content

Commit

Permalink
feat: Make native shuffle compression configurable and respect `spark…
Browse files Browse the repository at this point in the history
….shuffle.compress` (#1185)

* Make shuffle compression codec and level configurable

* remove lz4 references

* docs

* update comment

* clippy

* fix benches

* clippy

* clippy

* disable test for miri

* remove lz4 reference from proto
  • Loading branch information
andygrove authored Dec 20, 2024
1 parent 8f4a8a5 commit ea6d205
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 63 deletions.
14 changes: 11 additions & 3 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,21 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported.")
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " +
"Compression can be disabled by setting spark.shuffle.compress=false.")
.stringConf
.checkValues(Set("zstd"))
.createWithDefault("zstd")

val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level")
.doc("The compression level to use when compression shuffle files.")
.intConf
.createWithDefault(1)

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.async.enabled")
.doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ case class IpcInputStreamIterator(
currentLimitedInputStream = is

if (decompressingNeeded) {
val zs = ShuffleUtils.compressionCodecForShuffling.compressedInputStream(is)
Channels.newChannel(zs)
ShuffleUtils.compressionCodecForShuffling match {
case Some(codec) => Channels.newChannel(codec.compressedInputStream(is))
case _ => Channels.newChannel(is)
}
} else {
Channels.newChannel(is)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,33 @@ package org.apache.spark.sql.comet.execution.shuffle

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IO_COMPRESSION_CODEC
import org.apache.spark.internal.config.{IO_COMPRESSION_CODEC, SHUFFLE_COMPRESS}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.CometConf

private[spark] object ShuffleUtils extends Logging {
lazy val compressionCodecForShuffling: CompressionCodec = {
// optional compression codec to use when compressing shuffle files
lazy val compressionCodecForShuffling: Option[CompressionCodec] = {
val sparkConf = SparkEnv.get.conf
val codecName = CometConf.COMET_EXEC_SHUFFLE_CODEC.get(SQLConf.get)

// only zstd compression is supported at the moment
if (codecName != "zstd") {
logWarning(
s"Overriding config ${IO_COMPRESSION_CODEC}=${codecName} in shuffling, force using zstd")
val shuffleCompressionEnabled = sparkConf.getBoolean(SHUFFLE_COMPRESS.key, true)
val sparkShuffleCodec = sparkConf.get(IO_COMPRESSION_CODEC.key, "lz4")
val cometShuffleCodec = CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC.get()
if (shuffleCompressionEnabled) {
if (sparkShuffleCodec != cometShuffleCodec) {
logWarning(
s"Overriding config $IO_COMPRESSION_CODEC=$sparkShuffleCodec in shuffling, " +
s"force using $cometShuffleCodec")
}
cometShuffleCodec match {
case "zstd" =>
Some(CompressionCodec.createCodec(sparkConf, "zstd"))
case other =>
throw new UnsupportedOperationException(
s"Unsupported shuffle compression codec: $other")
}
} else {
None
}
CompressionCodec.createCodec(sparkConf, "zstd")
}
}
3 changes: 2 additions & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd |
| spark.comet.exec.shuffle.compression.level | The compression level to use when compression shuffle files. | 1 |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
Expand Down
6 changes: 6 additions & 0 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin
To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set,
then any shuffle operations that cannot be supported in this mode will fall back to Spark.

### Shuffle Compression

By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression.
Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in
certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage.

## Explain Plan
### Extended Explain
With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists
Expand Down
87 changes: 63 additions & 24 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,47 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::Int32Builder;
use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::shuffle::ShuffleWriterExec;
use comet::execution::shuffle::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::physical_plan::metrics::Time;
use datafusion::{
physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},
prelude::SessionContext,
};
use datafusion_physical_expr::{expressions::Column, Partitioning};
use std::io::Cursor;
use std::sync::Arc;
use tokio::runtime::Runtime;

fn criterion_benchmark(c: &mut Criterion) {
let batch = create_batch();
let mut batches = Vec::new();
for _ in 0..10 {
batches.push(batch.clone());
}
let partitions = &[batches];
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
.unwrap();

let mut group = c.benchmark_group("shuffle_writer");
group.bench_function("shuffle_writer", |b| {
group.bench_function("shuffle_writer: encode (no compression))", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::None, &ipc_time));
});
group.bench_function("shuffle_writer: encode and compress (zstd level 1)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(1), &ipc_time));
});
group.bench_function("shuffle_writer: encode and compress (zstd level 6)", |b| {
let batch = create_batch(8192, true);
let mut buffer = vec![];
let mut cursor = Cursor::new(&mut buffer);
let ipc_time = Time::default();
b.iter(|| write_ipc_compressed(&batch, &mut cursor, &CompressionCodec::Zstd(6), &ipc_time));
});
group.bench_function("shuffle_writer: end to end", |b| {
let ctx = SessionContext::new();
let exec = create_shuffle_writer_exec(CompressionCodec::Zstd(1));
b.iter(|| {
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
Expand All @@ -54,19 +65,47 @@ fn criterion_benchmark(c: &mut Criterion) {
});
}

fn create_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWriterExec {
let batches = create_batches(8192, 10);
let schema = batches[0].schema();
let partitions = &[batches];
ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, schema, None).unwrap()),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
compression_codec,
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
.unwrap()
}

fn create_batches(size: usize, count: usize) -> Vec<RecordBatch> {
let batch = create_batch(size, true);
let mut batches = Vec::new();
for _ in 0..count {
batches.push(batch.clone());
}
batches
}

fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Int32, true),
Field::new("c1", DataType::Utf8, true),
]));
let mut a = Int32Builder::new();
let mut b = StringBuilder::new();
for i in 0..8192 {
if i % 10 == 0 {
for i in 0..num_rows {
a.append_value(i as i32);
if allow_nulls && i % 10 == 0 {
b.append_null();
} else {
b.append_value(format!("{i}"));
b.append_value(format!("this is string number {i}"));
}
}
let array = b.finish();

RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap()
let a = a.finish();
let b = b.finish();
RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap()
}

fn config() -> Criterion {
Expand Down
17 changes: 15 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}
use datafusion_functions_nested::concat::ArrayAppend;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};

use crate::execution::shuffle::CompressionCodec;
use crate::execution::spark_plan::SparkPlan;
use datafusion_comet_proto::{
spark_expression::{
Expand All @@ -76,8 +77,8 @@ use datafusion_comet_proto::{
},
spark_operator::{
self, lower_window_frame_bound::LowerFrameBoundStruct, operator::OpStruct,
upper_window_frame_bound::UpperFrameBoundStruct, BuildSide, JoinType, Operator,
WindowFrameType,
upper_window_frame_bound::UpperFrameBoundStruct, BuildSide,
CompressionCodec as SparkCompressionCodec, JoinType, Operator, WindowFrameType,
},
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
Expand Down Expand Up @@ -1049,9 +1050,21 @@ impl PhysicalPlanner {
let partitioning = self
.create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?;

let codec = match writer.codec.try_into() {
Ok(SparkCompressionCodec::None) => Ok(CompressionCodec::None),
Ok(SparkCompressionCodec::Zstd) => {
Ok(CompressionCodec::Zstd(writer.compression_level))
}
_ => Err(ExecutionError::GeneralError(format!(
"Unsupported shuffle compression codec: {:?}",
writer.codec
))),
}?;

let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
Arc::clone(&child.native_plan),
partitioning,
codec,
writer.output_data_file.clone(),
writer.output_index_file.clone(),
)?);
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ mod list;
mod map;
pub mod row;
mod shuffle_writer;
pub use shuffle_writer::ShuffleWriterExec;
pub use shuffle_writer::{write_ipc_compressed, CompressionCodec, ShuffleWriterExec};
5 changes: 4 additions & 1 deletion native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ macro_rules! downcast_builder_ref {
}

// Expose the macro for other modules.
use crate::execution::shuffle::shuffle_writer::CompressionCodec;
pub(crate) use downcast_builder_ref;

/// Appends field of row to the given struct builder. `dt` is the data type of the field.
Expand Down Expand Up @@ -3358,7 +3359,9 @@ pub fn process_sorted_row_partition(

// we do not collect metrics in Native_writeSortedFileNative
let ipc_time = Time::default();
written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?;
// compression codec is not configurable for CometBypassMergeSortShuffleWriter
let codec = CompressionCodec::Zstd(1);
written += write_ipc_compressed(&batch, &mut cursor, &codec, &ipc_time)?;

if let Some(checksum) = &mut current_checksum {
checksum.update(&mut cursor)?;
Expand Down
Loading

0 comments on commit ea6d205

Please sign in to comment.