Skip to content

Commit

Permalink
experimental support for lz4 compression (not working)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 18, 2024
1 parent e297d23 commit 7876c9e
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 42 deletions.
13 changes: 7 additions & 6 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,13 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported.")
.stringConf
.createWithDefault("zstd")
val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
.doc("The codec of Comet native shuffle used to compress shuffle data. " +
"Only lz4 and zstd are supported.")
.stringConf
.checkValues(Set("lz4", "zstd"))
.createWithDefault("zstd")

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.async.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.spark.sql.comet.execution.shuffle

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IO_COMPRESSION_CODEC
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -31,12 +30,11 @@ private[spark] object ShuffleUtils extends Logging {
lazy val compressionCodecForShuffling: CompressionCodec = {
val sparkConf = SparkEnv.get.conf
val codecName = CometConf.COMET_EXEC_SHUFFLE_CODEC.get(SQLConf.get)

// only zstd compression is supported at the moment
if (codecName != "zstd") {
logWarning(
s"Overriding config ${IO_COMPRESSION_CODEC}=${codecName} in shuffling, force using zstd")
codecName match {
case "zstd" => CompressionCodec.createCodec(sparkConf, "zstd")
case "lz4" => CompressionCodec.createCodec(sparkConf, "lz4")
case other =>
throw new IllegalStateException(s"Unsupported shuffle compression codec: $other")
}
CompressionCodec.createCodec(sparkConf, "zstd")
}
}
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only lz4 and zstd are supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
Expand Down
12 changes: 10 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}
use datafusion_functions_nested::concat::ArrayAppend;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};

