Skip to content

Commit

Permalink
feat: Use CometPlugin as main entrypoint (apache#853)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored and huaxingao committed Aug 29, 2024
1 parent 15a24a5 commit 16a525f
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 6 deletions.
2 changes: 1 addition & 1 deletion docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ $SPARK_HOME/bin/spark-submit \
--jars $COMET_JAR \
--conf spark.driver.extraClassPath=$COMET_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.cast.allowIncompatible=true \
Expand Down
2 changes: 1 addition & 1 deletion docs/source/contributor-guide/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Then build the Comet as [described](https://github.com/apache/arrow-datafusion-c
Start Comet with `RUST_BACKTRACE=1`

```console
RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true
RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true
```

Get the expanded exception details
Expand Down
4 changes: 2 additions & 2 deletions docs/source/contributor-guide/plugin_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ under the License.

# Comet Plugin Overview

The entry point to Comet is the `org.apache.comet.CometSparkSessionExtensions` class, which can be registered with Spark by adding the following setting to the Spark configuration when launching `spark-shell` or `spark-submit`:
The entry point to Comet is the `org.apache.spark.CometPlugin` class, which can be registered with Spark by adding the following setting to the Spark configuration when launching `spark-shell` or `spark-submit`:

```
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions
--conf spark.plugins=org.apache.spark.CometPlugin
```

On initialization, this class registers two physical plan optimization rules with Spark: `CometScanRule` and `CometExecRule`. These rules run whenever a query stage is being planned.
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ $SPARK_HOME/bin/spark-shell \
--jars $COMET_JAR \
--conf spark.driver.extraClassPath=$COMET_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.explainFallback.enabled=true
Expand Down
2 changes: 1 addition & 1 deletion fuzz-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Note that the output filename is currently hard-coded as `queries.sql`
```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
Expand Down
23 changes: 23 additions & 0 deletions spark/src/main/scala/org/apache/spark/Plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext,
import org.apache.spark.comet.shims.ShimCometDriverPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
import org.apache.spark.sql.internal.StaticSQLConf

import org.apache.comet.{CometConf, CometSparkSessionExtensions}

Expand All @@ -44,6 +45,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
override def init(sc: SparkContext, pluginContext: PluginContext): ju.Map[String, String] = {
logInfo("CometDriverPlugin init")

// register CometSparkSessionExtensions if it isn't already registered
CometDriverPlugin.registerCometSessionExtension(sc.conf)

if (shouldOverrideMemoryConf(sc.getConf)) {
val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) {
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)
Expand Down Expand Up @@ -94,6 +98,25 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
}
}

object CometDriverPlugin extends Logging {
def registerCometSessionExtension(conf: SparkConf): Unit = {
val extensionKey = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key
val extensionClass = classOf[CometSparkSessionExtensions].getName
val extensions = conf.get(extensionKey, "")
if (extensions.isEmpty) {
logInfo(s"Setting $extensionKey=$extensionClass")
conf.set(extensionKey, extensionClass)
} else {
val currentExtensions = extensions.split(",").map(_.trim)
if (!currentExtensions.contains(extensionClass)) {
val newValue = s"$extensions,$extensionClass"
logInfo(s"Setting $extensionKey=$newValue")
conf.set(extensionKey, newValue)
}
}
}
}

/**
* The Comet plugin for Spark. To enable this plugin, set the config "spark.plugins" to
* `org.apache.spark.CometPlugin`
Expand Down
43 changes: 43 additions & 0 deletions spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.spark

import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.internal.StaticSQLConf

class CometPluginsSuite extends CometTestBase {
override protected def sparkConf: SparkConf = {
Expand All @@ -33,6 +34,48 @@ class CometPluginsSuite extends CometTestBase {
conf
}

test("Register Comet extension") {
// test common case where no extensions are previously registered
{
val conf = new SparkConf()
CometDriverPlugin.registerCometSessionExtension(conf)
assert(
"org.apache.comet.CometSparkSessionExtensions" == conf.get(
StaticSQLConf.SPARK_SESSION_EXTENSIONS.key))
}
// test case where Comet is already registered
{
val conf = new SparkConf()
conf.set(
StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
"org.apache.comet.CometSparkSessionExtensions")
CometDriverPlugin.registerCometSessionExtension(conf)
assert(
"org.apache.comet.CometSparkSessionExtensions" == conf.get(
StaticSQLConf.SPARK_SESSION_EXTENSIONS.key))
}
// test case where other extensions are already registered
{
val conf = new SparkConf()
conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, "foo,bar")
CometDriverPlugin.registerCometSessionExtension(conf)
assert(
"foo,bar,org.apache.comet.CometSparkSessionExtensions" == conf.get(
StaticSQLConf.SPARK_SESSION_EXTENSIONS.key))
}
// test case where other extensions, including Comet, are already registered
{
val conf = new SparkConf()
conf.set(
StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
"foo,bar,org.apache.comet.CometSparkSessionExtensions")
CometDriverPlugin.registerCometSessionExtension(conf)
assert(
"foo,bar,org.apache.comet.CometSparkSessionExtensions" == conf.get(
StaticSQLConf.SPARK_SESSION_EXTENSIONS.key))
}
}

test("Default Comet memory overhead") {
val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
val execMemOverhead2 = spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")
Expand Down

0 comments on commit 16a525f

Please sign in to comment.