Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed May 5, 2024
1 parent 7aba5e7 commit 66e74e6
Showing 1 changed file with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{createMessage, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo}
import org.apache.comet.CometSparkSessionExtensions.{createMessage, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
Expand Down Expand Up @@ -120,7 +120,7 @@ class CometSparkSessionExtensions
val info3 = createMessage(
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined,
"Comet does not support pushed aggregate")
withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString("\n"))
withInfos(scanExec, Seq(info1, info2, info3).flatten.toSet)
scanExec

// Other datasource V2 scan
Expand All @@ -147,7 +147,7 @@ class CometSparkSessionExtensions
!isSchemaSupported(scanExec.scan.readSchema()),
"Comet extension is not enabled for " +
s"${scanExec.scan.getClass.getSimpleName}: Schema not supported")
withInfo(scanExec, Seq(info1, info2).flatten.mkString("\n"))
withInfos(scanExec, Seq(info1, info2).flatten.toSet)

// If it is data source V2 other than Parquet or Iceberg,
// attach the unsupported reason to the plan.
Expand Down Expand Up @@ -1042,12 +1042,36 @@ object CometSparkSessionExtensions extends Logging {
* The node with information (if any) attached
*/
def withInfo[T <: TreeNode[_]](node: T, info: String, exprs: T*): T = {
val exprInfo = exprs.flatMap(_.getTagValue(CometExplainInfo.EXTENSION_INFO)).flatten.toSet
if (info != null && info.nonEmpty) {
node.setTagValue(CometExplainInfo.EXTENSION_INFO, exprInfo + info)
// support existing approach of passing in multiple infos in a newline-delimited string
val infoSet = if (info == null || info.isEmpty) {
Set.empty[String]
} else {
node.setTagValue(CometExplainInfo.EXTENSION_INFO, exprInfo)
info.split("\n").toSet
}
withInfos(node, infoSet, exprs: _*)
}

/**
* Attaches explain information to a TreeNode, rolling up the corresponding information tags
* from any child nodes. For now, we are using this to attach the reasons why certain Spark
* operators or expressions are disabled.
*
* @param node
* The node to attach the explain information to. Typically a SparkPlan
* @param info
* Information text. May contain zero or more strings. If not provided, then only information
* from child nodes will be included.
* @param exprs
* Child nodes. Information attached in these nodes will be be included in the information
* attached to @node
* @tparam T
* The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression
* @return
* The node with information (if any) attached
*/
def withInfos[T <: TreeNode[_]](node: T, info: Set[String], exprs: T*): T = {
val exprInfo = exprs.flatMap(_.getTagValue(CometExplainInfo.EXTENSION_INFO)).flatten.toSet
node.setTagValue(CometExplainInfo.EXTENSION_INFO, exprInfo ++ info)
node
}

Expand All @@ -1066,7 +1090,7 @@ object CometSparkSessionExtensions extends Logging {
* The node with information (if any) attached
*/
def withInfo[T <: TreeNode[_]](node: T, exprs: T*): T = {
withInfo(node, "", exprs: _*)
withInfos(node, Set.empty, exprs: _*)
}

// Helper to reduce boilerplate
Expand Down

0 comments on commit 66e74e6

Please sign in to comment.