Skip to content

Commit

Permalink
feat: Simplify configs for enabling/disabling operators (#855)
Browse files Browse the repository at this point in the history
* stop unpacking dictionaries for hash join input

* simply configs

* Revert

* simplify configs

* code cleanup and revert changes to FilterExec

* Revert

* revert

* cargo update

* add comment

* update plans

* update tests

* enable stddev

* revert plan changes

* update configs

* revert unrelated changes
  • Loading branch information
andygrove authored Aug 21, 2024
1 parent cddebc0 commit fbf389c
Show file tree
Hide file tree
Showing 29 changed files with 258 additions and 344 deletions.
66 changes: 19 additions & 47 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,6 @@ import org.apache.comet.shims.ShimCometConf
*/
object CometConf extends ShimCometConf {

val OPERATOR_PROJECT: String = "project"
val OPERATOR_FILTER: String = "filter"
val OPERATOR_SORT: String = "sort"
val OPERATOR_AGGREGATE: String = "aggregate"
val OPERATOR_BROADCAST_EXCHANGE: String = "broadcastExchange"
val OPERATOR_COLLECT_LIMIT: String = "collectLimit"
val OPERATOR_COALESCE: String = "coalesce"
val OPERATOR_TAKE_ORDERED_AND_PROJECT: String = "takeOrderedAndProject"
val OPERATOR_HASH_JOIN: String = "hashJoin"
val OPERATOR_SORT_MERGE_JOIN: String = "sortMergeJoin"
val OPERATOR_BROADCAST_HASH_JOIN: String = "broadcastHashJoin"
val OPERATOR_EXPAND: String = "expand"
val OPERATOR_WINDOW: String = "window"
val OPERATOR_UNION: String = "union"
val OPERATOR_LOCAL_LIMIT: String = "localLimit"
val OPERATOR_GLOBAL_LIMIT: String = "globalLimit"

val EXPRESSION_STDDEV: String = "stddev"

/** List of all configs that is used for generating documentation */
val allConfs = new ListBuffer[ConfigEntry[_]]

Expand Down Expand Up @@ -123,42 +104,42 @@ object CometConf extends ShimCometConf {
.createWithDefault(false)

val COMET_EXEC_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_PROJECT, defaultValue = false)
createExecEnabledConfig("project", defaultValue = true)
val COMET_EXEC_FILTER_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_FILTER, defaultValue = false)
createExecEnabledConfig("filter", defaultValue = true)
val COMET_EXEC_SORT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_SORT, defaultValue = false)
createExecEnabledConfig("sort", defaultValue = true)
val COMET_EXEC_LOCAL_LIMIT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_LOCAL_LIMIT, defaultValue = false)
createExecEnabledConfig("localLimit", defaultValue = true)
val COMET_EXEC_GLOBAL_LIMIT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_GLOBAL_LIMIT, defaultValue = false)
createExecEnabledConfig("globalLimit", defaultValue = true)
val COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_BROADCAST_HASH_JOIN, defaultValue = false)
createExecEnabledConfig("broadcastHashJoin", defaultValue = true)
val COMET_EXEC_BROADCAST_EXCHANGE_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_BROADCAST_EXCHANGE, defaultValue = false)
createExecEnabledConfig("broadcastExchange", defaultValue = true)
val COMET_EXEC_HASH_JOIN_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_HASH_JOIN, defaultValue = false)
createExecEnabledConfig("hashJoin", defaultValue = true)
val COMET_EXEC_SORT_MERGE_JOIN_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_SORT_MERGE_JOIN, defaultValue = false)
createExecEnabledConfig("sortMergeJoin", defaultValue = true)
val COMET_EXEC_AGGREGATE_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_AGGREGATE, defaultValue = false)
createExecEnabledConfig("aggregate", defaultValue = true)
val COMET_EXEC_COLLECT_LIMIT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_COLLECT_LIMIT, defaultValue = false)
createExecEnabledConfig("collectLimit", defaultValue = true)
val COMET_EXEC_COALESCE_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_COALESCE, defaultValue = false)
createExecEnabledConfig("coalesce", defaultValue = true)
val COMET_EXEC_UNION_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_UNION, defaultValue = false)
createExecEnabledConfig("union", defaultValue = true)
val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_EXPAND, defaultValue = false)
createExecEnabledConfig("expand", defaultValue = true)
val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_WINDOW, defaultValue = false)
createExecEnabledConfig("window", defaultValue = true)
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(OPERATOR_TAKE_ORDERED_AND_PROJECT, defaultValue = false)
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)

val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(
EXPRESSION_STDDEV,
defaultValue = false,
"stddev",
defaultValue = true,
notes = Some("stddev is slower than Spark's implementation"))

