diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 9fa66e8c4..9f34709aa 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -45,25 +45,6 @@ import org.apache.comet.shims.ShimCometConf */ object CometConf extends ShimCometConf { - val OPERATOR_PROJECT: String = "project" - val OPERATOR_FILTER: String = "filter" - val OPERATOR_SORT: String = "sort" - val OPERATOR_AGGREGATE: String = "aggregate" - val OPERATOR_BROADCAST_EXCHANGE: String = "broadcastExchange" - val OPERATOR_COLLECT_LIMIT: String = "collectLimit" - val OPERATOR_COALESCE: String = "coalesce" - val OPERATOR_TAKE_ORDERED_AND_PROJECT: String = "takeOrderedAndProject" - val OPERATOR_HASH_JOIN: String = "hashJoin" - val OPERATOR_SORT_MERGE_JOIN: String = "sortMergeJoin" - val OPERATOR_BROADCAST_HASH_JOIN: String = "broadcastHashJoin" - val OPERATOR_EXPAND: String = "expand" - val OPERATOR_WINDOW: String = "window" - val OPERATOR_UNION: String = "union" - val OPERATOR_LOCAL_LIMIT: String = "localLimit" - val OPERATOR_GLOBAL_LIMIT: String = "globalLimit" - - val EXPRESSION_STDDEV: String = "stddev" - /** List of all configs that is used for generating documentation */ val allConfs = new ListBuffer[ConfigEntry[_]] @@ -123,42 +104,42 @@ object CometConf extends ShimCometConf { .createWithDefault(false) val COMET_EXEC_PROJECT_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_PROJECT, defaultValue = false) + createExecEnabledConfig("project", defaultValue = true) val COMET_EXEC_FILTER_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_FILTER, defaultValue = false) + createExecEnabledConfig("filter", defaultValue = true) val COMET_EXEC_SORT_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_SORT, defaultValue = false) + createExecEnabledConfig("sort", defaultValue = true) val COMET_EXEC_LOCAL_LIMIT_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_LOCAL_LIMIT, defaultValue = false) + createExecEnabledConfig("localLimit", defaultValue = true) val COMET_EXEC_GLOBAL_LIMIT_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_GLOBAL_LIMIT, defaultValue = false) + createExecEnabledConfig("globalLimit", defaultValue = true) val COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_BROADCAST_HASH_JOIN, defaultValue = false) + createExecEnabledConfig("broadcastHashJoin", defaultValue = true) val COMET_EXEC_BROADCAST_EXCHANGE_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_BROADCAST_EXCHANGE, defaultValue = false) + createExecEnabledConfig("broadcastExchange", defaultValue = true) val COMET_EXEC_HASH_JOIN_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_HASH_JOIN, defaultValue = false) + createExecEnabledConfig("hashJoin", defaultValue = true) val COMET_EXEC_SORT_MERGE_JOIN_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_SORT_MERGE_JOIN, defaultValue = false) + createExecEnabledConfig("sortMergeJoin", defaultValue = true) val COMET_EXEC_AGGREGATE_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_AGGREGATE, defaultValue = false) + createExecEnabledConfig("aggregate", defaultValue = true) val COMET_EXEC_COLLECT_LIMIT_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_COLLECT_LIMIT, defaultValue = false) + createExecEnabledConfig("collectLimit", defaultValue = true) val COMET_EXEC_COALESCE_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_COALESCE, defaultValue = false) + createExecEnabledConfig("coalesce", defaultValue = true) val COMET_EXEC_UNION_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_UNION, defaultValue = false) + createExecEnabledConfig("union", defaultValue = true) val COMET_EXEC_EXPAND_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_EXPAND, defaultValue = false) + createExecEnabledConfig("expand", defaultValue = true) val COMET_EXEC_WINDOW_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_WINDOW, defaultValue = false) + createExecEnabledConfig("window", defaultValue = true) val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = - createExecEnabledConfig(OPERATOR_TAKE_ORDERED_AND_PROJECT, defaultValue = false) + createExecEnabledConfig("takeOrderedAndProject", defaultValue = true) val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig( - EXPRESSION_STDDEV, - defaultValue = false, + "stddev", + defaultValue = true, notes = Some("stddev is slower than Spark's implementation")) val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] = conf("spark.comet.memoryOverhead") @@ -190,15 +171,6 @@ object CometConf extends ShimCometConf { "Ensure that Comet memory overhead min is a long greater than or equal to 0") .createWithDefault(384) - val COMET_EXEC_ALL_OPERATOR_ENABLED: ConfigEntry[Boolean] = conf( - s"$COMET_EXEC_CONFIG_PREFIX.all.enabled") - .doc( - "Whether to enable all Comet operators. By default, this config is false. Note that " + - "this config precedes all separate config 'spark.comet.exec..enabled'. " + - "That being said, if this config is enabled, separate configs are ignored.") - .booleanConf - .createWithDefault(false) - val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled") .doc( @@ -524,7 +496,7 @@ object CometConf extends ShimCometConf { notes: Option[String] = None): ConfigEntry[Boolean] = { conf(s"$COMET_EXEC_CONFIG_PREFIX.$exec.enabled") .doc( - s"Whether to enable $exec by default. The default value is $defaultValue." + notes + s"Whether to enable $exec by default." + notes .map(s => s" $s.") .getOrElse("")) .booleanConf diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md index 5f4f10912..b6e9ce1d1 100644 --- a/docs/source/contributor-guide/benchmarking.md +++ b/docs/source/contributor-guide/benchmarking.md @@ -63,7 +63,6 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ --conf spark.comet.enabled=true \ --conf spark.comet.exec.enabled=true \ - --conf spark.comet.exec.all.enabled=true \ --conf spark.comet.cast.allowIncompatible=true \ --conf spark.comet.exec.shuffle.enabled=true \ --conf spark.comet.exec.shuffle.mode=auto \ diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index d1f62a5db..31faa4274 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -130,7 +130,7 @@ Then build the Comet as [described](https://github.com/apache/arrow-datafusion-c Start Comet with `RUST_BACKTRACE=1` ```console -RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true --conf spark.comet.exec.all.enabled=true +RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true ``` Get the expanded exception details diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 8465baa7c..5c29b709b 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,29 +35,28 @@ Comet provides the following configuration settings. | spark.comet.debug.enabled | Whether to enable debug mode for Comet. By default, this config is false. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | | spark.comet.enabled | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true | | spark.comet.exceptionOnDatetimeRebase | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false | -| spark.comet.exec.aggregate.enabled | Whether to enable aggregate by default. The default value is false. | false | -| spark.comet.exec.all.enabled | Whether to enable all Comet operators. By default, this config is false. Note that this config precedes all separate config 'spark.comet.exec..enabled'. That being said, if this config is enabled, separate configs are ignored. | false | -| spark.comet.exec.broadcastExchange.enabled | Whether to enable broadcastExchange by default. The default value is false. | false | -| spark.comet.exec.broadcastHashJoin.enabled | Whether to enable broadcastHashJoin by default. The default value is false. | false | -| spark.comet.exec.coalesce.enabled | Whether to enable coalesce by default. The default value is false. | false | -| spark.comet.exec.collectLimit.enabled | Whether to enable collectLimit by default. The default value is false. | false | +| spark.comet.exec.aggregate.enabled | Whether to enable aggregate by default. | true | +| spark.comet.exec.broadcastExchange.enabled | Whether to enable broadcastExchange by default. | true | +| spark.comet.exec.broadcastHashJoin.enabled | Whether to enable broadcastHashJoin by default. | true | +| spark.comet.exec.coalesce.enabled | Whether to enable coalesce by default. | true | +| spark.comet.exec.collectLimit.enabled | Whether to enable collectLimit by default. | true | | spark.comet.exec.enabled | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of 'spark.comet.exec..enabled' at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. By default, this config is false. | false | -| spark.comet.exec.expand.enabled | Whether to enable expand by default. The default value is false. | false | -| spark.comet.exec.filter.enabled | Whether to enable filter by default. The default value is false. | false | -| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. The default value is false. | false | -| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. The default value is false. | false | -| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. The default value is false. | false | +| spark.comet.exec.expand.enabled | Whether to enable expand by default. | true | +| spark.comet.exec.filter.enabled | Whether to enable filter by default. | true | +| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true | +| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true | +| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true | | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | -| spark.comet.exec.project.enabled | Whether to enable project by default. The default value is false. | false | +| spark.comet.exec.project.enabled | Whether to enable project by default. | true | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false | | spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm | -| spark.comet.exec.sort.enabled | Whether to enable sort by default. The default value is false. | false | -| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. The default value is false. | false | -| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. The default value is false. stddev is slower than Spark's implementation. | false | -| spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. The default value is false. | false | -| spark.comet.exec.union.enabled | Whether to enable union by default. The default value is false. | false | -| spark.comet.exec.window.enabled | Whether to enable window by default. The default value is false. | false | +| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true | +| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true | +| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true | +| spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. | true | +| spark.comet.exec.union.enabled | Whether to enable union by default. | true | +| spark.comet.exec.window.enabled | Whether to enable window by default. | true | | spark.comet.explain.native.enabled | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false | | spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false | | spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false | diff --git a/docs/source/user-guide/installation.md b/docs/source/user-guide/installation.md index cb7f032d6..a87e41e7a 100644 --- a/docs/source/user-guide/installation.md +++ b/docs/source/user-guide/installation.md @@ -88,7 +88,6 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ --conf spark.comet.enabled=true \ --conf spark.comet.exec.enabled=true \ - --conf spark.comet.exec.all.enabled=true \ --conf spark.comet.explainFallback.enabled=true ``` diff --git a/native/Cargo.lock b/native/Cargo.lock index 27bc3828c..b97740ee5 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -108,9 +108,9 @@ checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a" [[package]] name = "arrayvec" -version = "0.7.4" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" @@ -346,7 +346,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -457,9 +457,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.16.3" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "102087e286b4677862ea56cf8fc58bb2cdfa8725c40ffb80fe3a008eb7f2fc83" +checksum = "6fd4c6dcc3b0aea2f5c0b4b82c2b15fe39ddbc76041a310848f4706edf76bb31" [[package]] name = "byteorder" @@ -481,12 +481,13 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.8" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504bdec147f2cc13c8b57ed9401fd8a147cc66b67ad5cb241394244f2c947549" +checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -586,18 +587,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.13" +version = "4.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fbb260a053428790f3de475e304ff84cdbc4face759ea7a3e64c1edd938a7fc" +checksum = "ed6719fffa43d0d87e5fd8caeab59be1554fb028cd30edc88fc4369b17971019" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.13" +version = "4.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64b17d7ea74e9f833c7dbf2cbe4fb12ff26783eda4782a8975b72f895c9b4d99" +checksum = "216aec2b177652e3846684cbfe25c9964d18ec45234f0f5da5157b207ed1aab6" dependencies = [ "anstyle", "clap_lex", @@ -658,9 +659,9 @@ checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" [[package]] name = "core-foundation-sys" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpp_demangle" @@ -673,9 +674,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +checksum = "51e852e6dc9a5bed1fae92dd2375037bf2b768725bf3be87811edee3249d09ad" dependencies = [ "libc", ] @@ -804,7 +805,8 @@ dependencies = [ [[package]] name = "datafusion" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4fd4a99fc70d40ef7e52b243b4a399c3f8d353a40d5ecb200deee05e49c61bb" dependencies = [ "ahash", "arrow", @@ -851,7 +853,8 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b3cfbd84c6003594ae1972314e3df303a27ce8ce755fcea3240c90f4c0529" dependencies = [ "arrow-schema", "async-trait", @@ -949,7 +952,8 @@ dependencies = [ [[package]] name = "datafusion-common" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44fdbc877e3e40dcf88cc8f283d9f5c8851f0a3aa07fee657b1b75ac1ad49b9c" dependencies = [ "ahash", "arrow", @@ -969,7 +973,8 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7496d1f664179f6ce3a5cbef6566056ccaf3ea4aa72cc455f80e62c1dd86b1" dependencies = [ "tokio", ] @@ -977,7 +982,8 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "799e70968c815b611116951e3dd876aef04bf217da31b72eec01ee6a959336a1" dependencies = [ "arrow", "chrono", @@ -997,7 +1003,8 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c1841c409d9518c17971d15c9bae62e629eb937e6fb6c68cd32e9186f8b30d2" dependencies = [ "ahash", "arrow", @@ -1015,7 +1022,8 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8e481cf34d2a444bd8fa09b65945f0ce83dc92df8665b761505b3d9f351bebb" dependencies = [ "arrow", "arrow-buffer", @@ -1041,7 +1049,8 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b4ece19f73c02727e5e8654d79cd5652de371352c1df3c4ac3e419ecd6943fb" dependencies = [ "ahash", "arrow", @@ -1058,7 +1067,8 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1474552cc824e8c9c88177d454db5781d4b66757d4aca75719306b8343a5e8d" dependencies = [ "arrow", "arrow-array", @@ -1079,7 +1089,8 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "791ff56f55608bc542d1ea7a68a64bdc86a9413f5a381d06a39fd49c2a3ab906" dependencies = [ "arrow", "async-trait", @@ -1098,7 +1109,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a223962b3041304a3e20ed07a21d5de3d88d7e4e71ca192135db6d24e3365a4" dependencies = [ "ahash", "arrow", @@ -1127,7 +1139,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db5e7d8532a1601cd916881db87a70b0a599900d23f3db2897d389032da53bc6" dependencies = [ "ahash", "arrow", @@ -1140,7 +1153,8 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb9c78f308e050f5004671039786a925c3fee83b90004e9fcfd328d7febdcc0" dependencies = [ "datafusion-common", "datafusion-execution", @@ -1151,7 +1165,8 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d1116949432eb2d30f6362707e2846d942e491052a206f2ddcb42d08aea1ffe" dependencies = [ "ahash", "arrow", @@ -1184,7 +1199,8 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "41.0.0" -source = "git+https://github.com/apache/datafusion.git?rev=41.0.0-rc1#b10b820acb6ad92b5d69810e3d4de0ef6f2d6a87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45d0180711165fe94015d7c4123eb3e1cf5fb60b1506453200b8d1ce666bef0" dependencies = [ "arrow", "arrow-array", @@ -1377,7 +1393,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -1491,6 +1507,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + [[package]] name = "hex" version = "0.4.3" @@ -1547,9 +1569,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" +checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c" dependencies = [ "equivalent", "hashbrown", @@ -1593,11 +1615,11 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "is-terminal" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" dependencies = [ - "hermit-abi", + "hermit-abi 0.4.0", "libc", "windows-sys 0.52.0", ] @@ -1689,9 +1711,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" dependencies = [ "wasm-bindgen", ] @@ -1768,9 +1790,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.157" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "374af5f94e54fa97cf75e945cce8a6b201e88a1a07e688b47dfd2a59c66dbd86" [[package]] name = "libloading" @@ -2036,7 +2058,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", ] @@ -2346,10 +2368,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2546,9 +2568,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.205" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33aedb1a7135da52b7c21791455563facbbcc43d0f0f66165b42c21b3dfb150" +checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" dependencies = [ "serde_derive", ] @@ -2565,20 +2587,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.205" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692d6f5ac90220161d6774db30c662202721e64aed9058d2c394f451261420c1" +checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] name = "serde_json" -version = "1.0.122" +version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" +checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" dependencies = [ "itoa", "memchr", @@ -2610,6 +2632,12 @@ dependencies = [ "digest", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "simd-adler32" version = "0.3.7" @@ -2683,7 +2711,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2723,7 +2751,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2768,9 +2796,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.72" +version = "2.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" +checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" dependencies = [ "proc-macro2", "quote", @@ -2807,7 +2835,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2867,9 +2895,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.2" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", @@ -2885,7 +2913,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2907,7 +2935,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -3037,34 +3065,35 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" dependencies = [ "cfg-if", + "once_cell", "wasm-bindgen-macro", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3072,28 +3101,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" [[package]] name = "web-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" dependencies = [ "js-sys", "wasm-bindgen", @@ -3317,7 +3346,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 6e3663e54..2b2c0ebe9 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf._ import org.apache.comet.CometExplainInfo.getActualPlan -import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -394,9 +394,7 @@ class CometSparkSessionExtensions } case op: CollectLimitExec - if isCometNative(op.child) && isCometOperatorEnabled( - conf, - CometConf.OPERATOR_COLLECT_LIMIT) + if isCometNative(op.child) && CometConf.COMET_EXEC_COLLECT_LIMIT_ENABLED.get(conf) && isCometShuffleEnabled(conf) && getOffset(op) == 0 => QueryPlanSerde.operator2Proto(op) match { @@ -472,7 +470,7 @@ class CometSparkSessionExtensions } case op: ShuffledHashJoinExec - if isCometOperatorEnabled(conf, CometConf.OPERATOR_HASH_JOIN) && + if CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) && op.children.forall(isCometNative(_)) => val newOp = transform1(op) newOp match { @@ -494,8 +492,7 @@ class CometSparkSessionExtensions op } - case op: ShuffledHashJoinExec - if !isCometOperatorEnabled(conf, CometConf.OPERATOR_HASH_JOIN) => + case op: ShuffledHashJoinExec if !CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) => withInfo(op, "ShuffleHashJoin is not enabled") op @@ -507,7 +504,7 @@ class CometSparkSessionExtensions op case op: BroadcastHashJoinExec - if isCometOperatorEnabled(conf, CometConf.OPERATOR_BROADCAST_HASH_JOIN) && + if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) && op.children.forall(isCometNative(_)) => val newOp = transform1(op) newOp match { @@ -530,7 +527,7 @@ class CometSparkSessionExtensions } case op: SortMergeJoinExec - if isCometOperatorEnabled(conf, CometConf.OPERATOR_SORT_MERGE_JOIN) && + if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) && op.children.forall(isCometNative(_)) => val newOp = transform1(op) newOp match { @@ -552,7 +549,7 @@ class CometSparkSessionExtensions } case op: SortMergeJoinExec - if isCometOperatorEnabled(conf, CometConf.OPERATOR_SORT_MERGE_JOIN) && + if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) && !op.children.forall(isCometNative(_)) => withInfo( op, @@ -560,8 +557,7 @@ class CometSparkSessionExtensions s"${explainChildNotNative(op)}") op - case op: SortMergeJoinExec - if !isCometOperatorEnabled(conf, CometConf.OPERATOR_SORT_MERGE_JOIN) => + case op: SortMergeJoinExec if !CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) => withInfo(op, "SortMergeJoin is not enabled") op @@ -573,7 +569,7 @@ class CometSparkSessionExtensions op case c @ CoalesceExec(numPartitions, child) - if isCometOperatorEnabled(conf, CometConf.OPERATOR_COALESCE) + if CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf) && isCometNative(child) => QueryPlanSerde.operator2Proto(c) match { case Some(nativeOp) => @@ -583,8 +579,7 @@ class CometSparkSessionExtensions c } - case c @ CoalesceExec(_, _) - if !isCometOperatorEnabled(conf, CometConf.OPERATOR_COALESCE) => + case c @ CoalesceExec(_, _) if !CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf) => withInfo(c, "Coalesce is not enabled") c @@ -596,9 +591,8 @@ class CometSparkSessionExtensions op case s: TakeOrderedAndProjectExec - if isCometNative(s.child) && isCometOperatorEnabled( - conf, - CometConf.OPERATOR_TAKE_ORDERED_AND_PROJECT) + if isCometNative(s.child) && CometConf.COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED + .get(conf) && isCometShuffleEnabled(conf) && CometTakeOrderedAndProjectExec.isSupported(s) => QueryPlanSerde.operator2Proto(s) match { @@ -618,7 +612,7 @@ class CometSparkSessionExtensions case s: TakeOrderedAndProjectExec => val info1 = createMessage( - !isCometOperatorEnabled(conf, CometConf.OPERATOR_TAKE_ORDERED_AND_PROJECT), + !CometConf.COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED.get(conf), "TakeOrderedAndProject is not enabled") val info2 = createMessage( !isCometShuffleEnabled(conf), @@ -644,7 +638,7 @@ class CometSparkSessionExtensions } case u: UnionExec - if isCometOperatorEnabled(conf, CometConf.OPERATOR_UNION) && + if CometConf.COMET_EXEC_UNION_ENABLED.get(conf) && u.children.forall(isCometNative) => QueryPlanSerde.operator2Proto(u) match { case Some(nativeOp) => @@ -654,7 +648,7 @@ class CometSparkSessionExtensions u } - case u: UnionExec if !isCometOperatorEnabled(conf, CometConf.OPERATOR_UNION) => + case u: UnionExec if !CometConf.COMET_EXEC_UNION_ENABLED.get(conf) => withInfo(u, "Union is not enabled") u @@ -684,7 +678,7 @@ class CometSparkSessionExtensions val newChildren = plan.children.map { case b: BroadcastExchangeExec if isCometNative(b.child) && - isCometOperatorEnabled(conf, CometConf.OPERATOR_BROADCAST_EXCHANGE) && + CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) && isSpark34Plus => // Spark 3.4+ only QueryPlanSerde.operator2Proto(b) match { case Some(nativeOp) => @@ -725,7 +719,7 @@ class CometSparkSessionExtensions op case op: BroadcastHashJoinExec - if !isCometOperatorEnabled(conf, CometConf.OPERATOR_BROADCAST_HASH_JOIN) => + if !CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) => withInfo(op, "BroadcastHashJoin is not enabled") op @@ -1044,13 +1038,6 @@ object CometSparkSessionExtensions extends Logging { } } - private[comet] def isCometOperatorEnabled(conf: SQLConf, operator: String): Boolean = { - val operatorFlag = s"$COMET_EXEC_CONFIG_PREFIX.$operator.enabled" - val operatorDisabledFlag = s"$COMET_EXEC_CONFIG_PREFIX.$operator.disabled" - conf.getConfString(operatorFlag, "false").toBoolean || isCometAllOperatorEnabled(conf) && - !conf.getConfString(operatorDisabledFlag, "false").toBoolean - } - private[comet] def isCometBroadCastForceEnabled(conf: SQLConf): Boolean = { COMET_EXEC_BROADCAST_FORCE_ENABLED.get(conf) } @@ -1110,10 +1097,6 @@ object CometSparkSessionExtensions extends Logging { } } - private[comet] def isCometAllOperatorEnabled(conf: SQLConf): Boolean = { - COMET_EXEC_ALL_OPERATOR_ENABLED.get(conf) - } - def isCometScan(op: SparkPlan): Boolean = { op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec] } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 5ef924f6a..cfb847644 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark34Plus, withInfo} +import org.apache.comet.CometSparkSessionExtensions.{isCometScan, isSpark34Plus, withInfo} import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, RegExp, Unsupported} import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo} @@ -666,63 +666,66 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - case StddevSamp(child, _) if !isCometOperatorEnabled(conf, CometConf.EXPRESSION_STDDEV) => - withInfo( - aggExpr, - "stddev disabled by default because it can be slower than Spark. " + - s"Set ${CometConf.EXPRESSION_STDDEV}.enabled=true to enable it.", - child) - None - case std @ StddevSamp(child, nullOnDivideByZero) => - val childExpr = exprToProto(child, inputs, binding) - val dataType = serializeDataType(std.dataType) + if (CometConf.COMET_EXPR_STDDEV_ENABLED.get(conf)) { + val childExpr = exprToProto(child, inputs, binding) + val dataType = serializeDataType(std.dataType) - if (childExpr.isDefined && dataType.isDefined) { - val stdBuilder = ExprOuterClass.Stddev.newBuilder() - stdBuilder.setChild(childExpr.get) - stdBuilder.setNullOnDivideByZero(nullOnDivideByZero) - stdBuilder.setDatatype(dataType.get) - stdBuilder.setStatsTypeValue(0) + if (childExpr.isDefined && dataType.isDefined) { + val stdBuilder = ExprOuterClass.Stddev.newBuilder() + stdBuilder.setChild(childExpr.get) + stdBuilder.setNullOnDivideByZero(nullOnDivideByZero) + stdBuilder.setDatatype(dataType.get) + stdBuilder.setStatsTypeValue(0) - Some( - ExprOuterClass.AggExpr - .newBuilder() - .setStddev(stdBuilder) - .build()) + Some( + ExprOuterClass.AggExpr + .newBuilder() + .setStddev(stdBuilder) + .build()) + } else { + withInfo(aggExpr, child) + None + } } else { - withInfo(aggExpr, child) + withInfo( + aggExpr, + "stddev disabled by default because it can be slower than Spark. " + + s"Set ${CometConf.COMET_EXPR_STDDEV_ENABLED}=true to enable it.", + child) None } - case StddevPop(child, _) if !isCometOperatorEnabled(conf, CometConf.EXPRESSION_STDDEV) => - withInfo( - aggExpr, - "stddev disabled by default because it can be slower than Spark. " + - s"Set ${CometConf.EXPRESSION_STDDEV}.enabled=true to enable it.", - child) - None - case std @ StddevPop(child, nullOnDivideByZero) => - val childExpr = exprToProto(child, inputs, binding) - val dataType = serializeDataType(std.dataType) + if (CometConf.COMET_EXPR_STDDEV_ENABLED.get(conf)) { + val childExpr = exprToProto(child, inputs, binding) + val dataType = serializeDataType(std.dataType) - if (childExpr.isDefined && dataType.isDefined) { - val stdBuilder = ExprOuterClass.Stddev.newBuilder() - stdBuilder.setChild(childExpr.get) - stdBuilder.setNullOnDivideByZero(nullOnDivideByZero) - stdBuilder.setDatatype(dataType.get) - stdBuilder.setStatsTypeValue(1) + if (childExpr.isDefined && dataType.isDefined) { + val stdBuilder = ExprOuterClass.Stddev.newBuilder() + stdBuilder.setChild(childExpr.get) + stdBuilder.setNullOnDivideByZero(nullOnDivideByZero) + stdBuilder.setDatatype(dataType.get) + stdBuilder.setStatsTypeValue(1) - Some( - ExprOuterClass.AggExpr - .newBuilder() - .setStddev(stdBuilder) - .build()) + Some( + ExprOuterClass.AggExpr + .newBuilder() + .setStddev(stdBuilder) + .build()) + } else { + withInfo(aggExpr, child) + None + } } else { - withInfo(aggExpr, child) + withInfo( + aggExpr, + "stddev disabled by default because it can be slower than Spark. " + + s"Set ${CometConf.COMET_EXPR_STDDEV_ENABLED}=true to enable it.", + child) None } + case corr @ Corr(child1, child2, nullOnDivideByZero) => val child1Expr = exprToProto(child1, inputs, binding) val child2Expr = exprToProto(child2, inputs, binding) @@ -2526,12 +2529,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim * converted to a native operator. */ def operator2Proto(op: SparkPlan, childOp: Operator*): Option[Operator] = { + val conf = op.conf val result = OperatorOuterClass.Operator.newBuilder() childOp.foreach(result.addChildren) op match { - case ProjectExec(projectList, child) - if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_PROJECT) => + case ProjectExec(projectList, child) if CometConf.COMET_EXEC_PROJECT_ENABLED.get(conf) => val exprs = projectList.map(exprToProto(_, child.output)) if (exprs.forall(_.isDefined) && childOp.nonEmpty) { @@ -2544,8 +2547,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - case FilterExec(condition, child) - if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_FILTER) => + case FilterExec(condition, child) if CometConf.COMET_EXEC_FILTER_ENABLED.get(conf) => val cond = exprToProto(condition, child.output) if (cond.isDefined && childOp.nonEmpty) { @@ -2556,8 +2558,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - case SortExec(sortOrder, _, child, _) - if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_SORT) => + case SortExec(sortOrder, _, child, _) if CometConf.COMET_EXEC_SORT_ENABLED.get(conf) => if (!supportedSortType(op, sortOrder)) { return None } @@ -2574,8 +2575,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - case LocalLimitExec(limit, _) - if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_LOCAL_LIMIT) => + case LocalLimitExec(limit, _) if CometConf.COMET_EXEC_LOCAL_LIMIT_ENABLED.get(conf) => if (childOp.nonEmpty) { // LocalLimit doesn't use offset, but it shares same operator serde class. // Just set it to zero. @@ -2590,7 +2590,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case globalLimitExec: GlobalLimitExec - if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_GLOBAL_LIMIT) => + if CometConf.COMET_EXEC_GLOBAL_LIMIT_ENABLED.get(conf) => // TODO: We don't support negative limit for now. if (childOp.nonEmpty && globalLimitExec.limit >= 0) { val limitBuilder = OperatorOuterClass.Limit.newBuilder() @@ -2605,8 +2605,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - case ExpandExec(projections, _, child) - if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_EXPAND) => + case ExpandExec(projections, _, child) if CometConf.COMET_EXEC_EXPAND_ENABLED.get(conf) => var allProjExprs: Seq[Expression] = Seq() val projExprs = projections.flatMap(_.map(e => { allProjExprs = allProjExprs :+ e @@ -2625,7 +2624,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case WindowExec(windowExpression, partitionSpec, orderSpec, child) - if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_WINDOW) => + if CometConf.COMET_EXEC_WINDOW_ENABLED.get(conf) => val output = child.output val winExprs: Array[WindowExpression] = windowExpression.flatMap { expr => @@ -2666,7 +2665,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case aggregate: BaseAggregateExec if (aggregate.isInstanceOf[HashAggregateExec] || aggregate.isInstanceOf[ObjectHashAggregateExec]) && - isCometOperatorEnabled(op.conf, CometConf.OPERATOR_AGGREGATE) => + CometConf.COMET_EXEC_AGGREGATE_ENABLED.get(conf) => val groupingExpressions = aggregate.groupingExpressions val aggregateExpressions = aggregate.aggregateExpressions val aggregateAttributes = aggregate.aggregateAttributes @@ -2770,9 +2769,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case join: HashJoin => // `HashJoin` has only two implementations in Spark, but we check the type of the join to // make sure we are handling the correct join type. - if (!(isCometOperatorEnabled(op.conf, CometConf.OPERATOR_HASH_JOIN) && + if (!(CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) && join.isInstanceOf[ShuffledHashJoinExec]) && - !(isCometOperatorEnabled(op.conf, CometConf.OPERATOR_BROADCAST_HASH_JOIN) && + !(CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) && join.isInstanceOf[BroadcastHashJoinExec])) { withInfo(join, s"Invalid hash join type ${join.nodeName}") return None @@ -2826,8 +2825,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - case join: SortMergeJoinExec - if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_SORT_MERGE_JOIN) => + case join: SortMergeJoinExec if CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) => // `requiredOrders` and `getKeyOrdering` are copied from Spark's SortMergeJoinExec. def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = { keys.map(SortOrder(_, Ascending)) @@ -2903,8 +2901,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - case join: SortMergeJoinExec - if !isCometOperatorEnabled(op.conf, CometConf.OPERATOR_SORT_MERGE_JOIN) => + case join: SortMergeJoinExec if !CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) => withInfo(join, "SortMergeJoin is not enabled") None diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 09d7ca979..c50823b58 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1706,7 +1706,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", EXTENDED_EXPLAIN_PROVIDERS_KEY -> "org.apache.comet.ExtendedExplainInfo") { val table = "test" withTable(table) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index f2da95876..f1c96eba8 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -1222,7 +1222,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("stddev_pop and stddev_samp") { - withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf( + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXPR_STDDEV_ENABLED.key -> "true") { Seq("native", "jvm").foreach { cometShuffleMode => withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) { Seq(true, false).foreach { dictionary => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 455fd76f7..b7839abd5 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -377,9 +377,7 @@ class CometExecSuite extends CometTestBase { test("CometExec.executeColumnarCollectIterator can collect ColumnarBatch results") { assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+") - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withParquetTable((0 until 50).map(i => (i, i + 1)), "tbl") { val df = sql("SELECT _1 + 1, _2 + 2 FROM tbl WHERE _1 > 3") @@ -469,9 +467,7 @@ class CometExecSuite extends CometTestBase { } test("Comet native metrics: project and filter") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { val df = sql("SELECT _1 + 1, _2 + 2 FROM tbl WHERE _1 > 3") df.collect() @@ -498,7 +494,6 @@ class CometExecSuite extends CometTestBase { test("Comet native metrics: SortMergeJoin") { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", "spark.sql.adaptive.autoBroadcastJoinThreshold" -> "-1", "spark.sql.autoBroadcastJoinThreshold" -> "-1", "spark.sql.join.preferSortMergeJoin" -> "true") { @@ -1240,7 +1235,7 @@ class CometExecSuite extends CometTestBase { .saveAsTable("bucketed_table2") withSQLConf( - "spark.comet.exec.sortMergeJoin.disabled" -> "true", + CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { val t1 = spark.table("bucketed_table1") @@ -1348,9 +1343,8 @@ class CometExecSuite extends CometTestBase { test("disabled/unsupported exec with multiple children should not disappear") { withSQLConf( - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "false", - CometConf.COMET_EXEC_CONFIG_PREFIX + ".project.enabled" -> "true", - CometConf.COMET_EXEC_CONFIG_PREFIX + ".union.enabled" -> "false") { + CometConf.COMET_EXEC_PROJECT_ENABLED.key -> "true", + CometConf.COMET_EXEC_UNION_ENABLED.key -> "false") { withParquetDataFrame((0 until 5).map(Tuple1(_))) { df => val projected = df.selectExpr("_1 as x") val unioned = projected.union(df) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index d17e4abf4..aec122bee 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -174,9 +174,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper test("fix: Dictionary arrays imported from native should not be overridden") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_BATCH_SIZE.key -> "10", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_BATCH_SIZE.key -> "10") { withParquetTable((0 until 50).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl") .filter($"_1" === 1.toString) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 3c5ed16ca..376139b51 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -659,8 +659,7 @@ abstract class ParquetReadSuite extends CometTestBase { Seq(false, true).foreach { enableDictionary => withSQLConf( CometConf.COMET_BATCH_SIZE.key -> 7.toString, - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { // Make sure this works with Comet native execution too val data = (1 to 100) .map(_ % 5) // trigger dictionary encoding @@ -747,7 +746,6 @@ abstract class ParquetReadSuite extends CometTestBase { withSQLConf( CometConf.COMET_BATCH_SIZE.key -> 7.toString, CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_ENABLED.key -> "true") { Seq("a", null).foreach { partValue => withTempPath { dir => @@ -1286,7 +1284,6 @@ abstract class ParquetReadSuite extends CometTestBase { withSQLConf( CometConf.COMET_ENABLED.key -> cometEnabled, CometConf.COMET_EXEC_ENABLED.key -> cometEnabled, - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> cometEnabled, SQLConf.USE_V1_SOURCE_LIST.key -> v1.getOrElse("")) { withParquetTable(Seq((Long.MaxValue, 1), (Long.MaxValue, 2)), "tbl") { val df = spark.sql("select * from tbl") diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 0c43fc8dd..ee59e7897 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -185,7 +185,6 @@ class CometTPCDSQuerySuite "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") - conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g") conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala index 2638a0e6a..c81fba3ef 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala @@ -89,7 +89,6 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase with ShimCometTPCHQuery "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") - conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") conf.set(CometConf.COMET_SHUFFLE_MODE.key, "jvm") conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala index 15b1745c7..8cabb3a5b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCQueryListBase.scala @@ -87,7 +87,6 @@ trait CometTPCQueryListBase CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true", // Lower bloom filter thresholds to allows us to simulate the plan produced at larger scale. "spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold" -> "1MB", diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index c445aae61..694b977e3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -78,7 +78,6 @@ abstract class CometTestBase conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, "false") conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") - conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala index 21d6d4572..86b59050e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala @@ -73,8 +73,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql(query).noop() } } @@ -121,8 +120,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql(query).noop() } } @@ -167,7 +165,6 @@ object CometAggregateBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_MEMORY_OVERHEAD.key -> "1G") { spark.sql(query).noop() } @@ -210,8 +207,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql(query).noop() } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala index cd3fcadbd..c6fe55b56 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala @@ -61,8 +61,7 @@ object CometArithmeticBenchmark extends CometBenchmarkBase { benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop() } } @@ -101,8 +100,7 @@ object CometArithmeticBenchmark extends CometBenchmarkBase { benchmark.addCase(s"$op ($dataType) - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql(s"SELECT c1 ${op.sig} c2 FROM $table").noop() } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala index f37040020..6e6c62491 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala @@ -97,7 +97,6 @@ trait CometBenchmarkBase extends SqlBasedBenchmark { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", SQLConf.ANSI_ENABLED.key -> "false") { f } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala index 032b9469a..0dddfb36a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala @@ -54,8 +54,7 @@ object CometConditionalExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql(query).noop() } } @@ -86,8 +85,7 @@ object CometConditionalExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql(query).noop() } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala index bf4bfdbee..400d9b829 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala @@ -91,8 +91,7 @@ object CometExecBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select c2 + 1, c1 + 2 from parquetV1Table where c1 + 1 > 0").noop() } } @@ -129,7 +128,6 @@ object CometExecBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { spark.sql( @@ -163,8 +161,7 @@ object CometExecBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select * from parquetV1Table").sortWithinPartitions("value").noop() } } @@ -202,8 +199,7 @@ object CometExecBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark .sql("SELECT col1, col2, SUM(col3) FROM parquetV1Table " + "GROUP BY col1, col2 GROUPING SETS ((col1), (col2))") diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala index 7f1f4b44e..2ca924821 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala @@ -56,8 +56,7 @@ object CometPredicateExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql(query).noop() } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala index 30a2823cf..46af7115c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala @@ -89,7 +89,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") { spark .sql( @@ -104,7 +103,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0", CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { @@ -146,7 +144,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") { spark .sql( @@ -163,7 +160,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0", CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { @@ -206,7 +202,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -220,7 +215,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0", CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { @@ -236,7 +230,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0", CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { @@ -252,7 +245,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0", CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { @@ -306,7 +298,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -320,7 +311,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark @@ -335,7 +325,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "true") { spark @@ -349,7 +338,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { spark .sql("select c1 from parquetV1Table") @@ -394,7 +382,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") { spark .sql(s"select $columns from parquetV1Table") @@ -408,7 +395,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { spark .sql(s"select $columns from parquetV1Table") @@ -421,7 +407,6 @@ object CometShuffleBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "native") { spark diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala index c8bfa7741..0546c9173 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala @@ -51,8 +51,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select substring(c1, 1, 100) from parquetV1Table").noop() } } @@ -82,8 +81,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select space(c1) from parquetV1Table").noop() } } @@ -113,8 +111,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select ascii(c1) from parquetV1Table").noop() } } @@ -144,8 +141,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select bit_length(c1) from parquetV1Table").noop() } } @@ -175,8 +171,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select octet_length(c1) from parquetV1Table").noop() } } @@ -207,7 +202,6 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true") { spark.sql("select upper(c1) from parquetV1Table").noop() } @@ -238,8 +232,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select lower(c1) from parquetV1Table").noop() } } @@ -269,8 +262,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select chr(c1) from parquetV1Table").noop() } } @@ -300,8 +292,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select initCap(c1) from parquetV1Table").noop() } } @@ -331,8 +322,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select trim(c1) from parquetV1Table").noop() } } @@ -362,8 +352,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select concat_ws(' ', c1, c1) from parquetV1Table").noop() } } @@ -393,8 +382,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select length(c1) from parquetV1Table").noop() } } @@ -424,8 +412,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select repeat(c1, 3) from parquetV1Table").noop() } } @@ -455,8 +442,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select reverse(c1) from parquetV1Table").noop() } } @@ -486,8 +472,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select instr(c1, '123') from parquetV1Table").noop() } } @@ -517,8 +502,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select replace(c1, '123', 'abc') from parquetV1Table").noop() } } @@ -548,8 +532,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + CometConf.COMET_EXEC_ENABLED.key -> "true") { spark.sql("select translate(c1, '123456', 'aBcDeF') from parquetV1Table").noop() } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index 08b8de2b2..016c2a371 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -118,9 +118,9 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "auto", CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true", - CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { + // enabling COMET_EXPLAIN_NATIVE_ENABLED may add overhead but is useful for debugging + CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true") { cometSpark.sql(queryString).noop() } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCQueryBenchmarkBase.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCQueryBenchmarkBase.scala index 1700b7ed8..2361346b8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCQueryBenchmarkBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCQueryBenchmarkBase.scala @@ -74,7 +74,6 @@ trait CometTPCQueryBenchmarkBase extends SqlBasedBenchmark with CometTPCQueryBas withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "auto") { cometSpark.sql(queryString).noop() diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index 62c9d0224..759833e6d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -261,7 +261,6 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 "spark.sql.readSideCharPadding" -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { @@ -288,7 +287,6 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g") - conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") new TestSparkSession(new SparkContext("local[1]", this.getClass.getCanonicalName, conf)) diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala index 0d1a105ff..76df1e96c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala @@ -140,7 +140,6 @@ class ParquetEncryptionITCase extends QueryTest with SQLTestUtils { withSQLConf( CometConf.COMET_ENABLED.key -> cometEnabled, CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", SQLConf.ANSI_ENABLED.key -> "true") { testFun }