Skip to content

Commit

Permalink
Make compression level configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jul 4, 2024
1 parent b3977cd commit 625f877
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 16 deletions.
6 changes: 6 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ object CometConf extends ShimCometConf {
.stringConf
.createWithDefault("zstd")

val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compressionLevel")
.doc("Zstd compression level used in shuffle.")
.intConf
.createWithDefault(1)

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.columnar.shuffle.async.enabled")
.doc(
Expand Down
9 changes: 8 additions & 1 deletion core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,27 +128,32 @@ pub struct PhysicalPlanner {
exec_context_id: i64,
execution_props: ExecutionProps,
session_ctx: Arc<SessionContext>,
compression_level: i32,
}

#[cfg(test)]
impl Default for PhysicalPlanner {
/// Create default planner (for use in tests only)
fn default() -> Self {
let session_ctx = Arc::new(SessionContext::new());
let execution_props = ExecutionProps::new();
Self {
exec_context_id: TEST_EXEC_CONTEXT_ID,
execution_props,
session_ctx,
compression_level: 1,
}
}
}

impl PhysicalPlanner {
pub fn new(session_ctx: Arc<SessionContext>) -> Self {
pub fn new(session_ctx: Arc<SessionContext>, compression_level: i32) -> Self {
let execution_props = ExecutionProps::new();
Self {
exec_context_id: TEST_EXEC_CONTEXT_ID,
execution_props,
session_ctx,
compression_level,
}
}

Expand All @@ -157,6 +162,7 @@ impl PhysicalPlanner {
exec_context_id,
execution_props: self.execution_props,
session_ctx: self.session_ctx.clone(),
compression_level: self.compression_level,
}
}

Expand Down Expand Up @@ -863,6 +869,7 @@ impl PhysicalPlanner {
partitioning,
writer.output_data_file.clone(),
writer.output_index_file.clone(),
self.compression_level,
)?),
))
}
Expand Down
40 changes: 29 additions & 11 deletions core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ use std::{
task::{Context, Poll},
};

use crate::{
common::bit::ceil,
errors::{CometError, CometResult},
execution::datafusion::spark_hash::{create_murmur3_hashes, pmod},
};
use arrow::{datatypes::*, ipc::writer::StreamWriter};
use async_trait::async_trait;
use bytes::Buf;
Expand Down Expand Up @@ -59,12 +64,6 @@ use itertools::Itertools;
use simd_adler32::Adler32;
use tokio::task;

use crate::{
common::bit::ceil,
errors::{CometError, CometResult},
execution::datafusion::spark_hash::{create_murmur3_hashes, pmod},
};

/// The shuffle writer operator maps each input partition to M output partitions based on a
/// partitioning scheme. No guarantees are made about the order of the resulting partitions.
#[derive(Debug)]
Expand All @@ -80,6 +79,8 @@ pub struct ShuffleWriterExec {
/// Metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
/// zstd compression level
compression_level: i32,
}

impl DisplayAs for ShuffleWriterExec {
Expand Down Expand Up @@ -118,6 +119,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
self.output_data_file.clone(),
self.output_index_file.clone(),
self.compression_level,
)?)),
_ => panic!("ShuffleWriterExec wrong number of children"),
}
Expand All @@ -142,6 +144,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
metrics,
context,
self.compression_level,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -169,6 +172,7 @@ impl ShuffleWriterExec {
partitioning: Partitioning,
output_data_file: String,
output_index_file: String,
compression_level: i32,
) -> Result<Self> {
let cache = PlanProperties::new(
EquivalenceProperties::new(input.schema().clone()),
Expand All @@ -183,6 +187,7 @@ impl ShuffleWriterExec {
output_data_file,
output_index_file,
cache,
compression_level,
})
}
}
Expand All @@ -201,17 +206,20 @@ struct PartitionBuffer {
/// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`,
/// the active array builders will be frozen and appended to frozen buffer `frozen`.
batch_size: usize,
/// zstd compression level
compression_level: i32,
}

