From 7fd55c504b8b0f210e84e71953b5ddd6a00bb5a0 Mon Sep 17 00:00:00 2001 From: o_voievodin Date: Tue, 20 Feb 2024 15:51:17 -0800 Subject: [PATCH 1/6] Upgrade DF and arrow-rs --- core/Cargo.lock | 197 +++++++----------- core/Cargo.toml | 6 +- .../execution/datafusion/expressions/avg.rs | 12 +- .../datafusion/expressions/avg_decimal.rs | 10 +- .../datafusion/expressions/scalar_funcs.rs | 20 +- .../datafusion/expressions/sum_decimal.rs | 10 +- core/src/execution/datafusion/planner.rs | 2 + core/src/execution/operators/copy.rs | 5 +- core/src/execution/operators/scan.rs | 2 +- .../src/parquet/util/test_common/page_util.rs | 6 +- 10 files changed, 111 insertions(+), 159 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 0f262c03c..faec69611 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -492,16 +492,16 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -650,8 +650,8 @@ version = "7.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c64043d6c7b7a4c58e39e7efccfdea7b93d885a795d0c054a69dbbf4dd52686" dependencies = [ - "strum", - "strum_macros", + "strum 0.25.0", + "strum_macros 0.25.3", "unicode-width", ] @@ -833,9 +833,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4328f5467f76d890fe3f924362dbc3a838c6a733f762b32d87f9e0b7bef5fb49" +checksum = "b2b360b692bf6c6d6e6b6dbaf41a3be0020daeceac0f406aed54c75331e50dbb" dependencies = [ "ahash", "arrow", @@ -849,6 +849,7 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-plan", @@ -874,9 +875,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d29a7752143b446db4a2cccd9a6517293c6b97e8c39e520ca43ccd07135a4f7e" +checksum = "37f343ccc298f440e25aa38ff82678291a7acc24061c7370ba6c0ff5cc811412" dependencies = [ "ahash", "arrow", @@ -893,9 +894,9 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d447650af16e138c31237f53ddaef6dd4f92f0e2d3f2f35d190e16c214ca496" +checksum = "3f9c93043081487e335399a21ebf8295626367a647ac5cb87d41d18afad7d0f7" dependencies = [ "arrow", "chrono", @@ -914,9 +915,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8d19598e48a498850fb79f97a9719b1f95e7deb64a7a06f93f313e8fa1d524b" +checksum = "e204d89909e678846b6a95f156aafc1ee5b36cb6c9e37ec2e1449b078a38c818" dependencies = [ "ahash", "arrow", @@ -924,15 +925,30 @@ dependencies = [ "datafusion-common", "paste", "sqlparser", - "strum", - "strum_macros", + "strum 0.26.1", + "strum_macros 0.26.1", +] + +[[package]] +name = "datafusion-functions" +version = "36.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98f1c73f7801b2b8ba2297b3ad78ffcf6c1fc6b8171f502987eb9ad5cb244ee7" +dependencies = [ + "arrow", + "base64", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "hex", + "log", ] [[package]] name = "datafusion-optimizer" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b7feb0391f1fc75575acb95b74bfd276903dc37a5409fcebe160bc7ddff2010" +checksum = "5ae27e07bf1f04d327be5c2a293470879801ab5535204dc3b16b062fda195496" dependencies = [ "arrow", "async-trait", @@ -948,9 +964,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e911bca609c89a54e8f014777449d8290327414d3e10c57a3e3c2122e38878d0" +checksum = "dde620cd9ef76a3bca9c754fb68854bd2349c49f55baf97e08001f9e967f6d6b" dependencies = [ "ahash", "arrow", @@ -958,11 +974,13 @@ dependencies = [ "arrow-buffer", "arrow-ord", "arrow-schema", + "arrow-string", "base64", "blake2", "blake3", "chrono", "datafusion-common", + "datafusion-execution", "datafusion-expr", "half 2.1.0", "hashbrown 0.14.3", @@ -982,9 +1000,9 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b546b8a02e9c2ab35ac6420d511f12a4701950c1eb2e568c122b4fefb0be3" +checksum = "9a4c75fba9ea99d64b2246cbd2fcae2e6fc973e6616b1015237a616036506dd4" dependencies = [ "ahash", "arrow", @@ -1013,9 +1031,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d18d36f260bbbd63aafdb55339213a23d540d3419810575850ef0a798a6b768" +checksum = "21474a95c3a62d113599d21b439fa15091b538bac06bd20be0bb2e7d22903c09" dependencies = [ "arrow", "arrow-schema", @@ -1087,7 +1105,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -1336,7 +1354,7 @@ version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" dependencies = [ - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -1436,7 +1454,7 @@ checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ "hermit-abi", "rustix", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -1472,32 +1490,18 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" -[[package]] -name = "java-locator" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90003f2fd9c52f212c21d8520f1128da0080bad6fff16b68fe6e7f2f0c3780c2" -dependencies = [ - "glob", - "lazy_static", -] - [[package]] name = "jni" -version = "0.21.1" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" dependencies = [ "cesu8", - "cfg-if", "combine", - "java-locator", "jni-sys", - "libloading", "log", "thiserror", "walkdir", - "windows-sys 0.45.0", ] [[package]] @@ -1600,16 +1604,6 @@ version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" -[[package]] -name = "libloading" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" -dependencies = [ - "cfg-if", - "winapi", -] - [[package]] name = "libm" version = "0.2.8" @@ -2353,7 +2347,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -2516,9 +2510,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "sqlparser" -version = "0.41.0" +version = "0.43.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964" +checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4" dependencies = [ "log", "sqlparser_derive", @@ -2558,8 +2552,14 @@ name = "strum" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" + +[[package]] +name = "strum" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "723b93e8addf9aa965ebe2d11da6d7540fa2283fcea14b3371ff055f7ba13f5f" dependencies = [ - "strum_macros", + "strum_macros 0.26.1", ] [[package]] @@ -2575,6 +2575,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "strum_macros" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a3417fc93d76740d974a01654a09777cb500428cc874ca9f45edfe0c4d4cd18" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.48", +] + [[package]] name = "subtle" version = "2.5.0" @@ -2636,7 +2649,7 @@ dependencies = [ "fastrand", "redox_syscall", "rustix", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -2740,9 +2753,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -3029,15 +3042,6 @@ dependencies = [ "windows-targets 0.52.0", ] -[[package]] -name = "windows-sys" -version = "0.45.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.2", -] - [[package]] name = "windows-sys" version = "0.52.0" @@ -3047,21 +3051,6 @@ dependencies = [ "windows-targets 0.52.0", ] -[[package]] -name = "windows-targets" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", -] - [[package]] name = "windows-targets" version = "0.48.5" @@ -3092,12 +3081,6 @@ dependencies = [ "windows_x86_64_msvc 0.52.0", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -3110,12 +3093,6 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" - [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -3128,12 +3105,6 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" -[[package]] -name = "windows_i686_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" - [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -3146,12 +3117,6 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" -[[package]] -name = "windows_i686_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" - [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -3164,12 +3129,6 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" - [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -3182,12 +3141,6 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" - [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -3200,12 +3153,6 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - [[package]] name = "windows_x86_64_msvc" version = "0.48.5" diff --git a/core/Cargo.toml b/core/Cargo.toml index 14e271788..4dc5afe6f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -66,9 +66,9 @@ itertools = "0.11.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.8" } paste = "1.0.14" -datafusion-common = { version = "35.0.0" } -datafusion = { default-features = false, version = "35.0.0", features = ["unicode_expressions"] } -datafusion-physical-expr = { version = "35.0.0", default-features = false , features = ["unicode_expressions"] } +datafusion-common = { version = "36.0.0" } +datafusion = { default-features = false, version = "36.0.0", features = ["unicode_expressions"] } +datafusion-physical-expr = { version = "36.0.0", default-features = false , features = ["unicode_expressions"] } unicode-segmentation = "^1.10.1" once_cell = "1.18.0" regex = "1.9.6" diff --git a/core/src/execution/datafusion/expressions/avg.rs b/core/src/execution/datafusion/expressions/avg.rs index dc2b34747..1e04ab0e9 100644 --- a/core/src/execution/datafusion/expressions/avg.rs +++ b/core/src/execution/datafusion/expressions/avg.rs @@ -24,11 +24,11 @@ use arrow_array::{ Array, ArrayRef, ArrowNumericType, Int64Array, PrimitiveArray, }; use arrow_schema::{DataType, Field}; -use datafusion::logical_expr::{type_coercion::aggregates::avg_return_type, Accumulator}; -use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; -use datafusion_physical_expr::{ - expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr, +use datafusion::logical_expr::{ + type_coercion::aggregates::avg_return_type, Accumulator, EmitTo, GroupsAccumulator, }; +use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; +use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr}; use std::{any::Any, sync::Arc}; use arrow_array::ArrowNativeTypeOp; @@ -146,7 +146,7 @@ pub struct AvgAccumulator { } impl Accumulator for AvgAccumulator { - fn state(&self) -> Result> { + fn state(&mut self) -> Result> { Ok(vec![ ScalarValue::Float64(self.sum), ScalarValue::from(self.count), @@ -175,7 +175,7 @@ impl Accumulator for AvgAccumulator { Ok(()) } - fn evaluate(&self) -> Result { + fn evaluate(&mut self) -> Result { Ok(ScalarValue::Float64( self.sum.map(|f| f / self.count as f64), )) diff --git a/core/src/execution/datafusion/expressions/avg_decimal.rs b/core/src/execution/datafusion/expressions/avg_decimal.rs index dc7bf1599..6fb558109 100644 --- a/core/src/execution/datafusion/expressions/avg_decimal.rs +++ b/core/src/execution/datafusion/expressions/avg_decimal.rs @@ -24,11 +24,9 @@ use arrow_array::{ Array, ArrayRef, Decimal128Array, Int64Array, PrimitiveArray, }; use arrow_schema::{DataType, Field}; -use datafusion::logical_expr::Accumulator; +use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator}; use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue}; -use datafusion_physical_expr::{ - expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr, -}; +use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr}; use std::{any::Any, sync::Arc}; use arrow_array::ArrowNativeTypeOp; @@ -214,7 +212,7 @@ impl AvgDecimalAccumulator { } impl Accumulator for AvgDecimalAccumulator { - fn state(&self) -> Result> { + fn state(&mut self) -> Result> { Ok(vec![ ScalarValue::Decimal128(self.sum, self.sum_precision, self.sum_scale), ScalarValue::from(self.count), @@ -266,7 +264,7 @@ impl Accumulator for AvgDecimalAccumulator { Ok(()) } - fn evaluate(&self) -> Result { + fn evaluate(&mut self) -> Result { fn make_decimal128(value: Option, precision: u8, scale: i8) -> ScalarValue { ScalarValue::Decimal128(value, precision, scale) } diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs index 875956621..f87d36493 100644 --- a/core/src/execution/datafusion/expressions/scalar_funcs.rs +++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs @@ -31,13 +31,11 @@ use datafusion::{ physical_plan::ColumnarValue, }; use datafusion_common::{ - cast::as_generic_string_array, internal_err, DataFusionError, Result as DataFusionResult, - ScalarValue, + cast::as_generic_string_array, exec_err, internal_err, DataFusionError, + Result as DataFusionResult, ScalarValue, }; use datafusion_physical_expr::{ - execution_props::ExecutionProps, - functions::{create_physical_fun, make_scalar_function}, - math_expressions, + execution_props::ExecutionProps, functions::create_physical_fun, math_expressions, }; use num::{BigInt, Signed, ToPrimitive}; use unicode_segmentation::UnicodeSegmentation; @@ -366,7 +364,12 @@ fn spark_round( let (precision, scale) = get_precision_scale(data_type); make_decimal_array(array, precision, scale, &f) } - _ => make_scalar_function(math_expressions::round)(args), + DataType::Float32 | DataType::Float64 => { + Ok(ColumnarValue::Array(math_expressions::round(&[ + array.clone() + ])?)) + } + dt => exec_err!("Not supported datatype[{dt}] for ROUND"), }, ColumnarValue::Scalar(a) => match a { ScalarValue::Int64(a) if *point < 0 => { @@ -386,7 +389,10 @@ fn spark_round( let (precision, scale) = get_precision_scale(data_type); make_decimal_scalar(a, precision, scale, &f) } - _ => make_scalar_function(math_expressions::round)(args), + ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar( + ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?, + )), + dt => exec_err!("Not supported datatype[{dt}] for ROUND"), }, } } diff --git a/core/src/execution/datafusion/expressions/sum_decimal.rs b/core/src/execution/datafusion/expressions/sum_decimal.rs index a6da5f579..2afbbf011 100644 --- a/core/src/execution/datafusion/expressions/sum_decimal.rs +++ b/core/src/execution/datafusion/expressions/sum_decimal.rs @@ -24,11 +24,9 @@ use arrow_array::{ }; use arrow_data::decimal::validate_decimal_precision; use arrow_schema::{DataType, Field}; -use datafusion::logical_expr::Accumulator; +use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator}; use datafusion_common::{Result as DFResult, ScalarValue}; -use datafusion_physical_expr::{ - aggregate::utils::down_cast_any_ref, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr, -}; +use datafusion_physical_expr::{aggregate::utils::down_cast_any_ref, AggregateExpr, PhysicalExpr}; use std::{any::Any, ops::BitAnd, sync::Arc}; use crate::unlikely; @@ -204,7 +202,7 @@ impl Accumulator for SumDecimalAccumulator { Ok(()) } - fn evaluate(&self) -> DFResult { + fn evaluate(&mut self) -> DFResult { // For each group: // 1. if `is_empty` is true, it means either there is no value or all values for the group // are null, in this case we'll return null @@ -224,7 +222,7 @@ impl Accumulator for SumDecimalAccumulator { std::mem::size_of_val(self) } - fn state(&self) -> DFResult> { + fn state(&mut self) -> DFResult> { let sum = if self.is_not_null { ScalarValue::try_new_decimal128(self.sum, self.precision, self.scale)? } else { diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 2feaacebf..dbf5542d4 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -607,6 +607,7 @@ impl PhysicalPlanner { vec![left, right], data_type, None, + false, ))) } _ => Ok(Arc::new(BinaryExpr::new(left, op, right))), @@ -961,6 +962,7 @@ impl PhysicalPlanner { args.to_vec(), data_type, None, + args.is_empty(), )); Ok(scalar_expr) diff --git a/core/src/execution/operators/copy.rs b/core/src/execution/operators/copy.rs index c818d622d..996db2b47 100644 --- a/core/src/execution/operators/copy.rs +++ b/core/src/execution/operators/copy.rs @@ -28,7 +28,7 @@ use arrow_array::{ArrayRef, RecordBatch}; use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*}; -use datafusion_common::{DataFusionError, Result as DataFusionResult}; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; use super::copy_or_cast_array; @@ -141,8 +141,7 @@ impl CopyStream { .iter() .map(|v| copy_or_cast_array(v)) .collect::, _>>()?; - RecordBatch::try_new(self.schema.clone(), vectors) - .map_err(|err| DataFusionError::ArrowError(err, None)) + RecordBatch::try_new(self.schema.clone(), vectors).map_err(|e| arrow_datafusion_err!(e)) } } diff --git a/core/src/execution/operators/scan.rs b/core/src/execution/operators/scan.rs index 9f85de80f..efe21ac93 100644 --- a/core/src/execution/operators/scan.rs +++ b/core/src/execution/operators/scan.rs @@ -325,7 +325,7 @@ impl ScanStream { let options = RecordBatchOptions::new().with_row_count(Some(num_rows)); RecordBatch::try_new_with_options(self.schema.clone(), new_columns, &options) - .map_err(|err| DataFusionError::ArrowError(err, None)) + .map_err(|e| arrow_datafusion_err!(e)) } } diff --git a/core/src/parquet/util/test_common/page_util.rs b/core/src/parquet/util/test_common/page_util.rs index efd3f38e3..0c3a790ee 100644 --- a/core/src/parquet/util/test_common/page_util.rs +++ b/core/src/parquet/util/test_common/page_util.rs @@ -22,7 +22,7 @@ use rand::distributions::uniform::SampleUniform; use parquet::{ basic::Encoding, column::page::{Page, PageIterator, PageMetadata, PageReader}, - data_type::DataType, + data_type::{AsBytes, DataType}, encodings::{ encoding::{get_encoder, DictEncoder, Encoder}, levels::{max_buffer_size, LevelEncoder}, @@ -31,6 +31,8 @@ use parquet::{ schema::types::{ColumnDescPtr, SchemaDescPtr}, }; +use crate::parquet::util::memory::ByteBufferPtr; + use super::random_numbers_range; use bytes::Bytes; use zstd::zstd_safe::WriteBuf; @@ -290,7 +292,7 @@ pub fn make_pages( let indices = dict_encoder .write_indices() .expect("write_indices() should be OK"); - pb.add_indices(indices); + pb.add_indices(ByteBufferPtr::new(indices.as_bytes().to_vec())); } Encoding::PLAIN => { pb.add_values::(encoding, &values[value_range]); From d0c6daa541aa4a6a0bb8e35fb8ad0ab58e47aa2a Mon Sep 17 00:00:00 2001 From: o_voievodin Date: Tue, 20 Feb 2024 16:12:02 -0800 Subject: [PATCH 2/6] fix benches --- core/benches/common.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/benches/common.rs b/core/benches/common.rs index 059721698..15952b83c 100644 --- a/core/benches/common.rs +++ b/core/benches/common.rs @@ -45,6 +45,7 @@ pub fn create_int64_array(size: usize, null_density: f32, min: i64, max: i64) -> .collect() } +#[allow(dead_code)] pub fn create_primitive_array(size: usize, null_density: f32) -> PrimitiveArray where T: ArrowPrimitiveType, @@ -64,6 +65,7 @@ where /// Creates a dictionary with random keys and values, with value type `T`. /// Note here the keys are the dictionary indices. +#[allow(dead_code)] pub fn create_dictionary_array( size: usize, value_size: usize, From 2f83f68c63fac8872bdf6bb9d9a360de467c7436 Mon Sep 17 00:00:00 2001 From: o_voievodin Date: Wed, 21 Feb 2024 08:17:12 -0800 Subject: [PATCH 3/6] fix merge --- core/Cargo.lock | 104 ++++++++++++++++-- .../src/parquet/util/test_common/page_util.rs | 6 +- 2 files changed, 99 insertions(+), 11 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index faec69611..456d96966 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -1105,7 +1105,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1354,7 +1354,7 @@ version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" dependencies = [ - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1454,7 +1454,7 @@ checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ "hermit-abi", "rustix", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -1490,18 +1490,32 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "java-locator" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90003f2fd9c52f212c21d8520f1128da0080bad6fff16b68fe6e7f2f0c3780c2" +dependencies = [ + "glob", + "lazy_static", +] + [[package]] name = "jni" -version = "0.19.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" dependencies = [ "cesu8", + "cfg-if", "combine", + "java-locator", "jni-sys", + "libloading", "log", "thiserror", "walkdir", + "windows-sys 0.45.0", ] [[package]] @@ -1604,6 +1618,16 @@ version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +[[package]] +name = "libloading" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "libm" version = "0.2.8" @@ -2347,7 +2371,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -2649,7 +2673,7 @@ dependencies = [ "fastrand", "redox_syscall", "rustix", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -3042,6 +3066,15 @@ dependencies = [ "windows-targets 0.52.0", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -3051,6 +3084,21 @@ dependencies = [ "windows-targets 0.52.0", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -3081,6 +3129,12 @@ dependencies = [ "windows_x86_64_msvc 0.52.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -3093,6 +3147,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -3105,6 +3165,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -3117,6 +3183,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -3129,6 +3201,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -3141,6 +3219,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -3153,6 +3237,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" diff --git a/core/src/parquet/util/test_common/page_util.rs b/core/src/parquet/util/test_common/page_util.rs index 0c3a790ee..efd3f38e3 100644 --- a/core/src/parquet/util/test_common/page_util.rs +++ b/core/src/parquet/util/test_common/page_util.rs @@ -22,7 +22,7 @@ use rand::distributions::uniform::SampleUniform; use parquet::{ basic::Encoding, column::page::{Page, PageIterator, PageMetadata, PageReader}, - data_type::{AsBytes, DataType}, + data_type::DataType, encodings::{ encoding::{get_encoder, DictEncoder, Encoder}, levels::{max_buffer_size, LevelEncoder}, @@ -31,8 +31,6 @@ use parquet::{ schema::types::{ColumnDescPtr, SchemaDescPtr}, }; -use crate::parquet::util::memory::ByteBufferPtr; - use super::random_numbers_range; use bytes::Bytes; use zstd::zstd_safe::WriteBuf; @@ -292,7 +290,7 @@ pub fn make_pages( let indices = dict_encoder .write_indices() .expect("write_indices() should be OK"); - pb.add_indices(ByteBufferPtr::new(indices.as_bytes().to_vec())); + pb.add_indices(indices); } Encoding::PLAIN => { pb.add_values::(encoding, &values[value_range]); From 726dee21c056c0de0f1e94d436338b6423416ca4 Mon Sep 17 00:00:00 2001 From: o_voievodin Date: Wed, 21 Feb 2024 10:43:53 -0800 Subject: [PATCH 4/6] fix merge --- core/src/execution/operators/scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/execution/operators/scan.rs b/core/src/execution/operators/scan.rs index efe21ac93..e31230c58 100644 --- a/core/src/execution/operators/scan.rs +++ b/core/src/execution/operators/scan.rs @@ -43,7 +43,7 @@ use datafusion::{ physical_expr::*, physical_plan::{ExecutionPlan, *}, }; -use datafusion_common::{DataFusionError, Result as DataFusionResult}; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; use jni::{ objects::{GlobalRef, JLongArray, JObject, ReleaseMode}, sys::jlongArray, From cb2478708836f2fef9d5ea0052b42fdf69d0d7c6 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 27 Feb 2024 14:46:45 -0800 Subject: [PATCH 5/6] Update core/src/execution/datafusion/expressions/scalar_funcs.rs Co-authored-by: Liang-Chi Hsieh --- core/src/execution/datafusion/expressions/scalar_funcs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs index f87d36493..4e951ad10 100644 --- a/core/src/execution/datafusion/expressions/scalar_funcs.rs +++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs @@ -369,7 +369,7 @@ fn spark_round( array.clone() ])?)) } - dt => exec_err!("Not supported datatype[{dt}] for ROUND"), + dt => exec_err!("Not supported datatype for ROUND: {dt}"), }, ColumnarValue::Scalar(a) => match a { ScalarValue::Int64(a) if *point < 0 => { From 1aade96a2e1a6220621b5f1a7670e3f988547907 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 27 Feb 2024 14:46:56 -0800 Subject: [PATCH 6/6] Update core/src/execution/datafusion/expressions/scalar_funcs.rs Co-authored-by: Liang-Chi Hsieh --- core/src/execution/datafusion/expressions/scalar_funcs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs index 4e951ad10..8ff13e125 100644 --- a/core/src/execution/datafusion/expressions/scalar_funcs.rs +++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs @@ -392,7 +392,7 @@ fn spark_round( ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar( ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?, )), - dt => exec_err!("Not supported datatype[{dt}] for ROUND"), + dt => exec_err!("Not supported datatype for ROUND: {dt}"), }, } }