From 66e74e6ad8e3fbe5133e3a49fb3e3d238b434876 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 5 May 2024 10:43:22 -0600 Subject: [PATCH] refactor --- .../comet/CometSparkSessionExtensions.scala | 40 +++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 04f36aa4c..ef55f81c4 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf._ -import org.apache.comet.CometSparkSessionExtensions.{createMessage, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -120,7 +120,7 @@ class CometSparkSessionExtensions val info3 = createMessage( getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined, "Comet does not support pushed aggregate") - withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString("\n")) + withInfos(scanExec, Seq(info1, info2, info3).flatten.toSet) scanExec // Other datasource V2 scan @@ -147,7 +147,7 @@ class CometSparkSessionExtensions !isSchemaSupported(scanExec.scan.readSchema()), "Comet extension is not enabled for " + s"${scanExec.scan.getClass.getSimpleName}: Schema not supported") - withInfo(scanExec, Seq(info1, info2).flatten.mkString("\n")) + withInfos(scanExec, Seq(info1, info2).flatten.toSet) // If it is data source V2 other than Parquet or Iceberg, // attach the unsupported reason to the plan. @@ -1042,12 +1042,36 @@ object CometSparkSessionExtensions extends Logging { * The node with information (if any) attached */ def withInfo[T <: TreeNode[_]](node: T, info: String, exprs: T*): T = { - val exprInfo = exprs.flatMap(_.getTagValue(CometExplainInfo.EXTENSION_INFO)).flatten.toSet - if (info != null && info.nonEmpty) { - node.setTagValue(CometExplainInfo.EXTENSION_INFO, exprInfo + info) + // support existing approach of passing in multiple infos in a newline-delimited string + val infoSet = if (info == null || info.isEmpty) { + Set.empty[String] } else { - node.setTagValue(CometExplainInfo.EXTENSION_INFO, exprInfo) + info.split("\n").toSet } + withInfos(node, infoSet, exprs: _*) + } + + /** + * Attaches explain information to a TreeNode, rolling up the corresponding information tags + * from any child nodes. For now, we are using this to attach the reasons why certain Spark + * operators or expressions are disabled. + * + * @param node + * The node to attach the explain information to. Typically a SparkPlan + * @param info + * Information text. May contain zero or more strings. If not provided, then only information + * from child nodes will be included. + * @param exprs + * Child nodes. Information attached in these nodes will be be included in the information + * attached to @node + * @tparam T + * The type of the TreeNode. Typically SparkPlan, AggregateExpression, or Expression + * @return + * The node with information (if any) attached + */ + def withInfos[T <: TreeNode[_]](node: T, info: Set[String], exprs: T*): T = { + val exprInfo = exprs.flatMap(_.getTagValue(CometExplainInfo.EXTENSION_INFO)).flatten.toSet + node.setTagValue(CometExplainInfo.EXTENSION_INFO, exprInfo ++ info) node } @@ -1066,7 +1090,7 @@ object CometSparkSessionExtensions extends Logging { * The node with information (if any) attached */ def withInfo[T <: TreeNode[_]](node: T, exprs: T*): T = { - withInfo(node, "", exprs: _*) + withInfos(node, Set.empty, exprs: _*) } // Helper to reduce boilerplate