Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Disable abs and signum because they return incorrect results #695

Merged
merged 7 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 15 additions & 14 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion_common::{
JoinType as DFJoinType, ScalarValue,
};
use datafusion_expr::expr::find_df_window_func;
use datafusion_expr::{ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits};
use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use itertools::Itertools;
Expand Down Expand Up @@ -108,7 +108,7 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
use datafusion_comet_spark_expr::{
Abs, Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr,
Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr,
};

// For clippy error on type_complexity.
Expand Down Expand Up @@ -505,18 +505,19 @@ impl PhysicalPlanner {
let op = DataFusionOperator::BitwiseShiftLeft;
Ok(Arc::new(BinaryExpr::new(left, op, right)))
}
ExprStruct::Abs(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?;
let return_type = child.data_type(&input_schema)?;
let args = vec![child];
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
let comet_abs = Arc::new(ScalarUDF::new_from_impl(Abs::new(
eval_mode,
return_type.to_string(),
)?));
let expr = ScalarFunctionExpr::new("abs", comet_abs, args, return_type);
Ok(Arc::new(expr))
}
// https://github.com/apache/datafusion-comet/issues/666
// ExprStruct::Abs(expr) => {
// let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?;
// let return_type = child.data_type(&input_schema)?;
// let args = vec![child];
// let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
// let comet_abs = Arc::new(ScalarUDF::new_from_impl(Abs::new(
// eval_mode,
// return_type.to_string(),
// )?));
// let expr = ScalarFunctionExpr::new("abs", comet_abs, args, return_type);
// Ok(Arc::new(expr))
// }
ExprStruct::CaseWhen(case_when) => {
let when_then_pairs = case_when
.when
Expand Down
1 change: 0 additions & 1 deletion native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ arrow-schema = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
Expand Down
89 changes: 0 additions & 89 deletions native/spark-expr/src/abs.rs

This file was deleted.

2 changes: 0 additions & 2 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

mod abs;
mod cast;
mod error;
mod if_expr;
Expand All @@ -25,7 +24,6 @@ mod temporal;
pub mod timezone;
pub mod utils;

pub use abs::Abs;
pub use cast::Cast;
pub use error::{SparkError, SparkResult};
pub use if_expr::IfExpr;
Expand Down
38 changes: 21 additions & 17 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1615,19 +1615,21 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
None
}

case Abs(child, failOnErr) =>
val childExpr = exprToProtoInternal(child, inputs)
if (childExpr.isDefined) {
val evalModeStr =
if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY
val absBuilder = ExprOuterClass.Abs.newBuilder()
absBuilder.setChild(childExpr.get)
absBuilder.setEvalMode(evalModeStr)
Some(Expr.newBuilder().setAbs(absBuilder).build())
} else {
withInfo(expr, child)
None
}
// abs implementation is not correct
// https://github.com/apache/datafusion-comet/issues/666
// case Abs(child, failOnErr) =>
// val childExpr = exprToProtoInternal(child, inputs)
// if (childExpr.isDefined) {
// val evalModeStr =
// if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY
// val absBuilder = ExprOuterClass.Abs.newBuilder()
// absBuilder.setChild(childExpr.get)
// absBuilder.setEvalMode(evalModeStr)
// Some(Expr.newBuilder().setAbs(absBuilder).build())
// } else {
// withInfo(expr, child)
// None
// }

case Acos(child) =>
val childExpr = exprToProtoInternal(child, inputs)
Expand Down Expand Up @@ -1766,10 +1768,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
optExprWithInfo(optExpr, expr, r.child)
}

case Signum(child) =>
val childExpr = exprToProtoInternal(child, inputs)
val optExpr = scalarExprToProto("signum", childExpr)
optExprWithInfo(optExpr, expr, child)
// TODO enable once https://github.com/apache/datafusion/issues/11557 is fixed or
// when we have a Spark-compatible version implemented in Comet
// case Signum(child) =>
// val childExpr = exprToProtoInternal(child, inputs)
// val optExpr = scalarExprToProto("signum", childExpr)
// optExprWithInfo(optExpr, expr, child)

case Sin(child) =>
val childExpr = exprToProtoInternal(child, inputs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("abs") {
// https://github.com/apache/datafusion-comet/issues/666
ignore("abs") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
Expand All @@ -849,7 +850,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("abs Overflow ansi mode") {
// https://github.com/apache/datafusion-comet/issues/666
ignore("abs Overflow ansi mode") {

def testAbsAnsiOverflow[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = {
withParquetTable(data, "tbl") {
Expand Down Expand Up @@ -882,7 +884,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("abs Overflow legacy mode") {
// https://github.com/apache/datafusion-comet/issues/666
ignore("abs Overflow legacy mode") {

def testAbsLegacyOverflow[T <: Product: ClassTag: TypeTag](data: Seq[T]): Unit = {
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") {
Expand Down
Loading