Skip to content

Commit

Permalink
code refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhawvipul committed May 11, 2024
1 parent 2fdf148 commit 39d5ca1
Showing 1 changed file with 54 additions and 76 deletions.
130 changes: 54 additions & 76 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,52 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim

def exprToProtoInternal(expr: Expression, inputs: Seq[Attribute]): Option[Expr] = {
SQLConf.get

def handleCast(
child: Expression,
inputs: Seq[Attribute],
dt: DataType,
timeZoneId: Option[String],
actualEvalModeStr: String): Option[Expr] = {

val childExpr = exprToProtoInternal(child, inputs)
if (childExpr.isDefined) {
val castSupport =
CometCast.isSupported(child.dataType, dt, timeZoneId, actualEvalModeStr)

def getIncompatMessage(reason: Option[String]): String =
s"Comet does not guarantee correct results for cast " +
s"from ${child.dataType} to $dt " +
s"with timezone $timeZoneId and evalMode $actualEvalModeStr" +
reason.map(str => s" ($str)").getOrElse("")

castSupport match {
case Compatible(_) =>
castToProto(timeZoneId, dt, childExpr, actualEvalModeStr)
case Incompatible(reason) =>
if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) {
logWarning(getIncompatMessage(reason))
castToProto(timeZoneId, dt, childExpr, actualEvalModeStr)
} else {
withInfo(
expr,
s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " +
s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true")
None
}
case Unsupported =>
withInfo(
expr,
s"Unsupported cast from ${child.dataType} to $dt " +
s"with timezone $timeZoneId and evalMode $actualEvalModeStr")
None
}
} else {
withInfo(expr, child)
None
}
}

expr match {
case a @ Alias(_, _) =>
val r = exprToProtoInternal(a.child, inputs)
Expand All @@ -617,87 +663,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
val value = cast.eval()
exprToProtoInternal(Literal(value, dataType), inputs)

// do the following only for spark 3.2 and 3.3
case TryCast(child, dt, timeZoneId) =>
val childExpr = exprToProtoInternal(child, inputs)
if (childExpr.isDefined) {
val castSupport = CometCast.isSupported(child.dataType, dt, timeZoneId, "TRY")

def getIncompatMessage(reason: Option[String]) =
"Comet does not guarantee correct results for cast " +
s"from ${child.dataType} to $dt " +
s"with timezone $timeZoneId and evalMode TRY" +
reason.map(str => s" ($str)").getOrElse("")

castSupport match {
case Compatible(_) =>
castToProto(timeZoneId, dt, childExpr, "TRY")
case Incompatible(reason) =>
if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) {
logWarning(getIncompatMessage(reason))
castToProto(timeZoneId, dt, childExpr, "TRY")
} else {
withInfo(
expr,
s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " +
s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true")
None
}
case Unsupported =>
withInfo(
expr,
s"Unsupported cast from ${child.dataType} to $dt " +
s"with timezone $timeZoneId and evalMode TRY")
None
}
} else {
withInfo(expr, child)
None
}
handleCast(child, inputs, dt, timeZoneId, "TRY")

case Cast(child, dt, timeZoneId, evalMode) =>
val childExpr = exprToProtoInternal(child, inputs)
if (childExpr.isDefined) {
val evalModeStr = if (evalMode.isInstanceOf[Boolean]) {
// Spark 3.2 & 3.3 has ansiEnabled boolean
if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY"
} else {
// Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY
evalMode.toString
}
val castSupport =
CometCast.isSupported(child.dataType, dt, timeZoneId, evalModeStr)

def getIncompatMessage(reason: Option[String]) =
"Comet does not guarantee correct results for cast " +
s"from ${child.dataType} to $dt " +
s"with timezone $timeZoneId and evalMode $evalModeStr" +
reason.map(str => s" ($str)").getOrElse("")

castSupport match {
case Compatible(_) =>
castToProto(timeZoneId, dt, childExpr, evalModeStr)
case Incompatible(reason) =>
if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) {
logWarning(getIncompatMessage(reason))
castToProto(timeZoneId, dt, childExpr, evalModeStr)
} else {
withInfo(
expr,
s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " +
s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true")
None
}
case Unsupported =>
withInfo(
expr,
s"Unsupported cast from ${child.dataType} to $dt " +
s"with timezone $timeZoneId and evalMode $evalModeStr")
None
}
val evalModeStr = if (evalMode.isInstanceOf[Boolean]) {
// Spark 3.2 & 3.3 has ansiEnabled boolean
if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY"
} else {
withInfo(expr, child)
None
// Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY
evalMode.toString
}
handleCast(child, inputs, dt, timeZoneId, evalModeStr)

case add @ Add(left, right, _) if supportedDataType(left.dataType) =>
val leftExpr = exprToProtoInternal(left, inputs)
Expand Down

0 comments on commit 39d5ca1

Please sign in to comment.