From c3e27c0405cd2ce3888dc1943323113735c7859e Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Fri, 7 Jun 2024 07:11:29 +0200 Subject: [PATCH 1/3] Fix overflow in date_parser (#529) The date_parser was introduced in #383 and is mostly a direct port of code in Spark. Since the code uses the JVM it has defined integer overflow as wrapping. The proposed fixed is to use std::num::Wrapping to get the same wrapping behavior in rust. The overflown value will still be disgarded in a later check that uses `current_segment_digits` so allowing the overflow does not lead to correctness issues. This resolves one of the overflows discussed in #481 --- .../execution/datafusion/expressions/cast.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/execution/datafusion/expressions/cast.rs b/core/src/execution/datafusion/expressions/cast.rs index 7e8a96f28..a39587b6e 100644 --- a/core/src/execution/datafusion/expressions/cast.rs +++ b/core/src/execution/datafusion/expressions/cast.rs @@ -19,6 +19,7 @@ use std::{ any::Any, fmt::{Debug, Display, Formatter}, hash::{Hash, Hasher}, + num::Wrapping, sync::Arc, }; @@ -1570,7 +1571,7 @@ fn date_parser(date_str: &str, eval_mode: EvalMode) -> CometResult> let mut date_segments = [1, 1, 1]; let mut sign = 1; let mut current_segment = 0; - let mut current_segment_value = 0; + let mut current_segment_value = Wrapping(0); let mut current_segment_digits = 0; let bytes = date_str.as_bytes(); @@ -1597,16 +1598,16 @@ fn date_parser(date_str: &str, eval_mode: EvalMode) -> CometResult> return return_result(date_str, eval_mode); } //if valid update corresponding segment with the current segment value. - date_segments[current_segment as usize] = current_segment_value; - current_segment_value = 0; + date_segments[current_segment as usize] = current_segment_value.0; + current_segment_value = Wrapping(0); current_segment_digits = 0; current_segment += 1; } else if !b.is_ascii_digit() { return return_result(date_str, eval_mode); } else { //increment value of current segment by the next digit - let parsed_value = (b - b'0') as i32; - current_segment_value = current_segment_value * 10 + parsed_value; + let parsed_value = Wrapping((b - b'0') as i32); + current_segment_value = current_segment_value * Wrapping(10) + parsed_value; current_segment_digits += 1; } j += 1; @@ -1622,7 +1623,7 @@ fn date_parser(date_str: &str, eval_mode: EvalMode) -> CometResult> return return_result(date_str, eval_mode); } - date_segments[current_segment as usize] = current_segment_value; + date_segments[current_segment as usize] = current_segment_value.0; match NaiveDate::from_ymd_opt( sign * date_segments[0], @@ -1836,6 +1837,8 @@ mod tests { Some(" 202 "), Some("\n 2020-\r8 "), Some("2020-01-01T"), + // Overflows i32 + Some("-4607172990231812908"), ])); for eval_mode in &[EvalMode::Legacy, EvalMode::Try] { @@ -1857,7 +1860,8 @@ mod tests { None, None, None, - Some(18262) + Some(18262), + None ] ); } From c6d387c79507a1e4a71ecae5a26f7fc437d4f1d2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 6 Jun 2024 23:12:36 -0600 Subject: [PATCH 2/3] feat: Add fuzz testing for arithmetic expressions (#519) * Add fuzz tests for aritmetic expressions * add unary math * add bit-wise expressions * bug fix --- fuzz-testing/README.md | 1 - .../scala/org/apache/comet/fuzz/Meta.scala | 4 +++ .../org/apache/comet/fuzz/QueryGen.scala | 31 ++++++++++++++++++- .../org/apache/comet/fuzz/QueryRunner.scala | 6 ++-- 4 files changed, 38 insertions(+), 4 deletions(-) diff --git a/fuzz-testing/README.md b/fuzz-testing/README.md index 076ff6aea..0f5f4f606 100644 --- a/fuzz-testing/README.md +++ b/fuzz-testing/README.md @@ -32,7 +32,6 @@ Planned areas of improvement: - ANSI mode - Support for all data types, expressions, and operators supported by Comet -- Unary and binary arithmetic expressions - IF and CASE WHEN expressions - Complex (nested) expressions - Literal scalar values in queries diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala index 13ebbf9ed..bbba5a443 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala @@ -106,4 +106,8 @@ object Meta { Function("stddev_samp", 1), Function("corr", 2)) + val unaryArithmeticOps: Seq[String] = Seq("+", "-") + + val binaryArithmeticOps: Seq[String] = Seq("+", "-", "*", "/", "%", "&", "|", "^") + } diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala index 7584e76ce..e75726d7b 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala @@ -42,11 +42,13 @@ object QueryGen { val uniqueQueries = mutable.HashSet[String]() for (_ <- 0 until numQueries) { - val sql = r.nextInt().abs % 4 match { + val sql = r.nextInt().abs % 6 match { case 0 => generateJoin(r, spark, numFiles) case 1 => generateAggregate(r, spark, numFiles) case 2 => generateScalar(r, spark, numFiles) case 3 => generateCast(r, spark, numFiles) + case 4 => generateUnaryArithmetic(r, spark, numFiles) + case 5 => generateBinaryArithmetic(r, spark, numFiles) } if (!uniqueQueries.contains(sql)) { uniqueQueries += sql @@ -92,6 +94,33 @@ object QueryGen { s"ORDER BY ${args.mkString(", ")};" } + private def generateUnaryArithmetic(r: Random, spark: SparkSession, numFiles: Int): String = { + val tableName = s"test${r.nextInt(numFiles)}" + val table = spark.table(tableName) + + val op = Utils.randomChoice(Meta.unaryArithmeticOps, r) + val a = Utils.randomChoice(table.columns, r) + + // Example SELECT a, -a FROM test0 + s"SELECT $a, $op$a " + + s"FROM $tableName " + + s"ORDER BY $a;" + } + + private def generateBinaryArithmetic(r: Random, spark: SparkSession, numFiles: Int): String = { + val tableName = s"test${r.nextInt(numFiles)}" + val table = spark.table(tableName) + + val op = Utils.randomChoice(Meta.binaryArithmeticOps, r) + val a = Utils.randomChoice(table.columns, r) + val b = Utils.randomChoice(table.columns, r) + + // Example SELECT a, b, a+b FROM test0 + s"SELECT $a, $b, $a $op $b " + + s"FROM $tableName " + + s"ORDER BY $a, $b;" + } + private def generateCast(r: Random, spark: SparkSession, numFiles: Int): String = { val tableName = s"test${r.nextInt(numFiles)}" val table = spark.table(tableName) diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala index b2ceae9d0..f928c93a2 100644 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala +++ b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala @@ -19,7 +19,7 @@ package org.apache.comet.fuzz -import java.io.{BufferedWriter, FileWriter, PrintWriter} +import java.io.{BufferedWriter, FileWriter, PrintWriter, StringWriter} import scala.io.Source @@ -111,9 +111,11 @@ object QueryRunner { showSQL(w, sql) w.write(s"[ERROR] Query failed in Comet: ${e.getMessage}:\n") w.write("```\n") - val p = new PrintWriter(w) + val sw = new StringWriter() + val p = new PrintWriter(sw) e.printStackTrace(p) p.close() + w.write(s"${sw.toString}\n") w.write("```\n") } From fd596ed985643be5be65fb71806b358219a146d9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 7 Jun 2024 12:07:01 -0700 Subject: [PATCH 3/3] build: Switch back to official DataFusion repo and arrow-rs after Arrow Java 16 is released (#403) * build: Switch back to released version of DataFusion and arrow-rs * Exclude all arrow dependencies from Spark * Revert "build: Switch back to released version of DataFusion and arrow-rs" This reverts commit 29c89bfb25ddf4757ab17f951d3ccf17e55422da. * Test * Test * Test arrow-rs fix * Fix * Use DataFusion repo * Fix * Fix * Use 39.0.0-rc1 --- core/Cargo.lock | 240 ++++++++++++------ core/Cargo.toml | 25 +- .../datafusion/expressions/bitwise_not.rs | 4 +- .../expressions/bloom_filter_might_contain.rs | 4 +- .../execution/datafusion/expressions/cast.rs | 4 +- .../datafusion/expressions/checkoverflow.rs | 4 +- .../datafusion/expressions/if_expr.rs | 12 +- .../datafusion/expressions/negative.rs | 19 +- .../datafusion/expressions/normalize_nan.rs | 2 +- .../datafusion/expressions/scalar_funcs.rs | 37 +-- .../datafusion/expressions/strings.rs | 12 +- .../datafusion/expressions/subquery.rs | 2 +- .../datafusion/expressions/temporal.rs | 20 +- .../datafusion/expressions/unbound.rs | 2 +- .../execution/datafusion/operators/expand.rs | 4 +- core/src/execution/datafusion/planner.rs | 66 ++--- .../execution/datafusion/shuffle_writer.rs | 4 +- core/src/execution/kernels/hash.rs | 16 ++ core/src/execution/operators/copy.rs | 4 +- core/src/execution/operators/scan.rs | 2 +- dev/ensure-jars-have-correct-contents.sh | 8 + pom.xml | 27 +- 22 files changed, 301 insertions(+), 217 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 6c8d54ab7..59c46c92b 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -114,8 +114,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ae9728f104939be6d8d9b368a354b4929b0569160ea1641f0721b55a861ce38" dependencies = [ "arrow-arith", "arrow-array", @@ -134,48 +135,52 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7029a5b3efbeafbf4a12d12dc16b8f9e9bff20a410b8c25c5d28acc089e1043" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "chrono", - "half 2.1.0", + "half 2.4.1", "num", ] [[package]] name = "arrow-array" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d33238427c60271710695f17742f45b1a5dc5bcfc5c15331c25ddfe7abf70d97" dependencies = [ "ahash", "arrow-buffer", "arrow-data", "arrow-schema", "chrono", - "chrono-tz", - "half 2.1.0", + "chrono-tz 0.9.0", + "half 2.4.1", "hashbrown", "num", ] [[package]] name = "arrow-buffer" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe9b95e825ae838efaf77e366c00d3fc8cca78134c9db497d6bda425f2e7b7c1" dependencies = [ "bytes", - "half 2.1.0", + "half 2.4.1", "num", ] [[package]] name = "arrow-cast" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cf8385a9d5b5fcde771661dd07652b79b9139fea66193eda6a88664400ccab" dependencies = [ "arrow-array", "arrow-buffer", @@ -186,7 +191,7 @@ dependencies = [ "base64", "chrono", "comfy-table", - "half 2.1.0", + "half 2.4.1", "lexical-core", "num", "ryu", @@ -194,8 +199,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cea5068bef430a86690059665e40034625ec323ffa4dd21972048eebb0127adc" dependencies = [ "arrow-array", "arrow-buffer", @@ -212,19 +218,21 @@ dependencies = [ [[package]] name = "arrow-data" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb29be98f987bcf217b070512bb7afba2f65180858bca462edf4a39d84a23e10" dependencies = [ "arrow-buffer", "arrow-schema", - "half 2.1.0", + "half 2.4.1", "num", ] [[package]] name = "arrow-ipc" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc68f6523970aa6f7ce1dc9a33a7d9284cfb9af77d4ad3e617dbe5d79cc6ec8" dependencies = [ "arrow-array", "arrow-buffer", @@ -237,8 +245,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2041380f94bd6437ab648e6c2085a045e45a0c44f91a1b9a4fe3fed3d379bfb1" dependencies = [ "arrow-array", "arrow-buffer", @@ -246,7 +255,7 @@ dependencies = [ "arrow-data", "arrow-schema", "chrono", - "half 2.1.0", + "half 2.4.1", "indexmap", "lexical-core", "num", @@ -256,44 +265,48 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb56ed1547004e12203652f12fe12e824161ff9d1e5cf2a7dc4ff02ba94f413" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", - "half 2.1.0", + "half 2.4.1", "num", ] [[package]] name = "arrow-row" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "575b42f1fc588f2da6977b94a5ca565459f5ab07b60545e17243fb9a7ed6d43e" dependencies = [ "ahash", "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", - "half 2.1.0", + "half 2.4.1", "hashbrown", ] [[package]] name = "arrow-schema" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32aae6a60458a2389c0da89c9de0b7932427776127da1a738e2efc21d32f3393" dependencies = [ "bitflags 2.5.0", ] [[package]] name = "arrow-select" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de36abaef8767b4220d7b4a8c2fe5ffc78b47db81b03d77e2136091c3ba39102" dependencies = [ "ahash", "arrow-array", @@ -305,8 +318,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e435ada8409bcafc910bc3e0077f532a4daa20e99060a496685c0e3e53cc2597" dependencies = [ "arrow-array", "arrow-buffer", @@ -508,7 +522,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" dependencies = [ "chrono", - "chrono-tz-build", + "chrono-tz-build 0.2.1", + "phf", +] + +[[package]] +name = "chrono-tz" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" +dependencies = [ + "chrono", + "chrono-tz-build 0.3.0", "phf", ] @@ -523,6 +548,17 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "chrono-tz-build" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "ciborium" version = "0.2.1" @@ -592,6 +628,7 @@ dependencies = [ "ahash", "arrow", "arrow-array", + "arrow-buffer", "arrow-data", "arrow-schema", "arrow-string", @@ -601,16 +638,18 @@ dependencies = [ "byteorder", "bytes", "chrono", - "chrono-tz", + "chrono-tz 0.8.6", "crc32fast", "criterion", "datafusion", "datafusion-common", + "datafusion-expr", "datafusion-functions", "datafusion-physical-expr", + "datafusion-physical-expr-common", "flate2", "futures", - "half 2.1.0", + "half 2.4.1", "hashbrown", "itertools 0.11.0", "jni", @@ -826,8 +865,8 @@ dependencies = [ [[package]] name = "datafusion" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "ahash", "arrow", @@ -843,13 +882,15 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions", + "datafusion-functions-aggregate", "datafusion-optimizer", "datafusion-physical-expr", + "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-sql", "futures", "glob", - "half 2.1.0", + "half 2.4.1", "hashbrown", "indexmap", "itertools 0.12.1", @@ -857,6 +898,7 @@ dependencies = [ "num_cpus", "object_store", "parking_lot", + "paste", "pin-project-lite", "rand", "sqlparser", @@ -868,8 +910,8 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "ahash", "arrow", @@ -877,7 +919,8 @@ dependencies = [ "arrow-buffer", "arrow-schema", "chrono", - "half 2.1.0", + "half 2.4.1", + "hashbrown", "instant", "libc", "num_cpus", @@ -887,16 +930,16 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "tokio", ] [[package]] name = "datafusion-execution" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "arrow", "chrono", @@ -915,15 +958,17 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "ahash", "arrow", "arrow-array", + "arrow-buffer", "chrono", "datafusion-common", "paste", + "serde_json", "sqlparser", "strum", "strum_macros", @@ -931,8 +976,8 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "arrow", "base64", @@ -943,20 +988,39 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-physical-expr", + "hashbrown", "hex", "itertools 0.12.1", "log", "md-5", + "rand", "regex", "sha2", "unicode-segmentation", "uuid", ] +[[package]] +name = "datafusion-functions-aggregate" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +dependencies = [ + "ahash", + "arrow", + "arrow-schema", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr-common", + "log", + "paste", + "sqlparser", +] + [[package]] name = "datafusion-optimizer" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "arrow", "async-trait", @@ -965,6 +1029,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown", + "indexmap", "itertools 0.12.1", "log", "regex-syntax", @@ -972,8 +1037,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "ahash", "arrow", @@ -983,36 +1048,44 @@ dependencies = [ "arrow-schema", "arrow-string", "base64", - "blake2", - "blake3", "chrono", "datafusion-common", "datafusion-execution", "datafusion-expr", - "half 2.1.0", + "datafusion-functions-aggregate", + "datafusion-physical-expr-common", + "half 2.4.1", "hashbrown", "hex", "indexmap", "itertools 0.12.1", "log", - "md-5", "paste", "petgraph", - "rand", "regex", - "sha2", - "unicode-segmentation", +] + +[[package]] +name = "datafusion-physical-expr-common" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +dependencies = [ + "arrow", + "datafusion-common", + "datafusion-expr", + "rand", ] [[package]] name = "datafusion-physical-plan" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "ahash", "arrow", "arrow-array", "arrow-buffer", + "arrow-ord", "arrow-schema", "async-trait", "chrono", @@ -1020,9 +1093,11 @@ dependencies = [ "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", + "datafusion-functions-aggregate", "datafusion-physical-expr", + "datafusion-physical-expr-common", "futures", - "half 2.1.0", + "half 2.4.1", "hashbrown", "indexmap", "itertools 0.12.1", @@ -1036,8 +1111,8 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "36.0.0" -source = "git+https://github.com/viirya/arrow-datafusion.git?rev=57b3be4#57b3be4297a47aa45094c16e37ddf0141d723bf0" +version = "39.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" dependencies = [ "arrow", "arrow-array", @@ -1045,6 +1120,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "log", + "regex", "sqlparser", "strum", ] @@ -1140,9 +1216,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flatbuffers" -version = "23.5.26" +version = "24.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" dependencies = [ "bitflags 1.3.2", "rustc_version", @@ -1303,10 +1379,11 @@ checksum = "1b43ede17f21864e81be2fa654110bf1e793774238d86ef8555c37e6519c0403" [[package]] name = "half" -version = "2.1.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad6a9459c9c30b177b925162351f97e7d967c7ea8bab3b8352805327daf45554" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" dependencies = [ + "cfg-if", "crunchy", "num-traits", ] @@ -1905,9 +1982,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.9.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8718f8b65fdf67a45108d1548347d4af7d71fb81ce727bbf9e3b2535e079db3" +checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" dependencies = [ "async-trait", "bytes", @@ -1979,13 +2056,14 @@ dependencies = [ [[package]] name = "parquet" -version = "51.0.0" -source = "git+https://github.com/viirya/arrow-rs.git?rev=3f1ae0c#3f1ae0c836b0769c88220d2180ef008b7a59158c" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c3b5322cc1bbf67f11c079c42be41a55949099b78732f7dba9e15edde40eab" dependencies = [ "ahash", "bytes", "chrono", - "half 2.1.0", + "half 2.4.1", "hashbrown", "num", "num-bigint", @@ -2535,9 +2613,9 @@ checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" [[package]] name = "sqlparser" -version = "0.44.0" +version = "0.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf9c7ff146298ffda83a200f8d5084f08dcee1edfc135fcc1d646a45d50ffd6" +checksum = "295e9930cd7a97e58ca2a070541a3ca502b17f5d1fa7157376d0fabd85324f25" dependencies = [ "log", "sqlparser_derive", diff --git a/core/Cargo.toml b/core/Cargo.toml index 4584dffce..564c450cb 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -29,13 +29,14 @@ include = [ [dependencies] parquet-format = "4.0.0" # This must be kept in sync with that from parquet crate -arrow = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c", features = ["prettyprint", "ffi", "chrono-tz"] } -arrow-array = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" } -arrow-data = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" } -arrow-schema = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" } -arrow-string = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" } -parquet = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c", default-features = false, features = ["experimental"] } -half = { version = "~2.1", default-features = false } +arrow = { version = "52.0.0", features = ["prettyprint", "ffi", "chrono-tz"] } +arrow-array = { version = "52.0.0" } +arrow-buffer = { version = "52.0.0" } +arrow-data = { version = "52.0.0" } +arrow-schema = { version = "52.0.0" } +arrow-string = { version = "52.0.0" } +parquet = { version = "52.0.0", default-features = false, features = ["experimental"] } +half = { version = "2.4.1", default-features = false } futures = "0.3.28" mimalloc = { version = "*", default-features = false, optional = true } tokio = { version = "1", features = ["rt-multi-thread"] } @@ -66,10 +67,12 @@ itertools = "0.11.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.8" } paste = "1.0.14" -datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4" } -datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features = ["unicode_expressions", "crypto_expressions"] } -datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features = ["crypto_expressions"]} -datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", default-features = false, features = ["unicode_expressions"] } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1" } +datafusion = { default-features = false, git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["unicode_expressions", "crypto_expressions"] } +datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["crypto_expressions"] } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false } +datafusion-physical-expr-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false } unicode-segmentation = "^1.10.1" once_cell = "1.18.0" regex = "1.9.6" diff --git a/core/src/execution/datafusion/expressions/bitwise_not.rs b/core/src/execution/datafusion/expressions/bitwise_not.rs index f9f8ee392..06ead2670 100644 --- a/core/src/execution/datafusion/expressions/bitwise_not.rs +++ b/core/src/execution/datafusion/expressions/bitwise_not.rs @@ -105,8 +105,8 @@ impl PhysicalExpr for BitwiseNotExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( diff --git a/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs b/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs index 6a4d07b89..b922119f8 100644 --- a/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs +++ b/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs @@ -129,8 +129,8 @@ impl PhysicalExpr for BloomFilterMightContain { }) } - fn children(&self) -> Vec> { - vec![self.bloom_filter_expr.clone(), self.value_expr.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.bloom_filter_expr, &self.value_expr] } fn with_new_children( diff --git a/core/src/execution/datafusion/expressions/cast.rs b/core/src/execution/datafusion/expressions/cast.rs index a39587b6e..045626465 100644 --- a/core/src/execution/datafusion/expressions/cast.rs +++ b/core/src/execution/datafusion/expressions/cast.rs @@ -1292,8 +1292,8 @@ impl PhysicalExpr for Cast { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( diff --git a/core/src/execution/datafusion/expressions/checkoverflow.rs b/core/src/execution/datafusion/expressions/checkoverflow.rs index 1e4b5f333..044b366e3 100644 --- a/core/src/execution/datafusion/expressions/checkoverflow.rs +++ b/core/src/execution/datafusion/expressions/checkoverflow.rs @@ -165,8 +165,8 @@ impl PhysicalExpr for CheckOverflow { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( diff --git a/core/src/execution/datafusion/expressions/if_expr.rs b/core/src/execution/datafusion/expressions/if_expr.rs index 6f2ed6a54..fa235cc66 100644 --- a/core/src/execution/datafusion/expressions/if_expr.rs +++ b/core/src/execution/datafusion/expressions/if_expr.rs @@ -110,12 +110,8 @@ impl PhysicalExpr for IfExpr { Ok(ColumnarValue::Array(current_value)) } - fn children(&self) -> Vec> { - vec![ - self.if_expr.clone(), - self.true_expr.clone(), - self.false_expr.clone(), - ] + fn children(&self) -> Vec<&Arc> { + vec![&self.if_expr, &self.true_expr, &self.false_expr] } fn with_new_children( @@ -225,8 +221,8 @@ mod tests { let true_expr = lit(123i32); let false_expr = lit(999i32); - let expr = if_fn(if_expr, true_expr, false_expr); - let children = expr.unwrap().children(); + let expr = if_fn(if_expr, true_expr, false_expr).unwrap(); + let children = expr.children(); assert_eq!(children.len(), 3); assert_eq!(children[0].to_string(), "true"); assert_eq!(children[1].to_string(), "123"); diff --git a/core/src/execution/datafusion/expressions/negative.rs b/core/src/execution/datafusion/expressions/negative.rs index e7aa2ac64..a85cde89e 100644 --- a/core/src/execution/datafusion/expressions/negative.rs +++ b/core/src/execution/datafusion/expressions/negative.rs @@ -18,15 +18,15 @@ use crate::errors::CometError; use arrow::{compute::kernels::numeric::neg_wrapping, datatypes::IntervalDayTimeType}; use arrow_array::RecordBatch; +use arrow_buffer::IntervalDayTime; use arrow_schema::{DataType, Schema}; use datafusion::{ logical_expr::{interval_arithmetic::Interval, ColumnarValue}, physical_expr::PhysicalExpr, }; use datafusion_common::{Result, ScalarValue}; -use datafusion_physical_expr::{ - aggregate::utils::down_cast_any_ref, sort_properties::SortProperties, -}; +use datafusion_expr::sort_properties::ExprProperties; +use datafusion_physical_expr::aggregate::utils::down_cast_any_ref; use std::{ any::Any, hash::{Hash, Hasher}, @@ -63,7 +63,7 @@ macro_rules! check_overflow { for i in 0..typed_array.len() { if typed_array.value(i) == $min_val { if $type_name == "byte" || $type_name == "short" { - let value = typed_array.value(i).to_string() + " caused"; + let value = format!("{:?} caused", typed_array.value(i)); return Err(arithmetic_overflow_error(value.as_str()).into()); } return Err(arithmetic_overflow_error($type_name).into()); @@ -135,7 +135,7 @@ impl PhysicalExpr for NegativeExpr { arrow::datatypes::IntervalUnit::DayTime => check_overflow!( array, arrow::array::IntervalDayTimeArray, - i64::MIN, + IntervalDayTime::MIN, "interval" ), arrow::datatypes::IntervalUnit::MonthDayNano => { @@ -195,8 +195,8 @@ impl PhysicalExpr for NegativeExpr { } } - fn children(&self) -> Vec> { - vec![self.arg.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.arg] } fn with_new_children( @@ -255,8 +255,9 @@ impl PhysicalExpr for NegativeExpr { } /// The ordering of a [`NegativeExpr`] is simply the reverse of its child. - fn get_ordering(&self, children: &[SortProperties]) -> SortProperties { - -children[0] + fn get_properties(&self, children: &[ExprProperties]) -> Result { + let properties = children[0].clone().with_order(children[0].sort_properties); + Ok(properties) } } diff --git a/core/src/execution/datafusion/expressions/normalize_nan.rs b/core/src/execution/datafusion/expressions/normalize_nan.rs index 111a34d5d..3bd5feea5 100644 --- a/core/src/execution/datafusion/expressions/normalize_nan.rs +++ b/core/src/execution/datafusion/expressions/normalize_nan.rs @@ -77,7 +77,7 @@ impl PhysicalExpr for NormalizeNaNAndZero { } } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { self.child.children() } diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs index dc333e8be..5f98ce3f6 100644 --- a/core/src/execution/datafusion/expressions/scalar_funcs.rs +++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs @@ -19,7 +19,6 @@ use std::{ any::Any, cmp::min, fmt::{Debug, Write}, - str::FromStr, sync::Arc, }; @@ -35,17 +34,15 @@ use arrow_array::{Array, ArrowNativeTypeOp, Decimal128Array, StringArray}; use arrow_schema::DataType; use datafusion::{ execution::FunctionRegistry, - logical_expr::{ - BuiltinScalarFunction, ScalarFunctionDefinition, ScalarFunctionImplementation, - ScalarUDFImpl, Signature, Volatility, - }, + functions::math::round::round, + logical_expr::{ScalarFunctionImplementation, ScalarUDFImpl, Signature, Volatility}, physical_plan::ColumnarValue, }; use datafusion_common::{ cast::{as_binary_array, as_generic_string_array}, exec_err, internal_err, DataFusionError, Result as DataFusionResult, ScalarValue, }; -use datafusion_physical_expr::{math_expressions, udf::ScalarUDF}; +use datafusion_expr::ScalarUDF; use num::{ integer::{div_ceil, div_floor}, BigInt, Signed, ToPrimitive, @@ -66,9 +63,7 @@ macro_rules! make_comet_scalar_udf { $data_type.clone(), Arc::new(move |args| $func(args, &$data_type)), ); - Ok(ScalarFunctionDefinition::UDF(Arc::new( - ScalarUDF::new_from_impl(scalar_func), - ))) + Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func))) }}; ($name:expr, $func:expr, without $data_type:ident) => {{ let scalar_func = CometScalarFunction::new( @@ -77,9 +72,7 @@ macro_rules! make_comet_scalar_udf { $data_type, $func, ); - Ok(ScalarFunctionDefinition::UDF(Arc::new( - ScalarUDF::new_from_impl(scalar_func), - ))) + Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func))) }}; } @@ -88,7 +81,7 @@ pub fn create_comet_physical_fun( fun_name: &str, data_type: DataType, registry: &dyn FunctionRegistry, -) -> Result { +) -> Result, DataFusionError> { let sha2_functions = ["sha224", "sha256", "sha384", "sha512"]; match fun_name { "ceil" => { @@ -140,13 +133,11 @@ pub fn create_comet_physical_fun( let spark_func_name = "spark".to_owned() + sha; make_comet_scalar_udf!(spark_func_name, wrapped_func, without data_type) } - _ => { - if let Ok(fun) = BuiltinScalarFunction::from_str(fun_name) { - Ok(ScalarFunctionDefinition::BuiltIn(fun)) - } else { - Ok(ScalarFunctionDefinition::UDF(registry.udf(fun_name)?)) - } - } + _ => registry.udf(fun_name).map_err(|e| { + DataFusionError::Execution(format!( + "Function {fun_name} not found in the registry: {e}", + )) + }), } } @@ -509,9 +500,7 @@ fn spark_round( make_decimal_array(array, precision, scale, &f) } DataType::Float32 | DataType::Float64 => { - Ok(ColumnarValue::Array(math_expressions::round(&[ - array.clone() - ])?)) + Ok(ColumnarValue::Array(round(&[array.clone()])?)) } dt => exec_err!("Not supported datatype for ROUND: {dt}"), }, @@ -534,7 +523,7 @@ fn spark_round( make_decimal_scalar(a, precision, scale, &f) } ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar( - ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?, + ScalarValue::try_from_array(&round(&[a.to_array()?])?, 0)?, )), dt => exec_err!("Not supported datatype for ROUND: {dt}"), }, diff --git a/core/src/execution/datafusion/expressions/strings.rs b/core/src/execution/datafusion/expressions/strings.rs index ee9a22212..cbbd4cfa4 100644 --- a/core/src/execution/datafusion/expressions/strings.rs +++ b/core/src/execution/datafusion/expressions/strings.rs @@ -111,8 +111,8 @@ macro_rules! make_predicate_function { Ok(ColumnarValue::Array(Arc::new(array))) } - fn children(&self) -> Vec> { - vec![self.left.clone(), self.right.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.left, &self.right] } fn with_new_children( @@ -221,8 +221,8 @@ impl PhysicalExpr for SubstringExec { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( @@ -286,8 +286,8 @@ impl PhysicalExpr for StringSpaceExec { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( diff --git a/core/src/execution/datafusion/expressions/subquery.rs b/core/src/execution/datafusion/expressions/subquery.rs index bf37cb895..9b1be2df6 100644 --- a/core/src/execution/datafusion/expressions/subquery.rs +++ b/core/src/execution/datafusion/expressions/subquery.rs @@ -199,7 +199,7 @@ impl PhysicalExpr for Subquery { } } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/core/src/execution/datafusion/expressions/temporal.rs b/core/src/execution/datafusion/expressions/temporal.rs index 4ae3c2605..22b4aee8a 100644 --- a/core/src/execution/datafusion/expressions/temporal.rs +++ b/core/src/execution/datafusion/expressions/temporal.rs @@ -111,8 +111,8 @@ impl PhysicalExpr for HourExec { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( @@ -205,8 +205,8 @@ impl PhysicalExpr for MinuteExec { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( @@ -299,8 +299,8 @@ impl PhysicalExpr for SecondExec { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( @@ -386,8 +386,8 @@ impl PhysicalExpr for DateTruncExec { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( @@ -511,8 +511,8 @@ impl PhysicalExpr for TimestampTruncExec { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( diff --git a/core/src/execution/datafusion/expressions/unbound.rs b/core/src/execution/datafusion/expressions/unbound.rs index 5387b1012..95f9912c9 100644 --- a/core/src/execution/datafusion/expressions/unbound.rs +++ b/core/src/execution/datafusion/expressions/unbound.rs @@ -83,7 +83,7 @@ impl PhysicalExpr for UnboundColumn { internal_err!("UnboundColumn::evaluate() should not be called") } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/core/src/execution/datafusion/operators/expand.rs b/core/src/execution/datafusion/operators/expand.rs index ca3fdc1aa..5285dfb46 100644 --- a/core/src/execution/datafusion/operators/expand.rs +++ b/core/src/execution/datafusion/operators/expand.rs @@ -96,8 +96,8 @@ impl ExecutionPlan for CometExpandExec { self.schema.clone() } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 7af5f6838..7d504f878 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -17,7 +17,7 @@ //! Converts Spark physical plan to DataFusion physical plan -use std::{collections::HashMap, str::FromStr, sync::Arc}; +use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use datafusion::{ @@ -25,9 +25,7 @@ use datafusion::{ common::DataFusionError, execution::FunctionRegistry, functions::math, - logical_expr::{ - BuiltinScalarFunction, Operator as DataFusionOperator, ScalarFunctionDefinition, - }, + logical_expr::Operator as DataFusionOperator, physical_expr::{ execution_props::ExecutionProps, expressions::{ @@ -52,6 +50,7 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, JoinType as DFJoinType, ScalarValue, }; +use datafusion_physical_expr_common::aggregate::create_aggregate_expr; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; @@ -499,10 +498,7 @@ impl PhysicalPlanner { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; let return_type = child.data_type(&input_schema)?; let args = vec![child]; - let scalar_def = ScalarFunctionDefinition::UDF(math::abs()); - - let expr = - ScalarFunctionExpr::new("abs", scalar_def, args, return_type, None, false); + let expr = ScalarFunctionExpr::new("abs", math::abs(), args, return_type); Ok(Arc::new(expr)) } ExprStruct::CaseWhen(case_when) => { @@ -690,8 +686,6 @@ impl PhysicalPlanner { fun_expr, vec![left, right], data_type, - None, - false, ))) } _ => Ok(Arc::new(BinaryExpr::new(left, op, right))), @@ -1209,26 +1203,18 @@ impl PhysicalPlanner { } } AggExprStruct::First(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; - let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - Ok(Arc::new(FirstValue::new( - child, - "first", - datatype, - vec![], - vec![], - ))) + let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let func = datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new()); + + create_aggregate_expr(&func, &[child], &[], &[], &schema, "first", false, false) + .map_err(|e| e.into()) } AggExprStruct::Last(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; - let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); - Ok(Arc::new(LastValue::new( - child, - "last", - datatype, - vec![], - vec![], - ))) + let child = self.create_expr(expr.child.as_ref().unwrap(), schema.clone())?; + let func = datafusion_expr::AggregateUDF::new_from_impl(LastValue::new()); + + create_aggregate_expr(&func, &[child], &[], &[], &schema, "last", false, false) + .map_err(|e| e.into()) } AggExprStruct::BitAndAgg(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), schema)?; @@ -1373,21 +1359,11 @@ impl PhysicalPlanner { let data_type = match expr.return_type.as_ref().map(to_arrow_datatype) { Some(t) => t, - None => { - // If no data type is provided from Spark, we'll use DF's return type from the - // scalar function - // Note this assumes the `fun_name` is a defined function in DF. Otherwise, it'll - // throw error. - - if let Ok(fun) = BuiltinScalarFunction::from_str(fun_name) { - fun.return_type(&input_expr_types)? - } else { - self.session_ctx - .udf(fun_name)? - .inner() - .return_type(&input_expr_types)? - } - } + None => self + .session_ctx + .udf(fun_name)? + .inner() + .return_type(&input_expr_types)?, }; let fun_expr = @@ -1398,8 +1374,6 @@ impl PhysicalPlanner { fun_expr, args.to_vec(), data_type, - None, - args.is_empty(), )); Ok(scalar_expr) @@ -1444,7 +1418,7 @@ fn expr_to_columns( let mut left_field_indices: Vec = vec![]; let mut right_field_indices: Vec = vec![]; - expr.apply(&mut |expr| { + expr.apply(&mut |expr: &Arc| { Ok({ if let Some(column) = expr.as_any().downcast_ref::() { if column.index() > left_field_len + right_field_len { diff --git a/core/src/execution/datafusion/shuffle_writer.rs b/core/src/execution/datafusion/shuffle_writer.rs index 99ac885b5..5afc9a53e 100644 --- a/core/src/execution/datafusion/shuffle_writer.rs +++ b/core/src/execution/datafusion/shuffle_writer.rs @@ -104,8 +104,8 @@ impl ExecutionPlan for ShuffleWriterExec { self.input.schema() } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/core/src/execution/kernels/hash.rs b/core/src/execution/kernels/hash.rs index de30f74cd..b39fd6224 100644 --- a/core/src/execution/kernels/hash.rs +++ b/core/src/execution/kernels/hash.rs @@ -22,6 +22,7 @@ use arrow_array::{ downcast_dictionary_array, downcast_primitive_array, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, PrimitiveArray, }; +use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use std::fmt::Debug; pub fn hash(src: &[ArrayRef], dst: &mut [u64]) { @@ -169,3 +170,18 @@ impl Hashable for f64 { state.hash_one(u64::from_ne_bytes(self.to_ne_bytes())) } } + +impl Hashable for IntervalDayTime { + fn create_hash(&self, state: &RandomState) -> u64 { + state.hash_one(self.days); + state.hash_one(self.milliseconds) + } +} + +impl Hashable for IntervalMonthDayNano { + fn create_hash(&self, state: &RandomState) -> u64 { + state.hash_one(self.months); + state.hash_one(self.days); + state.hash_one(self.nanoseconds) + } +} diff --git a/core/src/execution/operators/copy.rs b/core/src/execution/operators/copy.rs index 96c244935..d011b3cb2 100644 --- a/core/src/execution/operators/copy.rs +++ b/core/src/execution/operators/copy.rs @@ -93,8 +93,8 @@ impl ExecutionPlan for CopyExec { self.schema.clone() } - fn children(&self) -> Vec> { - vec![self.input.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.input] } fn with_new_children( diff --git a/core/src/execution/operators/scan.rs b/core/src/execution/operators/scan.rs index 99c7c8391..bd518eda1 100644 --- a/core/src/execution/operators/scan.rs +++ b/core/src/execution/operators/scan.rs @@ -248,7 +248,7 @@ impl ExecutionPlan for ScanExec { scan_schema(input_batch, &self.data_types) } - fn children(&self) -> Vec> { + fn children(&self) -> Vec<&Arc> { vec![] } diff --git a/dev/ensure-jars-have-correct-contents.sh b/dev/ensure-jars-have-correct-contents.sh index 12f555b8e..23d0be323 100755 --- a/dev/ensure-jars-have-correct-contents.sh +++ b/dev/ensure-jars-have-correct-contents.sh @@ -60,6 +60,14 @@ allowed_expr+="|^x86_64/libarrow_cdata_jni.so$" allowed_expr+="|^x86_64/libarrow_cdata_jni.dylib$" allowed_expr+="|^x86_64/arrow_cdata_jni.dll$" allowed_expr+="|^aarch_64/libarrow_cdata_jni.dylib$" + +allowed_expr+="|^arrow_cdata_jni/" +allowed_expr+="|^arrow_cdata_jni/x86_64/" +allowed_expr+="|^arrow_cdata_jni/aarch_64/" +allowed_expr+="|^arrow_cdata_jni/x86_64/libarrow_cdata_jni.so$" +allowed_expr+="|^arrow_cdata_jni/x86_64/libarrow_cdata_jni.dylib$" +allowed_expr+="|^arrow_cdata_jni/x86_64/arrow_cdata_jni.dll$" +allowed_expr+="|^arrow_cdata_jni/aarch_64/libarrow_cdata_jni.dylib$" # Two classes in Arrow C module: StructVectorLoader and StructVectorUnloader, are not # under org/apache/arrow/c, so we'll need to treat them specially. allowed_expr+="|^org/apache/arrow/$" diff --git a/pom.xml b/pom.xml index 34d949e14..fb42303c9 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ under the License. 3.19.6 1.13.1 provided - 14.0.2 + 16.0.0 1.9.13 2.43.0 0.8.11 @@ -120,6 +120,11 @@ under the License. commons-logging commons-logging + + + org.apache.arrow + * + @@ -138,7 +143,7 @@ under the License. org.apache.arrow - arrow-memory-netty + * @@ -258,7 +263,7 @@ under the License. org.apache.arrow - arrow-memory-netty + * @@ -284,7 +289,7 @@ under the License. org.apache.arrow - arrow-memory-netty + * @@ -324,6 +329,11 @@ under the License. commons-logging commons-logging + + + org.apache.arrow + * + @@ -340,6 +350,10 @@ under the License. commons-logging commons-logging + + org.apache.arrow + * + @@ -400,6 +414,11 @@ under the License. commons-logging commons-logging + + + org.apache.arrow + * +