Skip to content

Commit

Permalink
feat: Add COMET_SHUFFLE_MODE config to control Comet shuffle mode
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 23, 2024
1 parent 7b0a7e0 commit f3f46bd
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 99 deletions.
21 changes: 13 additions & 8 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.comet

import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -131,14 +132,18 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.enabled")
.doc(
"Whether to enable Arrow-based columnar shuffle for Comet and Spark regular operators. " +
"If this is enabled, Comet prefers columnar shuffle than native shuffle. " +
"By default, this config is true.")
.booleanConf
.createWithDefault(true)
val COMET_SHUFFLE_MODE: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
.doc(
"The mode of Comet shuffle. This config is only effective only if Comet shuffle " +
"is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
"'native' is for native shuffle which has best performance in general." +
"'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle." +
"'auto' is for Comet to choose the best shuffle mode based on the query plan." +
"By default, this config is 'jvm'.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("jvm")

val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Comet provides the following configuration settings.
| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config is false. | false |
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 |
| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. By default, this config is 3. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 |
| spark.comet.columnar.shuffle.enabled | Whether to enable Arrow-based columnar shuffle for Comet and Spark regular operators. If this is enabled, Comet prefers columnar shuffle than native shuffle. By default, this config is true. | true |
| spark.comet.columnar.shuffle.memory.factor | Fraction of Comet memory to be allocated per executor process for Comet shuffle. Comet memory size is specified by `spark.comet.memoryOverhead` or calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. By default, this config is 1.0. | 1.0 |
| spark.comet.debug.enabled | Whether to enable debug mode for Comet. By default, this config is false. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false |
| spark.comet.enabled | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
Expand All @@ -39,6 +38,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective only if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general.'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle.'auto' is for Comet to choose the best shuffle mode based on the query plan.By default, this config is 'jvm'. | jvm |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
Expand Down
22 changes: 13 additions & 9 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,26 @@ It must be set before the Spark context is created. You can enable or disable Co
at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`.
Once it is disabled, Comet will fallback to the default Spark shuffle manager.

### Columnar Shuffle
### Shuffle Mode

By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses columnar shuffle
Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto Mode.

#### Columnar Shuffle

By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses JVM-based columnar shuffle
to improve the performance of shuffle operations. Columnar shuffle supports HashPartitioning,
RoundRobinPartitioning, RangePartitioning and SinglePartitioning.
RoundRobinPartitioning, RangePartitioning and SinglePartitioning. This mode has the highest
query coverage.

Columnar shuffle can be disabled by setting `spark.comet.columnar.shuffle.enabled` to `false`.
Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to `jvm`.

### Native Shuffle
#### Native Shuffle

Comet also provides a fully native shuffle implementation that can be used to improve the performance.
To enable native shuffle, just disable `spark.comet.columnar.shuffle.enabled`.
To enable native shuffle, just set `spark.comet.exec.shuffle.mode` to `native`

Native shuffle only supports HashPartitioning and SinglePartitioning.

### Auto Mode




`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
Expand Down Expand Up @@ -197,7 +197,7 @@ class CometSparkSessionExtensions
private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case s: ShuffleExchangeExec
if isCometPlan(s.child) && !isCometColumnarShuffleEnabled(conf) &&
if isCometPlan(s.child) && isCometNativeShuffleMode(conf) &&
QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 =>
logInfo("Comet extension enabled for Native Shuffle")

Expand All @@ -209,8 +209,8 @@ class CometSparkSessionExtensions
// Columnar shuffle for regular Spark operators (not Comet) and Comet operators
// (if configured)
case s: ShuffleExchangeExec
if (!s.child.supportsColumnar || isCometPlan(
s.child)) && isCometColumnarShuffleEnabled(conf) &&
if (!s.child.supportsColumnar || isCometPlan(s.child)) && isCometJVMShuffleMode(
conf) &&
QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 &&
!isShuffleOperator(s.child) =>
logInfo("Comet extension enabled for JVM Columnar Shuffle")
Expand Down Expand Up @@ -641,7 +641,7 @@ class CometSparkSessionExtensions
// Native shuffle for Comet operators
case s: ShuffleExchangeExec
if isCometShuffleEnabled(conf) &&
!isCometColumnarShuffleEnabled(conf) &&
isCometNativeShuffleMode(conf) &&
QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 =>
logInfo("Comet extension enabled for Native Shuffle")

Expand All @@ -662,7 +662,7 @@ class CometSparkSessionExtensions
// If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not
// convert it to CometColumnarShuffle,
case s: ShuffleExchangeExec
if isCometShuffleEnabled(conf) && isCometColumnarShuffleEnabled(conf) &&
if isCometShuffleEnabled(conf) && isCometJVMShuffleMode(conf) &&
QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 &&
!isShuffleOperator(s.child) =>
logInfo("Comet extension enabled for JVM Columnar Shuffle")
Expand All @@ -684,19 +684,19 @@ class CometSparkSessionExtensions
case s: ShuffleExchangeExec =>
val isShuffleEnabled = isCometShuffleEnabled(conf)
val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no reason available")
val msg1 = createMessage(!isShuffleEnabled, s"Native shuffle is not enabled: $reason")
val columnarShuffleEnabled = isCometColumnarShuffleEnabled(conf)
val msg1 = createMessage(!isShuffleEnabled, s"Comet shuffle is not enabled: $reason")
val columnarShuffleEnabled = isCometJVMShuffleMode(conf)
val msg2 = createMessage(
isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde
.supportPartitioning(s.child.output, s.outputPartitioning)
._1,
"Shuffle: " +
"Native shuffle: " +
s"${QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._2}")
val msg3 = createMessage(
isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde
.supportPartitioningTypes(s.child.output)
._1,
s"Columnar shuffle: ${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}")
s"JVM shuffle: ${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}")
withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(","))
s

Expand Down Expand Up @@ -966,8 +966,20 @@ object CometSparkSessionExtensions extends Logging {
COMET_EXEC_ENABLED.get(conf)
}

private[comet] def isCometColumnarShuffleEnabled(conf: SQLConf): Boolean = {
COMET_COLUMNAR_SHUFFLE_ENABLED.get(conf)
private[comet] def isCometNativeShuffleMode(conf: SQLConf): Boolean = {
COMET_SHUFFLE_MODE.get(conf) match {
case "native" => true
case "auto" => true
case _ => false
}
}

private[comet] def isCometJVMShuffleMode(conf: SQLConf): Boolean = {
COMET_SHUFFLE_MODE.get(conf) match {
case "jvm" => true
case "auto" => true
case _ => false
}
}

private[comet] def isCometAllOperatorEnabled(conf: SQLConf): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1421,15 +1421,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
(
s"SELECT sum(c0), sum(c2) from $table group by c1",
Set(
"Native shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled",
"Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled",
"AQEShuffleRead is not supported")),
(
"SELECT A.c1, A.sum_c0, A.sum_c2, B.casted from "
+ s"(SELECT c1, sum(c0) as sum_c0, sum(c2) as sum_c2 from $table group by c1) as A, "
+ s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as casted from $table) as B "
+ "where A.c1 = B.c1 ",
Set(
"Native shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled",
"Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled",
"AQEShuffleRead is not supported",
"make_interval is not supported",
"BroadcastExchange is not supported",
Expand Down
Loading

0 comments on commit f3f46bd

Please sign in to comment.