-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Enable columnar shuffle by default #250
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #250 +/- ##
============================================
+ Coverage 33.47% 33.50% +0.03%
- Complexity 795 798 +3
============================================
Files 110 110
Lines 37533 37541 +8
Branches 8215 8217 +2
============================================
+ Hits 12563 12579 +16
+ Misses 22322 22321 -1
+ Partials 2648 2641 -7 ☔ View full report in Codecov by Sentry. |
Hmm, all tests in TPCDSQuerySuite` are passed locally with this PR. I need to look at the CI failure. |
be83771
to
ef013c1
Compare
Observed several Spark SQL test failures regarding aggregation: #260 |
@@ -1414,6 +1424,7 @@ index ed2e309fa07..4cfe0093da7 100644 | |||
+ .set("spark.shuffle.manager", | |||
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") | |||
+ .set("spark.comet.exec.shuffle.enabled", "true") | |||
+ .set("spark.comet.memoryOverhead", "10g") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Observed that Comet is unable to acquire enough memory for columnar shuffle when doing Spark SQL tests:
For example, DatasetPrimitiveSuite:
Cause: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 37.0 failed 1 times, most recent failure: Lost task 1.0 in stage 37.0 (TID 75) (e4773b5abe7e executor driver): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 67108848 bytes of memory, got 96 bytes. Available: 96
[info] at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.allocate(CometShuffleMemoryAllocator.java:132)
[info] at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.allocatePage(CometShuffleMemoryAllocator.java:119)
[info] at org.apache.spark.sql.comet.execution.shuffle.SpillWriter.initialCurrentPage(SpillWriter.java:158)
[info] at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.insertRow(CometDiskBlockWriter.java:284)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increased Comet memoryoverhead to overcome it.
3e3daea
to
77e5604
Compare
77e5604
to
e3d861c
Compare
54da021
to
7465384
Compare
7465384
to
ace91fe
Compare
ace91fe
to
a6e2d16
Compare
a6e2d16
to
b145499
Compare
dcac8f9
to
30043e4
Compare
e91bac1
to
9767acc
Compare
9767acc
to
edfce1f
Compare
37c8186
to
b37070d
Compare
I fixed all Spark SQL test failures. Now waiting for #380 to be merged. |
protected val aliasCandidateLimit: Int = | ||
conf.getConfString("spark.sql.optimizer.expressionProjectionCandidateLimit", "100").toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some Spark tests tune this config. So we need to get the configured value.
@andygrove @sunchao Could you take a look and see if you have any comments on this? Thanks. |
Co-authored-by: Andy Grove <[email protected]>
Co-authored-by: Andy Grove <[email protected]>
dev/diffs/3.4.2.diff
Outdated
assert( | ||
- collect(df.queryExecution.executedPlan) { case e: ShuffleExchangeExec => e }.size == expected) | ||
+ collect(df.queryExecution.executedPlan) { | ||
+ case _: ShuffleExchangeExec | _: CometShuffleExchangeExec => 1 }.size == expected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just check for ShuffleExchangeLike
instead (and even push that change upstream?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. For upstream, not sure if it is accepted, but we can try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to check ShuffleExchangeLike
instead for the places which is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not familiar with all of the Spark tests in the patch, but the changes to Comet LGTM
Thank you @andygrove |
Merged. Thanks @andygrove for review. |
* feat: Enable columnar shuffle by default * Update plan stability * Fix * Update diff * Add Comet memoryoverhead for Spark SQL tests * Update plan stability * Update diff * Update more diff * Update DataFusion commit * Update diff * Update diff * Update diff * Update diff * Update diff * Fix more tests * Fix more * Fix * Fix more * Fix more * Fix more * Fix more * Fix more * Update diff * Fix memory leak * Update plan stability * Restore diff * Update core/src/execution/datafusion/planner.rs Co-authored-by: Andy Grove <[email protected]> * Update core/src/execution/datafusion/planner.rs Co-authored-by: Andy Grove <[email protected]> * Fix style * Use ShuffleExchangeLike instead --------- Co-authored-by: Andy Grove <[email protected]>
Which issue does this PR close?
Closes #95.
Rationale for this change
What changes are included in this PR?
How are these changes tested?