-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Make ANSI fallback more granular #509
Changes from all commits
84f6f70
5ab3ee0
e377e44
cccee7d
e0bddd6
338ba6d
54032df
97959a9
bf8ed7f
bc72ba2
e7d648d
60de09c
4d43485
992ad20
9688cc6
901a622
23f5996
9f30b4c
ba21817
fda7b30
1bb6d04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.comet.expressions | ||
|
||
/** | ||
* We cannot reference Spark's EvalMode directly because the package is different between Spark | ||
* versions, so we copy it here. | ||
* | ||
* Expression evaluation modes. | ||
* - LEGACY: the default evaluation mode, which is compliant to Hive SQL. | ||
* - ANSI: a evaluation mode which is compliant to ANSI SQL standard. | ||
* - TRY: a evaluation mode for `try_*` functions. It is identical to ANSI evaluation mode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is |
||
* except for returning null result on errors. | ||
*/ | ||
object CometEvalMode extends Enumeration { | ||
val LEGACY, ANSI, TRY = Value | ||
|
||
def fromBoolean(ansiEnabled: Boolean): Value = if (ansiEnabled) { | ||
ANSI | ||
} else { | ||
LEGACY | ||
} | ||
|
||
def fromString(str: String): CometEvalMode.Value = CometEvalMode.withName(str) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,8 +19,6 @@ | |
|
||
package org.apache.comet.serde | ||
|
||
import java.util.Locale | ||
|
||
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.spark.internal.Logging | ||
|
@@ -45,7 +43,7 @@ import org.apache.spark.unsafe.types.UTF8String | |
|
||
import org.apache.comet.CometConf | ||
import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark32, isSpark34Plus, withInfo} | ||
import org.apache.comet.expressions.{CometCast, Compatible, Incompatible, Unsupported} | ||
import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, Unsupported} | ||
import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} | ||
import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo} | ||
import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, JoinType, Operator} | ||
|
@@ -578,6 +576,15 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
} | ||
} | ||
|
||
def evalModeToProto(evalMode: CometEvalMode.Value): ExprOuterClass.EvalMode = { | ||
evalMode match { | ||
case CometEvalMode.LEGACY => ExprOuterClass.EvalMode.LEGACY | ||
case CometEvalMode.TRY => ExprOuterClass.EvalMode.TRY | ||
case CometEvalMode.ANSI => ExprOuterClass.EvalMode.ANSI | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a 'catch all' case for when someone tries to change CometEvalMode and things don't work as planned? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I have added this. |
||
case _ => throw new IllegalStateException(s"Invalid evalMode $evalMode") | ||
} | ||
} | ||
|
||
/** | ||
* Convert a Spark expression to protobuf. | ||
* | ||
|
@@ -590,18 +597,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
* @return | ||
* The protobuf representation of the expression, or None if the expression is not supported | ||
*/ | ||
|
||
def stringToEvalMode(evalModeStr: String): ExprOuterClass.EvalMode = | ||
evalModeStr.toUpperCase(Locale.ROOT) match { | ||
case "LEGACY" => ExprOuterClass.EvalMode.LEGACY | ||
case "TRY" => ExprOuterClass.EvalMode.TRY | ||
case "ANSI" => ExprOuterClass.EvalMode.ANSI | ||
case invalid => | ||
throw new IllegalArgumentException( | ||
s"Invalid eval mode '$invalid' " | ||
) // Assuming we want to catch errors strictly | ||
} | ||
|
||
def exprToProto( | ||
expr: Expression, | ||
input: Seq[Attribute], | ||
|
@@ -610,15 +605,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
timeZoneId: Option[String], | ||
dt: DataType, | ||
childExpr: Option[Expr], | ||
evalMode: String): Option[Expr] = { | ||
evalMode: CometEvalMode.Value): Option[Expr] = { | ||
val dataType = serializeDataType(dt) | ||
val evalModeEnum = stringToEvalMode(evalMode) // Convert string to enum | ||
|
||
if (childExpr.isDefined && dataType.isDefined) { | ||
val castBuilder = ExprOuterClass.Cast.newBuilder() | ||
castBuilder.setChild(childExpr.get) | ||
castBuilder.setDatatype(dataType.get) | ||
castBuilder.setEvalMode(evalModeEnum) // Set the enum in protobuf | ||
castBuilder.setEvalMode(evalModeToProto(evalMode)) | ||
|
||
val timeZone = timeZoneId.getOrElse("UTC") | ||
castBuilder.setTimezone(timeZone) | ||
|
@@ -646,26 +640,26 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
inputs: Seq[Attribute], | ||
dt: DataType, | ||
timeZoneId: Option[String], | ||
actualEvalModeStr: String): Option[Expr] = { | ||
evalMode: CometEvalMode.Value): Option[Expr] = { | ||
|
||
val childExpr = exprToProtoInternal(child, inputs) | ||
if (childExpr.isDefined) { | ||
val castSupport = | ||
CometCast.isSupported(child.dataType, dt, timeZoneId, actualEvalModeStr) | ||
CometCast.isSupported(child.dataType, dt, timeZoneId, evalMode) | ||
|
||
def getIncompatMessage(reason: Option[String]): String = | ||
"Comet does not guarantee correct results for cast " + | ||
s"from ${child.dataType} to $dt " + | ||
s"with timezone $timeZoneId and evalMode $actualEvalModeStr" + | ||
s"with timezone $timeZoneId and evalMode $evalMode" + | ||
reason.map(str => s" ($str)").getOrElse("") | ||
|
||
castSupport match { | ||
case Compatible(_) => | ||
castToProto(timeZoneId, dt, childExpr, actualEvalModeStr) | ||
castToProto(timeZoneId, dt, childExpr, evalMode) | ||
case Incompatible(reason) => | ||
if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { | ||
logWarning(getIncompatMessage(reason)) | ||
castToProto(timeZoneId, dt, childExpr, actualEvalModeStr) | ||
castToProto(timeZoneId, dt, childExpr, evalMode) | ||
} else { | ||
withInfo( | ||
expr, | ||
|
@@ -677,7 +671,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
withInfo( | ||
expr, | ||
s"Unsupported cast from ${child.dataType} to $dt " + | ||
s"with timezone $timeZoneId and evalMode $actualEvalModeStr") | ||
s"with timezone $timeZoneId and evalMode $evalMode") | ||
None | ||
} | ||
} else { | ||
|
@@ -701,17 +695,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
|
||
case UnaryExpression(child) if expr.prettyName == "trycast" => | ||
val timeZoneId = SQLConf.get.sessionLocalTimeZone | ||
handleCast(child, inputs, expr.dataType, Some(timeZoneId), "TRY") | ||
handleCast(child, inputs, expr.dataType, Some(timeZoneId), CometEvalMode.TRY) | ||
|
||
case Cast(child, dt, timeZoneId, evalMode) => | ||
val evalModeStr = if (evalMode.isInstanceOf[Boolean]) { | ||
// Spark 3.2 & 3.3 has ansiEnabled boolean | ||
if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY" | ||
} else { | ||
// Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY | ||
evalMode.toString | ||
} | ||
handleCast(child, inputs, dt, timeZoneId, evalModeStr) | ||
case c @ Cast(child, dt, timeZoneId, _) => | ||
handleCast(child, inputs, dt, timeZoneId, evalMode(c)) | ||
|
||
case expr if isUnsupportedAnsiExpr(expr) => | ||
withInfo(expr, "ANSI mode not supported") | ||
None | ||
|
||
case add @ Add(left, right, _) if supportedDataType(left.dataType) => | ||
val leftExpr = exprToProtoInternal(left, inputs) | ||
|
@@ -2006,7 +1997,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
// TODO: Remove this once we have new DataFusion release which includes | ||
// the fix: https://github.com/apache/arrow-datafusion/pull/9459 | ||
if (childExpr.isDefined) { | ||
castToProto(None, a.dataType, childExpr, "LEGACY") | ||
castToProto(None, a.dataType, childExpr, CometEvalMode.LEGACY) | ||
} else { | ||
withInfo(expr, a.children: _*) | ||
None | ||
|
@@ -2207,6 +2198,22 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
exprToProtoInternal(newExpr, input) | ||
} | ||
|
||
/** These expressions are not compatible with Spark in ANSI mode yet */ | ||
private def isUnsupportedAnsiExpr(expr: Expression): Boolean = expr match { | ||
case expr: Add => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: Subtract => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: Multiply => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: Divide => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: IntegralDivide => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: Remainder => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: Pmod => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: Round => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: BRound => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: Sum => evalMode(expr) == CometEvalMode.ANSI | ||
case expr: Average => evalMode(expr) == CometEvalMode.ANSI | ||
case _ => false | ||
} | ||
|
||
def scalarExprToProtoWithReturnType( | ||
funcName: String, | ||
returnType: DataType, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Spark 3.x, I feel we still need this protection in case users enable ANSI mode as well as native execution.
If I understand correctly we have not set up CI yet with ANSI enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This current check will make all plans fall back to Spark if ANSI is enabled, so no native plans will run, unless
COMET_ANSI_MODE_ENABLED
is enabled.The changes in this PR mean that we still have the same check but only if the plan actually contains any expressions that would be affected by enabling ANSI support and we still fall back to Spark by default unless
COMET_ANSI_MODE_ENABLED
is enabled.The main risk with this PR is if we don't have the complete list of expressions that support ANSI mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we have the Spark 4 tests where ANSI is enabled and that is catching issues for sure.
I noticed that we were explicitly disabling ANSI mode in CometTestBase and I have removed that so we use the default ANSI mode for whatever Spark version we are running against. This should mean that all of our unit tests will now run with ANSI enabled when running against Spark 4+. Let's see how many issues this finds 😰
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to add a CI pipeline for Spark 3.4 with ANSI enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am close to finish my PR to enable the spark tests with Spark 4.0 I.e. ANSI enabled, and I am disabling expressions that failed those tests with ANSI enabled.
I would propose to hold this part of change for now. Perhaps after we fix all Spark 4.0 issues, we can backport the learning to Spark 3.4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I have moved this to draft for now.