Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add COMET_SHUFFLE_MODE config to control Comet shuffle mode #460

Merged
merged 1 commit into from
May 23, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented May 22, 2024

Which issue does this PR close?

Closes #459.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@viirya viirya force-pushed the auto_shuffle_config branch 8 times, most recently from 8660c0c to 2bb0efd Compare May 23, 2024 04:20



`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@viirya viirya force-pushed the auto_shuffle_config branch 2 times, most recently from 07c9ba2 to f3f46bd Compare May 23, 2024 14:43
@viirya
Copy link
Member Author

viirya commented May 23, 2024

cc @sunchao

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM in general. How do we pick shuffle mode when it is auto? I don't seem to find the logic in this PR.

"By default, this config is 'jvm'.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("native", "jvm", "auto"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -39,6 +38,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective only if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general.'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle.'auto' is for Comet to choose the best shuffle mode based on the query plan.By default, this config is 'jvm'. | jvm |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is only effective only -> is only effective

@@ -39,6 +38,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective only if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general.'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle.'auto' is for Comet to choose the best shuffle mode based on the query plan.By default, this config is 'jvm'. | jvm |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also spaces before jvm and auto

Seq(true, false).foreach { cometColumnShuffleEnabled =>
withSQLConf(
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) {
Seq("native", "jvm").foreach { cometColumnShuffleEnabled =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe update the variable name too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

@@ -134,14 +134,14 @@ class CometExecSuite extends CometTestBase {
.toDF("c1", "c2")
.createOrReplaceTempView("v")

Seq(true, false).foreach { columnarShuffle =>
Seq("native", "jvm").foreach { columnarShuffle =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shuffleMode?

@viirya
Copy link
Member Author

viirya commented May 23, 2024

LGTM in general. How do we pick shuffle mode when it is auto? I don't seem to find the logic in this PR.

When it is auto, Comet chooses native shuffle if possible as it shows better performance. If it is not available (unsupported cases), Comet uses jvm-based columnar shuffle instead.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@viirya
Copy link
Member Author

viirya commented May 23, 2024

Thank you @sunchao

@viirya viirya force-pushed the auto_shuffle_config branch from 678cd8c to 0c8f0f9 Compare May 23, 2024 18:50
@viirya
Copy link
Member Author

viirya commented May 23, 2024

Error:  /Users/runner/work/datafusion-comet/datafusion-comet/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala:1215: value COMET_COLUMNAR_SHUFFLE_ENABLED is not a member of object org.apache.comet.CometConf

Weird. CI reports the above compilation error, but I don't see COMET_COLUMNAR_SHUFFLE_ENABLED in CometAggregateSuite locally...

@viirya viirya closed this May 23, 2024
@viirya viirya reopened this May 23, 2024
@viirya
Copy link
Member Author

viirya commented May 23, 2024

Oh, it is from one patch just merged.

@viirya viirya force-pushed the auto_shuffle_config branch from 0c8f0f9 to 586e1a7 Compare May 23, 2024 20:03
Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending CI

@viirya viirya merged commit 507e475 into apache:main May 23, 2024
40 checks passed
@viirya viirya deleted the auto_shuffle_config branch May 23, 2024 21:07
@viirya
Copy link
Member Author

viirya commented May 23, 2024

Merged. Thanks @sunchao @kazuyukitanimura @andygrove

himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Automatically pick best shuffle implementation for a query
4 participants