diff --git a/native/Cargo.lock b/native/Cargo.lock index c3aae93af..339d9ad84 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -920,7 +920,6 @@ dependencies = [ "datafusion", "datafusion-common", "datafusion-expr", - "datafusion-functions", "datafusion-physical-expr", "datafusion-physical-plan", "num", diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 03635dd7b..cd37e9380 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -56,7 +56,7 @@ use datafusion_common::{ JoinType as DFJoinType, ScalarValue, }; use datafusion_expr::expr::find_df_window_func; -use datafusion_expr::{ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits}; +use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr_common::aggregate::create_aggregate_expr; use itertools::Itertools; @@ -108,7 +108,7 @@ use datafusion_comet_proto::{ spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; use datafusion_comet_spark_expr::{ - Abs, Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr, + Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr, }; // For clippy error on type_complexity. @@ -505,18 +505,19 @@ impl PhysicalPlanner { let op = DataFusionOperator::BitwiseShiftLeft; Ok(Arc::new(BinaryExpr::new(left, op, right))) } - ExprStruct::Abs(expr) => { - let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; - let return_type = child.data_type(&input_schema)?; - let args = vec![child]; - let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; - let comet_abs = Arc::new(ScalarUDF::new_from_impl(Abs::new( - eval_mode, - return_type.to_string(), - )?)); - let expr = ScalarFunctionExpr::new("abs", comet_abs, args, return_type); - Ok(Arc::new(expr)) - } + // https://github.com/apache/datafusion-comet/issues/666 + // ExprStruct::Abs(expr) => { + // let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; + // let return_type = child.data_type(&input_schema)?; + // let args = vec![child]; + // let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; + // let comet_abs = Arc::new(ScalarUDF::new_from_impl(Abs::new( + // eval_mode, + // return_type.to_string(), + // )?)); + // let expr = ScalarFunctionExpr::new("abs", comet_abs, args, return_type); + // Ok(Arc::new(expr)) + // } ExprStruct::CaseWhen(case_when) => { let when_then_pairs = case_when .when diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index 976a1f36f..192ed102b 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -33,7 +33,6 @@ arrow-schema = { workspace = true } chrono = { workspace = true } datafusion = { workspace = true } datafusion-common = { workspace = true } -datafusion-functions = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } diff --git a/native/spark-expr/src/abs.rs b/native/spark-expr/src/abs.rs deleted file mode 100644 index fa25a7775..000000000 --- a/native/spark-expr/src/abs.rs +++ /dev/null @@ -1,89 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Spark-compatible implementation of abs function - -use std::{any::Any, sync::Arc}; - -use arrow::datatypes::DataType; -use arrow_schema::ArrowError; - -use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature}; -use datafusion_common::DataFusionError; -use datafusion_functions::math; - -use super::{EvalMode, SparkError}; - -/// Spark-compatible ABS expression -#[derive(Debug)] -pub struct Abs { - inner_abs_func: Arc, - eval_mode: EvalMode, - data_type_name: String, -} - -impl Abs { - pub fn new(eval_mode: EvalMode, data_type_name: String) -> Result { - if let EvalMode::Legacy | EvalMode::Ansi = eval_mode { - Ok(Self { - inner_abs_func: math::abs().inner().clone(), - eval_mode, - data_type_name, - }) - } else { - Err(DataFusionError::Execution(format!( - "Invalid EvalMode: \"{:?}\"", - eval_mode - ))) - } - } -} - -impl ScalarUDFImpl for Abs { - fn as_any(&self) -> &dyn Any { - self - } - fn name(&self) -> &str { - "abs" - } - - fn signature(&self) -> &Signature { - self.inner_abs_func.signature() - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - self.inner_abs_func.return_type(arg_types) - } - - fn invoke(&self, args: &[ColumnarValue]) -> Result { - match self.inner_abs_func.invoke(args) { - Err(DataFusionError::ArrowError(ArrowError::ComputeError(msg), _)) - if msg.contains("overflow") => - { - if self.eval_mode == EvalMode::Legacy { - Ok(args[0].clone()) - } else { - Err(SparkError::ArithmeticOverflow { - from_type: self.data_type_name.clone(), - } - .into()) - } - } - other => other, - } - } -} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 91d61f70a..336201f48 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -mod abs; mod cast; mod error; mod if_expr; @@ -25,7 +24,6 @@ mod temporal; pub mod timezone; pub mod utils; -pub use abs::Abs; pub use cast::Cast; pub use error::{SparkError, SparkResult}; pub use if_expr::IfExpr; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b417239d0..41a69f7a3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1615,19 +1615,21 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - case Abs(child, failOnErr) => - val childExpr = exprToProtoInternal(child, inputs) - if (childExpr.isDefined) { - val evalModeStr = - if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY - val absBuilder = ExprOuterClass.Abs.newBuilder() - absBuilder.setChild(childExpr.get) - absBuilder.setEvalMode(evalModeStr) - Some(Expr.newBuilder().setAbs(absBuilder).build()) - } else { - withInfo(expr, child) - None - } + // abs implementation is not correct + // https://github.com/apache/datafusion-comet/issues/666 +// case Abs(child, failOnErr) => +// val childExpr = exprToProtoInternal(child, inputs) +// if (childExpr.isDefined) { +// val evalModeStr = +// if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY +// val absBuilder = ExprOuterClass.Abs.newBuilder() +// absBuilder.setChild(childExpr.get) +// absBuilder.setEvalMode(evalModeStr) +// Some(Expr.newBuilder().setAbs(absBuilder).build()) +// } else { +// withInfo(expr, child) +// None +// } case Acos(child) => val childExpr = exprToProtoInternal(child, inputs) @@ -1766,10 +1768,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim optExprWithInfo(optExpr, expr, r.child) } - case Signum(child) => - val childExpr = exprToProtoInternal(child, inputs) - val optExpr = scalarExprToProto("signum", childExpr) - optExprWithInfo(optExpr, expr, child) + // TODO enable once https://github.com/apache/datafusion/issues/11557 is fixed or + // when we have a Spark-compatible version implemented in Comet +// case Signum(child) => +// val childExpr = exprToProtoInternal(child, inputs) +// val optExpr = scalarExprToProto("signum", childExpr) +// optExprWithInfo(optExpr, expr, child) case Sin(child) => val childExpr = exprToProtoInternal(child, inputs) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 65217767d..c22c6b06a 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -835,7 +835,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("abs") { + // https://github.com/apache/datafusion-comet/issues/666 + ignore("abs") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -849,7 +850,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("abs Overflow ansi mode") { + // https://github.com/apache/datafusion-comet/issues/666 + ignore("abs Overflow ansi mode") { def testAbsAnsiOverflow[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = { withParquetTable(data, "tbl") { @@ -882,7 +884,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("abs Overflow legacy mode") { + // https://github.com/apache/datafusion-comet/issues/666 + ignore("abs Overflow legacy mode") { def testAbsLegacyOverflow[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = { withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {