Skip to content

Commit

Permalink
chore: Disable abs and signum because they return incorrect results (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored and Steve Vaughan Jr committed Jul 23, 2024
1 parent 8f47689 commit 8bdbcc1
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 127 deletions.
1 change: 0 additions & 1 deletion native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 15 additions & 14 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion_common::{
JoinType as DFJoinType, ScalarValue,
};
use datafusion_expr::expr::find_df_window_func;
use datafusion_expr::{ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits};
use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use itertools::Itertools;
Expand Down Expand Up @@ -108,7 +108,7 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
use datafusion_comet_spark_expr::{
Abs, Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr,
Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr,
};

// For clippy error on type_complexity.
Expand Down Expand Up @@ -505,18 +505,19 @@ impl PhysicalPlanner {
let op = DataFusionOperator::BitwiseShiftLeft;
Ok(Arc::new(BinaryExpr::new(left, op, right)))
}
ExprStruct::Abs(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?;
let return_type = child.data_type(&input_schema)?;
let args = vec![child];
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
let comet_abs = Arc::new(ScalarUDF::new_from_impl(Abs::new(
eval_mode,
return_type.to_string(),
)?));
let expr = ScalarFunctionExpr::new("abs", comet_abs, args, return_type);
Ok(Arc::new(expr))
}
// https://github.com/apache/datafusion-comet/issues/666
// ExprStruct::Abs(expr) => {
// let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?;
// let return_type = child.data_type(&input_schema)?;
// let args = vec![child];
// let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
// let comet_abs = Arc::new(ScalarUDF::new_from_impl(Abs::new(
// eval_mode,
// return_type.to_string(),
// )?));
// let expr = ScalarFunctionExpr::new("abs", comet_abs, args, return_type);
// Ok(Arc::new(expr))
// }
ExprStruct::CaseWhen(case_when) => {
let when_then_pairs = case_when
.when
Expand Down
1 change: 0 additions & 1 deletion native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ arrow-schema = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
Expand Down
89 changes: 0 additions & 89 deletions native/spark-expr/src/abs.rs

This file was deleted.

2 changes: 0 additions & 2 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

mod abs;
mod cast;
mod error;
mod if_expr;
Expand All @@ -25,7 +24,6 @@ mod temporal;
pub mod timezone;
pub mod utils;

pub use abs::Abs;
pub use cast::Cast;
pub use error::{SparkError, SparkResult};
pub use if_expr::IfExpr;
Expand Down
38 changes: 21 additions & 17 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1615,19 +1615,21 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
None
}

case Abs(child, failOnErr) =>
val childExpr = exprToProtoInternal(child, inputs)
if (childExpr.isDefined) {
val evalModeStr =
if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY
val absBuilder = ExprOuterClass.Abs.newBuilder()
absBuilder.setChild(childExpr.get)
absBuilder.setEvalMode(evalModeStr)
Some(Expr.newBuilder().setAbs(absBuilder).build())
} else {
withInfo(expr, child)
None
}
// abs implementation is not correct
// https://github.com/apache/datafusion-comet/issues/666
// case Abs(child, failOnErr) =>
// val childExpr = exprToProtoInternal(child, inputs)
// if (childExpr.isDefined) {
// val evalModeStr =
// if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY
// val absBuilder = ExprOuterClass.Abs.newBuilder()
// absBuilder.setChild(childExpr.get)
// absBuilder.setEvalMode(evalModeStr)
// Some(Expr.newBuilder().setAbs(absBuilder).build())
// } else {
// withInfo(expr, child)
// None
// }

case Acos(child) =>
val childExpr = exprToProtoInternal(child, inputs)
Expand Down Expand Up @@ -1766,10 +1768,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
optExprWithInfo(optExpr, expr, r.child)
}

case Signum(child) =>
val childExpr = exprToProtoInternal(child, inputs)
val optExpr = scalarExprToProto("signum", childExpr)
optExprWithInfo(optExpr, expr, child)
// TODO enable once https://github.com/apache/datafusion/issues/11557 is fixed or
// when we have a Spark-compatible version implemented in Comet
// case Signum(child) =>
// val childExpr = exprToProtoInternal(child, inputs)
// val optExpr = scalarExprToProto("signum", childExpr)
// optExprWithInfo(optExpr, expr, child)

case Sin(child) =>
val childExpr = exprToProtoInternal(child, inputs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("abs") {
// https://github.com/apache/datafusion-comet/issues/666
ignore("abs") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand All @@ -849,7 +850,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("abs Overflow ansi mode") {
// https://github.com/apache/datafusion-comet/issues/666
ignore("abs Overflow ansi mode") {

def testAbsAnsiOverflow[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = {
withParquetTable(data, "tbl") {
Expand Down Expand Up @@ -882,7 +884,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("abs Overflow legacy mode") {
// https://github.com/apache/datafusion-comet/issues/666
ignore("abs Overflow legacy mode") {

def testAbsLegacyOverflow[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = {
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
Expand Down

0 comments on commit 8bdbcc1

Please sign in to comment.