From 49cf0d7f32813b6cb5bd3999378c960bd1171fca Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Dec 2024 07:45:06 -0700 Subject: [PATCH] chore: Move string kernels and expressions to spark-expr crate (#1164) * Move string kernels and expressions to spark-expr crate * remove unused hash kernel * remove unused dependencies --- native/Cargo.lock | 2 - native/core/Cargo.toml | 6 - native/core/benches/hash.rs | 137 ------------- .../execution/datafusion/expressions/mod.rs | 1 - .../core/src/execution/datafusion/planner.rs | 15 +- native/core/src/execution/kernels/hash.rs | 187 ------------------ native/core/src/execution/kernels/mod.rs | 23 --- native/core/src/execution/mod.rs | 3 - native/spark-expr/src/kernels/mod.rs | 1 + .../src}/kernels/strings.rs | 7 +- native/spark-expr/src/lib.rs | 2 + .../expressions => spark-expr/src}/strings.rs | 2 +- 12 files changed, 13 insertions(+), 373 deletions(-) delete mode 100644 native/core/benches/hash.rs delete mode 100644 native/core/src/execution/kernels/hash.rs delete mode 100644 native/core/src/execution/kernels/mod.rs rename native/{core/src/execution => spark-expr/src}/kernels/strings.rs (96%) rename native/{core/src/execution/datafusion/expressions => spark-expr/src}/strings.rs (99%) diff --git a/native/Cargo.lock b/native/Cargo.lock index 67d041a39..9a8eab83e 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -871,7 +871,6 @@ dependencies = [ name = "datafusion-comet" version = "0.5.0" dependencies = [ - "ahash", "arrow", "arrow-array", "arrow-buffer", @@ -893,7 +892,6 @@ dependencies = [ "datafusion-physical-expr", "flate2", "futures", - "half", "hex", "itertools 0.11.0", "jni", diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 4b9753ec5..489da46d4 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -41,7 +41,6 @@ arrow-buffer = { workspace = true } arrow-data = { workspace = true } arrow-schema = { workspace = true } parquet = { workspace = true, default-features = false, features = ["experimental"] } -half = { version = "2.4.1", default-features = false } futures = { workspace = true } mimalloc = { version = "*", default-features = false, optional = true } tokio = { version = "1", features = ["rt-multi-thread"] } @@ -62,7 +61,6 @@ rand = { workspace = true} num = { workspace = true } bytes = "1.5.0" tempfile = "3.8.0" -ahash = { version = "0.8", default-features = false } itertools = "0.11.0" paste = "1.0.14" datafusion-common = { workspace = true } @@ -102,10 +100,6 @@ harness = false name = "bit_util" harness = false -[[bench]] -name = "hash" -harness = false - [[bench]] name = "row_columnar" harness = false diff --git a/native/core/benches/hash.rs b/native/core/benches/hash.rs deleted file mode 100644 index 039a6d5d8..000000000 --- a/native/core/benches/hash.rs +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#[path = "common.rs"] -mod common; - -use arrow_array::ArrayRef; -use comet::execution::kernels::hash; -use common::*; -use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use datafusion_comet_spark_expr::scalar_funcs::{spark_murmur3_hash, spark_xxhash64}; -use datafusion_common::ScalarValue; -use datafusion_expr::ColumnarValue; -use std::sync::Arc; - -const BATCH_SIZE: usize = 1024 * 8; -const NUM_ITER: usize = 10; -const NULL_FRACTION: f32 = 0.1; - -fn criterion_benchmark(c: &mut Criterion) { - let mut group = c.benchmark_group("hash"); - - let a1: ArrayRef = Arc::new(create_int64_array(BATCH_SIZE, 0.0, 0, BATCH_SIZE as i64)); - let a2: ArrayRef = Arc::new(create_int64_array(BATCH_SIZE, 0.0, 0, BATCH_SIZE as i64)); - let a3: ArrayRef = Arc::new(create_int64_array( - BATCH_SIZE, - NULL_FRACTION, - 0, - BATCH_SIZE as i64, - )); - let a4: ArrayRef = Arc::new(create_int64_array( - BATCH_SIZE, - NULL_FRACTION, - 0, - BATCH_SIZE as i64, - )); - - group.bench_function( - BenchmarkId::new("hash_i64_single_nonnull", BATCH_SIZE), - |b| { - let input = vec![a1.clone()]; - let mut dst = vec![0; BATCH_SIZE]; - - b.iter(|| { - for _ in 0..NUM_ITER { - hash(&input, &mut dst); - } - }); - }, - ); - group.bench_function(BenchmarkId::new("hash_i64_single_null", BATCH_SIZE), |b| { - let input = vec![a3.clone()]; - let mut dst = vec![0; BATCH_SIZE]; - - b.iter(|| { - for _ in 0..NUM_ITER { - hash(&input, &mut dst); - } - }); - }); - group.bench_function( - BenchmarkId::new("hash_i64_multiple_nonnull", BATCH_SIZE), - |b| { - let input = vec![a1.clone(), a2.clone()]; - let mut dst = vec![0; BATCH_SIZE]; - - b.iter(|| { - for _ in 0..NUM_ITER { - hash(&input, &mut dst); - } - }); - }, - ); - group.bench_function( - BenchmarkId::new("hash_i64_multiple_null", BATCH_SIZE), - |b| { - let input = vec![a3.clone(), a4.clone()]; - let mut dst = vec![0; BATCH_SIZE]; - - b.iter(|| { - for _ in 0..NUM_ITER { - hash(&input, &mut dst); - } - }); - }, - ); - group.bench_function(BenchmarkId::new("xxhash64", BATCH_SIZE), |b| { - let inputs = &[ - ColumnarValue::Array(a3.clone()), - ColumnarValue::Array(a4.clone()), - ColumnarValue::Scalar(ScalarValue::Int64(Some(42i64))), - ]; - - b.iter(|| { - for _ in 0..NUM_ITER { - spark_xxhash64(inputs).unwrap(); - } - }); - }); - group.bench_function(BenchmarkId::new("murmur3", BATCH_SIZE), |b| { - let inputs = &[ - ColumnarValue::Array(a3.clone()), - ColumnarValue::Array(a4.clone()), - ColumnarValue::Scalar(ScalarValue::Int32(Some(42))), - ]; - b.iter(|| { - for _ in 0..NUM_ITER { - spark_murmur3_hash(inputs).unwrap(); - } - }); - }); -} - -fn config() -> Criterion { - Criterion::default() -} - -criterion_group! { - name = benches; - config = config(); - targets = criterion_benchmark -} -criterion_main!(benches); diff --git a/native/core/src/execution/datafusion/expressions/mod.rs b/native/core/src/execution/datafusion/expressions/mod.rs index 2bb14df36..5f9f322b2 100644 --- a/native/core/src/execution/datafusion/expressions/mod.rs +++ b/native/core/src/execution/datafusion/expressions/mod.rs @@ -23,7 +23,6 @@ use crate::errors::CometError; pub mod bloom_filter_agg; pub mod bloom_filter_might_contain; pub mod negative; -pub mod strings; pub mod subquery; pub mod unbound; diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 5e77b3f65..0e64ed6af 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -25,12 +25,8 @@ use crate::{ datafusion::{ expressions::{ bloom_filter_agg::BloomFilterAgg, - bloom_filter_might_contain::BloomFilterMightContain, - checkoverflow::CheckOverflow, - negative, - strings::{Contains, EndsWith, Like, StartsWith, StringSpaceExpr, SubstringExpr}, - subquery::Subquery, - unbound::UnboundColumn, + bloom_filter_might_contain::BloomFilterMightContain, checkoverflow::CheckOverflow, + negative, subquery::Subquery, unbound::UnboundColumn, }, operators::expand::CometExpandExec, shuffle_writer::ShuffleWriterExec, @@ -90,9 +86,10 @@ use datafusion_comet_proto::{ spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; use datafusion_comet_spark_expr::{ - ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Correlation, Covariance, CreateNamedStruct, - DateTruncExpr, GetArrayStructFields, GetStructField, HourExpr, IfExpr, ListExtract, MinuteExpr, - NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, Stddev, SumDecimal, + ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Contains, Correlation, Covariance, + CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, GetStructField, HourExpr, + IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, SecondExpr, + SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, SumDecimal, TimestampTruncExpr, ToJson, Variance, }; use datafusion_common::scalar::ScalarStructBuilder; diff --git a/native/core/src/execution/kernels/hash.rs b/native/core/src/execution/kernels/hash.rs deleted file mode 100644 index b39fd6224..000000000 --- a/native/core/src/execution/kernels/hash.rs +++ /dev/null @@ -1,187 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::common::bit; -use ahash::RandomState; -use arrow::datatypes::{i256, ArrowNativeType}; -use arrow_array::{ - downcast_dictionary_array, downcast_primitive_array, Array, ArrayAccessor, ArrayRef, - ArrowPrimitiveType, PrimitiveArray, -}; -use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; -use std::fmt::Debug; - -pub fn hash(src: &[ArrayRef], dst: &mut [u64]) { - let state = RandomState::with_seed(42); - src.iter().enumerate().for_each(|(idx, v)| { - downcast_dictionary_array!( - v => { - let keys = v.keys(); - let values = v.values(); - downcast_primitive_array!( - values => hash_dict_typed(&state, idx > 0, keys, values, dst), - dt => panic!("Expected only primitive type but found {}", dt) - ) - }, - dt => { - downcast_primitive_array!( - v => hash_typed(&state, idx > 0, v, dst), - _ => panic!("Expected only primitive type but found {}", dt) - ) - } - ) - }); -} - -fn hash_typed(state: &RandomState, mix: bool, array: T, dst: &mut [u64]) -where - T: ArrayAccessor, - T::Item: Hashable + Debug, -{ - let nullable = array.null_count() > 0; - let num_values = array.len(); - if nullable { - for i in 0..num_values { - if !array.is_null(i) { - unsafe { - let value = array.value_unchecked(i); - hash1(state, mix, i, value, dst); - } - } - } - } else { - for i in 0..num_values { - unsafe { - let value = array.value_unchecked(i); - hash1(state, mix, i, value, dst); - } - } - } -} - -fn hash_dict_typed( - state: &RandomState, - mix: bool, - keys: &PrimitiveArray, - values: V, - dst: &mut [u64], -) where - K: ArrowPrimitiveType, - V: ArrayAccessor, - V::Item: Hashable + Debug, -{ - let nullable = keys.null_count() > 0; - let num_keys = keys.len(); - let mut value_hashes = vec![0; values.len()]; - - for (i, value_hash) in value_hashes.iter_mut().enumerate() { - unsafe { - *value_hash = values.value_unchecked(i).create_hash(state); - } - } - - if nullable { - for i in 0..num_keys { - if !keys.is_null(i) { - unsafe { - let idx = keys.value_unchecked(i); - let hash = value_hashes[idx.as_usize()]; - hash1_helper(mix, i, hash, dst); - } - } - } - } else { - for i in 0..num_keys { - unsafe { - let idx = keys.value_unchecked(i); - let hash = value_hashes[idx.as_usize()]; - hash1_helper(mix, i, hash, dst); - } - } - } -} - -#[inline(always)] -fn hash1(state: &RandomState, mix: bool, i: usize, value: T, dst: &mut [u64]) { - let hash = value.create_hash(state); - hash1_helper(mix, i, hash, dst); -} - -#[inline(always)] -fn hash1_helper(mix: bool, i: usize, hash: u64, dst: &mut [u64]) { - dst[i] = if mix { - bit::mix_hash(dst[i], hash) - } else { - hash - } -} - -pub(crate) trait Hashable { - fn create_hash(&self, state: &RandomState) -> u64; -} - -macro_rules! impl_hashable { - ($($t:ty),+) => { - $(impl Hashable for $t { - #[inline] - fn create_hash(&self, state: &RandomState) -> u64 { - state.hash_one(self) - } - })+ - }; -} - -impl_hashable!(i8, i16, i32, u8, u16, u32, u64, i128, i256); - -impl Hashable for i64 { - fn create_hash(&self, state: &RandomState) -> u64 { - state.hash_one(self) - } -} - -impl Hashable for half::f16 { - fn create_hash(&self, _: &RandomState) -> u64 { - unimplemented!("hashing on f16 is not supported") - } -} - -impl Hashable for f32 { - fn create_hash(&self, state: &RandomState) -> u64 { - state.hash_one(u32::from_ne_bytes(self.to_ne_bytes())) - } -} - -impl Hashable for f64 { - fn create_hash(&self, state: &RandomState) -> u64 { - state.hash_one(u64::from_ne_bytes(self.to_ne_bytes())) - } -} - -impl Hashable for IntervalDayTime { - fn create_hash(&self, state: &RandomState) -> u64 { - state.hash_one(self.days); - state.hash_one(self.milliseconds) - } -} - -impl Hashable for IntervalMonthDayNano { - fn create_hash(&self, state: &RandomState) -> u64 { - state.hash_one(self.months); - state.hash_one(self.days); - state.hash_one(self.nanoseconds) - } -} diff --git a/native/core/src/execution/kernels/mod.rs b/native/core/src/execution/kernels/mod.rs deleted file mode 100644 index 675dcd489..000000000 --- a/native/core/src/execution/kernels/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Kernels - -mod hash; -pub use hash::hash; - -pub(crate) mod strings; diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index f17935702..3dba747f2 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -18,9 +18,6 @@ //! PoC of vectorization execution through JNI to Rust. pub mod datafusion; pub mod jni_api; - -pub mod kernels; // for benchmarking - mod metrics; pub mod operators; pub mod serde; diff --git a/native/spark-expr/src/kernels/mod.rs b/native/spark-expr/src/kernels/mod.rs index 88aa34b1a..3669ff13a 100644 --- a/native/spark-expr/src/kernels/mod.rs +++ b/native/spark-expr/src/kernels/mod.rs @@ -17,4 +17,5 @@ //! Kernels +pub mod strings; pub(crate) mod temporal; diff --git a/native/core/src/execution/kernels/strings.rs b/native/spark-expr/src/kernels/strings.rs similarity index 96% rename from native/core/src/execution/kernels/strings.rs rename to native/spark-expr/src/kernels/strings.rs index d63b2c477..bb275fbb9 100644 --- a/native/core/src/execution/kernels/strings.rs +++ b/native/spark-expr/src/kernels/strings.rs @@ -25,15 +25,14 @@ use arrow::{ compute::kernels::substring::{substring as arrow_substring, substring_by_char}, datatypes::{DataType, Int32Type}, }; - -use crate::errors::ExpressionError; +use datafusion_common::DataFusionError; /// Returns an ArrayRef with a string consisting of `length` spaces. /// /// # Preconditions /// /// - elements in `length` must not be negative -pub fn string_space(length: &dyn Array) -> Result { +pub fn string_space(length: &dyn Array) -> Result { match length.data_type() { DataType::Int32 => { let array = length.as_any().downcast_ref::().unwrap(); @@ -52,7 +51,7 @@ pub fn string_space(length: &dyn Array) -> Result { } } -pub fn substring(array: &dyn Array, start: i64, length: u64) -> Result { +pub fn substring(array: &dyn Array, start: i64, length: u64) -> Result { match array.data_type() { DataType::LargeUtf8 => substring_by_char( array diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 15f446ef3..5dff6e0b8 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -33,6 +33,8 @@ mod correlation; pub use correlation::Correlation; mod covariance; pub use covariance::Covariance; +mod strings; +pub use strings::{Contains, EndsWith, Like, StartsWith, StringSpaceExpr, SubstringExpr}; mod kernels; mod list; mod regexp; diff --git a/native/core/src/execution/datafusion/expressions/strings.rs b/native/spark-expr/src/strings.rs similarity index 99% rename from native/core/src/execution/datafusion/expressions/strings.rs rename to native/spark-expr/src/strings.rs index 200b4ec5a..a8aab6aee 100644 --- a/native/core/src/execution/datafusion/expressions/strings.rs +++ b/native/spark-expr/src/strings.rs @@ -17,7 +17,7 @@ #![allow(deprecated)] -use crate::execution::kernels::strings::{string_space, substring}; +use crate::kernels::strings::{string_space, substring}; use arrow::{ compute::{ contains_dyn, contains_utf8_scalar_dyn, ends_with_dyn, ends_with_utf8_scalar_dyn, like_dyn,