Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[do not review] experimental support for lz4 compression (not working) #1181

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
13 changes: 7 additions & 6 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,13 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
.doc(
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported.")
.stringConf
.createWithDefault("zstd")
val COMET_EXEC_SHUFFLE_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.codec")
.doc("The codec of Comet native shuffle used to compress shuffle data. " +
"Only lz4 and zstd are supported.")
.stringConf
.checkValues(Set("lz4", "zstd"))
.createWithDefault("zstd")

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.async.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.spark.sql.comet.execution.shuffle

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IO_COMPRESSION_CODEC
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -31,12 +30,11 @@ private[spark] object ShuffleUtils extends Logging {
lazy val compressionCodecForShuffling: CompressionCodec = {
val sparkConf = SparkEnv.get.conf
val codecName = CometConf.COMET_EXEC_SHUFFLE_CODEC.get(SQLConf.get)

// only zstd compression is supported at the moment
if (codecName != "zstd") {
logWarning(
s"Overriding config ${IO_COMPRESSION_CODEC}=${codecName} in shuffling, force using zstd")
codecName match {
case "zstd" => CompressionCodec.createCodec(sparkConf, "zstd")
case "lz4" => CompressionCodec.createCodec(sparkConf, "lz4")
case other =>
throw new IllegalStateException(s"Unsupported shuffle compression codec: $other")
}
CompressionCodec.createCodec(sparkConf, "zstd")
}
}
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only lz4 and zstd are supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
Expand Down
12 changes: 10 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr}
use datafusion_functions_nested::concat::ArrayAppend;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};

