-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-50372][CONNECT][SQL] Make all DF execution path collect observed metrics #48920
base: master
Are you sure you want to change the base?
Conversation
CollectMetrics(name, metrics.map(_.named), transformRelation(rel.getInput), planId) | ||
CollectMetrics(name, metrics.map(_.named), input, planId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a bug where the input of a CollectMetrics
can be processed two times, once in Line 1190 and once here/below.
When the input
contains another CollectMetrics
, transforming it twice will cause two Observation
objects (in the input) to be initialised and registered two times to the system. Since only one of them will be fulfilled when the query finishes, the one we'll be looking at may not have any data.
This issue is highlighted in the test case Observation.get is blocked until the query is finished ...
, where we specifically execute observedObservedDf
, which is a CollectMetrics
that has another CollectMetrics
as its input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing! Mostly LGTM, minor ask
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good but mind listing up affected API in the PR description?
Done! |
What changes were proposed in this pull request?
This PR fixes an issue that some of DataFrame execution paths would not process
ObservedMetrics
. The fix is done by injecting a lazy processing logic into the result iterator.The following private execution APIs are affected by this issue:
SparkSession.execute(proto.Relation.Builder)
SparkSession.execute(proto.Command)
SparkSession.execute(proto.Plan)
The following user-facing API is affected by this issue:
DataFrame.write.format("...").mode("...").save()
This PR also fixes an issue in which on the Server side, two observed metrics can be assigned to the same Plan ID when they are in the same plan (e.g., one observation is used as the input of another). The fix is to traverse the plan and find all observations with correct IDs.
Another bug is discovered as a byproduct of introducing a new test case. Copying the PR comment here from SparkConnectPlanner.scala:
Why are the changes needed?
To fix a bug.
Does this PR introduce any user-facing change?
Yes, this bug is user-facing.
How was this patch tested?
New tests.
Was this patch authored or co-authored using generative AI tooling?
No.