Skip to content

Commit

Permalink
chore: Enable shuffle by default (apache#881)
Browse files Browse the repository at this point in the history
* enable shuffle by default

* disable shuffle in CometTestBase

* format

* fix regressions

* fix

* fix more

* fix more

* fix regression

* fix regressions

* Revert refactor

* format

* update docs

(cherry picked from commit be10fee)
  • Loading branch information
andygrove authored and huaxingao committed Aug 29, 2024
1 parent 992b93c commit c39356d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 32 deletions.
5 changes: 3 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
.doc(
"Whether to enable Comet native shuffle. By default, this config is false. " +
"Whether to enable Comet native shuffle. " +
"Note that this requires setting 'spark.shuffle.manager' to " +
"'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. " +
"'spark.shuffle.manager' must be set before starting the Spark application and " +
"cannot be changed during the application.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val COMET_SHUFFLE_MODE: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
.doc("The mode of Comet shuffle. This config is only effective if Comet shuffle " +
Expand All @@ -198,6 +198,7 @@ object CometConf extends ShimCometConf {
"'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. " +
"'auto' is for Comet to choose the best shuffle mode based on the query plan. " +
"By default, this config is 'auto'.")
.internal()
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("native", "jvm", "auto"))
Expand Down
3 changes: 1 addition & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'auto'. | auto |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false",
EXTENDED_EXPLAIN_PROVIDERS_KEY -> "org.apache.comet.ExtendedExplainInfo") {
val table = "test"
withTable(table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,13 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
(-0.0.asInstanceOf[Float], 2),
(0.0.asInstanceOf[Float], 3),
(Float.NaN, 4))
withParquetTable(data, "tbl", dictionaryEnabled) {
checkSparkAnswer("SELECT SUM(_2), MIN(_2), MAX(_2), _1 FROM tbl GROUP BY _1")
checkSparkAnswer("SELECT MIN(_1), MAX(_1), MIN(_2), MAX(_2) FROM tbl")
checkSparkAnswer("SELECT AVG(_2), _1 FROM tbl GROUP BY _1")
checkSparkAnswer("SELECT AVG(_1), AVG(_2) FROM tbl")
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {
withParquetTable(data, "tbl", dictionaryEnabled) {
checkSparkAnswer("SELECT SUM(_2), MIN(_2), MAX(_2), _1 FROM tbl GROUP BY _1")
checkSparkAnswer("SELECT MIN(_1), MAX(_1), MIN(_2), MAX(_2) FROM tbl")
checkSparkAnswer("SELECT AVG(_2), _1 FROM tbl GROUP BY _1")
checkSparkAnswer("SELECT AVG(_1), AVG(_2) FROM tbl")
}
}
}
}
Expand Down Expand Up @@ -582,15 +584,23 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withParquetTable(path.toUri.toString, "tbl") {
withView("v") {
sql("CREATE TEMP VIEW v AS SELECT _g1, _g2, _3 FROM tbl ORDER BY _3")
checkSparkAnswer("SELECT _g1, _g2, FIRST(_3) FROM v GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, LAST(_3) FROM v GROUP BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, FIRST(_3) FROM v GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, LAST(_3) FROM v GROUP BY _g1, _g2 ORDER BY _g1, _g2")
}
checkSparkAnswer("SELECT _g1, _g2, SUM(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, COUNT(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, SUM(DISTINCT _3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, COUNT(DISTINCT _3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, MIN(_3), MAX(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, AVG(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, SUM(_3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, COUNT(_3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, SUM(DISTINCT _3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, COUNT(DISTINCT _3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, MIN(_3), MAX(_3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, AVG(_3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
}
}
}
Expand All @@ -603,7 +613,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withTable("t") {
sql("CREATE TABLE t(v VARCHAR(3), i INT) USING PARQUET")
sql("INSERT INTO t VALUES ('c', 1)")
checkSparkAnswerAndNumOfAggregates("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v", 1)
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {
checkSparkAnswerAndNumOfAggregates("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v", 1)
}
}
}

