diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index e3584300e6..0ef2982eba 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -277,6 +277,14 @@ object CometConf { .booleanConf .createWithDefault(false) + val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.explainFallback.enabled") + .doc( + "When this setting is enabled, Comet will provide logging explaining the reason(s) " + + "why a query stage cannot be executed natively.") + .booleanConf + .createWithDefault(false) + val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize") .doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.") .intConf diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index d75059a9e9..24f408a05e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -40,6 +40,7 @@ Comet provides the following configuration settings. | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false | +| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | diff --git a/docs/source/user-guide/installation.md b/docs/source/user-guide/installation.md index b948d50cb6..e9149019e1 100644 --- a/docs/source/user-guide/installation.md +++ b/docs/source/user-guide/installation.md @@ -67,7 +67,8 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ --conf spark.comet.enabled=true \ --conf spark.comet.exec.enabled=true \ - --conf spark.comet.exec.all.enabled=true + --conf spark.comet.exec.all.enabled=true \ + --conf spark.comet.explainFallback.enabled=true ``` ### Verify Comet enabled for Spark SQL query @@ -95,6 +96,17 @@ INFO src/lib.rs: Comet native library initialized PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct ``` +With the configuration `spark.comet.explainFallback.enabled=true`, Comet will log any reasons that prevent a plan from +being executed natively. + +```scala +scala> Seq(1,2,3,4).toDF("a").write.parquet("/tmp/test.parquet") +WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively because: + - LocalTableScan is not supported + - WriteFiles is not supported + - Execute InsertIntoHadoopFsRelationCommand is not supported +``` + ### Enable Comet shuffle Comet shuffle feature is disabled by default. To enable it, please add related configs: diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 73592d7858..9cb9930b1d 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -734,6 +734,23 @@ class CometSparkSessionExtensions } else { var newPlan = transform(plan) + // if the plan cannot be run fully natively then explain why (when appropriate + // config is enabled) + if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { + new ExtendedExplainInfo().extensionInfo(newPlan) match { + case reasons if reasons.size == 1 => + logWarning( + "Comet cannot execute some parts of this plan natively " + + s"because ${reasons.head}") + case reasons if reasons.size > 1 => + logWarning( + "Comet cannot execute some parts of this plan natively" + + s" because:\n\t- ${reasons.mkString("\n\t- ")}") + case _ => + // no reasons recorded + } + } + // Remove placeholders newPlan = newPlan.transform { case CometSinkPlaceHolder(_, _, s) => s diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 8e5aee8b6e..d7ef4e9f39 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -45,7 +45,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { } } - private def extensionInfo(node: TreeNode[_]): Set[String] = { + private[comet] def extensionInfo(node: TreeNode[_]): Set[String] = { var info = mutable.Seq[String]() val sorted = sortup(node) sorted.foreach { p =>