From 7f112c438d8a151e20487794e191ec4f773803bf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Jun 2024 08:04:56 -0600 Subject: [PATCH 1/5] add benchmark for xxhash64 --- core/benches/hash.rs | 11 +++++++++++ core/src/execution/datafusion/mod.rs | 2 +- core/src/execution/datafusion/spark_hash.rs | 8 +++++++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/core/benches/hash.rs b/core/benches/hash.rs index dafad79dd..da25ece8e 100644 --- a/core/benches/hash.rs +++ b/core/benches/hash.rs @@ -19,6 +19,7 @@ mod common; use arrow_array::ArrayRef; +use comet::execution::datafusion::spark_hash::create_xxhash64_hashes; use comet::execution::kernels::hash; use common::*; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; @@ -95,6 +96,16 @@ fn criterion_benchmark(c: &mut Criterion) { }); }, ); + group.bench_function(BenchmarkId::new("xxhash64", BATCH_SIZE), |b| { + let input = vec![a3.clone(), a4.clone()]; + let mut dst = vec![0; BATCH_SIZE]; + + b.iter(|| { + for _ in 0..NUM_ITER { + create_xxhash64_hashes(&input, &mut dst).unwrap(); + } + }); + }); } fn config() -> Criterion { diff --git a/core/src/execution/datafusion/mod.rs b/core/src/execution/datafusion/mod.rs index 2c6b2bf10..e98e4ed41 100644 --- a/core/src/execution/datafusion/mod.rs +++ b/core/src/execution/datafusion/mod.rs @@ -21,5 +21,5 @@ pub mod expressions; mod operators; pub mod planner; pub mod shuffle_writer; -mod spark_hash; +pub mod spark_hash; mod util; diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index 4d91a87df..f828a76ec 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -35,6 +35,12 @@ use datafusion::{ error::{DataFusionError, Result}, }; +const PRIME64_1: u64 = 0x9E3779B185EBCA87; +const PRIME64_2: u64 = 0xC2B2AE3D27D4EB4F; +const PRIME64_3: u64 = 0x165667B19E3779F9; +const PRIME64_4: u64 = 0x85EBCA77C2B2AE63; +const PRIME64_5: u64 = 0x27D4EB2F165667C5; + #[inline] pub(crate) fn spark_compatible_murmur3_hash>(data: T, seed: u32) -> u32 { #[inline] @@ -481,7 +487,7 @@ pub(crate) fn create_murmur3_hashes<'a>( /// /// The number of rows to hash is determined by `hashes_buffer.len()`. /// `hashes_buffer` should be pre-sized appropriately -pub(crate) fn create_xxhash64_hashes<'a>( +pub fn create_xxhash64_hashes<'a>( arrays: &[ArrayRef], hashes_buffer: &'a mut [u64], ) -> Result<&'a mut [u64]> { From 45d6254ad4310c172dd192be4f402476120d069e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 12 Jun 2024 08:05:32 -0600 Subject: [PATCH 2/5] remove new code --- core/src/execution/datafusion/spark_hash.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/execution/datafusion/spark_hash.rs b/core/src/execution/datafusion/spark_hash.rs index f828a76ec..a31752d4a 100644 --- a/core/src/execution/datafusion/spark_hash.rs +++ b/core/src/execution/datafusion/spark_hash.rs @@ -35,12 +35,6 @@ use datafusion::{ error::{DataFusionError, Result}, }; -const PRIME64_1: u64 = 0x9E3779B185EBCA87; -const PRIME64_2: u64 = 0xC2B2AE3D27D4EB4F; -const PRIME64_3: u64 = 0x165667B19E3779F9; -const PRIME64_4: u64 = 0x85EBCA77C2B2AE63; -const PRIME64_5: u64 = 0x27D4EB2F165667C5; - #[inline] pub(crate) fn spark_compatible_murmur3_hash>(data: T, seed: u32) -> u32 { #[inline] From 5b6bb9532218084bb6034880b4268711aa227885 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 13 Jun 2024 12:03:08 -0600 Subject: [PATCH 3/5] add murmur3 bench --- core/benches/hash.rs | 11 +++++++++++ .../execution/datafusion/expressions/scalar_funcs.rs | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/benches/hash.rs b/core/benches/hash.rs index da25ece8e..a09a61f0c 100644 --- a/core/benches/hash.rs +++ b/core/benches/hash.rs @@ -24,6 +24,9 @@ use comet::execution::kernels::hash; use common::*; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use std::sync::Arc; +use datafusion_common::ScalarValue; +use datafusion_expr::ColumnarValue; +use comet::execution::datafusion::expressions::scalar_funcs::spark_murmur3_hash; const BATCH_SIZE: usize = 1024 * 8; const NUM_ITER: usize = 10; @@ -106,6 +109,14 @@ fn criterion_benchmark(c: &mut Criterion) { } }); }); + group.bench_function(BenchmarkId::new("murmur3", BATCH_SIZE), |b| { + let inputs = &[ColumnarValue::Array(a3.clone()), ColumnarValue::Array(a3.clone()), ColumnarValue::Scalar(ScalarValue::Int32(Some(42)))]; + b.iter(|| { + for _ in 0..NUM_ITER { + spark_murmur3_hash(inputs).unwrap(); + } + }); + }); } fn config() -> Criterion { diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs index c50f06649..3c7af8676 100644 --- a/core/src/execution/datafusion/expressions/scalar_funcs.rs +++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs @@ -636,7 +636,7 @@ fn spark_decimal_div( Ok(ColumnarValue::Array(Arc::new(result))) } -fn spark_murmur3_hash(args: &[ColumnarValue]) -> Result { +pub fn spark_murmur3_hash(args: &[ColumnarValue]) -> Result { let length = args.len(); let seed = &args[length - 1]; match seed { From 76c71a01d8331ff6853ecff37f18e157b849d5b2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 13 Jun 2024 12:22:10 -0600 Subject: [PATCH 4/5] format --- core/benches/hash.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/benches/hash.rs b/core/benches/hash.rs index a09a61f0c..14136e607 100644 --- a/core/benches/hash.rs +++ b/core/benches/hash.rs @@ -19,14 +19,14 @@ mod common; use arrow_array::ArrayRef; +use comet::execution::datafusion::expressions::scalar_funcs::spark_murmur3_hash; use comet::execution::datafusion::spark_hash::create_xxhash64_hashes; use comet::execution::kernels::hash; use common::*; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use std::sync::Arc; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; -use comet::execution::datafusion::expressions::scalar_funcs::spark_murmur3_hash; +use std::sync::Arc; const BATCH_SIZE: usize = 1024 * 8; const NUM_ITER: usize = 10; @@ -110,7 +110,11 @@ fn criterion_benchmark(c: &mut Criterion) { }); }); group.bench_function(BenchmarkId::new("murmur3", BATCH_SIZE), |b| { - let inputs = &[ColumnarValue::Array(a3.clone()), ColumnarValue::Array(a3.clone()), ColumnarValue::Scalar(ScalarValue::Int32(Some(42)))]; + let inputs = &[ + ColumnarValue::Array(a3.clone()), + ColumnarValue::Array(a3.clone()), + ColumnarValue::Scalar(ScalarValue::Int32(Some(42))), + ]; b.iter(|| { for _ in 0..NUM_ITER { spark_murmur3_hash(inputs).unwrap(); From 73726122286d383c78f223997004a268a13ecdf5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 13 Jun 2024 13:04:34 -0600 Subject: [PATCH 5/5] fix --- core/benches/hash.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/benches/hash.rs b/core/benches/hash.rs index 14136e607..b878ebea5 100644 --- a/core/benches/hash.rs +++ b/core/benches/hash.rs @@ -112,7 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) { group.bench_function(BenchmarkId::new("murmur3", BATCH_SIZE), |b| { let inputs = &[ ColumnarValue::Array(a3.clone()), - ColumnarValue::Array(a3.clone()), + ColumnarValue::Array(a4.clone()), ColumnarValue::Scalar(ScalarValue::Int32(Some(42))), ]; b.iter(|| {