use crate::execution::shuffle::CompressionCodec;
use crate::execution::spark_plan::SparkPlan;
use datafusion_comet_proto::{
spark_expression::{
Expand All @@ -76,8 +77,8 @@ use datafusion_comet_proto::{
},
spark_operator::{
self, lower_window_frame_bound::LowerFrameBoundStruct, operator::OpStruct,
upper_window_frame_bound::UpperFrameBoundStruct, BuildSide, JoinType, Operator,
WindowFrameType,
upper_window_frame_bound::UpperFrameBoundStruct, BuildSide,
CompressionCodec as SparkCompressionCodec, JoinType, Operator, WindowFrameType,
},
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
Expand Down Expand Up @@ -1049,9 +1050,16 @@ impl PhysicalPlanner {
let partitioning = self
.create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?;

let codec = match writer.codec.try_into() {
Ok(SparkCompressionCodec::Lz4) => CompressionCodec::Lz4,
Ok(SparkCompressionCodec::Zstd) => CompressionCodec::Zstd(1), // TODO make level configurable
_ => todo!(),
};

let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
Arc::clone(&child.native_plan),
partitioning,
codec,
writer.output_data_file.clone(),
writer.output_index_file.clone(),
)?);
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ mod list;
mod map;
pub mod row;
mod shuffle_writer;
pub use shuffle_writer::ShuffleWriterExec;
pub use shuffle_writer::{CompressionCodec, ShuffleWriterExec};
4 changes: 3 additions & 1 deletion native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ macro_rules! downcast_builder_ref {
}

// Expose the macro for other modules.
use crate::execution::shuffle::shuffle_writer::CompressionCodec;
pub(crate) use downcast_builder_ref;

/// Appends field of row to the given struct builder. `dt` is the data type of the field.
Expand Down Expand Up @@ -3358,7 +3359,8 @@ pub fn process_sorted_row_partition(

// we do not collect metrics in Native_writeSortedFileNative
let ipc_time = Time::default();
written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?;
let codec = CompressionCodec::Zstd(1); // TODO config
written += write_ipc_compressed(&batch, &mut cursor, &codec, &ipc_time)?;

if let Some(checksum) = &mut current_checksum {
checksum.update(&mut cursor)?;
Expand Down
61 changes: 52 additions & 9 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub struct ShuffleWriterExec {
/// Metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
codec: CompressionCodec,
}

impl DisplayAs for ShuffleWriterExec {
Expand Down Expand Up @@ -126,6 +127,7 @@ impl ExecutionPlan for ShuffleWriterExec {
1 => Ok(Arc::new(ShuffleWriterExec::try_new(
Arc::clone(&children[0]),
self.partitioning.clone(),
self.codec.clone(),
self.output_data_file.clone(),
self.output_index_file.clone(),
)?)),
Expand All @@ -152,6 +154,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
metrics,
context,
self.codec.clone(),
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -181,6 +184,7 @@ impl ShuffleWriterExec {
pub fn try_new(
input: Arc<dyn ExecutionPlan>,
partitioning: Partitioning,
codec: CompressionCodec,
output_data_file: String,
output_index_file: String,
) -> Result<Self> {
Expand All @@ -197,6 +201,7 @@ impl ShuffleWriterExec {
output_data_file,
output_index_file,
cache,
codec,
})
}
}
Expand All @@ -217,6 +222,7 @@ struct PartitionBuffer {
batch_size: usize,
/// Memory reservation for this partition buffer.
reservation: MemoryReservation,
codec: CompressionCodec,
}

impl PartitionBuffer {
Expand All @@ -225,6 +231,7 @@ impl PartitionBuffer {
batch_size: usize,
partition_id: usize,
runtime: &Arc<RuntimeEnv>,
codec: CompressionCodec,
) -> Self {
let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", partition_id))
.with_can_spill(true)
Expand All @@ -238,6 +245,7 @@ impl PartitionBuffer {
num_active_rows: 0,
batch_size,
reservation,
codec,
}
}

Expand Down Expand Up @@ -337,7 +345,7 @@ impl PartitionBuffer {
let frozen_capacity_old = self.frozen.capacity();
let mut cursor = Cursor::new(&mut self.frozen);
cursor.seek(SeekFrom::End(0))?;
write_ipc_compressed(&frozen_batch, &mut cursor, ipc_time)?;
write_ipc_compressed(&frozen_batch, &mut cursor, &self.codec, ipc_time)?;

mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
Ok(mem_diff)
Expand Down Expand Up @@ -687,6 +695,7 @@ impl ShuffleRepartitioner {
metrics: ShuffleRepartitionerMetrics,
runtime: Arc<RuntimeEnv>,
batch_size: usize,
codec: CompressionCodec,
) -> Self {
let num_output_partitions = partitioning.partition_count();
let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id))
Expand All @@ -709,7 +718,13 @@ impl ShuffleRepartitioner {
schema: Arc::clone(&schema),
buffered_partitions: (0..num_output_partitions)
.map(|partition_id| {
PartitionBuffer::new(Arc::clone(&schema), batch_size, partition_id, &runtime)
PartitionBuffer::new(
Arc::clone(&schema),
batch_size,
partition_id,
&runtime,
codec.clone(),
)
})
.collect::<Vec<_>>(),
spills: Mutex::new(vec![]),
Expand Down Expand Up @@ -1137,6 +1152,7 @@ async fn external_shuffle(
partitioning: Partitioning,
metrics: ShuffleRepartitionerMetrics,
context: Arc<TaskContext>,
codec: CompressionCodec,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::new(
Expand All @@ -1148,6 +1164,7 @@ async fn external_shuffle(
metrics,
context.runtime_env(),
context.session_config().batch_size(),
codec,
);

while let Some(batch) = input.next().await {
Expand Down Expand Up @@ -1526,11 +1543,18 @@ impl Checksum {
}
}

#[derive(Debug, Clone)]
pub enum CompressionCodec {
Lz4,
Zstd(i32),
}

/// Writes given record batch as Arrow IPC bytes into given writer.
/// Returns number of bytes written.
pub(crate) fn write_ipc_compressed<W: Write + Seek>(
batch: &RecordBatch,
output: &mut W,
codec: &CompressionCodec,
ipc_time: &Time,
) -> Result<usize> {
if batch.num_rows() == 0 {
Expand All @@ -1543,14 +1567,32 @@ pub(crate) fn write_ipc_compressed<W: Write + Seek>(
// write ipc_length placeholder
output.write_all(&[0u8; 8])?;

// write ipc data
// TODO: make compression level configurable
let mut arrow_writer = StreamWriter::try_new(zstd::Encoder::new(output, 1)?, &batch.schema())?;
arrow_writer.write(batch)?;
arrow_writer.finish()?;
let output = match codec {
CompressionCodec::Zstd(level) => {
let encoder = zstd::Encoder::new(output, *level)?;
let mut arrow_writer = StreamWriter::try_new(encoder, &batch.schema())?;
arrow_writer.write(batch)?;
arrow_writer.finish()?;
let zstd_encoder = arrow_writer.into_inner()?;
zstd_encoder.finish()?
}
CompressionCodec::Lz4 => {
let encoder = lz4::EncoderBuilder::new()
// .block_size(BlockSize::Default)
// .checksum(ContentChecksum::ChecksumEnabled)
// .block_checksum(BlockChecksum::BlockChecksumEnabled)
// .favor_dec_speed(true)
.build(output)?;
let mut arrow_writer = StreamWriter::try_new(encoder, &batch.schema())?;
arrow_writer.write(batch)?;
arrow_writer.finish()?;
let lz4_encoder = arrow_writer.into_inner()?;
let (output, result) = lz4_encoder.finish();
result?;
output
}
};

let zwriter = arrow_writer.into_inner()?;
let output = zwriter.finish()?;
let end_pos = output.stream_position()?;
let ipc_length = end_pos - start_pos - 8;

Expand Down Expand Up @@ -1687,6 +1729,7 @@ mod test {
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
CompressionCodec::Zstd(1),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
Expand Down
6 changes: 6 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ message Limit {
int32 offset = 2;
}

enum CompressionCodec {
Lz4 = 0;
Zstd = 1;
}

message ShuffleWriter {
spark.spark_partitioning.Partitioning partitioning = 1;
string output_data_file = 3;
string output_index_file = 4;
CompressionCodec codec = 5;
}

enum AggregateMode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ import org.apache.spark.util.random.XORShiftRandom

import com.google.common.base.Objects

import org.apache.comet.CometConf
import org.apache.comet.serde.{OperatorOuterClass, PartitioningOuterClass, QueryPlanSerde}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.OperatorOuterClass.{CompressionCodec, Operator}
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
import org.apache.comet.shims.ShimCometShuffleExchangeExec

Expand Down Expand Up @@ -553,6 +554,12 @@ class CometShuffleWriteProcessor(
shuffleWriterBuilder.setOutputDataFile(dataFile)
shuffleWriterBuilder.setOutputIndexFile(indexFile)

val codec = CometConf.COMET_EXEC_SHUFFLE_CODEC.get() match {
case "lz4" => CompressionCodec.Lz4
case "zstd" => CompressionCodec.Zstd
}
shuffleWriterBuilder.setCodec(codec)

outputPartitioning match {
case _: HashPartitioning =>
val hashPartitioning = outputPartitioning.asInstanceOf[HashPartitioning]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.SparkEnv
import org.apache.spark.TaskContext
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.IO_COMPRESSION_CODEC
import org.apache.spark.io.CompressionCodec
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.{BypassMergeSortShuffleHandle, SerializedShuffleHandle, SortShuffleManager, SortShuffleWriter}
Expand Down Expand Up @@ -241,18 +239,6 @@ object CometShuffleManager extends Logging {
executorComponents
}

lazy val compressionCodecForShuffling: CompressionCodec = {
val sparkConf = SparkEnv.get.conf
val codecName = CometConf.COMET_EXEC_SHUFFLE_CODEC.get(SQLConf.get)

// only zstd compression is supported at the moment
if (codecName != "zstd") {
logWarning(
s"Overriding config ${IO_COMPRESSION_CODEC}=${codecName} in shuffling, force using zstd")
}
CompressionCodec.createCodec(sparkConf, "zstd")
}

def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
Expand Down

0 comments on commit 7876c9e

Please sign in to comment.