From 5d2c909f59caf90348096511d4a26e48a0ae8e3d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 28 Dec 2024 15:14:11 -0700 Subject: [PATCH] chore: Upgrade to DataFusion 44.0.0-rc2 (#1154) * move aggregate expressions to spark-expr crate * move more expressions * move benchmark * normalize_nan * bitwise not * comet scalar funcs * update bench imports * save * save * save * remove unused imports * clippy * implement more hashers * implement Hash and PartialEq * implement Hash and PartialEq * implement Hash and PartialEq * benches * fix ScalarUDFImpl.return_type failure * exclude test from miri * ignore correct test * ignore another test * remove miri checks * use return_type_from_exprs * Revert "use return_type_from_exprs" This reverts commit febc1f1ec1301f9b359fc23ad6a117224fce35b7. * use DF main branch * hacky workaround for regression in ScalarUDFImpl.return_type * fix repo url * pin to revision * bump to latest rev * bump to latest DF rev * bump DF to rev 9f530dd * add Cargo.lock * bump DF version * no default features * Revert "remove miri checks" This reverts commit 4638fe3aa5501966cd5d8b53acf26c698b10b3c9. * Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930 * update pin * Update Cargo.toml Bump to 44.0.0-rc2 * update cargo lock * revert miri change --------- Co-authored-by: Andrew Lamb --- native/Cargo.lock | 598 +++++++++--------- native/Cargo.toml | 29 +- .../expressions/bloom_filter_might_contain.rs | 46 +- .../src/execution/expressions/subquery.rs | 23 +- native/core/src/execution/jni_api.rs | 9 +- native/core/src/execution/operators/copy.rs | 4 +- native/core/src/execution/operators/expand.rs | 6 +- native/core/src/execution/operators/filter.rs | 3 +- native/core/src/execution/operators/scan.rs | 4 +- native/core/src/execution/planner.rs | 55 +- .../src/execution/shuffle/shuffle_writer.rs | 6 +- .../src/execution/util/spark_bit_array.rs | 2 +- .../src/execution/util/spark_bloom_filter.rs | 2 +- native/spark-expr/Cargo.toml | 1 + native/spark-expr/benches/aggregate.rs | 3 +- native/spark-expr/src/avg.rs | 22 +- native/spark-expr/src/avg_decimal.rs | 20 +- native/spark-expr/src/bitwise_not.rs | 38 +- native/spark-expr/src/cast.rs | 42 +- native/spark-expr/src/checkoverflow.rs | 53 +- native/spark-expr/src/correlation.rs | 29 +- native/spark-expr/src/covariance.rs | 26 +- native/spark-expr/src/if_expr.rs | 49 +- native/spark-expr/src/list.rs | 127 ++-- native/spark-expr/src/negative.rs | 37 +- native/spark-expr/src/normalize_nan.rs | 45 +- native/spark-expr/src/regexp.rs | 27 +- native/spark-expr/src/stddev.rs | 30 +- native/spark-expr/src/strings.rs | 89 ++- native/spark-expr/src/structs.rs | 59 +- native/spark-expr/src/sum_decimal.rs | 28 +- native/spark-expr/src/temporal.rs | 175 +++-- native/spark-expr/src/to_json.rs | 23 +- native/spark-expr/src/unbound.rs | 21 +- native/spark-expr/src/variance.rs | 19 +- 35 files changed, 715 insertions(+), 1035 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index ad572acb9..bbc0ff97a 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -57,9 +57,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.20" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "android-tzdata" @@ -90,9 +90,9 @@ checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" [[package]] name = "anyhow" -version = "1.0.93" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" +checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" [[package]] name = "arc-swap" @@ -114,9 +114,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4caf25cdc4a985f91df42ed9e9308e1adbcd341a31a72605c697033fcef163e3" +checksum = "c91839b07e474b3995035fd8ac33ee54f9c9ccbbb1ea33d9909c71bffdf1259d" dependencies = [ "arrow-arith", "arrow-array", @@ -135,9 +135,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91f2dfd1a7ec0aca967dfaa616096aec49779adc8eccec005e2f5e4111b1192a" +checksum = "855c57c4efd26722b044dcd3e348252560e3e0333087fb9f6479dc0bf744054f" dependencies = [ "arrow-array", "arrow-buffer", @@ -150,9 +150,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39387ca628be747394890a6e47f138ceac1aa912eab64f02519fed24b637af8" +checksum = "bd03279cea46569acf9295f6224fbc370c5df184b4d2ecfe97ccb131d5615a7f" dependencies = [ "ahash", "arrow-buffer", @@ -161,15 +161,15 @@ dependencies = [ "chrono", "chrono-tz 0.10.0", "half", - "hashbrown 0.14.5", + "hashbrown 0.15.2", "num", ] [[package]] name = "arrow-buffer" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e51e05228852ffe3eb391ce7178a0f97d2cf80cc6ef91d3c4a6b3cb688049ec" +checksum = "9e4a9b9b1d6d7117f6138e13bc4dd5daa7f94e671b70e8c9c4dc37b4f5ecfc16" dependencies = [ "bytes", "half", @@ -178,9 +178,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d09aea56ec9fa267f3f3f6cdab67d8a9974cbba90b3aa38c8fe9d0bb071bd8c1" +checksum = "bc70e39916e60c5b7af7a8e2719e3ae589326039e1e863675a008bee5ffe90fd" dependencies = [ "arrow-array", "arrow-buffer", @@ -199,9 +199,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c07b5232be87d115fde73e32f2ca7f1b353bff1b44ac422d3c6fc6ae38f11f0d" +checksum = "789b2af43c1049b03a8d088ff6b2257cdcea1756cd76b174b1f2600356771b97" dependencies = [ "arrow-array", "arrow-buffer", @@ -218,9 +218,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b98ae0af50890b494cebd7d6b04b35e896205c1d1df7b29a6272c5d0d0249ef5" +checksum = "e4e75edf21ffd53744a9b8e3ed11101f610e7ceb1a29860432824f1834a1f623" dependencies = [ "arrow-buffer", "arrow-schema", @@ -230,9 +230,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ed91bdeaff5a1c00d28d8f73466bcb64d32bbd7093b5a30156b4b9f4dba3eee" +checksum = "d186a909dece9160bf8312f5124d797884f608ef5435a36d9d608e0b2a9bcbf8" dependencies = [ "arrow-array", "arrow-buffer", @@ -245,9 +245,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0471f51260a5309307e5d409c9dc70aede1cd9cf1d4ff0f0a1e8e1a2dd0e0d3c" +checksum = "b66ff2fedc1222942d0bd2fd391cb14a85baa3857be95c9373179bd616753b85" dependencies = [ "arrow-array", "arrow-buffer", @@ -265,9 +265,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2883d7035e0b600fb4c30ce1e50e66e53d8656aa729f2bfa4b51d359cf3ded52" +checksum = "ece7b5bc1180e6d82d1a60e1688c199829e8842e38497563c3ab6ea813e527fd" dependencies = [ "arrow-array", "arrow-buffer", @@ -280,9 +280,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "552907e8e587a6fde4f8843fd7a27a576a260f65dab6c065741ea79f633fc5be" +checksum = "745c114c8f0e8ce211c83389270de6fbe96a9088a7b32c2a041258a443fe83ff" dependencies = [ "ahash", "arrow-array", @@ -294,18 +294,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "539ada65246b949bd99ffa0881a9a15a4a529448af1a07a9838dd78617dafab1" +checksum = "b95513080e728e4cec37f1ff5af4f12c9688d47795d17cda80b6ec2cf74d4678" dependencies = [ "bitflags 2.6.0", ] [[package]] name = "arrow-select" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6259e566b752da6dceab91766ed8b2e67bf6270eb9ad8a6e07a33c1bede2b125" +checksum = "8e415279094ea70323c032c6e739c48ad8d80e78a09bef7117b8718ad5bf3722" dependencies = [ "ahash", "arrow-array", @@ -317,9 +317,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3179ccbd18ebf04277a095ba7321b93fd1f774f18816bd5f6b3ce2f594edb6c" +checksum = "11d956cae7002eb8d83a27dbd34daaea1cf5b75852f0b84deb4d93a276e92bbf" dependencies = [ "arrow-array", "arrow-buffer", @@ -346,7 +346,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -385,6 +385,19 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bigdecimal" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f31f3af01c5c65a07985c804d3366560e6fa7883d640a122819b14ec327482c" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -408,9 +421,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82033247fd8e890df8f740e407ad4d038debb9eb1f40533fffb32e7d17dc6f7" +checksum = "b8ee0c1824c4dea5b5f81736aff91bae041d2c07ee1192bec91054e10e3e601e" dependencies = [ "arrayref", "arrayvec", @@ -457,9 +470,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.19.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" [[package]] name = "byteorder" @@ -469,9 +482,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "cast" @@ -481,9 +494,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.1" +version = "1.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" +checksum = "8d6dbb628b8f8555f86d0323c2eb39e3ec81901f4b83e091db8a6a76d316a333" dependencies = [ "jobserver", "libc", @@ -504,9 +517,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", @@ -586,18 +599,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.21" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" +checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.21" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" +checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" dependencies = [ "anstyle", "clap_lex", @@ -605,9 +618,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" [[package]] name = "combine" @@ -673,9 +686,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" +checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3" dependencies = [ "libc", ] @@ -729,9 +742,9 @@ dependencies = [ [[package]] name = "crossbeam-deque" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" dependencies = [ "crossbeam-epoch", "crossbeam-utils", @@ -748,9 +761,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.20" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crunchy" @@ -805,11 +818,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbba0799cf6913b456ed07a94f0f3b6e12c62a5d88b10809e2284a0f2b915c05" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ - "ahash", "arrow", "arrow-array", "arrow-ipc", @@ -825,6 +836,7 @@ dependencies = [ "datafusion-expr", "datafusion-functions", "datafusion-functions-aggregate", + "datafusion-functions-table", "datafusion-functions-window", "datafusion-optimizer", "datafusion-physical-expr", @@ -834,18 +846,13 @@ dependencies = [ "datafusion-sql", "futures", "glob", - "half", - "hashbrown 0.14.5", - "indexmap", "itertools 0.13.0", "log", - "num_cpus", "object_store", "parking_lot", "parquet", - "paste", - "pin-project-lite", "rand", + "regex", "sqlparser", "tempfile", "tokio", @@ -855,9 +862,8 @@ dependencies = [ [[package]] name = "datafusion-catalog" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7493c5c2d40eec435b13d92e5703554f4efc7059451fcb8d3a79580ff0e45560" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "arrow-schema", "async-trait", @@ -937,6 +943,7 @@ dependencies = [ "datafusion", "datafusion-common", "datafusion-expr", + "datafusion-expr-common", "datafusion-physical-expr", "futures", "num", @@ -945,57 +952,56 @@ dependencies = [ "regex", "thiserror", "tokio", - "twox-hash 2.0.1", + "twox-hash 2.1.0", ] [[package]] name = "datafusion-common" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24953049ebbd6f8964f91f60aa3514e121b5e81e068e33b60e77815ab369b25c" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "ahash", "arrow", "arrow-array", "arrow-buffer", "arrow-schema", - "chrono", "half", "hashbrown 0.14.5", "indexmap", - "instant", "libc", - "num_cpus", + "log", "object_store", "parquet", "paste", "sqlparser", "tokio", + "web-time", ] [[package]] name = "datafusion-common-runtime" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f06df4ef76872e11c924d3c814fd2a8dd09905ed2e2195f71c857d78abd19685" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "log", "tokio", ] +[[package]] +name = "datafusion-doc" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" + [[package]] name = "datafusion-execution" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bbdcb628d690f3ce5fea7de81642b514486d58ff9779a51f180a69a4eadb361" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "arrow", - "chrono", "dashmap", "datafusion-common", "datafusion-expr", "futures", - "hashbrown 0.14.5", "log", "object_store", "parking_lot", @@ -1006,16 +1012,13 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8036495980e3131f706b7d33ab00b4492d73dc714e3cb74d11b50f9602a73246" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ - "ahash", "arrow", - "arrow-array", - "arrow-buffer", "chrono", "datafusion-common", + "datafusion-doc", "datafusion-expr-common", "datafusion-functions-aggregate-common", "datafusion-functions-window-common", @@ -1024,27 +1027,22 @@ dependencies = [ "paste", "serde_json", "sqlparser", - "strum", - "strum_macros", ] [[package]] name = "datafusion-expr-common" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4da0f3cb4669f9523b403d6b5a0ec85023e0ab3bf0183afd1517475b3e64fdd2" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "arrow", "datafusion-common", "itertools 0.13.0", - "paste", ] [[package]] name = "datafusion-functions" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52c4012648b34853e40a2c6bcaa8772f837831019b68aca384fb38436dba162" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "arrow", "arrow-buffer", @@ -1053,8 +1051,11 @@ dependencies = [ "blake3", "chrono", "datafusion-common", + "datafusion-doc", "datafusion-execution", "datafusion-expr", + "datafusion-expr-common", + "datafusion-macros", "hashbrown 0.14.5", "hex", "itertools 0.13.0", @@ -1069,44 +1070,41 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5b8bb624597ba28ed7446df4a9bd7c7a7bde7c578b6b527da3f47371d5f6741" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "ahash", "arrow", "arrow-schema", "datafusion-common", + "datafusion-doc", "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate-common", + "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-common", "half", - "indexmap", "log", "paste", ] [[package]] name = "datafusion-functions-aggregate-common" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb06208fc470bc8cf1ce2d9a1159d42db591f2c7264a8c1776b53ad8f675143" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "ahash", "arrow", "datafusion-common", "datafusion-expr-common", "datafusion-physical-expr-common", - "rand", ] [[package]] name = "datafusion-functions-nested" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fca25bbb87323716d05e54114666e942172ccca23c5a507e9c7851db6e965317" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "arrow", "arrow-array", @@ -1122,18 +1120,33 @@ dependencies = [ "itertools 0.13.0", "log", "paste", - "rand", +] + +[[package]] +name = "datafusion-functions-table" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" +dependencies = [ + "arrow", + "async-trait", + "datafusion-catalog", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-plan", + "parking_lot", + "paste", ] [[package]] name = "datafusion-functions-window" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ae23356c634e54c59f7c51acb7a5b9f6240ffb2cf997049a1a24a8a88598dbe" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "datafusion-common", + "datafusion-doc", "datafusion-expr", "datafusion-functions-window-common", + "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-common", "log", @@ -1142,48 +1155,49 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4b3d6ff7794acea026de36007077a06b18b89e4f9c3fea7f2215f9f7dd9059b" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "datafusion-common", "datafusion-physical-expr-common", ] +[[package]] +name = "datafusion-macros" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" +dependencies = [ + "quote", + "syn 2.0.92", +] + [[package]] name = "datafusion-optimizer" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bec6241eb80c595fa0e1a8a6b69686b5cf3bd5fdacb8319582a0943b0bd788aa" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "arrow", - "async-trait", "chrono", "datafusion-common", "datafusion-expr", "datafusion-physical-expr", - "hashbrown 0.14.5", "indexmap", "itertools 0.13.0", "log", - "paste", + "regex", "regex-syntax", ] [[package]] name = "datafusion-physical-expr" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3370357b8fc75ec38577700644e5d1b0bc78f38babab99c0b8bd26bafb3e4335" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "ahash", "arrow", "arrow-array", "arrow-buffer", - "arrow-ord", "arrow-schema", - "arrow-string", - "chrono", "datafusion-common", "datafusion-expr", "datafusion-expr-common", @@ -1200,39 +1214,36 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8b7734d94bf2fa6f6e570935b0ddddd8421179ce200065be97874e13d46a47b" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "ahash", "arrow", "datafusion-common", "datafusion-expr-common", "hashbrown 0.14.5", - "rand", + "itertools 0.13.0", ] [[package]] name = "datafusion-physical-optimizer" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eee8c479522df21d7b395640dff88c5ed05361852dce6544d7c98e9dbcebffe" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "arrow", - "arrow-schema", "datafusion-common", "datafusion-execution", "datafusion-expr-common", "datafusion-physical-expr", "datafusion-physical-plan", "itertools 0.13.0", + "log", ] [[package]] name = "datafusion-physical-plan" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17e1fc2e2c239d14e8556f2622b19a726bf6bc6962cc00c71fc52626274bee24" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "ahash", "arrow", @@ -1246,7 +1257,6 @@ dependencies = [ "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", - "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr", "datafusion-physical-expr-common", @@ -1256,29 +1266,26 @@ dependencies = [ "indexmap", "itertools 0.13.0", "log", - "once_cell", "parking_lot", "pin-project-lite", - "rand", "tokio", ] [[package]] name = "datafusion-sql" -version = "43.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63e3a4ed41dbee20a5d947a59ca035c225d67dc9cbe869c10f66dcdf25e7ce51" +version = "44.0.0" +source = "git+https://github.com/apache/datafusion.git?rev=44.0.0-rc2#3cc3fca31e6edc2d953e663bfd7f856bcb70d8c4" dependencies = [ "arrow", "arrow-array", "arrow-schema", + "bigdecimal", "datafusion-common", "datafusion-expr", "indexmap", "log", "regex", "sqlparser", - "strum", ] [[package]] @@ -1326,7 +1333,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -1343,19 +1350,19 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "fastrand" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "findshlibs" @@ -1377,9 +1384,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flatbuffers" -version = "24.3.25" +version = "24.12.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" +checksum = "4f1baf0dbf96932ec9a3038d57900329c015b0bfb7b63d904f3bc27e2b02a096" dependencies = [ "bitflags 1.3.2", "rustc_version", @@ -1466,7 +1473,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -1555,9 +1562,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.1" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" [[package]] name = "heck" @@ -1574,12 +1581,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hermit-abi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" - [[package]] name = "hermit-abi" version = "0.4.0" @@ -1594,11 +1595,11 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "home" -version = "0.5.9" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1745,7 +1746,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -1771,12 +1772,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.6.0" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" +checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown 0.15.1", + "hashbrown 0.15.2", ] [[package]] @@ -1797,18 +1798,6 @@ dependencies = [ "str_stack", ] -[[package]] -name = "instant" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" -dependencies = [ - "cfg-if", - "js-sys", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "integer-encoding" version = "3.0.4" @@ -1821,7 +1810,7 @@ version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" dependencies = [ - "hermit-abi 0.4.0", + "hermit-abi", "libc", "windows-sys 0.52.0", ] @@ -1864,9 +1853,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.11" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" [[package]] name = "java-locator" @@ -1912,10 +1901,11 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.72" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" dependencies = [ + "once_cell", "wasm-bindgen", ] @@ -1927,9 +1917,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "lexical-core" -version = "1.0.2" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0431c65b318a590c1de6b8fd6e72798c92291d27762d94c9e6c37ed7a73d8458" +checksum = "b765c31809609075565a70b4b71402281283aeda7ecaf4818ac14a7b2ade8958" dependencies = [ "lexical-parse-float", "lexical-parse-integer", @@ -1940,9 +1930,9 @@ dependencies = [ [[package]] name = "lexical-parse-float" -version = "1.0.2" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb17a4bdb9b418051aa59d41d65b1c9be5affab314a872e5ad7f06231fb3b4e0" +checksum = "de6f9cb01fb0b08060209a057c048fcbab8717b4c1ecd2eac66ebfe39a65b0f2" dependencies = [ "lexical-parse-integer", "lexical-util", @@ -1951,9 +1941,9 @@ dependencies = [ [[package]] name = "lexical-parse-integer" -version = "1.0.2" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5df98f4a4ab53bf8b175b363a34c7af608fe31f93cc1fb1bf07130622ca4ef61" +checksum = "72207aae22fc0a121ba7b6d479e42cbfea549af1479c3f3a4f12c70dd66df12e" dependencies = [ "lexical-util", "static_assertions", @@ -1961,18 +1951,18 @@ dependencies = [ [[package]] name = "lexical-util" -version = "1.0.3" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85314db53332e5c192b6bca611fb10c114a80d1b831ddac0af1e9be1b9232ca0" +checksum = "5a82e24bf537fd24c177ffbbdc6ebcc8d54732c35b50a3f28cc3f4e4c949a0b3" dependencies = [ "static_assertions", ] [[package]] name = "lexical-write-float" -version = "1.0.2" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e7c3ad4e37db81c1cbe7cf34610340adc09c322871972f74877a712abc6c809" +checksum = "c5afc668a27f460fb45a81a757b6bf2f43c2d7e30cb5a2dcd3abf294c78d62bd" dependencies = [ "lexical-util", "lexical-write-integer", @@ -1981,9 +1971,9 @@ dependencies = [ [[package]] name = "lexical-write-integer" -version = "1.0.2" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb89e9f6958b83258afa3deed90b5de9ef68eef090ad5086c791cd2345610162" +checksum = "629ddff1a914a836fb245616a7888b62903aae58fa771e1d83943035efa0f978" dependencies = [ "lexical-util", "static_assertions", @@ -1991,9 +1981,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.162" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libloading" @@ -2029,9 +2019,9 @@ checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "litemap" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" +checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" [[package]] name = "lock_api" @@ -2131,9 +2121,9 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +checksum = "4ffbe83022cedc1d264172192511ae958937694cd57ce297164951b8b3568394" dependencies = [ "adler2", ] @@ -2239,30 +2229,20 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi 0.3.9", - "libc", -] - [[package]] name = "object" -version = "0.36.5" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" dependencies = [ "memchr", ] [[package]] name = "object_store" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eb4c22c6154a1e759d7099f9ffad7cc5ef8245f9efbab4a41b92623079c82f3" +checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf" dependencies = [ "async-trait", "bytes", @@ -2325,9 +2305,9 @@ dependencies = [ [[package]] name = "parquet" -version = "53.2.0" +version = "53.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dea02606ba6f5e856561d8d507dba8bac060aefca2a6c0f1aa1d361fed91ff3e" +checksum = "2b449890367085eb65d7d3321540abc3d7babbd179ce31df0016e90719114191" dependencies = [ "ahash", "arrow-array", @@ -2344,7 +2324,7 @@ dependencies = [ "flate2", "futures", "half", - "hashbrown 0.14.5", + "hashbrown 0.15.2", "lz4_flex", "num", "num-bigint", @@ -2506,9 +2486,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.89" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -2576,7 +2556,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -2600,9 +2580,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" dependencies = [ "proc-macro2", ] @@ -2659,9 +2639,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.7" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ "bitflags 2.6.0", ] @@ -2721,22 +2701,22 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.40" +version = "0.38.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" +checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" dependencies = [ "bitflags 2.6.0", "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "rustversion" -version = "1.0.18" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" +checksum = "f7c45b9784283f1b2e7fb61b42047c2fd678ef0960d4f6f1eba131594cc369d4" [[package]] name = "ryu" @@ -2761,9 +2741,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "semver" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +checksum = "3cb6eb87a131f756572d7fb904f6e7b68633f09cca868c5df1c4b8d1a694bbba" [[package]] name = "seq-macro" @@ -2773,9 +2753,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.215" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] @@ -2792,20 +2772,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.215" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.134" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" dependencies = [ "itoa", "memchr", @@ -2888,7 +2868,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -2899,9 +2879,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "sqlparser" -version = "0.51.0" +version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7" +checksum = "05a528114c392209b3264855ad491fcce534b94a38771b0a0b97a79379275ce8" dependencies = [ "log", "sqlparser_derive", @@ -2909,13 +2889,13 @@ dependencies = [ [[package]] name = "sqlparser_derive" -version = "0.2.2" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" +checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -2941,9 +2921,6 @@ name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" -dependencies = [ - "strum_macros", -] [[package]] name = "strum_macros" @@ -2955,7 +2932,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -2966,9 +2943,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "symbolic-common" -version = "12.12.1" +version = "12.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d4d73159efebfb389d819fd479afb2dbd57dcb3e3f4b7fcfa0e675f5a46c1cb" +checksum = "cd33e73f154e36ec223c18013f7064a2c120f1162fc086ac9933542def186b00" dependencies = [ "debugid", "memmap2", @@ -2978,9 +2955,9 @@ dependencies = [ [[package]] name = "symbolic-demangle" -version = "12.12.1" +version = "12.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a767859f6549c665011970874c3f541838b4835d5aaaa493d3ee383918be9f10" +checksum = "89e51191290147f071777e37fe111800bb82a9059f9c95b19d2dd41bfeddf477" dependencies = [ "cpp_demangle", "rustc-demangle", @@ -3000,9 +2977,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.87" +version = "2.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" +checksum = "70ae51629bf965c5c098cc9e87908a3df5301051a9e087d6f9bef5c9771ed126" dependencies = [ "proc-macro2", "quote", @@ -3017,7 +2994,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -3050,7 +3027,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] @@ -3105,9 +3082,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.41.1" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" dependencies = [ "backtrace", "bytes", @@ -3123,14 +3100,14 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -3139,20 +3116,20 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", ] @@ -3169,9 +3146,9 @@ dependencies = [ [[package]] name = "twox-hash" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6db6856664807f43c17fbaf2718e2381ac1476a449aa104f5f64622defa1245" +checksum = "e7b17f197b3050ba473acf9181f7b1d3b66d1cf7356c6cc57886662276e65908" dependencies = [ "rand", ] @@ -3193,9 +3170,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unicode-ident" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" [[package]] name = "unicode-segmentation" @@ -3226,9 +3203,9 @@ checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" [[package]] name = "url" -version = "2.5.3" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna", @@ -3280,9 +3257,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" dependencies = [ "cfg-if", "once_cell", @@ -3291,24 +3268,23 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3316,28 +3292,38 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.95" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" +checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" [[package]] name = "web-sys" -version = "0.3.72" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" dependencies = [ "js-sys", "wasm-bindgen", @@ -3557,9 +3543,9 @@ checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" [[package]] name = "yoke" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" dependencies = [ "serde", "stable_deref_trait", @@ -3569,13 +3555,13 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.7.4" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", "synstructure", ] @@ -3597,27 +3583,27 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] name = "zerofrom" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" +checksum = "cff3ee08c995dee1859d998dea82f7374f2826091dd9cd47def953cae446cd2e" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" +checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", "synstructure", ] @@ -3640,7 +3626,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.92", ] [[package]] diff --git a/native/Cargo.toml b/native/Cargo.toml index bd46cf0c9..cf4921ebe 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -33,20 +33,21 @@ edition = "2021" rust-version = "1.79" [workspace.dependencies] -arrow = { version = "53.2.0", features = ["prettyprint", "ffi", "chrono-tz"] } -arrow-array = { version = "53.2.0" } -arrow-buffer = { version = "53.2.0" } -arrow-data = { version = "53.2.0" } -arrow-schema = { version = "53.2.0" } -parquet = { version = "53.2.0", default-features = false, features = ["experimental"] } -datafusion = { version = "43.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } -datafusion-common = { version = "43.0.0" } -datafusion-functions = { version = "43.0.0", features = ["crypto_expressions"] } -datafusion-functions-nested = { version = "43.0.0", default-features = false } -datafusion-expr = { version = "43.0.0", default-features = false } -datafusion-execution = { version = "43.0.0", default-features = false } -datafusion-physical-plan = { version = "43.0.0", default-features = false } -datafusion-physical-expr = { version = "43.0.0", default-features = false } +arrow = { version = "53.3.0", features = ["prettyprint", "ffi", "chrono-tz"] } +arrow-array = { version = "53.3.0" } +arrow-buffer = { version = "53.3.0" } +arrow-data = { version = "53.3.0" } +arrow-schema = { version = "53.3.0" } +parquet = { version = "53.3.0", default-features = false, features = ["experimental"] } +datafusion = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false, features = ["unicode_expressions", "crypto_expressions"] } +datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false } +datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false, features = ["crypto_expressions"] } +datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false } +datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false } +datafusion-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false } +datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false } +datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false } +datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false } datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" } datafusion-comet-proto = { path = "proto", version = "0.5.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/native/core/src/execution/expressions/bloom_filter_might_contain.rs b/native/core/src/execution/expressions/bloom_filter_might_contain.rs index af6a5a47a..b14fab62f 100644 --- a/native/core/src/execution/expressions/bloom_filter_might_contain.rs +++ b/native/core/src/execution/expressions/bloom_filter_might_contain.rs @@ -19,26 +19,37 @@ use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data use arrow::record_batch::RecordBatch; use arrow_array::cast::as_primitive_array; use arrow_schema::{DataType, Schema}; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion::physical_plan::ColumnarValue; use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_physical_expr::PhysicalExpr; -use std::{ - any::Any, - fmt::Display, - hash::{Hash, Hasher}, - sync::Arc, -}; +use std::hash::Hash; +use std::{any::Any, fmt::Display, sync::Arc}; /// A physical expression that checks if a value might be in a bloom filter. It corresponds to the /// Spark's `BloomFilterMightContain` expression. -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct BloomFilterMightContain { pub bloom_filter_expr: Arc, pub value_expr: Arc, bloom_filter: Option, } +impl Hash for BloomFilterMightContain { + fn hash(&self, state: &mut H) { + self.bloom_filter_expr.hash(state); + self.value_expr.hash(state); + self.bloom_filter.hash(state); + } +} + +impl PartialEq for BloomFilterMightContain { + fn eq(&self, other: &Self) -> bool { + self.bloom_filter_expr.eq(&other.bloom_filter_expr) + && self.value_expr.eq(&other.value_expr) + && self.bloom_filter.eq(&other.bloom_filter) + } +} + impl Display for BloomFilterMightContain { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( @@ -49,18 +60,6 @@ impl Display for BloomFilterMightContain { } } -impl PartialEq for BloomFilterMightContain { - fn eq(&self, _other: &dyn Any) -> bool { - down_cast_any_ref(_other) - .downcast_ref::() - .map(|other| { - self.bloom_filter_expr.eq(&other.bloom_filter_expr) - && self.value_expr.eq(&other.value_expr) - }) - .unwrap_or(false) - } -} - fn evaluate_bloom_filter( bloom_filter_expr: &Arc, ) -> Result> { @@ -141,11 +140,4 @@ impl PhysicalExpr for BloomFilterMightContain { Arc::clone(&children[1]), )?)) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.bloom_filter_expr.hash(&mut s); - self.value_expr.hash(&mut s); - self.hash(&mut s); - } } diff --git a/native/core/src/execution/expressions/subquery.rs b/native/core/src/execution/expressions/subquery.rs index 3eeb29c16..d933a6096 100644 --- a/native/core/src/execution/expressions/subquery.rs +++ b/native/core/src/execution/expressions/subquery.rs @@ -22,7 +22,6 @@ use crate::{ use arrow_array::RecordBatch; use arrow_schema::{DataType, Schema, TimeUnit}; use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{internal_err, ScalarValue}; use datafusion_physical_expr::PhysicalExpr; use jni::{ @@ -32,11 +31,11 @@ use jni::{ use std::{ any::Any, fmt::{Display, Formatter}, - hash::{Hash, Hasher}, + hash::Hash, sync::Arc, }; -#[derive(Debug, Hash)] +#[derive(Debug, Hash, PartialEq, Eq)] pub struct Subquery { /// The ID of the execution context that owns this subquery. We use this ID to retrieve the /// subquery result. @@ -63,19 +62,6 @@ impl Display for Subquery { } } -impl PartialEq for Subquery { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.id.eq(&x.id) - && self.data_type.eq(&x.data_type) - && self.exec_context_id.eq(&x.exec_context_id) - }) - .unwrap_or(false) - } -} - impl PhysicalExpr for Subquery { fn as_any(&self) -> &dyn Any { self @@ -209,9 +195,4 @@ impl PhysicalExpr for Subquery { ) -> datafusion_common::Result> { Ok(self) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.hash(&mut s) - } } diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 2c1a55f48..09caf5e27 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -20,10 +20,7 @@ use arrow::datatypes::DataType as ArrowDataType; use arrow_array::RecordBatch; use datafusion::{ - execution::{ - disk_manager::DiskManagerConfig, - runtime_env::{RuntimeConfig, RuntimeEnv}, - }, + execution::{disk_manager::DiskManagerConfig, runtime_env::RuntimeEnv}, physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream}, prelude::{SessionConfig, SessionContext}, }; @@ -52,6 +49,7 @@ use crate::{ }; use datafusion_comet_proto::spark_operator::Operator; use datafusion_common::ScalarValue; +use datafusion_execution::runtime_env::RuntimeEnvBuilder; use futures::stream::StreamExt; use jni::{ objects::GlobalRef, @@ -188,7 +186,7 @@ fn prepare_datafusion_session_context( memory_fraction: f64, comet_task_memory_manager: Arc, ) -> CometResult { - let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs); + let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs); // Check if we are using unified memory manager integrated with Spark. if use_unified_memory_manager { @@ -216,6 +214,7 @@ fn prepare_datafusion_session_context( &ScalarValue::Float64(Some(1.1)), ); + #[allow(deprecated)] let runtime = RuntimeEnv::try_new(rt_config)?; let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index 8eeda8a5a..cec00eb28 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -30,6 +30,7 @@ use arrow_array::{ use arrow_data::transform::MutableArrayData; use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaRef}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*}; use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult}; @@ -78,7 +79,8 @@ impl CopyExec { let cache = PlanProperties::new( EquivalenceProperties::new(Arc::clone(&schema)), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Final, + Boundedness::Bounded, ); Self { diff --git a/native/core/src/execution/operators/expand.rs b/native/core/src/execution/operators/expand.rs index fb43a6e49..f75822d40 100644 --- a/native/core/src/execution/operators/expand.rs +++ b/native/core/src/execution/operators/expand.rs @@ -17,10 +17,11 @@ use arrow_array::{RecordBatch, RecordBatchOptions}; use arrow_schema::SchemaRef; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::{ execution::TaskContext, physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, }, }; @@ -54,7 +55,8 @@ impl ExpandExec { let cache = PlanProperties::new( EquivalenceProperties::new(Arc::clone(&schema)), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Final, + Boundedness::Bounded, ); Self { diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs index d9a54712d..eab30a356 100644 --- a/native/core/src/execution/operators/filter.rs +++ b/native/core/src/execution/operators/filter.rs @@ -210,7 +210,8 @@ impl FilterExec { Ok(PlanProperties::new( eq_properties, input.output_partitioning().clone(), // Output Partitioning - input.execution_mode(), // Execution Mode + input.pipeline_behavior(), + input.boundedness(), )) } } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index a297f87c1..888cd2fdb 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -28,6 +28,7 @@ use arrow_data::ffi::FFI_ArrowArray; use arrow_data::ArrayData; use arrow_schema::ffi::FFI_ArrowSchema; use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, }; @@ -122,7 +123,8 @@ impl ScanExec { // The partitioning is not important because we are not using DataFusion's // query planner or optimizer Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Final, + Boundedness::Bounded, ); Ok(Self { diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0a7493354..5a35c62e3 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -94,7 +94,6 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, JoinType as DFJoinType, ScalarValue, }; -use datafusion_expr::expr::find_df_window_func; use datafusion_expr::{ AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, @@ -1515,10 +1514,7 @@ impl PhysicalPlanner { let builder = match datatype { DataType::Decimal128(_, _) => { - let func = AggregateUDF::new_from_impl(SumDecimal::try_new( - Arc::clone(&child), - datatype, - )?); + let func = AggregateUDF::new_from_impl(SumDecimal::try_new(datatype)?); AggregateExprBuilder::new(Arc::new(func), vec![child]) } _ => { @@ -1543,11 +1539,8 @@ impl PhysicalPlanner { let input_datatype = to_arrow_datatype(expr.sum_datatype.as_ref().unwrap()); let builder = match datatype { DataType::Decimal128(_, _) => { - let func = AggregateUDF::new_from_impl(AvgDecimal::new( - Arc::clone(&child), - datatype, - input_datatype, - )); + let func = + AggregateUDF::new_from_impl(AvgDecimal::new(datatype, input_datatype)); AggregateExprBuilder::new(Arc::new(func), vec![child]) } _ => { @@ -1556,11 +1549,7 @@ impl PhysicalPlanner { // failure since it should have already been checked at Spark side. let child: Arc = Arc::new(CastExpr::new(Arc::clone(&child), datatype.clone(), None)); - let func = AggregateUDF::new_from_impl(Avg::new( - Arc::clone(&child), - "avg", - datatype, - )); + let func = AggregateUDF::new_from_impl(Avg::new("avg", datatype)); AggregateExprBuilder::new(Arc::new(func), vec![child]) } }; @@ -1638,8 +1627,6 @@ impl PhysicalPlanner { match expr.stats_type { 0 => { let func = AggregateUDF::new_from_impl(Covariance::new( - Arc::clone(&child1), - Arc::clone(&child2), "covariance", datatype, StatsType::Sample, @@ -1655,8 +1642,6 @@ impl PhysicalPlanner { } 1 => { let func = AggregateUDF::new_from_impl(Covariance::new( - Arc::clone(&child1), - Arc::clone(&child2), "covariance_pop", datatype, StatsType::Population, @@ -1682,7 +1667,6 @@ impl PhysicalPlanner { match expr.stats_type { 0 => { let func = AggregateUDF::new_from_impl(Variance::new( - Arc::clone(&child), "variance", datatype, StatsType::Sample, @@ -1693,7 +1677,6 @@ impl PhysicalPlanner { } 1 => { let func = AggregateUDF::new_from_impl(Variance::new( - Arc::clone(&child), "variance_pop", datatype, StatsType::Population, @@ -1714,7 +1697,6 @@ impl PhysicalPlanner { match expr.stats_type { 0 => { let func = AggregateUDF::new_from_impl(Stddev::new( - Arc::clone(&child), "stddev", datatype, StatsType::Sample, @@ -1725,7 +1707,6 @@ impl PhysicalPlanner { } 1 => { let func = AggregateUDF::new_from_impl(Stddev::new( - Arc::clone(&child), "stddev_pop", datatype, StatsType::Population, @@ -1747,8 +1728,6 @@ impl PhysicalPlanner { self.create_expr(expr.child2.as_ref().unwrap(), Arc::clone(&schema))?; let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); let func = AggregateUDF::new_from_impl(Correlation::new( - Arc::clone(&child1), - Arc::clone(&child2), "correlation", datatype, expr.null_on_divide_by_zero, @@ -1935,7 +1914,7 @@ impl PhysicalPlanner { window_func_name, &window_args, partition_by, - sort_exprs, + &LexOrdering::new(sort_exprs.to_vec()), window_frame.into(), input_schema.as_ref(), false, // TODO: Ignore nulls @@ -1985,15 +1964,11 @@ impl PhysicalPlanner { /// Find DataFusion's built-in window function by name. fn find_df_window_function(&self, name: &str) -> Option { - if let Some(f) = find_df_window_func(name) { - Some(f) - } else { - let registry = &self.session_ctx.state(); - registry - .udaf(name) - .map(WindowFunctionDefinition::AggregateUDF) - .ok() - } + let registry = &self.session_ctx.state(); + registry + .udaf(name) + .map(WindowFunctionDefinition::AggregateUDF) + .ok() } /// Create a DataFusion physical partitioning from Spark physical partitioning @@ -2049,7 +2024,15 @@ impl PhysicalPlanner { .coerce_types(&input_expr_types) .unwrap_or_else(|_| input_expr_types.clone()); - let data_type = func.inner().return_type(&coerced_types)?; + let data_type = match fun_name { + // workaround for https://github.com/apache/datafusion/issues/13716 + "datepart" => DataType::Int32, + _ => { + // TODO need to call `return_type_from_exprs` instead + #[allow(deprecated)] + func.inner().return_type(&coerced_types)? + } + }; (data_type, coerced_types) } diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 01117199e..f3fa685b8 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -25,6 +25,7 @@ use arrow::{datatypes::*, ipc::writer::StreamWriter}; use async_trait::async_trait; use bytes::Buf; use crc32fast::Hasher; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::{ arrow::{ array::*, @@ -44,7 +45,7 @@ use datafusion::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, }, stream::RecordBatchStreamAdapter, - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }, }; @@ -191,7 +192,8 @@ impl ShuffleWriterExec { let cache = PlanProperties::new( EquivalenceProperties::new(Arc::clone(&input.schema())), partitioning.clone(), - ExecutionMode::Bounded, + EmissionType::Final, + Boundedness::Bounded, ); Ok(ShuffleWriterExec { diff --git a/native/core/src/execution/util/spark_bit_array.rs b/native/core/src/execution/util/spark_bit_array.rs index 6cfecc1bf..3ac8b199b 100644 --- a/native/core/src/execution/util/spark_bit_array.rs +++ b/native/core/src/execution/util/spark_bit_array.rs @@ -22,7 +22,7 @@ use std::iter::zip; /// A simple bit array implementation that simulates the behavior of Spark's BitArray which is /// used in the BloomFilter implementation. Some methods are not implemented as they are not /// required for the current use case. -#[derive(Debug, Hash)] +#[derive(Debug, Hash, PartialEq, Eq)] pub struct SparkBitArray { data: Vec, bit_count: usize, diff --git a/native/core/src/execution/util/spark_bloom_filter.rs b/native/core/src/execution/util/spark_bloom_filter.rs index 2c3af1691..61245757c 100644 --- a/native/core/src/execution/util/spark_bloom_filter.rs +++ b/native/core/src/execution/util/spark_bloom_filter.rs @@ -27,7 +27,7 @@ const SPARK_BLOOM_FILTER_VERSION_1: i32 = 1; /// A Bloom filter implementation that simulates the behavior of Spark's BloomFilter. /// It's not a complete implementation of Spark's BloomFilter, but just add the minimum /// methods to support mightContainsLong in the native side. -#[derive(Debug, Hash)] +#[derive(Debug, Hash, PartialEq, Eq)] pub struct SparkBloomFilter { bits: SparkBitArray, num_hash_functions: u32, diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index 27367d83e..fc348f81b 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -36,6 +36,7 @@ chrono = { workspace = true } datafusion = { workspace = true, features = ["parquet"] } datafusion-common = { workspace = true } datafusion-expr = { workspace = true } +datafusion-expr-common = { workspace = true } datafusion-physical-expr = { workspace = true } chrono-tz = { workspace = true } num = { workspace = true } diff --git a/native/spark-expr/benches/aggregate.rs b/native/spark-expr/benches/aggregate.rs index 43194fdda..051ac5eb6 100644 --- a/native/spark-expr/benches/aggregate.rs +++ b/native/spark-expr/benches/aggregate.rs @@ -66,7 +66,6 @@ fn criterion_benchmark(c: &mut Criterion) { group.bench_function("avg_decimal_comet", |b| { let comet_avg_decimal = Arc::new(AggregateUDF::new_from_impl(AvgDecimal::new( - Arc::clone(&c1), DataType::Decimal128(38, 10), DataType::Decimal128(38, 10), ))); @@ -96,7 +95,7 @@ fn criterion_benchmark(c: &mut Criterion) { group.bench_function("sum_decimal_comet", |b| { let comet_sum_decimal = Arc::new(AggregateUDF::new_from_impl( - SumDecimal::try_new(Arc::clone(&c1), DataType::Decimal128(38, 10)).unwrap(), + SumDecimal::try_new(DataType::Decimal128(38, 10)).unwrap(), )); b.to_async(&rt).iter(|| { black_box(agg_test( diff --git a/native/spark-expr/src/avg.rs b/native/spark-expr/src/avg.rs index 7820497d4..816440ac9 100644 --- a/native/spark-expr/src/avg.rs +++ b/native/spark-expr/src/avg.rs @@ -27,11 +27,10 @@ use datafusion::logical_expr::{ type_coercion::aggregates::avg_return_type, Accumulator, EmitTo, GroupsAccumulator, Signature, }; use datafusion_common::{not_impl_err, Result, ScalarValue}; -use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; +use datafusion_physical_expr::expressions::format_state_name; use std::{any::Any, sync::Arc}; use arrow_array::ArrowNativeTypeOp; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::Volatility::Immutable; use datafusion_expr::{AggregateUDFImpl, ReversedUDAF}; @@ -42,20 +41,19 @@ use DataType::*; pub struct Avg { name: String, signature: Signature, - expr: Arc, + // expr: Arc, input_data_type: DataType, result_data_type: DataType, } impl Avg { /// Create a new AVG aggregate function - pub fn new(expr: Arc, name: impl Into, data_type: DataType) -> Self { + pub fn new(name: impl Into, data_type: DataType) -> Self { let result_data_type = avg_return_type("avg", &data_type).unwrap(); Self { name: name.into(), signature: Signature::user_defined(Immutable), - expr, input_data_type: data_type, result_data_type, } @@ -139,20 +137,6 @@ impl AggregateUDFImpl for Avg { } } -impl PartialEq for Avg { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.input_data_type == x.input_data_type - && self.result_data_type == x.result_data_type - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - /// An accumulator to compute the average #[derive(Debug, Default)] pub struct AvgAccumulator { diff --git a/native/spark-expr/src/avg_decimal.rs b/native/spark-expr/src/avg_decimal.rs index 163e1560b..05fc28e58 100644 --- a/native/spark-expr/src/avg_decimal.rs +++ b/native/spark-expr/src/avg_decimal.rs @@ -25,14 +25,13 @@ use arrow_array::{ use arrow_schema::{DataType, Field}; use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator, Signature}; use datafusion_common::{not_impl_err, Result, ScalarValue}; -use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; +use datafusion_physical_expr::expressions::format_state_name; use std::{any::Any, sync::Arc}; use crate::utils::is_valid_decimal_precision; use arrow_array::ArrowNativeTypeOp; use arrow_data::decimal::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION}; use datafusion::logical_expr::Volatility::Immutable; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::avg_return_type; use datafusion_expr::{AggregateUDFImpl, ReversedUDAF}; @@ -43,17 +42,15 @@ use DataType::*; #[derive(Debug, Clone)] pub struct AvgDecimal { signature: Signature, - expr: Arc, sum_data_type: DataType, result_data_type: DataType, } impl AvgDecimal { /// Create a new AVG aggregate function - pub fn new(expr: Arc, result_type: DataType, sum_type: DataType) -> Self { + pub fn new(result_type: DataType, sum_type: DataType) -> Self { Self { signature: Signature::user_defined(Immutable), - expr, result_data_type: result_type, sum_data_type: sum_type, } @@ -156,19 +153,6 @@ impl AggregateUDFImpl for AvgDecimal { } } -impl PartialEq for AvgDecimal { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.sum_data_type == x.sum_data_type - && self.result_data_type == x.result_data_type - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - /// An accumulator to compute the average for decimals #[derive(Debug)] struct AvgDecimalAccumulator { diff --git a/native/spark-expr/src/bitwise_not.rs b/native/spark-expr/src/bitwise_not.rs index 36234935e..d7c31836f 100644 --- a/native/spark-expr/src/bitwise_not.rs +++ b/native/spark-expr/src/bitwise_not.rs @@ -15,21 +15,16 @@ // specific language governing permissions and limitations // under the License. -use std::{ - any::Any, - hash::{Hash, Hasher}, - sync::Arc, -}; - use arrow::{ array::*, datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion::{error::DataFusionError, logical_expr::ColumnarValue}; use datafusion_common::Result; use datafusion_physical_expr::PhysicalExpr; +use std::hash::Hash; +use std::{any::Any, sync::Arc}; macro_rules! compute_op { ($OPERAND:expr, $DT:ident) => {{ @@ -43,12 +38,24 @@ macro_rules! compute_op { } /// BitwiseNot expression -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct BitwiseNotExpr { /// Input expression arg: Arc, } +impl Hash for BitwiseNotExpr { + fn hash(&self, state: &mut H) { + self.arg.hash(state); + } +} + +impl PartialEq for BitwiseNotExpr { + fn eq(&self, other: &Self) -> bool { + self.arg.eq(&other.arg) + } +} + impl BitwiseNotExpr { /// Create new bitwise not expression pub fn new(arg: Arc) -> Self { @@ -114,21 +121,6 @@ impl PhysicalExpr for BitwiseNotExpr { ) -> Result> { Ok(Arc::new(BitwiseNotExpr::new(Arc::clone(&children[0])))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.arg.hash(&mut s); - self.hash(&mut s); - } -} - -impl PartialEq for BitwiseNotExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.arg.eq(&x.arg)) - .unwrap_or(false) - } } pub fn bitwise_not(arg: Arc) -> Result> { diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index d96bcbbdb..6e0e0915c 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -39,7 +39,6 @@ use arrow_array::builder::StringBuilder; use arrow_array::{DictionaryArray, StringArray, StructArray}; use arrow_schema::{DataType, Field, Schema}; use chrono::{NaiveDate, NaiveDateTime, TimeZone, Timelike}; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{ cast::as_generic_string_array, internal_err, Result as DataFusionResult, ScalarValue, }; @@ -54,7 +53,7 @@ use std::str::FromStr; use std::{ any::Any, fmt::{Debug, Display, Formatter}, - hash::{Hash, Hasher}, + hash::Hash, num::Wrapping, sync::Arc, }; @@ -131,13 +130,29 @@ impl TimeStampInfo { } } -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct Cast { pub child: Arc, pub data_type: DataType, pub cast_options: SparkCastOptions, } +impl PartialEq for Cast { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) + && self.data_type.eq(&other.data_type) + && self.cast_options.eq(&other.cast_options) + } +} + +impl Hash for Cast { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.data_type.hash(state); + self.cast_options.hash(state); + } +} + /// Determine if Comet supports a cast, taking options such as EvalMode and Timezone into account. pub fn cast_supported( from_type: &DataType, @@ -1681,19 +1696,6 @@ impl Display for Cast { } } -impl PartialEq for Cast { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.child.eq(&x.child) - && self.cast_options.eq(&x.cast_options) - && self.data_type.eq(&x.data_type) - }) - .unwrap_or(false) - } -} - impl PhysicalExpr for Cast { fn as_any(&self) -> &dyn Any { self @@ -1729,14 +1731,6 @@ impl PhysicalExpr for Cast { _ => internal_err!("Cast should have exactly one child"), } } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.data_type.hash(&mut s); - self.cast_options.hash(&mut s); - self.hash(&mut s); - } } fn timestamp_parser( diff --git a/native/spark-expr/src/checkoverflow.rs b/native/spark-expr/src/checkoverflow.rs index e922171bd..528bbd5d9 100644 --- a/native/spark-expr/src/checkoverflow.rs +++ b/native/spark-expr/src/checkoverflow.rs @@ -15,13 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::{ - any::Any, - fmt::{Display, Formatter}, - hash::{Hash, Hasher}, - sync::Arc, -}; - use arrow::{ array::{as_primitive_array, Array, ArrayRef, Decimal128Array}, datatypes::{Decimal128Type, DecimalType}, @@ -29,21 +22,42 @@ use arrow::{ }; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_physical_expr::PhysicalExpr; +use std::hash::Hash; +use std::{ + any::Any, + fmt::{Display, Formatter}, + sync::Arc, +}; /// This is from Spark `CheckOverflow` expression. Spark `CheckOverflow` expression rounds decimals /// to given scale and check if the decimals can fit in given precision. As `cast` kernel rounds /// decimals already, Comet `CheckOverflow` expression only checks if the decimals can fit in the /// precision. -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct CheckOverflow { pub child: Arc, pub data_type: DataType, pub fail_on_error: bool, } +impl Hash for CheckOverflow { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.data_type.hash(state); + self.fail_on_error.hash(state); + } +} + +impl PartialEq for CheckOverflow { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) + && self.data_type.eq(&other.data_type) + && self.fail_on_error.eq(&other.fail_on_error) + } +} + impl CheckOverflow { pub fn new(child: Arc, data_type: DataType, fail_on_error: bool) -> Self { Self { @@ -64,19 +78,6 @@ impl Display for CheckOverflow { } } -impl PartialEq for CheckOverflow { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.child.eq(&x.child) - && self.data_type.eq(&x.data_type) - && self.fail_on_error.eq(&x.fail_on_error) - }) - .unwrap_or(false) - } -} - impl PhysicalExpr for CheckOverflow { fn as_any(&self) -> &dyn Any { self @@ -162,12 +163,4 @@ impl PhysicalExpr for CheckOverflow { self.fail_on_error, ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.data_type.hash(&mut s); - self.fail_on_error.hash(&mut s); - self.hash(&mut s); - } } diff --git a/native/spark-expr/src/correlation.rs b/native/spark-expr/src/correlation.rs index e5f36c6f9..e4ddab95d 100644 --- a/native/spark-expr/src/correlation.rs +++ b/native/spark-expr/src/correlation.rs @@ -26,13 +26,12 @@ use arrow::{ datatypes::{DataType, Field}, }; use datafusion::logical_expr::Accumulator; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::NUMERICS; use datafusion_expr::{AggregateUDFImpl, Signature, Volatility}; +use datafusion_physical_expr::expressions::format_state_name; use datafusion_physical_expr::expressions::StatsType; -use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; /// CORR aggregate expression /// The implementation mostly is the same as the DataFusion's implementation. The reason @@ -43,26 +42,16 @@ use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; pub struct Correlation { name: String, signature: Signature, - expr1: Arc, - expr2: Arc, null_on_divide_by_zero: bool, } impl Correlation { - pub fn new( - expr1: Arc, - expr2: Arc, - name: impl Into, - data_type: DataType, - null_on_divide_by_zero: bool, - ) -> Self { + pub fn new(name: impl Into, data_type: DataType, null_on_divide_by_zero: bool) -> Self { // the result of correlation just support FLOAT64 data type. assert!(matches!(data_type, DataType::Float64)); Self { name: name.into(), signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable), - expr1, - expr2, null_on_divide_by_zero, } } @@ -131,20 +120,6 @@ impl AggregateUDFImpl for Correlation { } } -impl PartialEq for Correlation { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.expr1.eq(&x.expr1) - && self.expr2.eq(&x.expr2) - && self.null_on_divide_by_zero == x.null_on_divide_by_zero - }) - .unwrap_or(false) - } -} - /// An accumulator to compute correlation #[derive(Debug)] pub struct CorrelationAccumulator { diff --git a/native/spark-expr/src/covariance.rs b/native/spark-expr/src/covariance.rs index 9166e3976..fa3563cde 100644 --- a/native/spark-expr/src/covariance.rs +++ b/native/spark-expr/src/covariance.rs @@ -17,7 +17,7 @@ * under the License. */ -use std::{any::Any, sync::Arc}; +use std::any::Any; use arrow::{ array::{ArrayRef, Float64Array}, @@ -25,15 +25,14 @@ use arrow::{ datatypes::{DataType, Field}, }; use datafusion::logical_expr::Accumulator; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{ downcast_value, unwrap_or_internal_err, DataFusionError, Result, ScalarValue, }; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::type_coercion::aggregates::NUMERICS; use datafusion_expr::{AggregateUDFImpl, Signature, Volatility}; +use datafusion_physical_expr::expressions::format_state_name; use datafusion_physical_expr::expressions::StatsType; -use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; /// COVAR_SAMP and COVAR_POP aggregate expression /// The implementation mostly is the same as the DataFusion's implementation. The reason @@ -43,8 +42,6 @@ use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; pub struct Covariance { name: String, signature: Signature, - expr1: Arc, - expr2: Arc, stats_type: StatsType, null_on_divide_by_zero: bool, } @@ -52,8 +49,6 @@ pub struct Covariance { impl Covariance { /// Create a new COVAR aggregate function pub fn new( - expr1: Arc, - expr2: Arc, name: impl Into, data_type: DataType, stats_type: StatsType, @@ -64,8 +59,6 @@ impl Covariance { Self { name: name.into(), signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable), - expr1, - expr2, stats_type, null_on_divide_by_zero, } @@ -126,21 +119,6 @@ impl AggregateUDFImpl for Covariance { } } -impl PartialEq for Covariance { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.expr1.eq(&x.expr1) - && self.expr2.eq(&x.expr2) - && self.stats_type == x.stats_type - && self.null_on_divide_by_zero == x.null_on_divide_by_zero - }) - .unwrap_or(false) - } -} - /// An accumulator to compute covariance #[derive(Debug)] pub struct CovarianceAccumulator { diff --git a/native/spark-expr/src/if_expr.rs b/native/spark-expr/src/if_expr.rs index 193a90fb5..01c754ad6 100644 --- a/native/spark-expr/src/if_expr.rs +++ b/native/spark-expr/src/if_expr.rs @@ -15,24 +15,19 @@ // specific language governing permissions and limitations // under the License. -use std::{ - any::Any, - hash::{Hash, Hasher}, - sync::Arc, -}; - use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::Result; use datafusion_physical_expr::{expressions::CaseExpr, PhysicalExpr}; +use std::hash::Hash; +use std::{any::Any, sync::Arc}; /// IfExpr is a wrapper around CaseExpr, because `IF(a, b, c)` is semantically equivalent to /// `CASE WHEN a THEN b ELSE c END`. -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct IfExpr { if_expr: Arc, true_expr: Arc, @@ -41,6 +36,23 @@ pub struct IfExpr { case_expr: Arc, } +impl Hash for IfExpr { + fn hash(&self, state: &mut H) { + self.if_expr.hash(state); + self.true_expr.hash(state); + self.false_expr.hash(state); + self.case_expr.hash(state); + } +} +impl PartialEq for IfExpr { + fn eq(&self, other: &Self) -> bool { + self.if_expr.eq(&other.if_expr) + && self.true_expr.eq(&other.true_expr) + && self.false_expr.eq(&other.false_expr) + && self.case_expr.eq(&other.case_expr) + } +} + impl std::fmt::Display for IfExpr { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!( @@ -106,27 +118,6 @@ impl PhysicalExpr for IfExpr { Arc::clone(&children[2]), ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.if_expr.hash(&mut s); - self.true_expr.hash(&mut s); - self.false_expr.hash(&mut s); - self.hash(&mut s); - } -} - -impl PartialEq for IfExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.if_expr.eq(&x.if_expr) - && self.true_expr.eq(&x.true_expr) - && self.false_expr.eq(&x.false_expr) - }) - .unwrap_or(false) - } } #[cfg(test)] diff --git a/native/spark-expr/src/list.rs b/native/spark-expr/src/list.rs index 7dc17b568..fc31b11a0 100644 --- a/native/spark-expr/src/list.rs +++ b/native/spark-expr/src/list.rs @@ -26,16 +26,15 @@ use arrow_array::{ }; use arrow_schema::{DataType, Field, FieldRef, Schema}; use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{ cast::{as_int32_array, as_large_list_array, as_list_array}, internal_err, DataFusionError, Result as DataFusionResult, ScalarValue, }; use datafusion_physical_expr::PhysicalExpr; +use std::hash::Hash; use std::{ any::Any, fmt::{Debug, Display, Formatter}, - hash::{Hash, Hasher}, sync::Arc, }; @@ -44,7 +43,7 @@ use std::{ // https://github.com/apache/spark/blob/master/common/utils/src/main/java/org/apache/spark/unsafe/array/ByteArrayUtils.java const MAX_ROUNDED_ARRAY_LENGTH: usize = 2147483632; -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct ListExtract { child: Arc, ordinal: Arc, @@ -53,6 +52,25 @@ pub struct ListExtract { fail_on_error: bool, } +impl Hash for ListExtract { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.ordinal.hash(state); + self.default_value.hash(state); + self.one_based.hash(state); + self.fail_on_error.hash(state); + } +} +impl PartialEq for ListExtract { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) + && self.ordinal.eq(&other.ordinal) + && self.default_value.eq(&other.default_value) + && self.one_based.eq(&other.one_based) + && self.fail_on_error.eq(&other.fail_on_error) + } +} + impl ListExtract { pub fn new( child: Arc, @@ -176,16 +194,6 @@ impl PhysicalExpr for ListExtract { _ => internal_err!("ListExtract should have exactly two children"), } } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.ordinal.hash(&mut s); - self.default_value.hash(&mut s); - self.one_based.hash(&mut s); - self.fail_on_error.hash(&mut s); - self.hash(&mut s); - } } fn one_based_index(index: i32, len: usize) -> DataFusionResult> { @@ -267,33 +275,24 @@ impl Display for ListExtract { } } -impl PartialEq for ListExtract { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.child.eq(&x.child) - && self.ordinal.eq(&x.ordinal) - && (self.default_value.is_none() == x.default_value.is_none()) - && self - .default_value - .as_ref() - .zip(x.default_value.as_ref()) - .map(|(s, x)| s.eq(x)) - .unwrap_or(true) - && self.one_based.eq(&x.one_based) - && self.fail_on_error.eq(&x.fail_on_error) - }) - .unwrap_or(false) - } -} - -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct GetArrayStructFields { child: Arc, ordinal: usize, } +impl Hash for GetArrayStructFields { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.ordinal.hash(state); + } +} +impl PartialEq for GetArrayStructFields { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal) + } +} + impl GetArrayStructFields { pub fn new(child: Arc, ordinal: usize) -> Self { Self { child, ordinal } @@ -379,13 +378,6 @@ impl PhysicalExpr for GetArrayStructFields { _ => internal_err!("GetArrayStructFields should have exactly one child"), } } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.ordinal.hash(&mut s); - self.hash(&mut s); - } } fn get_array_struct_fields( @@ -417,16 +409,7 @@ impl Display for GetArrayStructFields { } } -impl PartialEq for GetArrayStructFields { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child) && self.ordinal.eq(&x.ordinal)) - .unwrap_or(false) - } -} - -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct ArrayInsert { src_array_expr: Arc, pos_expr: Arc, @@ -434,6 +417,23 @@ pub struct ArrayInsert { legacy_negative_index: bool, } +impl Hash for ArrayInsert { + fn hash(&self, state: &mut H) { + self.src_array_expr.hash(state); + self.pos_expr.hash(state); + self.item_expr.hash(state); + self.legacy_negative_index.hash(state); + } +} +impl PartialEq for ArrayInsert { + fn eq(&self, other: &Self) -> bool { + self.src_array_expr.eq(&other.src_array_expr) + && self.pos_expr.eq(&other.pos_expr) + && self.item_expr.eq(&other.item_expr) + && self.legacy_negative_index.eq(&other.legacy_negative_index) + } +} + impl ArrayInsert { pub fn new( src_array_expr: Arc, @@ -555,15 +555,6 @@ impl PhysicalExpr for ArrayInsert { _ => internal_err!("ArrayInsert should have exactly three childrens"), } } - - fn dyn_hash(&self, _state: &mut dyn Hasher) { - let mut s = _state; - self.src_array_expr.hash(&mut s); - self.pos_expr.hash(&mut s); - self.item_expr.hash(&mut s); - self.legacy_negative_index.hash(&mut s); - self.hash(&mut s); - } } fn array_insert( @@ -694,20 +685,6 @@ impl Display for ArrayInsert { } } -impl PartialEq for ArrayInsert { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.src_array_expr.eq(&x.src_array_expr) - && self.pos_expr.eq(&x.pos_expr) - && self.item_expr.eq(&x.item_expr) - && self.legacy_negative_index.eq(&x.legacy_negative_index) - }) - .unwrap_or(false) - } -} - #[cfg(test)] mod test { use crate::list::{array_insert, list_extract, zero_based_index}; diff --git a/native/spark-expr/src/negative.rs b/native/spark-expr/src/negative.rs index 3d9063e78..7fb508917 100644 --- a/native/spark-expr/src/negative.rs +++ b/native/spark-expr/src/negative.rs @@ -21,18 +21,14 @@ use arrow::{compute::kernels::numeric::neg_wrapping, datatypes::IntervalDayTimeT use arrow_array::RecordBatch; use arrow_buffer::IntervalDayTime; use arrow_schema::{DataType, Schema}; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion::{ logical_expr::{interval_arithmetic::Interval, ColumnarValue}, physical_expr::PhysicalExpr, }; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::sort_properties::ExprProperties; -use std::{ - any::Any, - hash::{Hash, Hasher}, - sync::Arc, -}; +use std::hash::Hash; +use std::{any::Any, sync::Arc}; pub fn create_negate_expr( expr: Arc, @@ -42,13 +38,26 @@ pub fn create_negate_expr( } /// Negative expression -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct NegativeExpr { /// Input expression arg: Arc, fail_on_error: bool, } +impl Hash for NegativeExpr { + fn hash(&self, state: &mut H) { + self.arg.hash(state); + self.fail_on_error.hash(state); + } +} + +impl PartialEq for NegativeExpr { + fn eq(&self, other: &Self) -> bool { + self.arg.eq(&other.arg) && self.fail_on_error.eq(&other.fail_on_error) + } +} + macro_rules! check_overflow { ($array:expr, $array_type:ty, $min_val:expr, $type_name:expr) => {{ let typed_array = $array @@ -204,11 +213,6 @@ impl PhysicalExpr for NegativeExpr { ))) } - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.hash(&mut s); - } - /// Given the child interval of a NegativeExpr, it calculates the NegativeExpr's interval. /// It replaces the upper and lower bounds after multiplying them with -1. /// Ex: `(a, b]` => `[-b, -a)` @@ -255,12 +259,3 @@ impl PhysicalExpr for NegativeExpr { Ok(properties) } } - -impl PartialEq for NegativeExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.arg.eq(&x.arg)) - .unwrap_or(false) - } -} diff --git a/native/spark-expr/src/normalize_nan.rs b/native/spark-expr/src/normalize_nan.rs index c5331ad7b..078ce4b5a 100644 --- a/native/spark-expr/src/normalize_nan.rs +++ b/native/spark-expr/src/normalize_nan.rs @@ -15,13 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::{ - any::Any, - fmt::{Display, Formatter}, - hash::{Hash, Hasher}, - sync::Arc, -}; - use arrow::{ array::{as_primitive_array, ArrayAccessor, ArrayIter, Float32Array, Float64Array}, datatypes::{ArrowNativeType, Float32Type, Float64Type}, @@ -29,15 +22,33 @@ use arrow::{ }; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_physical_expr::PhysicalExpr; +use std::hash::Hash; +use std::{ + any::Any, + fmt::{Display, Formatter}, + sync::Arc, +}; -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct NormalizeNaNAndZero { pub data_type: DataType, pub child: Arc, } +impl PartialEq for NormalizeNaNAndZero { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) && self.data_type.eq(&other.data_type) + } +} + +impl Hash for NormalizeNaNAndZero { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.data_type.hash(state); + } +} + impl NormalizeNaNAndZero { pub fn new(data_type: DataType, child: Arc) -> Self { Self { data_type, child } @@ -89,13 +100,6 @@ impl PhysicalExpr for NormalizeNaNAndZero { Arc::clone(&children[0]), ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.data_type.hash(&mut s); - self.hash(&mut s); - } } fn eval_typed>(input: T) -> Vec> { @@ -120,15 +124,6 @@ impl Display for NormalizeNaNAndZero { } } -impl PartialEq for NormalizeNaNAndZero { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child) && self.data_type.eq(&x.data_type)) - .unwrap_or(false) - } -} - trait FloatDouble: ArrowNativeType { fn is_nan(&self) -> bool; fn nan(&self) -> Self; diff --git a/native/spark-expr/src/regexp.rs b/native/spark-expr/src/regexp.rs index c7626285a..7f367a8bb 100644 --- a/native/spark-expr/src/regexp.rs +++ b/native/spark-expr/src/regexp.rs @@ -21,7 +21,7 @@ use arrow_array::builder::BooleanBuilder; use arrow_array::types::Int32Type; use arrow_array::{Array, BooleanArray, DictionaryArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Schema}; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; +use datafusion::physical_expr_common::physical_expr::DynEq; use datafusion_common::{internal_err, Result}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr::PhysicalExpr; @@ -53,6 +53,16 @@ impl Hash for RLike { } } +impl DynEq for RLike { + fn dyn_eq(&self, other: &dyn Any) -> bool { + if let Some(other) = other.downcast_ref::() { + self.pattern_str == other.pattern_str + } else { + false + } + } +} + impl RLike { pub fn try_new(child: Arc, pattern: &str) -> Result { Ok(Self { @@ -93,15 +103,6 @@ impl Display for RLike { } } -impl PartialEq for RLike { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child) && self.pattern_str.eq(&x.pattern_str)) - .unwrap_or(false) - } -} - impl PhysicalExpr for RLike { fn as_any(&self) -> &dyn Any { self @@ -161,10 +162,4 @@ impl PhysicalExpr for RLike { &self.pattern_str, )?)) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - use std::hash::Hash; - let mut s = state; - self.hash(&mut s); - } } diff --git a/native/spark-expr/src/stddev.rs b/native/spark-expr/src/stddev.rs index 3cf604da0..1ec5ffb69 100644 --- a/native/spark-expr/src/stddev.rs +++ b/native/spark-expr/src/stddev.rs @@ -23,12 +23,12 @@ use arrow::{ datatypes::{DataType, Field}, }; use datafusion::logical_expr::Accumulator; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; +use datafusion_common::types::NativeType; use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::{AggregateUDFImpl, Signature, Volatility}; +use datafusion_physical_expr::expressions::format_state_name; use datafusion_physical_expr::expressions::StatsType; -use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; /// STDDEV and STDDEV_SAMP (standard deviation) aggregate expression /// The implementation mostly is the same as the DataFusion's implementation. The reason @@ -39,7 +39,6 @@ use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; pub struct Stddev { name: String, signature: Signature, - expr: Arc, stats_type: StatsType, null_on_divide_by_zero: bool, } @@ -47,7 +46,6 @@ pub struct Stddev { impl Stddev { /// Create a new STDDEV aggregate function pub fn new( - expr: Arc, name: impl Into, data_type: DataType, stats_type: StatsType, @@ -57,8 +55,14 @@ impl Stddev { assert!(matches!(data_type, DataType::Float64)); Self { name: name.into(), - signature: Signature::coercible(vec![DataType::Float64], Volatility::Immutable), - expr, + signature: Signature::coercible( + vec![ + datafusion_expr_common::signature::TypeSignatureClass::Native(Arc::new( + NativeType::Float64, + )), + ], + Volatility::Immutable, + ), stats_type, null_on_divide_by_zero, } @@ -121,20 +125,6 @@ impl AggregateUDFImpl for Stddev { } } -impl PartialEq for Stddev { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.expr.eq(&x.expr) - && self.null_on_divide_by_zero == x.null_on_divide_by_zero - && self.stats_type == x.stats_type - }) - .unwrap_or(false) - } -} - /// An accumulator to compute the standard deviation #[derive(Debug)] pub struct StddevAccumulator { diff --git a/native/spark-expr/src/strings.rs b/native/spark-expr/src/strings.rs index a8aab6aee..c2706b589 100644 --- a/native/spark-expr/src/strings.rs +++ b/native/spark-expr/src/strings.rs @@ -27,19 +27,18 @@ use arrow::{ }; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{DataFusionError, ScalarValue::Utf8}; use datafusion_physical_expr::PhysicalExpr; use std::{ any::Any, fmt::{Display, Formatter}, - hash::{Hash, Hasher}, + hash::Hash, sync::Arc, }; macro_rules! make_predicate_function { ($name: ident, $kernel: ident, $str_scalar_kernel: ident) => { - #[derive(Debug, Hash)] + #[derive(Debug, Eq)] pub struct $name { left: Arc, right: Arc, @@ -57,12 +56,16 @@ macro_rules! make_predicate_function { } } - impl PartialEq for $name { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.left.eq(&x.left) && self.right.eq(&x.right)) - .unwrap_or(false) + impl Hash for $name { + fn hash(&self, state: &mut H) { + self.left.hash(state); + self.right.hash(state); + } + } + + impl PartialEq for $name { + fn eq(&self, other: &Self) -> bool { + self.left.eq(&other.left) && self.right.eq(&other.right) } } @@ -122,13 +125,6 @@ macro_rules! make_predicate_function { children[1].clone(), ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.left.hash(&mut s); - self.right.hash(&mut s); - self.hash(&mut s); - } } }; } @@ -141,18 +137,43 @@ make_predicate_function!(EndsWith, ends_with_dyn, ends_with_utf8_scalar_dyn); make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn); -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct SubstringExpr { pub child: Arc, pub start: i64, pub len: u64, } -#[derive(Debug, Hash)] +impl Hash for SubstringExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.start.hash(state); + self.len.hash(state); + } +} + +impl PartialEq for SubstringExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) && self.start.eq(&other.start) && self.len.eq(&other.len) + } +} +#[derive(Debug, Eq)] pub struct StringSpaceExpr { pub child: Arc, } +impl Hash for StringSpaceExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + } +} + +impl PartialEq for StringSpaceExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) + } +} + impl SubstringExpr { pub fn new(child: Arc, start: i64, len: u64) -> Self { Self { child, start, len } @@ -181,15 +202,6 @@ impl Display for StringSpaceExpr { } } -impl PartialEq for SubstringExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child) && self.start.eq(&x.start) && self.len.eq(&x.len)) - .unwrap_or(false) - } -} - impl PhysicalExpr for SubstringExpr { fn as_any(&self) -> &dyn Any { self @@ -231,23 +243,6 @@ impl PhysicalExpr for SubstringExpr { self.len, ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.start.hash(&mut s); - self.len.hash(&mut s); - self.hash(&mut s); - } -} - -impl PartialEq for StringSpaceExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child)) - .unwrap_or(false) - } } impl PhysicalExpr for StringSpaceExpr { @@ -292,10 +287,4 @@ impl PhysicalExpr for StringSpaceExpr { ) -> datafusion_common::Result> { Ok(Arc::new(StringSpaceExpr::new(Arc::clone(&children[0])))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.hash(&mut s); - } } diff --git a/native/spark-expr/src/structs.rs b/native/spark-expr/src/structs.rs index cda8246d9..7cc49e428 100644 --- a/native/spark-expr/src/structs.rs +++ b/native/spark-expr/src/structs.rs @@ -19,17 +19,16 @@ use arrow::record_batch::RecordBatch; use arrow_array::{Array, StructArray}; use arrow_schema::{DataType, Field, Schema}; use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{DataFusionError, Result as DataFusionResult, ScalarValue}; use datafusion_physical_expr::PhysicalExpr; use std::{ any::Any, fmt::{Display, Formatter}, - hash::{Hash, Hasher}, + hash::Hash, sync::Arc, }; -#[derive(Debug, Hash)] +#[derive(Debug, Hash, PartialEq, Eq)] pub struct CreateNamedStruct { values: Vec>, names: Vec, @@ -95,13 +94,6 @@ impl PhysicalExpr for CreateNamedStruct { self.names.clone(), ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.values.hash(&mut s); - self.names.hash(&mut s); - self.hash(&mut s); - } } impl Display for CreateNamedStruct { @@ -114,29 +106,24 @@ impl Display for CreateNamedStruct { } } -impl PartialEq for CreateNamedStruct { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.values - .iter() - .zip(x.values.iter()) - .all(|(a, b)| a.eq(b)) - && self.values.len() == x.values.len() - && self.names.iter().zip(x.names.iter()).all(|(a, b)| a.eq(b)) - && self.names.len() == x.names.len() - }) - .unwrap_or(false) - } -} - -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct GetStructField { child: Arc, ordinal: usize, } +impl Hash for GetStructField { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.ordinal.hash(state); + } +} +impl PartialEq for GetStructField { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal) + } +} + impl GetStructField { pub fn new(child: Arc, ordinal: usize) -> Self { Self { child, ordinal } @@ -203,13 +190,6 @@ impl PhysicalExpr for GetStructField { self.ordinal, ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.ordinal.hash(&mut s); - self.hash(&mut s); - } } impl Display for GetStructField { @@ -222,15 +202,6 @@ impl Display for GetStructField { } } -impl PartialEq for GetStructField { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child) && self.ordinal.eq(&x.ordinal)) - .unwrap_or(false) - } -} - #[cfg(test)] mod test { use super::CreateNamedStruct; diff --git a/native/spark-expr/src/sum_decimal.rs b/native/spark-expr/src/sum_decimal.rs index ab142aee6..f3f34d9bf 100644 --- a/native/spark-expr/src/sum_decimal.rs +++ b/native/spark-expr/src/sum_decimal.rs @@ -25,20 +25,16 @@ use arrow_array::{ }; use arrow_schema::{DataType, Field}; use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator}; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{DataFusionError, Result as DFResult, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::Volatility::Immutable; use datafusion_expr::{AggregateUDFImpl, ReversedUDAF, Signature}; -use datafusion_physical_expr::PhysicalExpr; use std::{any::Any, ops::BitAnd, sync::Arc}; #[derive(Debug)] pub struct SumDecimal { /// Aggregate function signature signature: Signature, - /// The expression that provides the input decimal values to be summed - expr: Arc, /// The data type of the SUM result. This will always be a decimal type /// with the same precision and scale as specified in this struct result_type: DataType, @@ -49,7 +45,7 @@ pub struct SumDecimal { } impl SumDecimal { - pub fn try_new(expr: Arc, data_type: DataType) -> DFResult { + pub fn try_new(data_type: DataType) -> DFResult { // The `data_type` is the SUM result type passed from Spark side let (precision, scale) = match data_type { DataType::Decimal128(p, s) => (p, s), @@ -61,7 +57,6 @@ impl SumDecimal { }; Ok(Self { signature: Signature::user_defined(Immutable), - expr, result_type: data_type, precision, scale, @@ -132,20 +127,6 @@ impl AggregateUDFImpl for SumDecimal { } } -impl PartialEq for SumDecimal { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - // note that we do not compare result_type because this - // is guaranteed to match if the precision and scale - // match - self.precision == x.precision && self.scale == x.scale && self.expr.eq(&x.expr) - }) - .unwrap_or(false) - } -} - #[derive(Debug)] struct SumDecimalAccumulator { sum: i128, @@ -491,13 +472,13 @@ mod tests { use datafusion_common::Result; use datafusion_expr::AggregateUDF; use datafusion_physical_expr::aggregate::AggregateExprBuilder; - use datafusion_physical_expr::expressions::{Column, Literal}; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr::PhysicalExpr; use futures::StreamExt; #[test] fn invalid_data_type() { - let expr = Arc::new(Literal::new(ScalarValue::Int32(Some(1)))); - assert!(SumDecimal::try_new(expr, DataType::Int32).is_err()); + assert!(SumDecimal::try_new(DataType::Int32).is_err()); } #[tokio::test] @@ -518,7 +499,6 @@ mod tests { Arc::new(MemoryExec::try_new(partitions, Arc::clone(&schema), None).unwrap()); let aggregate_udf = Arc::new(AggregateUDF::new_from_impl(SumDecimal::try_new( - Arc::clone(&c1), data_type.clone(), )?)); diff --git a/native/spark-expr/src/temporal.rs b/native/spark-expr/src/temporal.rs index 91953dd60..fb549f9ce 100644 --- a/native/spark-expr/src/temporal.rs +++ b/native/spark-expr/src/temporal.rs @@ -15,36 +15,45 @@ // specific language governing permissions and limitations // under the License. -use std::{ - any::Any, - fmt::{Debug, Display, Formatter}, - hash::{Hash, Hasher}, - sync::Arc, -}; - +use crate::utils::array_with_timezone; use arrow::{ compute::{date_part, DatePart}, record_batch::RecordBatch, }; use arrow_schema::{DataType, Schema, TimeUnit::Microsecond}; use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{DataFusionError, ScalarValue::Utf8}; use datafusion_physical_expr::PhysicalExpr; - -use crate::utils::array_with_timezone; +use std::hash::Hash; +use std::{ + any::Any, + fmt::{Debug, Display, Formatter}, + sync::Arc, +}; use crate::kernels::temporal::{ date_trunc_array_fmt_dyn, date_trunc_dyn, timestamp_trunc_array_fmt_dyn, timestamp_trunc_dyn, }; -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct HourExpr { /// An array with DataType::Timestamp(TimeUnit::Microsecond, None) child: Arc, timezone: String, } +impl Hash for HourExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.timezone.hash(state); + } +} +impl PartialEq for HourExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) && self.timezone.eq(&other.timezone) + } +} + impl HourExpr { pub fn new(child: Arc, timezone: String) -> Self { HourExpr { child, timezone } @@ -61,15 +70,6 @@ impl Display for HourExpr { } } -impl PartialEq for HourExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child) && self.timezone.eq(&x.timezone)) - .unwrap_or(false) - } -} - impl PhysicalExpr for HourExpr { fn as_any(&self) -> &dyn Any { self @@ -123,22 +123,27 @@ impl PhysicalExpr for HourExpr { self.timezone.clone(), ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.timezone.hash(&mut s); - self.hash(&mut s); - } } -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct MinuteExpr { /// An array with DataType::Timestamp(TimeUnit::Microsecond, None) child: Arc, timezone: String, } +impl Hash for MinuteExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.timezone.hash(state); + } +} +impl PartialEq for MinuteExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) && self.timezone.eq(&other.timezone) + } +} + impl MinuteExpr { pub fn new(child: Arc, timezone: String) -> Self { MinuteExpr { child, timezone } @@ -155,15 +160,6 @@ impl Display for MinuteExpr { } } -impl PartialEq for MinuteExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child) && self.timezone.eq(&x.timezone)) - .unwrap_or(false) - } -} - impl PhysicalExpr for MinuteExpr { fn as_any(&self) -> &dyn Any { self @@ -217,22 +213,27 @@ impl PhysicalExpr for MinuteExpr { self.timezone.clone(), ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.timezone.hash(&mut s); - self.hash(&mut s); - } } -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct SecondExpr { /// An array with DataType::Timestamp(TimeUnit::Microsecond, None) child: Arc, timezone: String, } +impl Hash for SecondExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.timezone.hash(state); + } +} +impl PartialEq for SecondExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) && self.timezone.eq(&other.timezone) + } +} + impl SecondExpr { pub fn new(child: Arc, timezone: String) -> Self { SecondExpr { child, timezone } @@ -249,15 +250,6 @@ impl Display for SecondExpr { } } -impl PartialEq for SecondExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child) && self.timezone.eq(&x.timezone)) - .unwrap_or(false) - } -} - impl PhysicalExpr for SecondExpr { fn as_any(&self) -> &dyn Any { self @@ -311,16 +303,9 @@ impl PhysicalExpr for SecondExpr { self.timezone.clone(), ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.timezone.hash(&mut s); - self.hash(&mut s); - } } -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct DateTruncExpr { /// An array with DataType::Date32 child: Arc, @@ -328,6 +313,18 @@ pub struct DateTruncExpr { format: Arc, } +impl Hash for DateTruncExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.format.hash(state); + } +} +impl PartialEq for DateTruncExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) && self.format.eq(&other.format) + } +} + impl DateTruncExpr { pub fn new(child: Arc, format: Arc) -> Self { DateTruncExpr { child, format } @@ -344,15 +341,6 @@ impl Display for DateTruncExpr { } } -impl PartialEq for DateTruncExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self.child.eq(&x.child) && self.format.eq(&x.format)) - .unwrap_or(false) - } -} - impl PhysicalExpr for DateTruncExpr { fn as_any(&self) -> &dyn Any { self @@ -398,16 +386,9 @@ impl PhysicalExpr for DateTruncExpr { Arc::clone(&self.format), ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.format.hash(&mut s); - self.hash(&mut s); - } } -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct TimestampTruncExpr { /// An array with DataType::Timestamp(TimeUnit::Microsecond, None) child: Arc, @@ -422,6 +403,21 @@ pub struct TimestampTruncExpr { timezone: String, } +impl Hash for TimestampTruncExpr { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.format.hash(state); + self.timezone.hash(state); + } +} +impl PartialEq for TimestampTruncExpr { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) + && self.format.eq(&other.format) + && self.timezone.eq(&other.timezone) + } +} + impl TimestampTruncExpr { pub fn new( child: Arc, @@ -446,19 +442,6 @@ impl Display for TimestampTruncExpr { } } -impl PartialEq for TimestampTruncExpr { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.child.eq(&x.child) - && self.format.eq(&x.format) - && self.timezone.eq(&x.timezone) - }) - .unwrap_or(false) - } -} - impl PhysicalExpr for TimestampTruncExpr { fn as_any(&self) -> &dyn Any { self @@ -524,12 +507,4 @@ impl PhysicalExpr for TimestampTruncExpr { self.timezone.clone(), ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.child.hash(&mut s); - self.format.hash(&mut s); - self.timezone.hash(&mut s); - self.hash(&mut s); - } } diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 1f68eb860..91b46c6f0 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -29,11 +29,11 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr::PhysicalExpr; use std::any::Any; use std::fmt::{Debug, Display, Formatter}; -use std::hash::{Hash, Hasher}; +use std::hash::Hash; use std::sync::Arc; /// to_json function -#[derive(Debug, Hash)] +#[derive(Debug, Eq)] pub struct ToJson { /// The input to convert to JSON expr: Arc, @@ -41,6 +41,18 @@ pub struct ToJson { timezone: String, } +impl Hash for ToJson { + fn hash(&self, state: &mut H) { + self.expr.hash(state); + self.timezone.hash(state); + } +} +impl PartialEq for ToJson { + fn eq(&self, other: &Self) -> bool { + self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone) + } +} + impl ToJson { pub fn new(expr: Arc, timezone: &str) -> Self { Self { @@ -101,13 +113,6 @@ impl PhysicalExpr for ToJson { &self.timezone, ))) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.expr.hash(&mut s); - self.timezone.hash(&mut s); - self.hash(&mut s); - } } /// Convert an array into a JSON value string representation diff --git a/native/spark-expr/src/unbound.rs b/native/spark-expr/src/unbound.rs index a6babd0f7..14f68c9cd 100644 --- a/native/spark-expr/src/unbound.rs +++ b/native/spark-expr/src/unbound.rs @@ -17,15 +17,10 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Schema}; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion::physical_plan::ColumnarValue; use datafusion_common::{internal_err, Result}; use datafusion_physical_expr::PhysicalExpr; -use std::{ - any::Any, - hash::{Hash, Hasher}, - sync::Arc, -}; +use std::{hash::Hash, sync::Arc}; /// This is similar to `UnKnownColumn` in DataFusion, but it has data type. /// This is only used when the column is not bound to a schema, for example, the @@ -93,18 +88,4 @@ impl PhysicalExpr for UnboundColumn { ) -> Result> { Ok(self) } - - fn dyn_hash(&self, state: &mut dyn Hasher) { - let mut s = state; - self.hash(&mut s); - } -} - -impl PartialEq for UnboundColumn { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| self == x) - .unwrap_or(false) - } } diff --git a/native/spark-expr/src/variance.rs b/native/spark-expr/src/variance.rs index 4370d89ff..e71d713f5 100644 --- a/native/spark-expr/src/variance.rs +++ b/native/spark-expr/src/variance.rs @@ -15,20 +15,19 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, sync::Arc}; +use std::any::Any; use arrow::{ array::{ArrayRef, Float64Array}, datatypes::{DataType, Field}, }; use datafusion::logical_expr::Accumulator; -use datafusion::physical_expr_common::physical_expr::down_cast_any_ref; use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::Volatility::Immutable; use datafusion_expr::{AggregateUDFImpl, Signature}; +use datafusion_physical_expr::expressions::format_state_name; use datafusion_physical_expr::expressions::StatsType; -use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; /// VAR_SAMP and VAR_POP aggregate expression /// The implementation mostly is the same as the DataFusion's implementation. The reason @@ -39,7 +38,6 @@ use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr}; pub struct Variance { name: String, signature: Signature, - expr: Arc, stats_type: StatsType, null_on_divide_by_zero: bool, } @@ -47,7 +45,6 @@ pub struct Variance { impl Variance { /// Create a new VARIANCE aggregate function pub fn new( - expr: Arc, name: impl Into, data_type: DataType, stats_type: StatsType, @@ -58,7 +55,6 @@ impl Variance { Self { name: name.into(), signature: Signature::numeric(1, Immutable), - expr, stats_type, null_on_divide_by_zero, } @@ -118,17 +114,6 @@ impl AggregateUDFImpl for Variance { } } -impl PartialEq for Variance { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name && self.expr.eq(&x.expr) && self.stats_type == x.stats_type - }) - .unwrap_or(false) - } -} - /// An accumulator to compute variance #[derive(Debug)] pub struct VarianceAccumulator {