diff --git a/native/core/benches/shuffle_writer.rs b/native/core/benches/shuffle_writer.rs index 272887238..c27ac8ce5 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/core/benches/shuffle_writer.rs @@ -15,36 +15,82 @@ // specific language governing permissions and limitations // under the License. +use arrow::ipc::writer::StreamWriter; +use arrow_array::builder::Int32Builder; use arrow_array::{builder::StringBuilder, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; -use comet::execution::shuffle::ShuffleWriterExec; +use comet::execution::shuffle::{calculate_partition_ids, write_ipc_compressed, ShuffleWriterExec}; use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::physical_plan::metrics::Time; use datafusion::{ physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan}, prelude::SessionContext, }; use datafusion_physical_expr::{expressions::Column, Partitioning}; +use std::io::Cursor; use std::sync::Arc; use tokio::runtime::Runtime; fn criterion_benchmark(c: &mut Criterion) { - let batch = create_batch(); - let mut batches = Vec::new(); - for _ in 0..10 { - batches.push(batch.clone()); - } - let partitions = &[batches]; - let exec = ShuffleWriterExec::try_new( - Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()), - Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), - "/tmp/data.out".to_string(), - "/tmp/index.out".to_string(), - ) - .unwrap(); - let mut group = c.benchmark_group("shuffle_writer"); - group.bench_function("shuffle_writer", |b| { + group.bench_function("shuffle_writer: calculate partition ids", |b| { + let batch = create_batch(8192, true); + let arrays = batch.columns().to_vec(); + let mut hashes_buf = vec![0; batch.num_rows()]; + let mut partition_ids = vec![0; batch.num_rows()]; + b.iter(|| { + calculate_partition_ids(&arrays, 200, &mut hashes_buf, &mut partition_ids).unwrap(); + }); + }); + group.bench_function("shuffle_writer: encode with new writer each time", |b| { + let batches = create_batches(); + let schema = batches[0].schema(); + let mut output = vec![]; + b.iter(|| { + output.clear(); + for batch in &batches { + let cursor = Cursor::new(&mut output); + let mut arrow_writer = + StreamWriter::try_new(zstd::Encoder::new(cursor, 1).unwrap(), &schema).unwrap(); + arrow_writer.write(batch).unwrap(); + arrow_writer.finish().unwrap(); + } + }); + }); + group.bench_function("shuffle_writer: encode with single writer", |b| { + let batches = create_batches(); + let schema = batches[0].schema(); + let mut output = vec![]; + b.iter(|| { + output.clear(); + let cursor = Cursor::new(&mut output); + let mut arrow_writer = + StreamWriter::try_new(zstd::Encoder::new(cursor, 1).unwrap(), &schema).unwrap(); + for batch in &batches { + arrow_writer.write(batch).unwrap(); + } + arrow_writer.finish().unwrap(); + }); + }); + group.bench_function("shuffle_writer: encode and compress", |b| { + let batch = create_batch(8192, true); + let mut buffer = vec![]; + let mut cursor = Cursor::new(&mut buffer); + let ipc_time = Time::default(); + b.iter(|| write_ipc_compressed(&batch, &mut cursor, &ipc_time)); + }); + group.bench_function("shuffle_writer: end to end", |b| { let ctx = SessionContext::new(); + let batches = create_batches(); + let schema = batches[0].schema(); + let partitions = &[batches]; + let exec = ShuffleWriterExec::try_new( + Arc::new(MemoryExec::try_new(partitions, schema, None).unwrap()), + Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), + "/tmp/data.out".to_string(), + "/tmp/index.out".to_string(), + ) + .unwrap(); b.iter(|| { let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx).unwrap(); @@ -54,19 +100,33 @@ fn criterion_benchmark(c: &mut Criterion) { }); } -fn create_batch() -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); +fn create_batches() -> Vec { + let batch = create_batch(8192, true); + let mut batches = Vec::new(); + for _ in 0..10 { + batches.push(batch.clone()); + } + batches +} + +fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("c0", DataType::Int32, true), + Field::new("c1", DataType::Utf8, true), + ])); + let mut a = Int32Builder::new(); let mut b = StringBuilder::new(); - for i in 0..8192 { - if i % 10 == 0 { + for i in 0..num_rows { + a.append_value(i as i32); + if allow_nulls && i % 10 == 0 { b.append_null(); } else { - b.append_value(format!("{i}")); + b.append_value(format!("this is string number {i}")); } } - let array = b.finish(); - - RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap() + let a = a.finish(); + let b = b.finish(); + RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap() } fn config() -> Criterion { diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index 8721ead74..add1f5cbb 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -19,4 +19,4 @@ mod list; mod map; pub mod row; mod shuffle_writer; -pub use shuffle_writer::ShuffleWriterExec; +pub use shuffle_writer::{calculate_partition_ids, write_ipc_compressed, ShuffleWriterExec}; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index fcc8c51f6..8c310aca2 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -773,23 +773,23 @@ impl ShuffleRepartitioner { Partitioning::Hash(exprs, _) => { let (partition_starts, shuffled_partition_ids): (Vec, Vec) = { let mut timer = self.metrics.repart_time.timer(); + + // evaluate partition expressions let arrays = exprs .iter() .map(|expr| expr.evaluate(&input)?.into_array(input.num_rows())) .collect::>>()?; - // use identical seed as spark hash partition - let hashes_buf = &mut self.hashes_buf[..arrays[0].len()]; - hashes_buf.fill(42_u32); - - // Hash arrays and compute buckets based on number of partitions - let partition_ids = &mut self.partition_ids[..arrays[0].len()]; - create_murmur3_hashes(&arrays, hashes_buf)? - .iter() - .enumerate() - .for_each(|(idx, hash)| { - partition_ids[idx] = pmod(*hash, num_output_partitions) as u64 - }); + // calculate partition ids + let num_rows = input.num_rows(); + let hashes_buf = &mut self.hashes_buf[..num_rows]; + let partition_ids = &mut self.partition_ids[..num_rows]; + calculate_partition_ids( + &arrays, + num_output_partitions, + hashes_buf, + partition_ids, + )?; // count each partition size let mut partition_counters = vec![0usize; num_output_partitions]; @@ -1085,6 +1085,25 @@ impl ShuffleRepartitioner { } } +/// Calculate the partition ID for each row in a batch +#[inline] +pub fn calculate_partition_ids( + arrays: &[ArrayRef], + num_output_partitions: usize, + hashes_buf: &mut [u32], + partition_ids: &mut [u64], +) -> Result<(), DataFusionError> { + // use identical seed as spark hash partition + hashes_buf.fill(42_u32); + + // Hash arrays and compute buckets based on number of partitions + create_murmur3_hashes(arrays, hashes_buf)? + .iter() + .enumerate() + .for_each(|(idx, hash)| partition_ids[idx] = pmod(*hash, num_output_partitions) as u64); + Ok(()) +} + /// consume the `buffered_partitions` and do spill into a single temp shuffle output file fn spill_into( buffered_partitions: &mut [PartitionBuffer], @@ -1528,7 +1547,7 @@ impl Checksum { /// Writes given record batch as Arrow IPC bytes into given writer. /// Returns number of bytes written. -pub(crate) fn write_ipc_compressed( +pub fn write_ipc_compressed( batch: &RecordBatch, output: &mut W, ipc_time: &Time,