-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support Ansi mode in abs function #500
Changes from 14 commits
fce78fa
1071eee
750331d
9f89b57
e6eda86
0b37f8e
d1e2099
73e5513
cff5f29
9b3b4c8
708fffe
f7df357
76914b0
3b55ca2
aa92450
ab28bf6
0dda0b2
1fc4f48
828ab3b
fe2a003
dc3f2a8
3dff4bb
19969d6
6fb873a
bf64a24
b4df447
a72db13
809052d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,87 @@ | ||||
// Licensed to the Apache Software Foundation (ASF) under one | ||||
// or more contributor license agreements. See the NOTICE file | ||||
// distributed with this work for additional information | ||||
// regarding copyright ownership. The ASF licenses this file | ||||
// to you under the Apache License, Version 2.0 (the | ||||
// "License"); you may not use this file except in compliance | ||||
// with the License. You may obtain a copy of the License at | ||||
// | ||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||
// | ||||
// Unless required by applicable law or agreed to in writing, | ||||
// software distributed under the License is distributed on an | ||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||
// KIND, either express or implied. See the License for the | ||||
// specific language governing permissions and limitations | ||||
// under the License. | ||||
|
||||
use arrow::datatypes::DataType; | ||||
use arrow_schema::ArrowError; | ||||
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature}; | ||||
use datafusion_common::DataFusionError; | ||||
use datafusion_functions::math; | ||||
use std::{any::Any, sync::Arc}; | ||||
|
||||
use crate::errors::CometError; | ||||
|
||||
use super::EvalMode; | ||||
|
||||
fn arithmetic_overflow_error(from_type: &str) -> CometError { | ||||
CometError::ArithmeticOverflow { | ||||
from_type: from_type.to_string(), | ||||
} | ||||
} | ||||
|
||||
#[derive(Debug)] | ||||
pub struct CometAbsFunc { | ||||
inner_abs_func: Arc<dyn ScalarUDFImpl>, | ||||
eval_mode: EvalMode, | ||||
data_type_name: String, | ||||
} | ||||
|
||||
impl CometAbsFunc { | ||||
pub fn new(eval_mode: EvalMode, data_type_name: String) -> Self { | ||||
Self { | ||||
inner_abs_func: math::abs().inner(), | ||||
eval_mode, | ||||
data_type_name, | ||||
} | ||||
} | ||||
} | ||||
|
||||
impl ScalarUDFImpl for CometAbsFunc { | ||||
fn as_any(&self) -> &dyn Any { | ||||
self | ||||
} | ||||
fn name(&self) -> &str { | ||||
"abs" | ||||
} | ||||
|
||||
fn signature(&self) -> &Signature { | ||||
self.inner_abs_func.signature() | ||||
} | ||||
|
||||
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType, DataFusionError> { | ||||
self.inner_abs_func.return_type(arg_types) | ||||
} | ||||
|
||||
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> { | ||||
match self.inner_abs_func.invoke(args) { | ||||
Ok(result) => Ok(result), | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically there is no need to match on an
Suggested change
|
||||
Err(DataFusionError::ArrowError(ArrowError::ComputeError(msg), trace)) | ||||
if msg.contains("overflow") => | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if Arrow/DataFusion threw a specific overflow error so that we didn't have to look for a string within the error message, but I guess that isn't available. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am going to file a feature request in DataFusion. I will post the link here later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great! I understand that we have to wait for the next version of arror-rs to be released and integrate it here to be able to make the changes, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can continue with this and do a follow up once the arrow-rs change is available. Or you can wait; the arrow-rs community is very responsive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably better to do it in a follow-up PR, I think. Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would help to log an issue to keep track There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apparently my PR in Arrow will not be available for 3 months because it is an API change 😞 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we upgrade to the version of the arrow with the improved overflow eror reporing then the tests in this PR will fail (because we are looking for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @parthchandra fyi ☝️ |
||||
{ | ||||
if self.eval_mode == EvalMode::Legacy { | ||||
Ok(args[0].clone()) | ||||
} else { | ||||
let msg = arithmetic_overflow_error(&self.data_type_name).to_string(); | ||||
Err(DataFusionError::ArrowError( | ||||
ArrowError::ComputeError(msg), | ||||
trace, | ||||
)) | ||||
} | ||||
} | ||||
other => other, | ||||
} | ||||
} | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -473,6 +473,7 @@ message BitwiseNot { | |
|
||
message Abs { | ||
Expr child = 1; | ||
string eval_mode = 2; | ||
} | ||
|
||
message Subquery { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,13 @@ import org.apache.comet.shims.ShimQueryPlanSerde | |
* An utility object for query plan and expression serialization. | ||
*/ | ||
object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim { | ||
|
||
object ExecutionMode { | ||
val ANSI = "ANSI" | ||
val LEGACY = "LEGACY" | ||
val TRY = "TRY" | ||
} | ||
|
||
def emitWarning(reason: String): Unit = { | ||
logWarning(s"Comet native execution is disabled due to: $reason") | ||
} | ||
|
@@ -691,7 +698,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
case Cast(child, dt, timeZoneId, evalMode) => | ||
val evalModeStr = if (evalMode.isInstanceOf[Boolean]) { | ||
// Spark 3.2 & 3.3 has ansiEnabled boolean | ||
if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY" | ||
if (evalMode.asInstanceOf[Boolean]) ExecutionMode.ANSI else ExecutionMode.LEGACY | ||
} else { | ||
// Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY | ||
evalMode.toString | ||
|
@@ -1474,15 +1481,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
None | ||
} | ||
|
||
case Abs(child, _) => | ||
case Abs(child, failOnErr) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we are already using failOnErr, can we simply use this boolean instead of evalmode struct? what are you thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, thanks for your review! The intention why it is done this way is that in the Rust code the ansi mode is always treated the same way whether the expression supports the three ansi modes or only two. If not, we have to do a different treatment for both cases. |
||
val childExpr = exprToProtoInternal(child, inputs) | ||
if (childExpr.isDefined) { | ||
val abs = | ||
ExprOuterClass.Abs | ||
.newBuilder() | ||
.setChild(childExpr.get) | ||
.build() | ||
Some(Expr.newBuilder().setAbs(abs).build()) | ||
val evalModeStr = if (failOnErr) ExecutionMode.ANSI else ExecutionMode.LEGACY | ||
val absBuilder = ExprOuterClass.Abs.newBuilder() | ||
absBuilder.setChild(childExpr.get) | ||
absBuilder.setEvalMode(evalModeStr) | ||
Some(Expr.newBuilder().setAbs(absBuilder).build()) | ||
} else { | ||
withInfo(expr, child) | ||
None | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -850,6 +850,34 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | |
} | ||
} | ||
|
||
test("abs Overflow ansi mode") { | ||
val data: Seq[(Int, Int)] = Seq((Int.MaxValue, Int.MinValue)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have tests for all numerical values? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done! |
||
withSQLConf( | ||
SQLConf.ANSI_ENABLED.key -> "true", | ||
CometConf.COMET_ANSI_MODE_ENABLED.key -> "true") { | ||
withParquetTable(data, "tbl") { | ||
checkSparkMaybeThrows(sql("select abs(_1), abs(_2) from tbl")) match { | ||
case (Some(sparkExc), Some(cometExc)) => | ||
val cometErrorPattern = | ||
""".+[ARITHMETIC_OVERFLOW].+overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.""".r | ||
val sparkErrorPattern = ".*integer overflow.*".r | ||
assert(cometErrorPattern.findFirstIn(cometExc.getMessage).isDefined) | ||
assert(sparkErrorPattern.findFirstIn(sparkExc.getMessage).isDefined) | ||
case _ => fail("Exception should be thrown") | ||
} | ||
} | ||
} | ||
} | ||
|
||
test("abs Overflow legacy mode") { | ||
val data: Seq[(Int, Int)] = Seq((Int.MaxValue, Int.MinValue), (1, -1)) | ||
withSQLConf(SQLConf.ANSI_ENABLED.key -> "false") { | ||
withParquetTable(data, "tbl") { | ||
checkSparkAnswerAndOperator("select abs(_1), abs(_2) from tbl") | ||
} | ||
} | ||
} | ||
|
||
test("ceil and floor") { | ||
Seq("true", "false").foreach { dictionary => | ||
withSQLConf( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to duplicate the same function from
negative.rs
. Perhaps that one could be moved so that it can be reused here.