Skip to content

Commit

Permalink
Add cargo bench for shuffle writer
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed May 16, 2024
1 parent ddf6a6f commit f8b3203
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 1 deletion.
4 changes: 4 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,7 @@ harness = false
[[bench]]
name = "cast_numeric"
harness = false

[[bench]]
name = "shuffle_writer"
harness = false
83 changes: 83 additions & 0 deletions core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use comet::execution::datafusion::shuffle_writer::ShuffleWriterExec;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::{
physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},
prelude::SessionContext,
};
use datafusion_physical_expr::{expressions::Column, Partitioning};
use std::sync::Arc;
use tokio::runtime::Runtime;

fn criterion_benchmark(c: &mut Criterion) {
let batch = create_batch();
let mut batches = Vec::new();
for _ in 0..10 {
batches.push(batch.clone());
}
let partitions = &[batches];
let exec = ShuffleWriterExec::try_new(
Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()),
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
"data.out".to_string(),
"index.out".to_string(),
)
.unwrap();

let mut group = c.benchmark_group("shuffle_writer");
group.bench_function("shuffle_writer", |b| {
let ctx = SessionContext::new();
b.iter(|| {
let task_ctx = ctx.task_ctx();
let stream = exec.execute(0, task_ctx).unwrap();
let rt = Runtime::new().unwrap();
criterion::black_box(rt.block_on(collect(stream)).unwrap());
});
});
}

fn create_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let mut b = StringBuilder::new();
// TODO test with 81920 to trigger
// range end index 81920 out of range for slice of length 8192
for i in 0..8192 {
if i % 10 == 0 {
b.append_null();
} else {
b.append_value(format!("{i}"));
}
}
let array = b.finish();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap();
batch
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
2 changes: 1 addition & 1 deletion core/src/execution/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@
pub mod expressions;
mod operators;
pub mod planner;
pub(crate) mod shuffle_writer;
pub mod shuffle_writer;
mod spark_hash;
mod util;

0 comments on commit f8b3203

Please sign in to comment.