-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support Ansi mode in abs function #500
Changes from 21 commits
fce78fa
1071eee
750331d
9f89b57
e6eda86
0b37f8e
d1e2099
73e5513
cff5f29
9b3b4c8
708fffe
f7df357
76914b0
3b55ca2
aa92450
ab28bf6
0dda0b2
1fc4f48
828ab3b
fe2a003
dc3f2a8
3dff4bb
19969d6
6fb873a
bf64a24
b4df447
a72db13
809052d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,87 @@ | ||||
// Licensed to the Apache Software Foundation (ASF) under one | ||||
// or more contributor license agreements. See the NOTICE file | ||||
// distributed with this work for additional information | ||||
// regarding copyright ownership. The ASF licenses this file | ||||
// to you under the Apache License, Version 2.0 (the | ||||
// "License"); you may not use this file except in compliance | ||||
// with the License. You may obtain a copy of the License at | ||||
// | ||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||
// | ||||
// Unless required by applicable law or agreed to in writing, | ||||
// software distributed under the License is distributed on an | ||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||
// KIND, either express or implied. See the License for the | ||||
// specific language governing permissions and limitations | ||||
// under the License. | ||||
|
||||
use arrow::datatypes::DataType; | ||||
use arrow_schema::ArrowError; | ||||
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature}; | ||||
use datafusion_common::DataFusionError; | ||||
use datafusion_functions::math; | ||||
use std::{any::Any, sync::Arc}; | ||||
|
||||
use crate::errors::CometError; | ||||
|
||||
use super::EvalMode; | ||||
|
||||
fn arithmetic_overflow_error(from_type: &str) -> CometError { | ||||
CometError::ArithmeticOverflow { | ||||
from_type: from_type.to_string(), | ||||
} | ||||
} | ||||
|
||||
#[derive(Debug)] | ||||
pub struct CometAbsFunc { | ||||
inner_abs_func: Arc<dyn ScalarUDFImpl>, | ||||
eval_mode: EvalMode, | ||||
data_type_name: String, | ||||
} | ||||
|
||||
impl CometAbsFunc { | ||||
pub fn new(eval_mode: EvalMode, data_type_name: String) -> Self { | ||||
Self { | ||||
inner_abs_func: math::abs().inner(), | ||||
eval_mode, | ||||
data_type_name, | ||||
} | ||||
} | ||||
} | ||||
|
||||
impl ScalarUDFImpl for CometAbsFunc { | ||||
fn as_any(&self) -> &dyn Any { | ||||
self | ||||
} | ||||
fn name(&self) -> &str { | ||||
"abs" | ||||
} | ||||
|
||||
fn signature(&self) -> &Signature { | ||||
self.inner_abs_func.signature() | ||||
} | ||||
|
||||
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType, DataFusionError> { | ||||
self.inner_abs_func.return_type(arg_types) | ||||
} | ||||
|
||||
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> { | ||||
match self.inner_abs_func.invoke(args) { | ||||
Ok(result) => Ok(result), | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically there is no need to match on an
Suggested change
|
||||
Err(DataFusionError::ArrowError(ArrowError::ComputeError(msg), trace)) | ||||
if msg.contains("overflow") => | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if Arrow/DataFusion threw a specific overflow error so that we didn't have to look for a string within the error message, but I guess that isn't available. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am going to file a feature request in DataFusion. I will post the link here later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great! I understand that we have to wait for the next version of arror-rs to be released and integrate it here to be able to make the changes, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can continue with this and do a follow up once the arrow-rs change is available. Or you can wait; the arrow-rs community is very responsive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably better to do it in a follow-up PR, I think. Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would help to log an issue to keep track There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apparently my PR in Arrow will not be available for 3 months because it is an API change 😞 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we upgrade to the version of the arrow with the improved overflow eror reporing then the tests in this PR will fail (because we are looking for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @parthchandra fyi ☝️ |
||||
{ | ||||
if self.eval_mode == EvalMode::Legacy { | ||||
Ok(args[0].clone()) | ||||
} else { | ||||
let msg = arithmetic_overflow_error(&self.data_type_name).to_string(); | ||||
Err(DataFusionError::ArrowError( | ||||
ArrowError::ComputeError(msg), | ||||
trace, | ||||
)) | ||||
} | ||||
} | ||||
other => other, | ||||
} | ||||
} | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,6 @@ use datafusion::{ | |
arrow::{compute::SortOptions, datatypes::SchemaRef}, | ||
common::DataFusionError, | ||
execution::FunctionRegistry, | ||
functions::math, | ||
logical_expr::{ | ||
BuiltinScalarFunction, Operator as DataFusionOperator, ScalarFunctionDefinition, | ||
}, | ||
|
@@ -52,6 +51,7 @@ use datafusion_common::{ | |
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, | ||
JoinType as DFJoinType, ScalarValue, | ||
}; | ||
use datafusion_physical_expr::udf::ScalarUDF; | ||
use itertools::Itertools; | ||
use jni::objects::GlobalRef; | ||
use num::{BigInt, ToPrimitive}; | ||
|
@@ -65,7 +65,7 @@ use crate::{ | |
avg_decimal::AvgDecimal, | ||
bitwise_not::BitwiseNotExpr, | ||
bloom_filter_might_contain::BloomFilterMightContain, | ||
cast::{Cast, EvalMode}, | ||
cast::Cast, | ||
checkoverflow::CheckOverflow, | ||
correlation::Correlation, | ||
covariance::Covariance, | ||
|
@@ -96,6 +96,8 @@ use crate::{ | |
}, | ||
}; | ||
|
||
use super::expressions::{abs::CometAbsFunc, EvalMode}; | ||
|
||
// For clippy error on type_complexity. | ||
type ExecResult<T> = Result<T, ExecutionError>; | ||
type PhyAggResult = Result<Vec<Arc<dyn AggregateExpr>>, ExecutionError>; | ||
|
@@ -492,7 +494,18 @@ impl PhysicalPlanner { | |
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; | ||
let return_type = child.data_type(&input_schema)?; | ||
let args = vec![child]; | ||
let scalar_def = ScalarFunctionDefinition::UDF(math::abs()); | ||
let eval_mode = match spark_expression::EvalMode::try_from(expr.eval_mode)? { | ||
spark_expression::EvalMode::Legacy => EvalMode::Legacy, | ||
spark_expression::EvalMode::Ansi => EvalMode::Ansi, | ||
spark_expression::EvalMode::Try => { | ||
return Err(ExecutionError::GeneralError( | ||
"Invalid EvalMode: \"TRY\"".to_string(), | ||
)) | ||
} | ||
}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that we have this code block duplicated for |
||
let comet_abs = | ||
ScalarUDF::new_from_impl(CometAbsFunc::new(eval_mode, return_type.to_string())); | ||
let scalar_def = ScalarFunctionDefinition::UDF(Arc::new(comet_abs)); | ||
|
||
let expr = | ||
ScalarFunctionExpr::new("abs", scalar_def, args, return_type, None, false); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -480,6 +480,7 @@ message BitwiseNot { | |
|
||
message Abs { | ||
Expr child = 1; | ||
EvalMode eval_mode = 2; | ||
} | ||
|
||
message Subquery { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1489,15 +1489,15 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
None | ||
} | ||
|
||
case Abs(child, _) => | ||
case Abs(child, failOnErr) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we are already using failOnErr, can we simply use this boolean instead of evalmode struct? what are you thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, thanks for your review! The intention why it is done this way is that in the Rust code the ansi mode is always treated the same way whether the expression supports the three ansi modes or only two. If not, we have to do a different treatment for both cases. |
||
val childExpr = exprToProtoInternal(child, inputs) | ||
if (childExpr.isDefined) { | ||
val abs = | ||
ExprOuterClass.Abs | ||
.newBuilder() | ||
.setChild(childExpr.get) | ||
.build() | ||
Some(Expr.newBuilder().setAbs(abs).build()) | ||
val evalModeStr = | ||
if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY | ||
val absBuilder = ExprOuterClass.Abs.newBuilder() | ||
absBuilder.setChild(childExpr.get) | ||
absBuilder.setEvalMode(evalModeStr) | ||
Some(Expr.newBuilder().setAbs(absBuilder).build()) | ||
} else { | ||
withInfo(expr, child) | ||
None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to duplicate the same function from
negative.rs
. Perhaps that one could be moved so that it can be reused here.