impl PartitionBuffer {
fn new(schema: SchemaRef, batch_size: usize) -> Self {
fn new(schema: SchemaRef, batch_size: usize, compression_level: i32) -> Self {
Self {
schema,
frozen: vec![],
active: vec![],
active_slots_mem_size: 0,
num_active_rows: 0,
batch_size,
compression_level,
}
}

Expand Down Expand Up @@ -285,7 +293,7 @@ impl PartitionBuffer {
let frozen_capacity_old = self.frozen.capacity();
let mut cursor = Cursor::new(&mut self.frozen);
cursor.seek(SeekFrom::End(0))?;
write_ipc_compressed(&frozen_batch, &mut cursor)?;
write_ipc_compressed(&frozen_batch, &mut cursor, self.compression_level)?;

mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
Ok(mem_diff)
Expand Down Expand Up @@ -577,6 +585,8 @@ struct ShuffleRepartitioner {
partition_ids: Vec<u64>,
/// The configured batch size
batch_size: usize,
/// zstd compression level
compression_level: i32,
}

struct ShuffleRepartitionerMetrics {
Expand Down Expand Up @@ -611,6 +621,7 @@ impl ShuffleRepartitioner {
metrics: ShuffleRepartitionerMetrics,
runtime: Arc<RuntimeEnv>,
batch_size: usize,
compression_level: i32,
) -> Self {
let num_output_partitions = partitioning.partition_count();
let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id))
Expand All @@ -633,7 +644,7 @@ impl ShuffleRepartitioner {
schema: schema.clone(),
buffered_partitions: Mutex::new(
(0..num_output_partitions)
.map(|_| PartitionBuffer::new(schema.clone(), batch_size))
.map(|_| PartitionBuffer::new(schema.clone(), batch_size, compression_level))
.collect::<Vec<_>>(),
),
spills: Mutex::new(vec![]),
Expand All @@ -645,6 +656,7 @@ impl ShuffleRepartitioner {
hashes_buf,
partition_ids,
batch_size,
compression_level,
}
}

Expand Down Expand Up @@ -963,6 +975,7 @@ async fn external_shuffle(
partitioning: Partitioning,
metrics: ShuffleRepartitionerMetrics,
context: Arc<TaskContext>,
compression_level: i32,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::new(
Expand All @@ -974,6 +987,7 @@ async fn external_shuffle(
metrics,
context.runtime_env(),
context.session_config().batch_size(),
compression_level,
);

while let Some(batch) = input.next().await {
Expand Down Expand Up @@ -1353,6 +1367,7 @@ impl Checksum {
pub(crate) fn write_ipc_compressed<W: Write + Seek>(
batch: &RecordBatch,
output: &mut W,
compression_level: i32,
) -> Result<usize> {
if batch.num_rows() == 0 {
return Ok(0);
Expand All @@ -1363,8 +1378,10 @@ pub(crate) fn write_ipc_compressed<W: Write + Seek>(
output.write_all(&[0u8; 8])?;

// write ipc data
// TODO: make compression level configurable
let mut arrow_writer = StreamWriter::try_new(zstd::Encoder::new(output, 1)?, &batch.schema())?;
let mut arrow_writer = StreamWriter::try_new(
zstd::Encoder::new(output, compression_level)?,
&batch.schema(),
)?;
arrow_writer.write(batch)?;
arrow_writer.finish()?;

Expand Down Expand Up @@ -1465,6 +1482,7 @@ mod test {
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
1,
)
.unwrap();
let ctx = SessionContext::new();
Expand Down
17 changes: 15 additions & 2 deletions core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ struct ExecutionContext {
pub session_ctx: Arc<SessionContext>,
/// Whether to enable additional debugging checks & messages
pub debug_native: bool,
/// zstd compression level
pub compression_level: i32,
}

/// Accept serialized query plan and return the address of the native query plan.
Expand Down Expand Up @@ -132,6 +134,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
.and_then(|x| x.parse::<bool>().ok())
.unwrap_or(false);

let compression_level = configs
.get("compression_level")
.and_then(|x| x.parse::<i32>().ok())
.unwrap_or(1);

// Use multi-threaded tokio runtime to prevent blocking spawned tasks if any
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
Expand Down Expand Up @@ -169,6 +176,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
metrics,
session_ctx: Arc::new(session),
debug_native,
compression_level,
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down Expand Up @@ -317,8 +325,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
// Because we don't know if input arrays are dictionary-encoded when we create
// query plan, we need to defer stream initialization to first time execution.
if exec_context.root_op.is_none() {
let planner = PhysicalPlanner::new(exec_context.session_ctx.clone())
.with_exec_id(exec_context_id);
let planner = PhysicalPlanner::new(
exec_context.session_ctx.clone(),
exec_context.compression_level,
)
.with_exec_id(exec_context_id);
let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
Expand Down Expand Up @@ -455,6 +466,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled: jboolean,
checksum_algo: jint,
current_checksum: jlong,
compression_level: jlong,
) -> jlongArray {
try_unwrap_or_throw(&e, |mut env| unsafe {
let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;
Expand Down Expand Up @@ -493,6 +505,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled,
checksum_algo,
current_checksum,
compression_level as i32,
)?;

let checksum = if let Some(checksum) = checksum {
Expand Down
3 changes: 2 additions & 1 deletion core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3296,6 +3296,7 @@ pub fn process_sorted_row_partition(
// this is the initial checksum for this method, as it also gets updated iteratively
// inside the loop within the method across batches.
initial_checksum: Option<u32>,
compression_level: i32,
) -> Result<(i64, Option<u32>), CometError> {
// TODO: We can tune this parameter automatically based on row size and cache size.
let row_step = 10;
Expand Down Expand Up @@ -3355,7 +3356,7 @@ pub fn process_sorted_row_partition(
let mut frozen: Vec<u8> = vec![];
let mut cursor = Cursor::new(&mut frozen);
cursor.seek(SeekFrom::End(0))?;
written += write_ipc_compressed(&batch, &mut cursor)?;
written += write_ipc_compressed(&batch, &mut cursor, compression_level)?;

if let Some(checksum) = &mut current_checksum {
checksum.update(&mut cursor)?;
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.enabled | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of 'spark.comet.exec.<operator_name>.enabled' at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. By default, this config is false. | false |
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.compressionLevel | Zstd compression level used in shuffle. | 1 |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION}
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL}
import org.apache.comet.vector.NativeUtil

/**
Expand Down Expand Up @@ -85,6 +85,7 @@ class CometExecIterator(
result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
result.put("compression_level", String.valueOf(COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL.get()))

// Strip mandatory prefix spark. which is not required for DataFusion session params
conf.getAll.foreach {
Expand Down

0 comments on commit 625f877

Please sign in to comment.