val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] = conf("spark.comet.memoryOverhead")
Expand Down Expand Up @@ -190,15 +171,6 @@ object CometConf extends ShimCometConf {
"Ensure that Comet memory overhead min is a long greater than or equal to 0")
.createWithDefault(384)

val COMET_EXEC_ALL_OPERATOR_ENABLED: ConfigEntry[Boolean] = conf(
s"$COMET_EXEC_CONFIG_PREFIX.all.enabled")
.doc(
"Whether to enable all Comet operators. By default, this config is false. Note that " +
"this config precedes all separate config 'spark.comet.exec.<operator_name>.enabled'. " +
"That being said, if this config is enabled, separate configs are ignored.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
.doc(
Expand Down Expand Up @@ -524,7 +496,7 @@ object CometConf extends ShimCometConf {
notes: Option[String] = None): ConfigEntry[Boolean] = {
conf(s"$COMET_EXEC_CONFIG_PREFIX.$exec.enabled")
.doc(
s"Whether to enable $exec by default. The default value is $defaultValue." + notes
s"Whether to enable $exec by default." + notes
.map(s => s" $s.")
.getOrElse(""))
.booleanConf
Expand Down
1 change: 0 additions & 1 deletion docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
--conf spark.comet.cast.allowIncompatible=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
Expand Down
2 changes: 1 addition & 1 deletion docs/source/contributor-guide/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Then build the Comet as [described](https://github.com/apache/arrow-datafusion-c
Start Comet with `RUST_BACKTRACE=1`

```console
RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true --conf spark.comet.exec.all.enabled=true
RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true
```

Get the expanded exception details
Expand Down
35 changes: 17 additions & 18 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,28 @@ Comet provides the following configuration settings.
| spark.comet.debug.enabled | Whether to enable debug mode for Comet. By default, this config is false. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false |
| spark.comet.enabled | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
| spark.comet.exceptionOnDatetimeRebase | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false |
| spark.comet.exec.aggregate.enabled | Whether to enable aggregate by default. The default value is false. | false |
| spark.comet.exec.all.enabled | Whether to enable all Comet operators. By default, this config is false. Note that this config precedes all separate config 'spark.comet.exec.<operator_name>.enabled'. That being said, if this config is enabled, separate configs are ignored. | false |
| spark.comet.exec.broadcastExchange.enabled | Whether to enable broadcastExchange by default. The default value is false. | false |
| spark.comet.exec.broadcastHashJoin.enabled | Whether to enable broadcastHashJoin by default. The default value is false. | false |
| spark.comet.exec.coalesce.enabled | Whether to enable coalesce by default. The default value is false. | false |
| spark.comet.exec.collectLimit.enabled | Whether to enable collectLimit by default. The default value is false. | false |
| spark.comet.exec.aggregate.enabled | Whether to enable aggregate by default. | true |
| spark.comet.exec.broadcastExchange.enabled | Whether to enable broadcastExchange by default. | true |
| spark.comet.exec.broadcastHashJoin.enabled | Whether to enable broadcastHashJoin by default. | true |
| spark.comet.exec.coalesce.enabled | Whether to enable coalesce by default. | true |
| spark.comet.exec.collectLimit.enabled | Whether to enable collectLimit by default. | true |
| spark.comet.exec.enabled | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of 'spark.comet.exec.<operator_name>.enabled' at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. By default, this config is false. | false |
| spark.comet.exec.expand.enabled | Whether to enable expand by default. The default value is false. | false |
| spark.comet.exec.filter.enabled | Whether to enable filter by default. The default value is false. | false |
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. The default value is false. | false |
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. The default value is false. | false |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. The default value is false. | false |
| spark.comet.exec.expand.enabled | Whether to enable expand by default. | true |
| spark.comet.exec.filter.enabled | Whether to enable filter by default. | true |
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. The default value is false. | false |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. The default value is false. | false |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. The default value is false. | false |
| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. The default value is false. stddev is slower than Spark's implementation. | false |
| spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. The default value is false. | false |
| spark.comet.exec.union.enabled | Whether to enable union by default. The default value is false. | false |
| spark.comet.exec.window.enabled | Whether to enable window by default. The default value is false. | false |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true |
| spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. | true |
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |
| spark.comet.exec.window.enabled | Whether to enable window by default. | true |
| spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
| spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
Expand Down
1 change: 0 additions & 1 deletion docs/source/user-guide/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ $SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
--conf spark.comet.explainFallback.enabled=true
```

Expand Down
Loading

0 comments on commit fbf389c

Please sign in to comment.