Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add logging to explain reasons for Comet not being able to run a query stage natively #397

Merged
merged 15 commits into from
May 14, 2024
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to make this configurable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a config should be good. Sometimes it might be verbose.

conf("spark.comet.explainFallback.enabled")
.doc(
"When this setting is enabled, Comet will provide logging explaining the reason(s) " +
"why a query stage cannot be executed natively.")
.booleanConf
.createWithDefault(false)

val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
.intConf
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. | false |
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 |
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
Expand Down
13 changes: 12 additions & 1 deletion docs/source/user-guide/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ $SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true
--conf spark.comet.exec.all.enabled=true \
--conf spark.comet.explainFallback.enabled=true
```

### Verify Comet enabled for Spark SQL query
Expand Down Expand Up @@ -95,6 +96,16 @@ INFO src/lib.rs: Comet native library initialized
PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct<a:int>
```

With the configuration `spark.comet.explainFallback.enabled=true`, Comet will log any reasons that prevent a plan from
being executed natively.

```scala
scala> spark.sql("select * from t1 where cast(cast(a as double) as decimal(10,2)) > 5").show
WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute this plan natively because:
- Comet does not guarantee correct results for cast from DoubleType to DecimalType(10,2) with timezone Some(America/Denver) and evalMode LEGACY (No overflow check). To enable all incompatible casts, set spark.comet.cast.allowIncompatible=true
- CollectLimit is not supported
```

### Enable Comet shuffle

Comet shuffle feature is disabled by default. To enable it, please add related configs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,22 @@ class CometSparkSessionExtensions
} else {
var newPlan = transform(plan)

// if the plan cannot be run natively then explain why (when appropriate config is enabled)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that ExecRule is going to be invoked multiple times as planning proceeds, this may get logged multiple times for the same plan. Perhaps that is why we need to keep this configurable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is one reason, yes. The logging could get verbose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume that this feature is useful in development but less so in production since we have the integration with Spark's explain for that.

if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get() && !isCometNative(newPlan)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comet could only trigger native execution for partial query plan. This check isCometNative(newPlan) only applies for the top operator.

new ExtendedExplainInfo().extensionInfo(newPlan) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but I feel ExtendedExplainInfo can be object

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I had the same thought

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that it has to be a class due to the way this integrates into Spark, but I am not sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just seeing this. I don't think Spark prevents us from this being an object. I'll take care of this. #452

case reasons if reasons.isEmpty =>
logWarning(
"Comet cannot execute this plan natively, but no reason is given. " +
"This is likely a bug.")
case reasons if reasons.size == 1 =>
logWarning(s"Comet cannot execute this plan natively because ${reasons.head}")
case reasons =>
logWarning(
"Comet cannot execute this plan natively because:\n\t- " +
s"${reasons.mkString("\n\t- ")}")
}
}

// Remove placeholders
newPlan = newPlan.transform {
case CometSinkPlaceHolder(_, _, s) => s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
}
}

private def extensionInfo(node: TreeNode[_]): Set[String] = {
def extensionInfo(node: TreeNode[_]): Set[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps private[comet] instead of public?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I made that change.

var info = mutable.Seq[String]()
val sorted = sortup(node)
sorted.foreach { p =>
Expand Down
Loading