use crate::execution::shuffle::CompressionCodec;
use crate::execution::spark_plan::SparkPlan;
use datafusion_comet_proto::{
spark_expression::{
Expand All @@ -76,8 +77,8 @@ use datafusion_comet_proto::{
},
spark_operator::{
self, lower_window_frame_bound::LowerFrameBoundStruct, operator::OpStruct,
upper_window_frame_bound::UpperFrameBoundStruct, BuildSide, JoinType, Operator,
WindowFrameType,
upper_window_frame_bound::UpperFrameBoundStruct, BuildSide,
CompressionCodec as SparkCompressionCodec, JoinType, Operator, WindowFrameType,
},
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
Expand Down Expand Up @@ -1049,9 +1050,16 @@ impl PhysicalPlanner {
let partitioning = self
.create_partitioning(writer.partitioning.as_ref().unwrap(), child.schema())?;

let codec = match writer.codec.try_into() {
Ok(SparkCompressionCodec::Lz4) => CompressionCodec::Lz4,
Ok(SparkCompressionCodec::Zstd) => CompressionCodec::Zstd(1), // TODO make level configurable
_ => todo!(),
};

let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
Arc::clone(&child.native_plan),
partitioning,
codec,
writer.output_data_file.clone(),
writer.output_index_file.clone(),
)?);
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/shuffle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ mod list;
mod map;
pub mod row;
mod shuffle_writer;
pub use shuffle_writer::ShuffleWriterExec;
pub use shuffle_writer::{CompressionCodec, ShuffleWriterExec};
4 changes: 3 additions & 1 deletion native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ macro_rules! downcast_builder_ref {
}

// Expose the macro for other modules.
use crate::execution::shuffle::shuffle_writer::CompressionCodec;
pub(crate) use downcast_builder_ref;

/// Appends field of row to the given struct builder. `dt` is the data type of the field.
Expand Down Expand Up @@ -3358,7 +3359,8 @@ pub fn process_sorted_row_partition(

// we do not collect metrics in Native_writeSortedFileNative
let ipc_time = Time::default();
written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?;
let codec = CompressionCodec::Zstd(1); // TODO config
written += write_ipc_compressed(&batch, &mut cursor, &codec, &ipc_time)?;

if let Some(checksum) = &mut current_checksum {
checksum.update(&mut cursor)?;
Expand Down
69 changes: 60 additions & 9 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ use datafusion_physical_expr::EquivalenceProperties;
use futures::executor::block_on;
use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use lz4::liblz4::BlockChecksum;
use lz4::{BlockSize, ContentChecksum};
use simd_adler32::Adler32;
use std::io::Error;
use std::{
Expand Down Expand Up @@ -90,6 +92,7 @@ pub struct ShuffleWriterExec {
/// Metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
codec: CompressionCodec,
}

impl DisplayAs for ShuffleWriterExec {
Expand Down Expand Up @@ -126,6 +129,7 @@ impl ExecutionPlan for ShuffleWriterExec {
1 => Ok(Arc::new(ShuffleWriterExec::try_new(
Arc::clone(&children[0]),
self.partitioning.clone(),
self.codec.clone(),
self.output_data_file.clone(),
self.output_index_file.clone(),
)?)),
Expand All @@ -152,6 +156,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
metrics,
context,
self.codec.clone(),
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -181,6 +186,7 @@ impl ShuffleWriterExec {
pub fn try_new(
input: Arc<dyn ExecutionPlan>,
partitioning: Partitioning,
codec: CompressionCodec,
output_data_file: String,
output_index_file: String,
) -> Result<Self> {
Expand All @@ -197,6 +203,7 @@ impl ShuffleWriterExec {
output_data_file,
output_index_file,
cache,
codec,
})
}
}
Expand All @@ -217,6 +224,7 @@ struct PartitionBuffer {
batch_size: usize,
/// Memory reservation for this partition buffer.
reservation: MemoryReservation,
codec: CompressionCodec,
}

impl PartitionBuffer {
Expand All @@ -225,6 +233,7 @@ impl PartitionBuffer {
batch_size: usize,
partition_id: usize,
runtime: &Arc<RuntimeEnv>,
codec: CompressionCodec,
) -> Self {
let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", partition_id))
.with_can_spill(true)
Expand All @@ -238,6 +247,7 @@ impl PartitionBuffer {
num_active_rows: 0,
batch_size,
reservation,
codec,
}
}

Expand Down Expand Up @@ -337,7 +347,7 @@ impl PartitionBuffer {
let frozen_capacity_old = self.frozen.capacity();
let mut cursor = Cursor::new(&mut self.frozen);
cursor.seek(SeekFrom::End(0))?;
write_ipc_compressed(&frozen_batch, &mut cursor, ipc_time)?;
write_ipc_compressed(&frozen_batch, &mut cursor, &self.codec, ipc_time)?;

mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
Ok(mem_diff)
Expand Down Expand Up @@ -687,6 +697,7 @@ impl ShuffleRepartitioner {
metrics: ShuffleRepartitionerMetrics,
runtime: Arc<RuntimeEnv>,
batch_size: usize,
codec: CompressionCodec,
) -> Self {
let num_output_partitions = partitioning.partition_count();
let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id))
Expand All @@ -709,7 +720,13 @@ impl ShuffleRepartitioner {
schema: Arc::clone(&schema),
buffered_partitions: (0..num_output_partitions)
.map(|partition_id| {
PartitionBuffer::new(Arc::clone(&schema), batch_size, partition_id, &runtime)
PartitionBuffer::new(
Arc::clone(&schema),
batch_size,
partition_id,
&runtime,
codec.clone(),
)
})
.collect::<Vec<_>>(),
spills: Mutex::new(vec![]),
Expand Down Expand Up @@ -1137,6 +1154,7 @@ async fn external_shuffle(
partitioning: Partitioning,
metrics: ShuffleRepartitionerMetrics,
context: Arc<TaskContext>,
codec: CompressionCodec,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::new(
Expand All @@ -1148,6 +1166,7 @@ async fn external_shuffle(
metrics,
context.runtime_env(),
context.session_config().batch_size(),
codec,
);

while let Some(batch) = input.next().await {
Expand Down Expand Up @@ -1526,11 +1545,18 @@ impl Checksum {
}
}

#[derive(Debug, Clone)]
pub enum CompressionCodec {
Lz4,
Zstd(i32),
}

/// Writes given record batch as Arrow IPC bytes into given writer.
/// Returns number of bytes written.
pub(crate) fn write_ipc_compressed<W: Write + Seek>(
batch: &RecordBatch,
output: &mut W,
codec: &CompressionCodec,
ipc_time: &Time,
) -> Result<usize> {
if batch.num_rows() == 0 {
Expand All @@ -1543,14 +1569,38 @@ pub(crate) fn write_ipc_compressed<W: Write + Seek>(
// write ipc_length placeholder
output.write_all(&[0u8; 8])?;

// write ipc data
// TODO: make compression level configurable
let mut arrow_writer = StreamWriter::try_new(zstd::Encoder::new(output, 1)?, &batch.schema())?;
arrow_writer.write(batch)?;
arrow_writer.finish()?;
let output = match codec {
CompressionCodec::Lz4 => {
// write IPC first without compression
let mut buffer = vec![];
let mut arrow_writer = StreamWriter::try_new(&mut buffer, &batch.schema())?;
arrow_writer.write(batch)?;
arrow_writer.finish()?;
let ipc_encoded = arrow_writer.into_inner()?;

let mut encoder = lz4::EncoderBuilder::new()
.content_size(ipc_encoded.len() as u64)
.checksum(ContentChecksum::ChecksumEnabled)
.block_checksum(BlockChecksum::BlockChecksumEnabled)
.level(4)
.block_size(BlockSize::Default)
.auto_flush(true)
.build(&mut *output)?;
encoder.write_all(ipc_encoded.as_slice())?;
let (output, result) = encoder.finish();
result?;
output
}
CompressionCodec::Zstd(level) => {
let encoder = zstd::Encoder::new(output, *level)?;
let mut arrow_writer = StreamWriter::try_new(encoder, &batch.schema())?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really familiar with the code, but shouldn't StreamWriter and encoder be created only once per stream instead of per batch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is something I have been thinking about as well. We have the cost of writing the schema for each batch currently, and the schema is guaranteed to be the same for each batch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add some more context here, we buffer rows per partition until we reach the desired batch size and then need to serialize that batch to bytes that can be read as one block by CometBlockStoreShuffleReader.

arrow_writer.write(batch)?;
arrow_writer.finish()?;
let zstd_encoder = arrow_writer.into_inner()?;
zstd_encoder.finish()?
}
};

let zwriter = arrow_writer.into_inner()?;
let output = zwriter.finish()?;
let end_pos = output.stream_position()?;
let ipc_length = end_pos - start_pos - 8;

Expand Down Expand Up @@ -1687,6 +1737,7 @@ mod test {
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], num_partitions),
CompressionCodec::Zstd(1),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
)
Expand Down
6 changes: 6 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ message Limit {
int32 offset = 2;
}

enum CompressionCodec {
Lz4 = 0;
Zstd = 1;
}

message ShuffleWriter {
spark.spark_partitioning.Partitioning partitioning = 1;
string output_data_file = 3;
string output_index_file = 4;
CompressionCodec codec = 5;
}

enum AggregateMode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ import org.apache.spark.util.random.XORShiftRandom

import com.google.common.base.Objects

import org.apache.comet.CometConf
import org.apache.comet.serde.{OperatorOuterClass, PartitioningOuterClass, QueryPlanSerde}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.OperatorOuterClass.{CompressionCodec, Operator}
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
import org.apache.comet.shims.ShimCometShuffleExchangeExec

Expand Down Expand Up @@ -553,6 +554,12 @@ class CometShuffleWriteProcessor(
shuffleWriterBuilder.setOutputDataFile(dataFile)
shuffleWriterBuilder.setOutputIndexFile(indexFile)

val codec = CometConf.COMET_EXEC_SHUFFLE_CODEC.get() match {
case "lz4" => CompressionCodec.Lz4
case "zstd" => CompressionCodec.Zstd
}
shuffleWriterBuilder.setCodec(codec)

outputPartitioning match {
case _: HashPartitioning =>
val hashPartitioning = outputPartitioning.asInstanceOf[HashPartitioning]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.SparkEnv
import org.apache.spark.TaskContext
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.IO_COMPRESSION_CODEC
import org.apache.spark.io.CompressionCodec
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.{BypassMergeSortShuffleHandle, SerializedShuffleHandle, SortShuffleManager, SortShuffleWriter}
Expand Down Expand Up @@ -241,18 +239,6 @@ object CometShuffleManager extends Logging {
executorComponents
}

lazy val compressionCodecForShuffling: CompressionCodec = {
val sparkConf = SparkEnv.get.conf
val codecName = CometConf.COMET_EXEC_SHUFFLE_CODEC.get(SQLConf.get)

// only zstd compression is supported at the moment
if (codecName != "zstd") {
logWarning(
s"Overriding config ${IO_COMPRESSION_CODEC}=${codecName} in shuffling, force using zstd")
}
CompressionCodec.createCodec(sparkConf, "zstd")
}

def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
Expand Down
Loading