Skip to content

Commit

Permalink
improve fallback message when comet native shuffle is not enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed May 17, 2024
1 parent f8fec7f commit a1bba7b
Showing 1 changed file with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.comet._
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle}
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager}
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
Expand All @@ -46,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{createMessage, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
Expand Down Expand Up @@ -682,7 +681,8 @@ class CometSparkSessionExtensions

case s: ShuffleExchangeExec =>
val isShuffleEnabled = isCometShuffleEnabled(conf)
val msg1 = createMessage(!isShuffleEnabled, "Native shuffle is not enabled")
val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no reason available")
val msg1 = createMessage(!isShuffleEnabled, s"Native shuffle is not enabled: $reason")
val columnarShuffleEnabled = isCometColumnarShuffleEnabled(conf)
val msg2 = createMessage(
isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde
Expand Down Expand Up @@ -931,13 +931,31 @@ object CometSparkSessionExtensions extends Logging {
}

private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) &&
(conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") ==
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") &&
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) &&
// TODO: AQE coalesce partitions feature causes Comet shuffle memory leak.
// We should disable Comet shuffle when AQE coalesce partitions is enabled.
(!conf.coalesceShufflePartitionsEnabled || COMET_SHUFFLE_ENFORCE_MODE_ENABLED.get())

private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = {
if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) {
Some(s"${COMET_EXEC_SHUFFLE_ENABLED.key} is not enabled")
} else if (!isCometShuffleManagerEnabled(conf)) {
Some(s"spark.shuffle.manager is not set to ${CometShuffleManager.getClass.getName}")
} else if (conf.coalesceShufflePartitionsEnabled && !COMET_SHUFFLE_ENFORCE_MODE_ENABLED
.get()) {
Some(
s"${SQLConf.COALESCE_PARTITIONS_ENABLED.key} is enabled and " +
s"${COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key} is not enabled")
} else {
None
}
}

private def isCometShuffleManagerEnabled(conf: SQLConf) = {
conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") ==
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
}

private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = {
COMET_SCAN_ENABLED.get(conf)
}
Expand Down

0 comments on commit a1bba7b

Please sign in to comment.