diff --git a/core/Cargo.lock b/core/Cargo.lock index 0f262c03c..456d96966 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -492,16 +492,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -650,8 +650,8 @@ version = "7.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686" dependencies = [ - "strum", - "strum_macros", + "strum 0.25.0", + "strum_macros 0.25.3", "unicode-width", ] @@ -833,9 +833,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4328f5467f76d890fe3f924362dbc3a838c6a733f762b32d87f9e0b7bef5fb49" +checksum = "b2b360b692bf6c6d6e6b6dbaf41a3be0020daeceac0f406aed54c75331e50dbb" dependencies = [ "ahash", "arrow", @@ -849,6 +849,7 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-plan", @@ -874,9 +875,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d29a7752143b446db4a2cccd9a6517293c6b97e8c39e520ca43ccd07135a4f7e" +checksum = "37f343ccc298f440e25aa38ff82678291a7acc24061c7370ba6c0ff5cc811412" dependencies = [ "ahash", "arrow", @@ -893,9 +894,9 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d447650af16e138c31237f53ddaef6dd4f92f0e2d3f2f35d190e16c214ca496" +checksum = "3f9c93043081487e335399a21ebf8295626367a647ac5cb87d41d18afad7d0f7" dependencies = [ "arrow", "chrono", @@ -914,9 +915,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8d19598e48a498850fb79f97a9719b1f95e7deb64a7a06f93f313e8fa1d524b" +checksum = "e204d89909e678846b6a95f156aafc1ee5b36cb6c9e37ec2e1449b078a38c818" dependencies = [ "ahash", "arrow", @@ -924,15 +925,30 @@ dependencies = [ "datafusion-common", "paste", "sqlparser", - "strum", - "strum_macros", + "strum 0.26.1", + "strum_macros 0.26.1", +] + +[[package]] +name = "datafusion-functions" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98f1c73f7801b2b8ba2297b3ad78ffcf6c1fc6b8171f502987eb9ad5cb244ee7" +dependencies = [ + "arrow", + "base64", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "hex", + "log", ] [[package]] name = "datafusion-optimizer" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b7feb0391f1fc75575acb95b74bfd276903dc37a5409fcebe160bc7ddff2010" +checksum = "5ae27e07bf1f04d327be5c2a293470879801ab5535204dc3b16b062fda195496" dependencies = [ "arrow", "async-trait", @@ -948,9 +964,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e911bca609c89a54e8f014777449d8290327414d3e10c57a3e3c2122e38878d0" +checksum = "dde620cd9ef76a3bca9c754fb68854bd2349c49f55baf97e08001f9e967f6d6b" dependencies = [ "ahash", "arrow", @@ -958,11 +974,13 @@ dependencies = [ "arrow-buffer", "arrow-ord", "arrow-schema", + "arrow-string", "base64", "blake2", "blake3", "chrono", "datafusion-common", + "datafusion-execution", "datafusion-expr", "half 2.1.0", "hashbrown 0.14.3", @@ -982,9 +1000,9 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b546b8a02e9c2ab35ac6420d511f12a4701950c1eb2e568c122b4fefb0be3" +checksum = "9a4c75fba9ea99d64b2246cbd2fcae2e6fc973e6616b1015237a616036506dd4" dependencies = [ "ahash", "arrow", @@ -1013,9 +1031,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d18d36f260bbbd63aafdb55339213a23d540d3419810575850ef0a798a6b768" +checksum = "21474a95c3a62d113599d21b439fa15091b538bac06bd20be0bb2e7d22903c09" dependencies = [ "arrow", "arrow-schema", @@ -2516,9 +2534,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "sqlparser" -version = "0.41.0" +version = "0.43.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964" +checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4" dependencies = [ "log", "sqlparser_derive", @@ -2558,8 +2576,14 @@ name = "strum" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" + +[[package]] +name = "strum" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f" dependencies = [ - "strum_macros", + "strum_macros 0.26.1", ] [[package]] @@ -2575,6 +2599,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "strum_macros" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.48", +] + [[package]] name = "subtle" version = "2.5.0" @@ -2740,9 +2777,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", diff --git a/core/Cargo.toml b/core/Cargo.toml index 14e271788..4dc5afe6f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -66,9 +66,9 @@ itertools = "0.11.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.8" } paste = "1.0.14" -datafusion-common = { version = "35.0.0" } -datafusion = { default-features = false, version = "35.0.0", features = ["unicode_expressions"] } -datafusion-physical-expr = { version = "35.0.0", default-features = false , features = ["unicode_expressions"] } +datafusion-common = { version = "36.0.0" } +datafusion = { default-features = false, version = "36.0.0", features = ["unicode_expressions"] } +datafusion-physical-expr = { version = "36.0.0", default-features = false , features = ["unicode_expressions"] } unicode-segmentation = "^1.10.1" once_cell = "1.18.0" regex = "1.9.6" diff --git a/core/benches/common.rs b/core/benches/common.rs index 059721698..15952b83c 100644 --- a/core/benches/common.rs +++ b/core/benches/common.rs @@ -45,6 +45,7 @@ pub fn create_int64_array(size: usize, null_density: f32, min: i64, max: i64) -> .collect() } +#[allow(dead_code)] pub fn create_primitive_array(size: usize, null_density: f32) -> PrimitiveArray where T: ArrowPrimitiveType, @@ -64,6 +65,7 @@ where /// Creates a dictionary with random keys and values, with value type `T`. /// Note here the keys are the dictionary indices. +#[allow(dead_code)] pub fn create_dictionary_array( size: usize, value_size: usize, diff --git a/core/src/execution/datafusion/expressions/avg.rs b/core/src/execution/datafusion/expressions/avg.rs index dc2b34747..1e04ab0e9 100644 --- a/core/src/execution/datafusion/expressions/avg.rs +++ b/core/src/execution/datafusion/expressions/avg.rs @@ -24,11 +24,11 @@ use arrow_array::{ Array, ArrayRef, ArrowNumericType, Int64Array, PrimitiveArray, }; use arrow_schema::{DataType, Field}; -use datafusion::logical_expr::{type_coercion::aggregates::avg_return_type, Accumulator}; -use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; -use datafusion_physical_expr::{ - expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr, +use datafusion::logical_expr::{ + type_coercion::aggregates::avg_return_type, Accumulator, EmitTo, GroupsAccumulator, }; +use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; +use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr}; use std::{any::Any, sync::Arc}; use arrow_array::ArrowNativeTypeOp; @@ -146,7 +146,7 @@ pub struct AvgAccumulator { } impl Accumulator for AvgAccumulator { - fn state(&self) -> Result> { + fn state(&mut self) -> Result> { Ok(vec![ ScalarValue::Float64(self.sum), ScalarValue::from(self.count), @@ -175,7 +175,7 @@ impl Accumulator for AvgAccumulator { Ok(()) } - fn evaluate(&self) -> Result { + fn evaluate(&mut self) -> Result { Ok(ScalarValue::Float64( self.sum.map(|f| f / self.count as f64), )) diff --git a/core/src/execution/datafusion/expressions/avg_decimal.rs b/core/src/execution/datafusion/expressions/avg_decimal.rs index dc7bf1599..6fb558109 100644 --- a/core/src/execution/datafusion/expressions/avg_decimal.rs +++ b/core/src/execution/datafusion/expressions/avg_decimal.rs @@ -24,11 +24,9 @@ use arrow_array::{ Array, ArrayRef, Decimal128Array, Int64Array, PrimitiveArray, }; use arrow_schema::{DataType, Field}; -use datafusion::logical_expr::Accumulator; +use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator}; use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; -use datafusion_physical_expr::{ - expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr, -}; +use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr}; use std::{any::Any, sync::Arc}; use arrow_array::ArrowNativeTypeOp; @@ -214,7 +212,7 @@ impl AvgDecimalAccumulator { } impl Accumulator for AvgDecimalAccumulator { - fn state(&self) -> Result> { + fn state(&mut self) -> Result> { Ok(vec![ ScalarValue::Decimal128(self.sum, self.sum_precision, self.sum_scale), ScalarValue::from(self.count), @@ -266,7 +264,7 @@ impl Accumulator for AvgDecimalAccumulator { Ok(()) } - fn evaluate(&self) -> Result { + fn evaluate(&mut self) -> Result { fn make_decimal128(value: Option, precision: u8, scale: i8) -> ScalarValue { ScalarValue::Decimal128(value, precision, scale) } diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs index 875956621..8ff13e125 100644 --- a/core/src/execution/datafusion/expressions/scalar_funcs.rs +++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs @@ -31,13 +31,11 @@ use datafusion::{ physical_plan::ColumnarValue, }; use datafusion_common::{ - cast::as_generic_string_array, internal_err, DataFusionError, Result as DataFusionResult, - ScalarValue, + cast::as_generic_string_array, exec_err, internal_err, DataFusionError, + Result as DataFusionResult, ScalarValue, }; use datafusion_physical_expr::{ - execution_props::ExecutionProps, - functions::{create_physical_fun, make_scalar_function}, - math_expressions, + execution_props::ExecutionProps, functions::create_physical_fun, math_expressions, }; use num::{BigInt, Signed, ToPrimitive}; use unicode_segmentation::UnicodeSegmentation; @@ -366,7 +364,12 @@ fn spark_round( let (precision, scale) = get_precision_scale(data_type); make_decimal_array(array, precision, scale, &f) } - _ => make_scalar_function(math_expressions::round)(args), + DataType::Float32 | DataType::Float64 => { + Ok(ColumnarValue::Array(math_expressions::round(&[ + array.clone() + ])?)) + } + dt => exec_err!("Not supported datatype for ROUND: {dt}"), }, ColumnarValue::Scalar(a) => match a { ScalarValue::Int64(a) if *point < 0 => { @@ -386,7 +389,10 @@ fn spark_round( let (precision, scale) = get_precision_scale(data_type); make_decimal_scalar(a, precision, scale, &f) } - _ => make_scalar_function(math_expressions::round)(args), + ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar( + ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?, + )), + dt => exec_err!("Not supported datatype for ROUND: {dt}"), }, } } diff --git a/core/src/execution/datafusion/expressions/sum_decimal.rs b/core/src/execution/datafusion/expressions/sum_decimal.rs index a6da5f579..2afbbf011 100644 --- a/core/src/execution/datafusion/expressions/sum_decimal.rs +++ b/core/src/execution/datafusion/expressions/sum_decimal.rs @@ -24,11 +24,9 @@ use arrow_array::{ }; use arrow_data::decimal::validate_decimal_precision; use arrow_schema::{DataType, Field}; -use datafusion::logical_expr::Accumulator; +use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator}; use datafusion_common::{Result as DFResult, ScalarValue}; -use datafusion_physical_expr::{ - aggregate::utils::down_cast_any_ref, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr, -}; +use datafusion_physical_expr::{aggregate::utils::down_cast_any_ref, AggregateExpr, PhysicalExpr}; use std::{any::Any, ops::BitAnd, sync::Arc}; use crate::unlikely; @@ -204,7 +202,7 @@ impl Accumulator for SumDecimalAccumulator { Ok(()) } - fn evaluate(&self) -> DFResult { + fn evaluate(&mut self) -> DFResult { // For each group: // 1. if `is_empty` is true, it means either there is no value or all values for the group // are null, in this case we'll return null @@ -224,7 +222,7 @@ impl Accumulator for SumDecimalAccumulator { std::mem::size_of_val(self) } - fn state(&self) -> DFResult> { + fn state(&mut self) -> DFResult> { let sum = if self.is_not_null { ScalarValue::try_new_decimal128(self.sum, self.precision, self.scale)? } else { diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 66a29cbb1..f4a0cec79 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -608,6 +608,7 @@ impl PhysicalPlanner { vec![left, right], data_type, None, + false, ))) } _ => Ok(Arc::new(BinaryExpr::new(left, op, right))), @@ -984,6 +985,7 @@ impl PhysicalPlanner { args.to_vec(), data_type, None, + args.is_empty(), )); Ok(scalar_expr) diff --git a/core/src/execution/operators/copy.rs b/core/src/execution/operators/copy.rs index c818d622d..996db2b47 100644 --- a/core/src/execution/operators/copy.rs +++ b/core/src/execution/operators/copy.rs @@ -28,7 +28,7 @@ use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*}; -use datafusion_common::{DataFusionError, Result as DataFusionResult}; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; use super::copy_or_cast_array; @@ -141,8 +141,7 @@ impl CopyStream { .iter() .map(|v| copy_or_cast_array(v)) .collect::, _>>()?; - RecordBatch::try_new(self.schema.clone(), vectors) - .map_err(|err| DataFusionError::ArrowError(err, None)) + RecordBatch::try_new(self.schema.clone(), vectors).map_err(|e| arrow_datafusion_err!(e)) } } diff --git a/core/src/execution/operators/scan.rs b/core/src/execution/operators/scan.rs index 9f85de80f..e31230c58 100644 --- a/core/src/execution/operators/scan.rs +++ b/core/src/execution/operators/scan.rs @@ -43,7 +43,7 @@ use datafusion::{ physical_expr::*, physical_plan::{ExecutionPlan, *}, }; -use datafusion_common::{DataFusionError, Result as DataFusionResult}; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; use jni::{ objects::{GlobalRef, JLongArray, JObject, ReleaseMode}, sys::jlongArray, @@ -325,7 +325,7 @@ impl ScanStream { let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); RecordBatch::try_new_with_options(self.schema.clone(), new_columns, &options) - .map_err(|err| DataFusionError::ArrowError(err, None)) + .map_err(|e| arrow_datafusion_err!(e)) } }