Expand All @@ -623,19 +635,22 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withView("v") {
sql("CREATE TEMP VIEW v AS SELECT _g3, _g4, _3, _4 FROM tbl ORDER BY _3, _4")
checkSparkAnswer(
"SELECT _g3, _g4, FIRST(_3), FIRST(_4) FROM v GROUP BY _g3, _g4")
checkSparkAnswer("SELECT _g3, _g4, LAST(_3), LAST(_4) FROM v GROUP BY _g3, _g4")
"SELECT _g3, _g4, FIRST(_3), FIRST(_4) FROM v GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, LAST(_3), LAST(_4) FROM v GROUP BY _g3, _g4 ORDER BY _g3, _g4")
}
checkSparkAnswer("SELECT _g3, _g4, SUM(_3), SUM(_4) FROM tbl GROUP BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, SUM(DISTINCT _3), SUM(DISTINCT _4) FROM tbl GROUP BY _g3, _g4")
"SELECT _g3, _g4, SUM(_3), SUM(_4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, SUM(DISTINCT _3), SUM(DISTINCT _4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, COUNT(_3), COUNT(_4) FROM tbl GROUP BY _g3, _g4")
"SELECT _g3, _g4, COUNT(_3), COUNT(_4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, COUNT(DISTINCT _3), COUNT(DISTINCT _4) FROM tbl GROUP BY _g3, _g4")
"SELECT _g3, _g4, COUNT(DISTINCT _3), COUNT(DISTINCT _4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, MIN(_3), MAX(_3), MIN(_4), MAX(_4) FROM tbl GROUP BY _g3, _g4")
checkSparkAnswer("SELECT _g3, _g4, AVG(_3), AVG(_4) FROM tbl GROUP BY _g3, _g4")
"SELECT _g3, _g4, MIN(_3), MAX(_3), MIN(_4), MAX(_4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, AVG(_3), AVG(_4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
}
}
}
Expand All @@ -654,21 +669,24 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
makeParquetFile(path, numValues, numGroups, dictionaryEnabled)
withParquetTable(path.toUri.toString, "tbl") {
Seq(128, numValues + 100).foreach { batchSize =>
withSQLConf(CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) {
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> batchSize.toString,
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {

// Test all combinations of different aggregation & group-by types
(1 to 14).foreach { gCol =>
withView("v") {
sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _1, _2, _3, _4 " +
"FROM tbl ORDER BY _1, _2, _3, _4")
checkSparkAnswer(s"SELECT _g$gCol, FIRST(_1), FIRST(_2), FIRST(_3), " +
s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM v GROUP BY _g$gCol")
s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM v GROUP BY _g$gCol ORDER BY _g$gCol")
}
checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2), COUNT(_3), COUNT(_4), " +
s"MIN(_1), MAX(_4), AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol")
checkSparkAnswer(s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol")
s"MIN(_1), MAX(_4), AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol ORDER BY _g$gCol")
checkSparkAnswer(
s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol ORDER BY _g$gCol")
checkSparkAnswer(
s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol")
s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol ORDER BY _g$gCol")
}
}
}
Expand Down Expand Up @@ -837,7 +855,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
(0 until 5).map(i => (i.toDouble, i.toDouble % 2)),
"tbl",
dictionaryEnabled) {
checkSparkAnswerAndNumOfAggregates("SELECT _2 , AVG(_1) FROM tbl GROUP BY _2", 1)
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {
checkSparkAnswerAndNumOfAggregates("SELECT _2 , AVG(_1) FROM tbl GROUP BY _2", 1)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ abstract class CometTestBase
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
// TODO we should no longer be disabling COALESCE_PARTITIONS_ENABLED
conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, "false")
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
conf
Expand Down

0 comments on commit c39356d

Please sign in to comment.