From 39d5ca1cd672e9ec7f7a4cef50a54f9806a1729d Mon Sep 17 00:00:00 2001 From: Vipul Vaibhaw Date: Sat, 11 May 2024 16:56:41 +0530 Subject: [PATCH] code refactor --- .../apache/comet/serde/QueryPlanSerde.scala | 130 ++++++++---------- 1 file changed, 54 insertions(+), 76 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9aeea7aaa..6488db1ce 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -604,6 +604,52 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim def exprToProtoInternal(expr: Expression, inputs: Seq[Attribute]): Option[Expr] = { SQLConf.get + + def handleCast( + child: Expression, + inputs: Seq[Attribute], + dt: DataType, + timeZoneId: Option[String], + actualEvalModeStr: String): Option[Expr] = { + + val childExpr = exprToProtoInternal(child, inputs) + if (childExpr.isDefined) { + val castSupport = + CometCast.isSupported(child.dataType, dt, timeZoneId, actualEvalModeStr) + + def getIncompatMessage(reason: Option[String]): String = + s"Comet does not guarantee correct results for cast " + + s"from ${child.dataType} to $dt " + + s"with timezone $timeZoneId and evalMode $actualEvalModeStr" + + reason.map(str => s" ($str)").getOrElse("") + + castSupport match { + case Compatible(_) => + castToProto(timeZoneId, dt, childExpr, actualEvalModeStr) + case Incompatible(reason) => + if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { + logWarning(getIncompatMessage(reason)) + castToProto(timeZoneId, dt, childExpr, actualEvalModeStr) + } else { + withInfo( + expr, + s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " + + s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true") + None + } + case Unsupported => + withInfo( + expr, + s"Unsupported cast from ${child.dataType} to $dt " + + s"with timezone $timeZoneId and evalMode $actualEvalModeStr") + None + } + } else { + withInfo(expr, child) + None + } + } + expr match { case a @ Alias(_, _) => val r = exprToProtoInternal(a.child, inputs) @@ -617,87 +663,19 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim val value = cast.eval() exprToProtoInternal(Literal(value, dataType), inputs) + // do the following only for spark 3.2 and 3.3 case TryCast(child, dt, timeZoneId) => - val childExpr = exprToProtoInternal(child, inputs) - if (childExpr.isDefined) { - val castSupport = CometCast.isSupported(child.dataType, dt, timeZoneId, "TRY") - - def getIncompatMessage(reason: Option[String]) = - "Comet does not guarantee correct results for cast " + - s"from ${child.dataType} to $dt " + - s"with timezone $timeZoneId and evalMode TRY" + - reason.map(str => s" ($str)").getOrElse("") - - castSupport match { - case Compatible(_) => - castToProto(timeZoneId, dt, childExpr, "TRY") - case Incompatible(reason) => - if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { - logWarning(getIncompatMessage(reason)) - castToProto(timeZoneId, dt, childExpr, "TRY") - } else { - withInfo( - expr, - s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " + - s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true") - None - } - case Unsupported => - withInfo( - expr, - s"Unsupported cast from ${child.dataType} to $dt " + - s"with timezone $timeZoneId and evalMode TRY") - None - } - } else { - withInfo(expr, child) - None - } + handleCast(child, inputs, dt, timeZoneId, "TRY") case Cast(child, dt, timeZoneId, evalMode) => - val childExpr = exprToProtoInternal(child, inputs) - if (childExpr.isDefined) { - val evalModeStr = if (evalMode.isInstanceOf[Boolean]) { - // Spark 3.2 & 3.3 has ansiEnabled boolean - if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY" - } else { - // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY - evalMode.toString - } - val castSupport = - CometCast.isSupported(child.dataType, dt, timeZoneId, evalModeStr) - - def getIncompatMessage(reason: Option[String]) = - "Comet does not guarantee correct results for cast " + - s"from ${child.dataType} to $dt " + - s"with timezone $timeZoneId and evalMode $evalModeStr" + - reason.map(str => s" ($str)").getOrElse("") - - castSupport match { - case Compatible(_) => - castToProto(timeZoneId, dt, childExpr, evalModeStr) - case Incompatible(reason) => - if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) { - logWarning(getIncompatMessage(reason)) - castToProto(timeZoneId, dt, childExpr, evalModeStr) - } else { - withInfo( - expr, - s"${getIncompatMessage(reason)}. To enable all incompatible casts, set " + - s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true") - None - } - case Unsupported => - withInfo( - expr, - s"Unsupported cast from ${child.dataType} to $dt " + - s"with timezone $timeZoneId and evalMode $evalModeStr") - None - } + val evalModeStr = if (evalMode.isInstanceOf[Boolean]) { + // Spark 3.2 & 3.3 has ansiEnabled boolean + if (evalMode.asInstanceOf[Boolean]) "ANSI" else "LEGACY" } else { - withInfo(expr, child) - None + // Spark 3.4+ has EvalMode enum with values LEGACY, ANSI, and TRY + evalMode.toString } + handleCast(child, inputs, dt, timeZoneId, evalModeStr) case add @ Add(left, right, _) if supportedDataType(left.dataType) => val leftExpr = exprToProtoInternal(left, inputs)