From 19472edc0368954a7c2a9640dd7143dde1381612 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 7 May 2024 10:31:18 -0600 Subject: [PATCH 01/13] Add logging to show fallback reasons --- .../main/scala/org/apache/comet/CometConf.scala | 8 ++++++++ docs/source/user-guide/configs.md | 1 + .../comet/CometSparkSessionExtensions.scala | 15 ++++++++++++++- .../org/apache/comet/ExtendedExplainInfo.scala | 2 +- 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index f9caee9d9..685c6d5cc 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -276,6 +276,14 @@ object CometConf { .booleanConf .createWithDefault(false) + val COMET_EXPLAIN_ENABLED: ConfigEntry[String] = + conf("spark.comet.explain") + .doc( + "Set this config to VERBOSE to see logging explaining the reason(s) why a query " + + "stage cannot be executed natively with Comet.") + .stringConf + .createWithDefault("NONE") + val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize") .doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.") .intConf diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 02ecbd693..b886bc0e4 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -41,6 +41,7 @@ Comet provides the following configuration settings. | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false | +| spark.comet.explain | Set this config to VERBOSE to see logging explaining the reason(s) why a query stage cannot be executed natively with Comet. | NONE | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 73592d785..d8813d9d1 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -776,7 +776,7 @@ class CometSparkSessionExtensions // Convert native execution block by linking consecutive native operators. var firstNativeOp = true - newPlan.transformDown { + val finalPlan = newPlan.transformDown { case op: CometNativeExec => if (firstNativeOp) { firstNativeOp = false @@ -788,6 +788,19 @@ class CometSparkSessionExtensions firstNativeOp = true op } + + // if the plan cannot be run natively then explain why, if verbose explain is enabled + val explainVerbose = "VERBOSE".equalsIgnoreCase(CometConf.COMET_EXPLAIN_ENABLED.get()) + if (explainVerbose && !isCometNative(finalPlan)) { + val fallbackReasons = new ExtendedExplainInfo().extensionInfo(finalPlan) + if (fallbackReasons.isEmpty) { + logInfo(s"Cannot run plan natively, but no reason is given. This is likely a bug.") + } else { + logInfo(s"Cannot run plan natively:\n\t- ${fallbackReasons.mkString("\n\t- ")}") + } + } + + finalPlan } } diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 8e5aee8b6..9ee4d4d6a 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -45,7 +45,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { } } - private def extensionInfo(node: TreeNode[_]): Set[String] = { + def extensionInfo(node: TreeNode[_]): Set[String] = { var info = mutable.Seq[String]() val sorted = sortup(node) sorted.foreach { p => From 07c77841fbbb6cf5379bc6ebaad63870cdb46ccf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 7 May 2024 10:56:23 -0600 Subject: [PATCH 02/13] refactor --- .../scala/org/apache/comet/CometConf.scala | 2 +- docs/source/user-guide/configs.md | 2 +- .../comet/CometSparkSessionExtensions.scala | 34 ++++++++++++------- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 685c6d5cc..76973c00a 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -282,7 +282,7 @@ object CometConf { "Set this config to VERBOSE to see logging explaining the reason(s) why a query " + "stage cannot be executed natively with Comet.") .stringConf - .createWithDefault("NONE") + .createWithDefault("VERBOSE") val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize") .doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.") diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index b886bc0e4..669c6a255 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -41,7 +41,7 @@ Comet provides the following configuration settings. | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false | -| spark.comet.explain | Set this config to VERBOSE to see logging explaining the reason(s) why a query stage cannot be executed natively with Comet. | NONE | +| spark.comet.explain | Set this config to VERBOSE to see logging explaining the reason(s) why a query stage cannot be executed natively with Comet. | VERBOSE | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index d8813d9d1..b28c91c54 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -722,7 +722,10 @@ class CometSparkSessionExtensions } // We shouldn't transform Spark query plan if Comet is disabled. - if (!isCometEnabled(conf)) return plan + if (!isCometEnabled(conf)) { + logWarning("Comet is not enabled") + return plan + } if (!isCometExecEnabled(conf)) { // Comet exec is disabled, but for Spark shuffle, we still can use Comet columnar shuffle @@ -734,6 +737,24 @@ class CometSparkSessionExtensions } else { var newPlan = transform(plan) + // if the plan cannot be run natively then explain why, if verbose explain is enabled + if (!isCometNative(newPlan)) { + if ("VERBOSE".equals(CometConf.COMET_EXPLAIN_ENABLED.get())) { + new ExtendedExplainInfo().extensionInfo(newPlan) match { + case reasons if reasons.isEmpty => + logWarning( + s"Comet cannot execute this plan natively, but no reason is given. " + + s"This is likely a bug.") + case reasons if reasons.size == 1 => + logWarning(s"Comet cannot execute this plan natively because ${reasons.head}") + case reasons => + logWarning( + s"Comet cannot execute this plan natively because:\n\t- " + + s"${reasons.mkString("\n\t- ")}") + } + } + } + // Remove placeholders newPlan = newPlan.transform { case CometSinkPlaceHolder(_, _, s) => s @@ -789,17 +810,6 @@ class CometSparkSessionExtensions op } - // if the plan cannot be run natively then explain why, if verbose explain is enabled - val explainVerbose = "VERBOSE".equalsIgnoreCase(CometConf.COMET_EXPLAIN_ENABLED.get()) - if (explainVerbose && !isCometNative(finalPlan)) { - val fallbackReasons = new ExtendedExplainInfo().extensionInfo(finalPlan) - if (fallbackReasons.isEmpty) { - logInfo(s"Cannot run plan natively, but no reason is given. This is likely a bug.") - } else { - logInfo(s"Cannot run plan natively:\n\t- ${fallbackReasons.mkString("\n\t- ")}") - } - } - finalPlan } } From 7becda1263f19b44c7e2f29d5c6d431c3a5b85d4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 7 May 2024 10:56:41 -0600 Subject: [PATCH 03/13] revert a change --- .../scala/org/apache/comet/CometSparkSessionExtensions.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index b28c91c54..0e2b41998 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -797,7 +797,7 @@ class CometSparkSessionExtensions // Convert native execution block by linking consecutive native operators. var firstNativeOp = true - val finalPlan = newPlan.transformDown { + newPlan.transformDown { case op: CometNativeExec => if (firstNativeOp) { firstNativeOp = false @@ -809,8 +809,6 @@ class CometSparkSessionExtensions firstNativeOp = true op } - - finalPlan } } From 26cbb1cc1d9dc3332e7a3cf64c672c59b9b2e7a2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 7 May 2024 12:29:30 -0600 Subject: [PATCH 04/13] revert a change --- .../scala/org/apache/comet/CometSparkSessionExtensions.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 0e2b41998..3ec15eb0e 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -722,10 +722,7 @@ class CometSparkSessionExtensions } // We shouldn't transform Spark query plan if Comet is disabled. - if (!isCometEnabled(conf)) { - logWarning("Comet is not enabled") - return plan - } + if (!isCometEnabled(conf)) return plan if (!isCometExecEnabled(conf)) { // Comet exec is disabled, but for Spark shuffle, we still can use Comet columnar shuffle From 4f2c26cce39aa0e31eb0fcd4e6bb3cc592b22d8e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 7 May 2024 12:50:52 -0600 Subject: [PATCH 05/13] make format --- .../org/apache/comet/CometSparkSessionExtensions.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 3ec15eb0e..5b1375111 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -740,13 +740,13 @@ class CometSparkSessionExtensions new ExtendedExplainInfo().extensionInfo(newPlan) match { case reasons if reasons.isEmpty => logWarning( - s"Comet cannot execute this plan natively, but no reason is given. " + - s"This is likely a bug.") + "Comet cannot execute this plan natively, but no reason is given. " + + "This is likely a bug.") case reasons if reasons.size == 1 => logWarning(s"Comet cannot execute this plan natively because ${reasons.head}") case reasons => logWarning( - s"Comet cannot execute this plan natively because:\n\t- " + + "Comet cannot execute this plan natively because:\n\t- " + s"${reasons.mkString("\n\t- ")}") } } From 14d95f1777469195e578dc383964b9c4a1ee8bbf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 7 May 2024 12:55:21 -0600 Subject: [PATCH 06/13] rename config --- .../scala/org/apache/comet/CometConf.scala | 12 ++++---- .../comet/CometSparkSessionExtensions.scala | 28 +++++++++---------- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 76973c00a..06112cae8 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -276,13 +276,13 @@ object CometConf { .booleanConf .createWithDefault(false) - val COMET_EXPLAIN_ENABLED: ConfigEntry[String] = - conf("spark.comet.explain") + val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.explainFallback.enabled") .doc( - "Set this config to VERBOSE to see logging explaining the reason(s) why a query " + - "stage cannot be executed natively with Comet.") - .stringConf - .createWithDefault("VERBOSE") + "When this setting is enabled, Comet will provide logging explaining the reason(s) " + + "why a query stage cannot be executed natively with Comet.") + .booleanConf + .createWithDefault(false) val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize") .doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.") diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 5b1375111..c43439196 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -734,21 +734,19 @@ class CometSparkSessionExtensions } else { var newPlan = transform(plan) - // if the plan cannot be run natively then explain why, if verbose explain is enabled - if (!isCometNative(newPlan)) { - if ("VERBOSE".equals(CometConf.COMET_EXPLAIN_ENABLED.get())) { - new ExtendedExplainInfo().extensionInfo(newPlan) match { - case reasons if reasons.isEmpty => - logWarning( - "Comet cannot execute this plan natively, but no reason is given. " + - "This is likely a bug.") - case reasons if reasons.size == 1 => - logWarning(s"Comet cannot execute this plan natively because ${reasons.head}") - case reasons => - logWarning( - "Comet cannot execute this plan natively because:\n\t- " + - s"${reasons.mkString("\n\t- ")}") - } + // if the plan cannot be run natively then explain why (when appropriate config is enabled) + if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get() && !isCometNative(newPlan)) { + new ExtendedExplainInfo().extensionInfo(newPlan) match { + case reasons if reasons.isEmpty => + logWarning( + "Comet cannot execute this plan natively, but no reason is given. " + + "This is likely a bug.") + case reasons if reasons.size == 1 => + logWarning(s"Comet cannot execute this plan natively because ${reasons.head}") + case reasons => + logWarning( + "Comet cannot execute this plan natively because:\n\t- " + + s"${reasons.mkString("\n\t- ")}") } } From 5c21783485a2eb23f353174f55677b22044c1cc7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 7 May 2024 12:57:07 -0600 Subject: [PATCH 07/13] generate configs --- docs/source/user-guide/configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 669c6a255..58e759d91 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -41,7 +41,7 @@ Comet provides the following configuration settings. | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false | -| spark.comet.explain | Set this config to VERBOSE to see logging explaining the reason(s) why a query stage cannot be executed natively with Comet. | VERBOSE | +| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively with Comet. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | From eb9c21c029255b6545633145a79e40d3974b20c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 7 May 2024 13:00:13 -0600 Subject: [PATCH 08/13] update description --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- docs/source/user-guide/configs.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 06112cae8..f2a793b31 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -280,7 +280,7 @@ object CometConf { conf("spark.comet.explainFallback.enabled") .doc( "When this setting is enabled, Comet will provide logging explaining the reason(s) " + - "why a query stage cannot be executed natively with Comet.") + "why a query stage cannot be executed natively.") .booleanConf .createWithDefault(false) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 58e759d91..5a4c56dee 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -41,7 +41,7 @@ Comet provides the following configuration settings. | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false | -| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively with Comet. | false | +| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | From 22409f4088736ae3521264aa86e66f17db42a9c1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 7 May 2024 13:54:54 -0600 Subject: [PATCH 09/13] update docs --- docs/source/user-guide/installation.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/installation.md b/docs/source/user-guide/installation.md index b948d50cb..1fd802f84 100644 --- a/docs/source/user-guide/installation.md +++ b/docs/source/user-guide/installation.md @@ -67,7 +67,8 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ --conf spark.comet.enabled=true \ --conf spark.comet.exec.enabled=true \ - --conf spark.comet.exec.all.enabled=true + --conf spark.comet.exec.all.enabled=true \ + --conf spark.comet.explainFallback.enabled=true ``` ### Verify Comet enabled for Spark SQL query @@ -95,6 +96,16 @@ INFO src/lib.rs: Comet native library initialized PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct ``` +With the configuration `spark.comet.explainFallback.enabled=true`, Comet will log any reasons that prevent a plan from +being executed natively. + +```scala +scala> spark.sql("select * from t1 where cast(cast(a as double) as decimal(10,2)) > 5").show +WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute this plan natively because: + - Comet does not guarantee correct results for cast from DoubleType to DecimalType(10,2) with timezone Some(America/Denver) and evalMode LEGACY (No overflow check). To enable all incompatible casts, set spark.comet.cast.allowIncompatible=true + - CollectLimit is not supported +``` + ### Enable Comet shuffle Comet shuffle feature is disabled by default. To enable it, please add related configs: From 8e800ec68ea093a22913780dbd5e8afb56c7ff2d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 May 2024 06:43:17 -0600 Subject: [PATCH 10/13] remove check for top level operator being native --- .../comet/CometSparkSessionExtensions.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index c43439196..50e326367 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -734,19 +734,20 @@ class CometSparkSessionExtensions } else { var newPlan = transform(plan) - // if the plan cannot be run natively then explain why (when appropriate config is enabled) - if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get() && !isCometNative(newPlan)) { + // if the plan cannot be run fully natively then explain why (when appropriate + // config is enabled) + if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { new ExtendedExplainInfo().extensionInfo(newPlan) match { - case reasons if reasons.isEmpty => - logWarning( - "Comet cannot execute this plan natively, but no reason is given. " + - "This is likely a bug.") case reasons if reasons.size == 1 => - logWarning(s"Comet cannot execute this plan natively because ${reasons.head}") - case reasons => logWarning( - "Comet cannot execute this plan natively because:\n\t- " + - s"${reasons.mkString("\n\t- ")}") + "Comet cannot execute some parts of this plan natively " + + s"because ${reasons.head}") + case reasons if reasons.size > 1 => + logWarning( + "Comet cannot execute some parts of execute this plan natively" + + s" because:\n\t- ${reasons.mkString("\n\t- ")}") + case _ => + // no reasons recorded } } From bf87ff72c4f66cbcef9552ad57e6c8f45873b683 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 May 2024 07:10:08 -0600 Subject: [PATCH 11/13] fix message format and update example in docs --- docs/source/user-guide/installation.md | 9 +++++---- .../org/apache/comet/CometSparkSessionExtensions.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/source/user-guide/installation.md b/docs/source/user-guide/installation.md index 1fd802f84..e9149019e 100644 --- a/docs/source/user-guide/installation.md +++ b/docs/source/user-guide/installation.md @@ -100,10 +100,11 @@ With the configuration `spark.comet.explainFallback.enabled=true`, Comet will lo being executed natively. ```scala -scala> spark.sql("select * from t1 where cast(cast(a as double) as decimal(10,2)) > 5").show -WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute this plan natively because: - - Comet does not guarantee correct results for cast from DoubleType to DecimalType(10,2) with timezone Some(America/Denver) and evalMode LEGACY (No overflow check). To enable all incompatible casts, set spark.comet.cast.allowIncompatible=true - - CollectLimit is not supported +scala> Seq(1,2,3,4).toDF("a").write.parquet("/tmp/test.parquet") +WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively because: + - LocalTableScan is not supported + - WriteFiles is not supported + - Execute InsertIntoHadoopFsRelationCommand is not supported ``` ### Enable Comet shuffle diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 50e326367..9cb9930b1 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -744,7 +744,7 @@ class CometSparkSessionExtensions s"because ${reasons.head}") case reasons if reasons.size > 1 => logWarning( - "Comet cannot execute some parts of execute this plan natively" + + "Comet cannot execute some parts of this plan natively" + s" because:\n\t- ${reasons.mkString("\n\t- ")}") case _ => // no reasons recorded From 78043bd5060d2bff70c46ba68301f3177670db9b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 May 2024 13:30:30 -0600 Subject: [PATCH 12/13] address feedback --- spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index 9ee4d4d6a..d7ef4e9f3 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -45,7 +45,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator { } } - def extensionInfo(node: TreeNode[_]): Set[String] = { + private[comet] def extensionInfo(node: TreeNode[_]): Set[String] = { var info = mutable.Seq[String]() val sorted = sortup(node) sorted.foreach { p => From af396400523f4383170f64a26980d0b30b10a81c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 May 2024 18:59:52 -0600 Subject: [PATCH 13/13] trigger build