Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed May 7, 2024
1 parent 19472ed commit 07c7784
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
2 changes: 1 addition & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ object CometConf {
"Set this config to VERBOSE to see logging explaining the reason(s) why a query " +
"stage cannot be executed natively with Comet.")
.stringConf
.createWithDefault("NONE")
.createWithDefault("VERBOSE")

val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.explain | Set this config to VERBOSE to see logging explaining the reason(s) why a query stage cannot be executed natively with Comet. | NONE |
| spark.comet.explain | Set this config to VERBOSE to see logging explaining the reason(s) why a query stage cannot be executed natively with Comet. | VERBOSE |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,10 @@ class CometSparkSessionExtensions
}

// We shouldn't transform Spark query plan if Comet is disabled.
if (!isCometEnabled(conf)) return plan
if (!isCometEnabled(conf)) {
logWarning("Comet is not enabled")
return plan
}

if (!isCometExecEnabled(conf)) {
// Comet exec is disabled, but for Spark shuffle, we still can use Comet columnar shuffle
Expand All @@ -734,6 +737,24 @@ class CometSparkSessionExtensions
} else {
var newPlan = transform(plan)

// if the plan cannot be run natively then explain why, if verbose explain is enabled
if (!isCometNative(newPlan)) {
if ("VERBOSE".equals(CometConf.COMET_EXPLAIN_ENABLED.get())) {
new ExtendedExplainInfo().extensionInfo(newPlan) match {
case reasons if reasons.isEmpty =>
logWarning(
s"Comet cannot execute this plan natively, but no reason is given. " +
s"This is likely a bug.")
case reasons if reasons.size == 1 =>
logWarning(s"Comet cannot execute this plan natively because ${reasons.head}")
case reasons =>
logWarning(
s"Comet cannot execute this plan natively because:\n\t- " +
s"${reasons.mkString("\n\t- ")}")
}
}
}

// Remove placeholders
newPlan = newPlan.transform {
case CometSinkPlaceHolder(_, _, s) => s
Expand Down Expand Up @@ -789,17 +810,6 @@ class CometSparkSessionExtensions
op
}

// if the plan cannot be run natively then explain why, if verbose explain is enabled
val explainVerbose = "VERBOSE".equalsIgnoreCase(CometConf.COMET_EXPLAIN_ENABLED.get())
if (explainVerbose && !isCometNative(finalPlan)) {
val fallbackReasons = new ExtendedExplainInfo().extensionInfo(finalPlan)
if (fallbackReasons.isEmpty) {
logInfo(s"Cannot run plan natively, but no reason is given. This is likely a bug.")
} else {
logInfo(s"Cannot run plan natively:\n\t- ${fallbackReasons.mkString("\n\t- ")}")
}
}

finalPlan
}
}
Expand Down

0 comments on commit 07c7784

Please sign in to comment.