Skip to content

Commit

Permalink
For review
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 23, 2024
1 parent f3f46bd commit 678cd8c
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 22 deletions.
13 changes: 6 additions & 7 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,12 @@ object CometConf {
.createWithDefault(false)

val COMET_SHUFFLE_MODE: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
.doc(
"The mode of Comet shuffle. This config is only effective only if Comet shuffle " +
"is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
"'native' is for native shuffle which has best performance in general." +
"'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle." +
"'auto' is for Comet to choose the best shuffle mode based on the query plan." +
"By default, this config is 'jvm'.")
.doc("The mode of Comet shuffle. This config is only effective if Comet shuffle " +
"is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
"'native' is for native shuffle which has best performance in general. " +
"'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. " +
"'auto' is for Comet to choose the best shuffle mode based on the query plan. " +
"By default, this config is 'jvm'.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("native", "jvm", "auto"))
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective only if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general.'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle.'auto' is for Comet to choose the best shuffle mode based on the query plan.By default, this config is 'jvm'. | jvm |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,8 +1016,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("test bool_and/bool_or") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
Seq("native", "jvm").foreach { cometColumnShuffleEnabled =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled.toString) {
Seq("native", "jvm").foreach { cometShuffleMode =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
val table = "test"
Expand Down Expand Up @@ -1091,8 +1091,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("covar_pop and covar_samp") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
Seq("native", "jvm").foreach { cometColumnShuffleEnabled =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) {
Seq("native", "jvm").foreach { cometShuffleMode =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
val table = "test"
Expand Down Expand Up @@ -1129,8 +1129,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("var_pop and var_samp") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
Seq("native", "jvm").foreach { cometColumnShuffleEnabled =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) {
Seq("native", "jvm").foreach { cometShuffleMode =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
Seq(true, false).foreach { nullOnDivideByZero =>
Expand Down Expand Up @@ -1168,8 +1168,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("stddev_pop and stddev_samp") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
Seq("native", "jvm").foreach { cometColumnShuffleEnabled =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) {
Seq("native", "jvm").foreach { cometShuffleMode =>
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) {
Seq(true, false).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
Seq(true, false).foreach { nullOnDivideByZero =>
Expand Down
13 changes: 7 additions & 6 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,15 @@ class CometExecSuite extends CometTestBase {
.toDF("c1", "c2")
.createOrReplaceTempView("v")

Seq("native", "jvm").foreach { columnarShuffle =>
Seq("native", "jvm").foreach { columnarShuffleMode =>
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffle.toString) {
CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffleMode) {
val df = sql("SELECT * FROM v where c1 = 1 order by c1, c2")
val shuffle = find(df.queryExecution.executedPlan) {
case _: CometShuffleExchangeExec if columnarShuffle.equalsIgnoreCase("jvm") => true
case _: ShuffleExchangeExec if !columnarShuffle.equalsIgnoreCase("jvm") => true
case _: CometShuffleExchangeExec if columnarShuffleMode.equalsIgnoreCase("jvm") =>
true
case _: ShuffleExchangeExec if !columnarShuffleMode.equalsIgnoreCase("jvm") => true
case _ => false
}.get
assert(shuffle.logicalLink.isEmpty)
Expand Down Expand Up @@ -764,10 +765,10 @@ class CometExecSuite extends CometTestBase {
}

test("limit") {
Seq("native", "jvm").foreach { columnarShuffle =>
Seq("native", "jvm").foreach { columnarShuffleMode =>
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffle) {
CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffleMode) {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") {
val df = sql("SELECT * FROM tbl_a")
.repartition(10, $"_1")
Expand Down

0 comments on commit 678cd8c

Please sign in to comment.