-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add extended explain info to Comet plan #255
Conversation
* under the License. | ||
*/ | ||
|
||
package org.apache.comet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated to this PR, but we should probably be using org.apache.datafusion.comet
once the repositories get renamed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. But this won't be the only class needing that change. So perhaps better to wait until then and do one bulk update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, absolutely. The move will likely happen next week, so let's discuss after that.
val info1 = if (isSchemaSupported(requiredSchema)) { | ||
CometExplainInfo(s"Schema $requiredSchema is not supported") | ||
} else { | ||
CometExplainInfo.none | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use Scala's Option
type here rather than defining our own none
constant. For example:
val info1 = if (isSchemaSupported(requiredSchema)) {
Some(CometExplainInfo(s"Schema $requiredSchema is not supported"))
} else {
None
}
We would also need to update the call to pass the list of reasons to opWithInfo
to use flatten
so that we only pass the valid reasons.
opWithInfo(scanExec, CometExplainInfo("SCAN", Seq(info1, info2, info3).flatten))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I may have a cleaner way of doing this thoguh, which would make this unnecessary. Give me a little time since I need to change a lot of places to try out the idea. If that doesn't work, I'll change this to use Scala Option
CometTakeOrderedAndProjectExec | ||
.isSupported(s) | ||
._1 => | ||
// TODO: support offset for Spark 3.4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to file an issue to track this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This got copied from some other branch so I've removed it. I'm not too sure what else we need to support for CometTakeOrderedAndProjectExec
but let me find out and log an issue
case b: CometExec => b | ||
case b: CometBroadcastExchangeExec => b | ||
case b: CometShuffleExchangeExec => b |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very minor nit: we could combine these if we want to and also avoid the extra variable b
.
case b: CometExec => b | |
case b: CometBroadcastExchangeExec => b | |
case b: CometShuffleExchangeExec => b | |
case _: CometExec | _: CometBroadcastExchangeExec | _: CometShuffleExchangeExec => op |
if (s.nonEmpty) { | ||
info = info :+ s | ||
} | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For longer lambda expression, we can ignore parentheses. Why do we have a map(t => t)
?
sorted.foreach { p =>
val s = getActualPlan(p).getTagValue(CometExplainInfo.EXTENSION_INFO).map(t => t).getOrElse("")
if (s.nonEmpty) {
info = info :+ s
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
case _: SparkPlan => traversed.enqueue(getActualPlan(c.asInstanceOf[SparkPlan])) | ||
case _ => | ||
} | ||
() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, remove the unneeded parentheses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chnaged
Looking at the ci failures, will address the comments as well. |
07eb3df
to
9456f6c
Compare
@andygrove I changed the core of the implementation. Instead of setting information in a CometExplainInfo structure and bubbling it up in the plan, I now set the explain information as a spark tag in the plan or expression. This makes the changes easier to implement as we add support for more operators and expressions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a quick review, this is neat work. Thanks.
IIUC, the required Spark feature is SPARK-47289? I think it would be helpful to put this link in the PR summary or commit message, so that users know which PR/patch to apply to their own 3.x Spark.
Query: q2 TPCH Snappy. Comet Exec: Enabled (CometSort, CometSortMergeJoin, CometProject, CometFilter) | ||
Query: q2 TPCH Snappy: ExplainInfo: | ||
ObjectHashAggregate is not supported | ||
might_contain is not supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, might_contain
(org.apache.spark.sql.catalyst.expressions.BloomFilterMightContain) should already been supported in #179. Its child expression might be a XXHash64
, which is not supported yet.
I'm wondering if this output is a false positive? However I cannot reproduce the might_contain
case in my local 1GB TPCH runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch! I was catching this but in FilterExec I was not propagating the correct error. Fixed.
BroadcastHashJoin is not supported | ||
SortMergeJoin is not supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BroadcastHashJoin and SortMergeJoin should also been supported? There should be some more specific reason why these are not supported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another good catch! I reviewed everything and found a few more paths where the error was not being caught. Fixed.
@@ -98,6 +105,11 @@ trait CometTPCQueryListBase | |||
} else { | |||
out.println(s"Query: $name$nameSuffix. Comet Exec: Disabled") | |||
} | |||
if (supportsExtendedExplainInfo(df.queryExecution)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the if
should be removed? Otherwise, running with Vanilla Spark 3.x, the ExplainInfo is not printed.
(P.S: I ran the TPCH query and found no ExplainInfo with open source Spark 3.4).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Done
var info1: Option[String] = None | ||
if (isSchemaSupported(requiredSchema)) { | ||
info1 = Some(s"Schema $requiredSchema is not supported") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could add a helper method to reduce the boilerplate for each of these sections for creating the Option[String]
.
We could add a method like this:
def createMessage(condition: Boolean, message: => String): Option[String] = {
if (condition) {
Some(message)
} else {
None
}
}
The call site could then be a one-liner:
val info1 = createMessage(isSchemaSupported(requiredSchema), s"Schema $requiredSchema is not supported")
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
edit: I just updated the suggested code to use the pass-by-name message: => String
syntax so that the message is evaluated lazily
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* @return | ||
* The node with information (if any) attached | ||
*/ | ||
def withInfo[T <: TreeNode[_]](node: T, info: String, exprs: T*): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love the idea of storing the metadata on the plan using Spark's tagging mechanism (we did the same thing in Spark RAPIDS).
59ac574
to
f361846
Compare
@advancedxy, @andygrove Addressed your comments. I was missing a few paths where the explanation was not being propagated correctly. I've fixed the ones I could find. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @parthchandra
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Looking into the ci failures ... |
92ae83d
to
a8a228c
Compare
@andygrove could you please trigger a re-run of the failed ci workflow? (Only committers can do that). I cannot reproduce the failure locally and am pretty sure it has nothing to do with this PR. |
Re-triggered. |
Could we merge this please? @viirya @andygrove |
s"Schema $requiredSchema is not supported") | ||
val info2 = createMessage( | ||
!isSchemaSupported(partitionSchema), | ||
s"Schema $partitionSchema is not supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s"Schema $partitionSchema is not supported") | |
s"Partition schema $partitionSchema is not supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
info.distinct.mkString("\n").trim | ||
} | ||
|
||
private def getActualPlan(node: TreeNode[_]): TreeNode[_] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this only works on SparkPlan, I'm wondering why we use TreeNode[_]
in many places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used TreeNode because we can add the extended info tag to any TreeNode (SparkPlan, Expression, or AggregateExpression). This particular method only operates on SparkPlan, but I couldn't get the compiler to agree with me :( so I finally left it as a TreeNode.
info.filter(!_.contentEquals("\n")) | ||
} | ||
|
||
// get all plan nodes, breadth first, leaf nodes first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
breadth first and leaf nodes first seem conflicting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reversed breadth first would be a more accurate description I suppose. The traversal is BF but the order is reversed at the end.
|
||
def supportsExtendedExplainInfo(qe: QueryExecution): Boolean = { | ||
try { | ||
// Look for QueryExecution.extendedExplainInfo(scala.Function1[String, Unit], SparkPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add clear reference here that when it is added to Spark, e.g., Spark xx+?
/** | ||
* A trait for a session extension to implement that provides addition explain plan information. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** | |
* A trait for a session extension to implement that provides addition explain plan information. | |
*/ | |
/** | |
* A trait for a session extension to implement that provides addition explain plan information. | |
* We copy this from Spark 4.0 since this trait is not available in Spark 3.x. We can remove this | |
* after dropping Spark 3.x support. | |
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
"spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold" -> "1MB", | ||
"spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold" -> "1MB") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need these bloomFilter
configs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows us to simulate the plan produced at larger loads. When we run this on a 1TB dataset, we get bloom filters enabled because the thresholds are met. However for smaller datasets, we need to lower the thresholds so that bloom filters are enabled.
Added this as a comment.
@@ -917,6 +1026,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde { | |||
.setStringSpace(builder) | |||
.build()) | |||
} else { | |||
withInfo(expr, null, child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we override this method for this kind of usage so we can do withInfo(expr, child)
? There are a lot of withInfo(expr, null, ...)
calls.
Can be in a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. There are one or two cases where the second param is not null. But mostly it is null.
There is conflict needed to be resolved. And I have a few minor comments like code comment changes. |
BTW, you can close and re-open the PR to re-trigger the CI workflow. Or you can push an empty commit to re-trigger. |
Requires Spark 4.0.0 for the explain info to be visible in Spark UI. (see: https://issues.apache.org/jira/browse/SPARK-47289)
b909fa8
to
d1b948b
Compare
Thank you @parthchandra. I will merge this once CI passes. |
Thanks for the review @viirya @andygrove @advancedxy |
Merged. Thanks all. |
* feat: Add extended explain info to Comet plan Requires Spark 4.0.0 for the explain info to be visible in Spark UI. (see: https://issues.apache.org/jira/browse/SPARK-47289) * spotless apply * Address review comments * fix ci * Add one more explanation * address review comments * fix formatting after rebase
Which issue does this PR close?
Closes #253
Rationale for this change
Adds addition planning information to a Spark plan which allows us to track the reasons why a Spark plan was not fully converted to Comet.
What changes are included in this PR?
This PR adds a
CometExplainInfo
structure that is returned by every step of the planning process. Eventually the Structure is attached to the Spark plan being returned by adding it as a Comet specific tagHow are these changes tested?
Additional unit test and tested with all TPCH and TPCDS queries with both Spark 3.4 and Spark 3.4 with the extended plan support.
Also regenerated the
CometTPC*QueriesList
output for both TPCH and TPCDS run against a 1TB dataset.Note: Since the extended plan support does not exist before Spark 4, the output of
CometTPC*QueriesList
was updated with a build of Spark 3.4 with the feature backported.