diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 42fb5fb4c0..1b40c7cd00 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -401,6 +401,12 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_XXHASH64_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.xxhash64.enabled") + .doc("The xxhash64 implementation is not optimized yet and may cause performance issues.") + .booleanConf + .createWithDefault(false) + } object ConfigHelpers { diff --git a/core/Cargo.lock b/core/Cargo.lock index ed17f14abb..217ab98c97 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -805,7 +805,8 @@ dependencies = [ [[package]] name = "datafusion" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f92d2d7a9cba4580900b32b009848d9eb35f1028ac84cdd6ddcf97612cd0068" dependencies = [ "ahash", "arrow", @@ -911,7 +912,8 @@ dependencies = [ [[package]] name = "datafusion-common" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "effed030d2c1667eb1e11df5372d4981eaf5d11a521be32220b3985ae5ba6971" dependencies = [ "ahash", "arrow", @@ -931,7 +933,8 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0091318129dad1359f08e4c6c71f855163c35bba05d1dbf983196f727857894" dependencies = [ "tokio", ] @@ -939,7 +942,8 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8385aba84fc4a06d3ebccfbcbf9b4f985e80c762fac634b49079f7cc14933fb1" dependencies = [ "arrow", "chrono", @@ -959,7 +963,8 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebb192f0055d2ce64e38ac100abc18e4e6ae9734d3c28eee522bbbd6a32108a3" dependencies = [ "ahash", "arrow", @@ -977,7 +982,8 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c081ae5b7edd712b92767fb8ed5c0e32755682f8075707666cd70835807c0b" dependencies = [ "arrow", "base64", @@ -1003,7 +1009,8 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feb28a4ea52c28a26990646986a27c4052829a2a2572386258679e19263f8b78" dependencies = [ "ahash", "arrow", @@ -1020,7 +1027,8 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12172f2a6c9eb4992a51e62d709eeba5dedaa3b5369cce37ff6c2260e100ba76" dependencies = [ "arrow", "async-trait", @@ -1038,7 +1046,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a3fce531b623e94180f6cd33d620ef01530405751b6ddd2fd96250cdbd78e2e" dependencies = [ "ahash", "arrow", @@ -1068,7 +1077,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046400b6a2cc3ed57a7c576f5ae6aecc77804ac8e0186926b278b189305b2a77" dependencies = [ "arrow", "datafusion-common", @@ -1079,7 +1089,8 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4aed47f5a2ad8766260befb375b201592e86a08b260256e168ae4311426a2bff" dependencies = [ "ahash", "arrow", @@ -1112,7 +1123,8 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "39.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?tag=39.0.0-rc1#6a4a280e3cf70fe5f1a1cfe7c2de13e4c39f89bb" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fa92bb1fd15e46ce5fb6f1c85f3ac054592560f294429a28e392b5f9cd4255e" dependencies = [ "arrow", "arrow-array", diff --git a/core/Cargo.toml b/core/Cargo.toml index 68bfbdec5d..fe74b3554f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -71,12 +71,12 @@ itertools = "0.11.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.8" } paste = "1.0.14" -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1" } -datafusion = { default-features = false, git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["unicode_expressions", "crypto_expressions"] } -datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["crypto_expressions"] } -datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false } -datafusion-physical-expr-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false } +datafusion-common = { version = "39.0.0" } +datafusion = { default-features = false, version = "39.0.0", features = ["unicode_expressions", "crypto_expressions"] } +datafusion-functions = { version = "39.0.0", features = ["crypto_expressions"] } +datafusion-expr = { version = "39.0.0", default-features = false } +datafusion-physical-expr-common = { version = "39.0.0", default-features = false } +datafusion-physical-expr = { version = "39.0.0", default-features = false } unicode-segmentation = "^1.10.1" once_cell = "1.18.0" regex = "1.9.6" diff --git a/core/src/errors.rs b/core/src/errors.rs index af4fd26973..493880c3e4 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -152,9 +152,10 @@ pub enum CometError { #[error("{msg}")] Panic { msg: String }, - #[error(transparent)] + #[error("{msg}")] DataFusion { - #[from] + msg: String, + #[source] source: DataFusionError, }, @@ -185,10 +186,19 @@ impl convert::From> for CometError { } } +impl From for CometError { + fn from(value: DataFusionError) -> Self { + CometError::DataFusion { + msg: value.message().to_string(), + source: value, + } + } +} + impl From for DataFusionError { fn from(value: CometError) -> Self { match value { - CometError::DataFusion { source } => source, + CometError::DataFusion { msg: _, source } => source, _ => DataFusionError::Execution(value.to_string()), } } diff --git a/core/src/execution/datafusion/expressions/scalar_funcs.rs b/core/src/execution/datafusion/expressions/scalar_funcs.rs index 5f98ce3f6c..c50f06649b 100644 --- a/core/src/execution/datafusion/expressions/scalar_funcs.rs +++ b/core/src/execution/datafusion/expressions/scalar_funcs.rs @@ -55,6 +55,9 @@ use unhex::spark_unhex; mod hex; use hex::spark_hex; +mod chr; +use chr::spark_chr; + macro_rules! make_comet_scalar_udf { ($name:expr, $func:ident, $data_type:ident) => {{ let scalar_func = CometScalarFunction::new( @@ -123,6 +126,10 @@ pub fn create_comet_physical_fun( let func = Arc::new(spark_xxhash64); make_comet_scalar_udf!("xxhash64", func, without data_type) } + "chr" => { + let func = Arc::new(spark_chr); + make_comet_scalar_udf!("chr", func, without data_type) + } sha if sha2_functions.contains(&sha) => { // Spark requires hex string as the result of sha2 functions, we have to wrap the // result of digest functions as hex string diff --git a/core/src/execution/datafusion/expressions/scalar_funcs/chr.rs b/core/src/execution/datafusion/expressions/scalar_funcs/chr.rs new file mode 100644 index 0000000000..3d62d324c7 --- /dev/null +++ b/core/src/execution/datafusion/expressions/scalar_funcs/chr.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, sync::Arc}; + +use arrow::{ + array::{ArrayRef, StringArray}, + datatypes::{ + DataType, + DataType::{Int64, Utf8}, + }, +}; + +use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use datafusion_common::{cast::as_int64_array, exec_err, DataFusionError, Result, ScalarValue}; + +/// Returns the ASCII character having the binary equivalent to the input expression. +/// E.g., chr(65) = 'A'. +/// Compatible with Apache Spark's Chr function +pub fn spark_chr(args: &[ColumnarValue]) -> Result { + let chr_func = ChrFunc::default(); + chr_func.invoke(args) +} + +pub fn chr(args: &[ArrayRef]) -> Result { + let integer_array = as_int64_array(&args[0])?; + + // first map is the iterator, second is for the `Option<_>` + let result = integer_array + .iter() + .map(|integer: Option| { + integer + .map(|integer| match core::char::from_u32(integer as u32) { + Some(integer) => Ok(integer.to_string()), + None => { + exec_err!("requested character too large for encoding.") + } + }) + .transpose() + }) + .collect::>()?; + + Ok(Arc::new(result) as ArrayRef) +} + +#[derive(Debug)] +pub struct ChrFunc { + signature: Signature, +} + +impl Default for ChrFunc { + fn default() -> Self { + Self::new() + } +} + +impl ChrFunc { + pub fn new() -> Self { + Self { + signature: Signature::uniform(1, vec![Int64], Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for ChrFunc { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "chr" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(Utf8) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + handle_chr_fn(args) + } +} + +fn handle_chr_fn(args: &[ColumnarValue]) -> Result { + let array = args[0].clone(); + match array { + ColumnarValue::Array(array) => { + let array = chr(&[array])?; + Ok(ColumnarValue::Array(array)) + } + ColumnarValue::Scalar(ScalarValue::Int64(Some(value))) => { + match core::char::from_u32(value as u32) { + Some(ch) => Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some( + ch.to_string(), + )))), + None => exec_err!("requested character too large for encoding."), + } + } + ColumnarValue::Scalar(ScalarValue::Int64(None)) => { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))) + } + _ => exec_err!("The argument must be an Int64 array or scalar."), + } +} diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index fcef182a78..e51932154a 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -1391,7 +1391,7 @@ impl PhysicalPlanner { impl From for ExecutionError { fn from(value: DataFusionError) -> Self { - ExecutionError::DataFusionError(value.to_string()) + ExecutionError::DataFusionError(value.message().to_string()) } } @@ -1563,6 +1563,7 @@ mod tests { spark_operator, }; + use crate::execution::operators::ExecutionError; use spark_expression::expr::ExprStruct::*; use spark_operator::{operator::OpStruct, Operator}; @@ -1752,6 +1753,14 @@ mod tests { assert!(output.is_empty()); } + #[tokio::test()] + async fn from_datafusion_error_to_comet() { + let err_msg = "exec error"; + let err = datafusion_common::DataFusionError::Execution(err_msg.to_string()); + let comet_err: ExecutionError = err.into(); + assert_eq!(comet_err.to_string(), "Error from DataFusion: exec error."); + } + // Creates a filter operator which takes an `Int32Array` and selects rows that are equal to // `value`. fn create_filter(child_op: spark_operator::Operator, value: i32) -> spark_operator::Operator { diff --git a/core/src/execution/operators/mod.rs b/core/src/execution/operators/mod.rs index 5d05fdb8db..13a0d96279 100644 --- a/core/src/execution/operators/mod.rs +++ b/core/src/execution/operators/mod.rs @@ -38,19 +38,19 @@ pub use copy::*; pub enum ExecutionError { /// Simple error #[allow(dead_code)] - #[error("General execution error with reason {0}.")] + #[error("General execution error with reason: {0}.")] GeneralError(String), /// Error when deserializing an operator. - #[error("Fail to deserialize to native operator with reason {0}.")] + #[error("Fail to deserialize to native operator with reason: {0}.")] DeserializeError(String), /// Error when processing Arrow array. - #[error("Fail to process Arrow array with reason {0}.")] + #[error("Fail to process Arrow array with reason: {0}.")] ArrowError(String), /// DataFusion error - #[error("Error from DataFusion {0}.")] + #[error("Error from DataFusion: {0}.")] DataFusionError(String), } diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 104f29ce80..f232dc8b8a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -48,3 +48,4 @@ Comet provides the following configuration settings. | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | | spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | | spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when 'spark.comet.columnar.shuffle.enabled' is true. | 10.0 | +| spark.comet.xxhash64.enabled | The xxhash64 implementation is not optimized yet and may cause performance issues. | false | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 7d9bef48c4..5a0ad38d71 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2101,19 +2101,27 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim scalarExprToProtoWithReturnType("murmur3_hash", IntegerType, exprs :+ seedExpr: _*) case XxHash64(children, seed) => - val firstUnSupportedInput = children.find(c => !supportedDataType(c.dataType)) - if (firstUnSupportedInput.isDefined) { - withInfo(expr, s"Unsupported datatype ${firstUnSupportedInput.get.dataType}") - return None + if (CometConf.COMET_XXHASH64_ENABLED.get()) { + val firstUnSupportedInput = children.find(c => !supportedDataType(c.dataType)) + if (firstUnSupportedInput.isDefined) { + withInfo(expr, s"Unsupported datatype ${firstUnSupportedInput.get.dataType}") + return None + } + val exprs = children.map(exprToProtoInternal(_, inputs)) + val seedBuilder = ExprOuterClass.Literal + .newBuilder() + .setDatatype(serializeDataType(LongType).get) + .setLongVal(seed) + val seedExpr = Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build()) + // the seed is put at the end of the arguments + scalarExprToProtoWithReturnType("xxhash64", LongType, exprs :+ seedExpr: _*) + } else { + withInfo( + expr, + "xxhash64 is disabled by default. " + + s"Set ${CometConf.COMET_XXHASH64_ENABLED.key}=true to enable it.") + None } - val exprs = children.map(exprToProtoInternal(_, inputs)) - val seedBuilder = ExprOuterClass.Literal - .newBuilder() - .setDatatype(serializeDataType(LongType).get) - .setLongVal(seed) - val seedExpr = Some(ExprOuterClass.Expr.newBuilder().setLiteral(seedBuilder).build()) - // the seed is put at the end of the arguments - scalarExprToProtoWithReturnType("xxhash64", LongType, exprs :+ seedExpr: _*) case Sha2(left, numBits) => if (!numBits.foldable) { diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 63bd627be2..8da106c3d7 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -992,9 +992,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // both systems threw an exception so we make sure they are the same val sparkMessage = if (sparkException.getCause != null) sparkException.getCause.getMessage else null - // We have to workaround https://github.com/apache/datafusion-comet/issues/293 here by - // removing the "Execution error: " error message prefix that is added by DataFusion - val cometMessage = cometException.getCause.getMessage.replace("Execution error: ", "") + val cometMessage = cometException.getCause.getMessage if (CometSparkSessionExtensions.isSpark40Plus) { // for Spark 4 we expect to sparkException carries the message assert( diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index e7b0d47c91..c5ee76d309 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -980,6 +980,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("Chr with null character") { + // test compatibility with Spark, spark supports chr(0) + Seq(false, true).foreach { dictionary => + withSQLConf( + "parquet.enable.dictionary" -> dictionary.toString, + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + val table = "test0" + withTable(table) { + sql(s"create table $table(c9 int, c4 int) using parquet") + sql(s"insert into $table values(0, 0), (66, null), (null, 70), (null, null)") + val query = s"SELECT chr(c9), chr(c4) FROM $table" + checkSparkAnswerAndOperator(query) + } + } + } + } + test("InitCap") { Seq(false, true).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { @@ -1489,6 +1506,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, + CometConf.COMET_XXHASH64_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { @@ -1521,6 +1539,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, + CometConf.COMET_XXHASH64_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) {