From a1bba7b183ced076c59db1e7b4ca1e6c2a540567 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 17 May 2024 13:30:16 -0600 Subject: [PATCH] improve fallback message when comet native shuffle is not enabled --- .../comet/CometSparkSessionExtensions.scala | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 7ddc950ea..f9059d0ba 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -30,8 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.comet._ -import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle} -import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.comet.util.Utils import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec} @@ -46,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf._ -import org.apache.comet.CometSparkSessionExtensions.{createMessage, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, shouldApplyRowToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -682,7 +681,8 @@ class CometSparkSessionExtensions case s: ShuffleExchangeExec => val isShuffleEnabled = isCometShuffleEnabled(conf) - val msg1 = createMessage(!isShuffleEnabled, "Native shuffle is not enabled") + val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no reason available") + val msg1 = createMessage(!isShuffleEnabled, s"Native shuffle is not enabled: $reason") val columnarShuffleEnabled = isCometColumnarShuffleEnabled(conf) val msg2 = createMessage( isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde @@ -931,13 +931,31 @@ object CometSparkSessionExtensions extends Logging { } private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean = - COMET_EXEC_SHUFFLE_ENABLED.get(conf) && - (conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") == - "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") && + COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) && // TODO: AQE coalesce partitions feature causes Comet shuffle memory leak. // We should disable Comet shuffle when AQE coalesce partitions is enabled. (!conf.coalesceShufflePartitionsEnabled || COMET_SHUFFLE_ENFORCE_MODE_ENABLED.get()) + private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = { + if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) { + Some(s"${COMET_EXEC_SHUFFLE_ENABLED.key} is not enabled") + } else if (!isCometShuffleManagerEnabled(conf)) { + Some(s"spark.shuffle.manager is not set to ${CometShuffleManager.getClass.getName}") + } else if (conf.coalesceShufflePartitionsEnabled && !COMET_SHUFFLE_ENFORCE_MODE_ENABLED + .get()) { + Some( + s"${SQLConf.COALESCE_PARTITIONS_ENABLED.key} is enabled and " + + s"${COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key} is not enabled") + } else { + None + } + } + + private def isCometShuffleManagerEnabled(conf: SQLConf) = { + conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") == + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" + } + private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = { COMET_SCAN_ENABLED.get(conf) }