From 8660c0cba24367da7e391d8740342b959dab3a92 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 22 May 2024 15:08:51 -0700 Subject: [PATCH] feat: Add COMET_SHUFFLE_MODE config to control Comet shuffle mode --- .../scala/org/apache/comet/CometConf.scala | 21 +- docs/source/user-guide/configs.md | 2 +- docs/source/user-guide/tuning.md | 22 +- .../comet/CometSparkSessionExtensions.scala | 201 ++++++++---------- .../comet/exec/CometAggregateSuite.scala | 70 +++--- .../exec/CometColumnarShuffleSuite.scala | 10 +- .../apache/comet/exec/CometExecSuite.scala | 22 +- .../comet/exec/CometNativeShuffleSuite.scala | 2 +- .../spark/sql/CometTPCHQuerySuite.scala | 2 +- .../sql/benchmark/CometExecBenchmark.scala | 2 +- .../sql/benchmark/CometShuffleBenchmark.scala | 19 +- .../apache/comet/exec/CometExec3_4Suite.scala | 4 +- 12 files changed, 180 insertions(+), 197 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 463de90c25..daa6213b95 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -19,6 +19,7 @@ package org.apache.comet +import java.util.Locale import java.util.concurrent.TimeUnit import scala.collection.mutable.ListBuffer @@ -131,14 +132,18 @@ object CometConf { .booleanConf .createWithDefault(false) - val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] = - conf("spark.comet.columnar.shuffle.enabled") - .doc( - "Whether to enable Arrow-based columnar shuffle for Comet and Spark regular operators. " + - "If this is enabled, Comet prefers columnar shuffle than native shuffle. " + - "By default, this config is true.") - .booleanConf - .createWithDefault(true) + val COMET_SHUFFLE_MODE: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode") + .doc( + "The mode of Comet shuffle. This config is only effective only if Comet shuffle " + + "is enabled. Available modes are 'native', 'jvm', and 'auto'. " + + "'native' is for native shuffle which has best performance in general." + + "'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle." + + "'auto' is for Comet to choose the best shuffle mode based on the query plan." + + "By default, this config is 'jvm'.") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .checkValues(Set("native", "jvm", "auto")) + .createWithDefault("jvm") val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.shuffle.enforceMode.enabled") diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 0204b0c540..38499c82a6 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -29,7 +29,6 @@ Comet provides the following configuration settings. | spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config is false. | false | | spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 | | spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. By default, this config is 3. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 | -| spark.comet.columnar.shuffle.enabled | Whether to enable Arrow-based columnar shuffle for Comet and Spark regular operators. If this is enabled, Comet prefers columnar shuffle than native shuffle. By default, this config is true. | true | | spark.comet.columnar.shuffle.memory.factor | Fraction of Comet memory to be allocated per executor process for Comet shuffle. Comet memory size is specified by `spark.comet.memoryOverhead` or calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. By default, this config is 1.0. | 1.0 | | spark.comet.debug.enabled | Whether to enable debug mode for Comet. By default, this config is false. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false | | spark.comet.enabled | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true | @@ -39,6 +38,7 @@ Comet provides the following configuration settings. | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false | +| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective only if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general.'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle.'auto' is for Comet to choose the best shuffle mode based on the query plan.By default, this config is 'jvm'. | jvm | | spark.comet.explainFallback.enabled | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. | false | | spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. Default value is 0.2. | 0.2 | | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 01fa7bdbef..5a3100bd05 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -39,22 +39,26 @@ It must be set before the Spark context is created. You can enable or disable Co at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`. Once it is disabled, Comet will fallback to the default Spark shuffle manager. -### Columnar Shuffle +### Shuffle Mode -By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses columnar shuffle +Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto Mode. + +#### Columnar Shuffle + +By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses JVM-based columnar shuffle to improve the performance of shuffle operations. Columnar shuffle supports HashPartitioning, -RoundRobinPartitioning, RangePartitioning and SinglePartitioning. +RoundRobinPartitioning, RangePartitioning and SinglePartitioning. This mode has the highest +query coverage. -Columnar shuffle can be disabled by setting `spark.comet.columnar.shuffle.enabled` to `false`. +Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to `jvm`. -### Native Shuffle +#### Native Shuffle Comet also provides a fully native shuffle implementation that can be used to improve the performance. -To enable native shuffle, just disable `spark.comet.columnar.shuffle.enabled`. +To enable native shuffle, just set `spark.comet.exec.shuffle.mode` to `native` Native shuffle only supports HashPartitioning and SinglePartitioning. +### Auto Mode - - - +`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan. diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 85a19f55ca..5b6bd5870e 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf._ -import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -194,32 +194,6 @@ class CometSparkSessionExtensions } case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { - private def applyCometShuffle(plan: SparkPlan): SparkPlan = { - plan.transformUp { - case s: ShuffleExchangeExec - if isCometPlan(s.child) && !isCometColumnarShuffleEnabled(conf) && - QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 => - logInfo("Comet extension enabled for Native Shuffle") - - // Switch to use Decimal128 regardless of precision, since Arrow native execution - // doesn't support Decimal32 and Decimal64 yet. - conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true") - CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle) - - // Columnar shuffle for regular Spark operators (not Comet) and Comet operators - // (if configured) - case s: ShuffleExchangeExec - if (!s.child.supportsColumnar || isCometPlan( - s.child)) && isCometColumnarShuffleEnabled(conf) && - QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 && - !isShuffleOperator(s.child) => - logInfo("Comet extension enabled for JVM Columnar Shuffle") - CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle) - } - } - - private def isCometPlan(op: SparkPlan): Boolean = op.isInstanceOf[CometPlan] - private def isCometNative(op: SparkPlan): Boolean = op.isInstanceOf[CometNativeExec] // spotless:off @@ -641,7 +615,7 @@ class CometSparkSessionExtensions // Native shuffle for Comet operators case s: ShuffleExchangeExec if isCometShuffleEnabled(conf) && - !isCometColumnarShuffleEnabled(conf) && + isCometNativeShuffleMode(conf) && QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 => logInfo("Comet extension enabled for Native Shuffle") @@ -662,7 +636,7 @@ class CometSparkSessionExtensions // If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not // convert it to CometColumnarShuffle, case s: ShuffleExchangeExec - if isCometShuffleEnabled(conf) && isCometColumnarShuffleEnabled(conf) && + if isCometShuffleEnabled(conf) && isCometJVMShuffleMode(conf) && QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 && !isShuffleOperator(s.child) => logInfo("Comet extension enabled for JVM Columnar Shuffle") @@ -684,19 +658,19 @@ class CometSparkSessionExtensions case s: ShuffleExchangeExec => val isShuffleEnabled = isCometShuffleEnabled(conf) val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no reason available") - val msg1 = createMessage(!isShuffleEnabled, s"Native shuffle is not enabled: $reason") - val columnarShuffleEnabled = isCometColumnarShuffleEnabled(conf) + val msg1 = createMessage(!isShuffleEnabled, s"Comet shuffle is not enabled: $reason") + val columnarShuffleEnabled = isCometJVMShuffleMode(conf) val msg2 = createMessage( isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde .supportPartitioning(s.child.output, s.outputPartitioning) ._1, - "Shuffle: " + + "Native shuffle: " + s"${QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._2}") val msg3 = createMessage( isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde .supportPartitioningTypes(s.child.output) ._1, - s"Columnar shuffle: ${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}") + s"JVM shuffle: ${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}") withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(",")) s @@ -724,89 +698,80 @@ class CometSparkSessionExtensions } // We shouldn't transform Spark query plan if Comet is disabled. - if (!isCometEnabled(conf)) return plan - - if (!isCometExecEnabled(conf)) { - // Comet exec is disabled, but for Spark shuffle, we still can use Comet columnar shuffle - if (isCometShuffleEnabled(conf)) { - applyCometShuffle(plan) - } else { - plan - } - } else { - var newPlan = transform(plan) - - // if the plan cannot be run fully natively then explain why (when appropriate - // config is enabled) - if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { - new ExtendedExplainInfo().extensionInfo(newPlan) match { - case reasons if reasons.size == 1 => - logWarning( - "Comet cannot execute some parts of this plan natively " + - s"because ${reasons.head}") - case reasons if reasons.size > 1 => - logWarning( - "Comet cannot execute some parts of this plan natively" + - s" because:\n\t- ${reasons.mkString("\n\t- ")}") - case _ => - // no reasons recorded - } + if (!isCometEnabled(conf) || !isCometExecEnabled(conf)) return plan + + var newPlan = transform(plan) + + // if the plan cannot be run fully natively then explain why (when appropriate + // config is enabled) + if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { + new ExtendedExplainInfo().extensionInfo(newPlan) match { + case reasons if reasons.size == 1 => + logWarning( + "Comet cannot execute some parts of this plan natively " + + s"because ${reasons.head}") + case reasons if reasons.size > 1 => + logWarning( + "Comet cannot execute some parts of this plan natively" + + s" because:\n\t- ${reasons.mkString("\n\t- ")}") + case _ => + // no reasons recorded } + } - // Remove placeholders - newPlan = newPlan.transform { - case CometSinkPlaceHolder(_, _, s) => s - case CometScanWrapper(_, s) => s - } + // Remove placeholders + newPlan = newPlan.transform { + case CometSinkPlaceHolder(_, _, s) => s + case CometScanWrapper(_, s) => s + } - // Set up logical links - newPlan = newPlan.transform { - case op: CometExec => - if (op.originalPlan.logicalLink.isEmpty) { - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) - } else { - op.originalPlan.logicalLink.foreach(op.setLogicalLink) - } - op - case op: CometShuffleExchangeExec => - // Original Spark shuffle exchange operator might have empty logical link. - // But the `setLogicalLink` call above on downstream operator of - // `CometShuffleExchangeExec` will set its logical link to the downstream - // operators which cause AQE behavior to be incorrect. So we need to unset - // the logical link here. - if (op.originalPlan.logicalLink.isEmpty) { - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) - } else { - op.originalPlan.logicalLink.foreach(op.setLogicalLink) - } - op + // Set up logical links + newPlan = newPlan.transform { + case op: CometExec => + if (op.originalPlan.logicalLink.isEmpty) { + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) + } else { + op.originalPlan.logicalLink.foreach(op.setLogicalLink) + } + op + case op: CometShuffleExchangeExec => + // Original Spark shuffle exchange operator might have empty logical link. + // But the `setLogicalLink` call above on downstream operator of + // `CometShuffleExchangeExec` will set its logical link to the downstream + // operators which cause AQE behavior to be incorrect. So we need to unset + // the logical link here. + if (op.originalPlan.logicalLink.isEmpty) { + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) + } else { + op.originalPlan.logicalLink.foreach(op.setLogicalLink) + } + op - case op: CometBroadcastExchangeExec => - if (op.originalPlan.logicalLink.isEmpty) { - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) - } else { - op.originalPlan.logicalLink.foreach(op.setLogicalLink) - } - op - } + case op: CometBroadcastExchangeExec => + if (op.originalPlan.logicalLink.isEmpty) { + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) + } else { + op.originalPlan.logicalLink.foreach(op.setLogicalLink) + } + op + } - // Convert native execution block by linking consecutive native operators. - var firstNativeOp = true - newPlan.transformDown { - case op: CometNativeExec => - if (firstNativeOp) { - firstNativeOp = false - op.convertBlock() - } else { - op - } - case op => - firstNativeOp = true + // Convert native execution block by linking consecutive native operators. + var firstNativeOp = true + newPlan.transformDown { + case op: CometNativeExec => + if (firstNativeOp) { + firstNativeOp = false + op.convertBlock() + } else { op - } + } + case op => + firstNativeOp = true + op } } @@ -966,8 +931,20 @@ object CometSparkSessionExtensions extends Logging { COMET_EXEC_ENABLED.get(conf) } - private[comet] def isCometColumnarShuffleEnabled(conf: SQLConf): Boolean = { - COMET_COLUMNAR_SHUFFLE_ENABLED.get(conf) + private[comet] def isCometNativeShuffleMode(conf: SQLConf): Boolean = { + COMET_SHUFFLE_MODE.get(conf) match { + case "native" => true + case "auto" => true + case _ => false + } + } + + private[comet] def isCometJVMShuffleMode(conf: SQLConf): Boolean = { + COMET_SHUFFLE_MODE.get(conf) match { + case "jvm" => true + case "auto" => true + case _ => false + } } private[comet] def isCometAllOperatorEnabled(conf: SQLConf): Boolean = { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 310a24ee3b..52e092a0ae 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -44,7 +44,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df1 = sql("SELECT count(DISTINCT 2), count(DISTINCT 2,3)") checkSparkAnswer(df1) @@ -57,7 +57,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { checkSparkAnswer(sql(""" |SELECT | lag(123, 100, 321) OVER (ORDER BY id) as lag, @@ -78,7 +78,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df1 = Seq( ("a", "b", "c"), ("a", "b", "c"), @@ -99,7 +99,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df = sql("SELECT LAST(n) FROM lowerCaseData") checkSparkAnswer(df) } @@ -114,7 +114,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df = sql("select sum(a), avg(a) from allNulls") checkSparkAnswer(df) } @@ -125,7 +125,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test") makeParquetFile(path, 10000, 10, false) @@ -141,7 +141,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test") @@ -160,7 +160,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { sql( "CREATE TABLE lineitem(l_extendedprice DOUBLE, l_quantity DOUBLE, l_partkey STRING) USING PARQUET") @@ -197,7 +197,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> EliminateSorts.ruleName, CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test") @@ -216,7 +216,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { withTable(table) { sql(s"CREATE TABLE $table(col DECIMAL(5, 2)) USING PARQUET") sql(s"INSERT INTO TABLE $table VALUES (CAST(12345.01 AS DECIMAL(5, 2)))") @@ -316,7 +316,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionaryEnabled => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> nativeShuffleEnabled.toString, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { withParquetTable( (0 until 100).map(i => (i, (i % 10).toString)), "tbl", @@ -497,7 +497,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { nativeShuffleEnabled => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> nativeShuffleEnabled.toString, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test") makeParquetFile(path, 1000, 20, dictionaryEnabled) @@ -686,7 +686,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final count") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(false, true).foreach { dictionaryEnabled => withParquetTable((0 until 5).map(i => (i, i % 2)), "tbl", dictionaryEnabled) { checkSparkAnswerAndNumOfAggregates("SELECT _2, COUNT(_1) FROM tbl GROUP BY _2", 2) @@ -703,7 +703,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final min/max") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable((0 until 5).map(i => (i, i % 2)), "tbl", dictionaryEnabled) { checkSparkAnswerAndNumOfAggregates( @@ -724,7 +724,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final min/max/count with result expressions") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable((0 until 5).map(i => (i, i % 2)), "tbl", dictionaryEnabled) { checkSparkAnswerAndNumOfAggregates( @@ -759,7 +759,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final sum") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(false, true).foreach { dictionaryEnabled => withParquetTable((0L until 5L).map(i => (i, i % 2)), "tbl", dictionaryEnabled) { checkSparkAnswerAndNumOfAggregates( @@ -780,7 +780,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final avg") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable( (0 until 5).map(i => (i.toDouble, i.toDouble % 2)), @@ -805,7 +805,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) { val table = "t1" @@ -850,7 +850,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("avg null handling") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { val table = "t1" withTable(table) { sql(s"create table $table(a double, b double) using parquet") @@ -872,7 +872,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { nativeShuffleEnabled => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> nativeShuffleEnabled.toString, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", + CometConf.COMET_SHUFFLE_MODE.key -> "native", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test") @@ -912,11 +912,11 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("distinct") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => - withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + Seq("native", "jvm").foreach { cometShuffleMode => + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometShuffleMode) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + val cometColumnShuffleEnabled = cometShuffleMode == "jvm" val table = "test" withTable(table) { sql(s"create table $table(col1 int, col2 int, col3 int) using parquet") @@ -970,7 +970,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" @@ -1016,9 +1016,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test bool_and/bool_or") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => - withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + Seq("native", "jvm").foreach { cometColumnShuffleEnabled => + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled.toString) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" @@ -1043,7 +1042,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("bitwise aggregate") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" @@ -1092,9 +1091,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("covar_pop and covar_samp") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => - withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + Seq("native", "jvm").foreach { cometColumnShuffleEnabled => + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" @@ -1131,9 +1129,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("var_pop and var_samp") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => - withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + Seq("native", "jvm").foreach { cometColumnShuffleEnabled => + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { Seq(true, false).foreach { nullOnDivideByZero => @@ -1171,9 +1168,8 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("stddev_pop and stddev_samp") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => - withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + Seq("native", "jvm").foreach { cometColumnShuffleEnabled => + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { Seq(true, false).foreach { nullOnDivideByZero => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 600f9c44f0..c38be7c4a0 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -54,7 +54,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString, CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString, CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") { testFun @@ -963,7 +963,7 @@ class CometShuffleSuite extends CometColumnarShuffleSuite { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { val df = sql("SELECT * FROM tbl_a") val shuffled = df @@ -983,7 +983,7 @@ class CometShuffleSuite extends CometColumnarShuffleSuite { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") { val df = sql("SELECT * FROM tbl_a") @@ -1016,7 +1016,7 @@ class DisableAQECometShuffleSuite extends CometColumnarShuffleSuite { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") { val df = sql("SELECT * FROM tbl_a") @@ -1061,7 +1061,7 @@ class CometShuffleEncryptionSuite extends CometTestBase { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "false", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncEnabled.toString) { readParquetFile(path.toString) { df => val shuffled = df diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index c5fef022cf..68c200e223 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -134,14 +134,14 @@ class CometExecSuite extends CometTestBase { .toDF("c1", "c2") .createOrReplaceTempView("v") - Seq(true, false).foreach { columnarShuffle => + Seq("native", "jvm").foreach { columnarShuffle => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> columnarShuffle.toString) { + CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffle.toString) { val df = sql("SELECT * FROM v where c1 = 1 order by c1, c2") val shuffle = find(df.queryExecution.executedPlan) { - case _: CometShuffleExchangeExec if columnarShuffle => true - case _: ShuffleExchangeExec if !columnarShuffle => true + case _: CometShuffleExchangeExec if columnarShuffle.equalsIgnoreCase("jvm") => true + case _: ShuffleExchangeExec if !columnarShuffle.equalsIgnoreCase("jvm") => true case _ => false }.get assert(shuffle.logicalLink.isEmpty) @@ -179,7 +179,7 @@ class CometExecSuite extends CometTestBase { SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled, // `REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION` is a new config in Spark 3.3+. "spark.sql.requireAllClusterKeysForDistribution" -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df = Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value") val windowSpec = Window.partitionBy("key1", "key2").orderBy("value") @@ -318,7 +318,7 @@ class CometExecSuite extends CometTestBase { dataTypes.map { subqueryType => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { var column1 = s"CAST(max(_1) AS $subqueryType)" @@ -499,7 +499,7 @@ class CometExecSuite extends CometTestBase { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withTable(tableName, dim) { sql( @@ -716,7 +716,7 @@ class CometExecSuite extends CometTestBase { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { val df = sql("SELECT * FROM tbl").sort($"_1".desc) checkSparkAnswerAndOperator(df) @@ -764,10 +764,10 @@ class CometExecSuite extends CometTestBase { } test("limit") { - Seq("true", "false").foreach { columnarShuffle => + Seq("native", "jvm").foreach { columnarShuffle => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> columnarShuffle) { + CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffle) { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") { val df = sql("SELECT * FROM tbl_a") .repartition(10, $"_1") @@ -1411,7 +1411,7 @@ class CometExecSuite extends CometTestBase { Seq("true", "false").foreach(aqe => { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqe, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> "false") { spark .range(1000) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index d48ba18392..d17e4abf4a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -37,7 +37,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper super.test(testName, testTags: _*) { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", + CometConf.COMET_SHUFFLE_MODE.key -> "native", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { testFun } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala index e8aac26195..1abe5faebe 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala @@ -90,7 +90,7 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase with SQLQueryTestH conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") - conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true") + conf.set(CometConf.COMET_SHUFFLE_MODE.key, "jvm") conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala index d6020ac691..bf4bfdbee3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala @@ -131,7 +131,7 @@ object CometExecBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { spark.sql( "SELECT (SELECT max(col1) AS parquetV1Table FROM parquetV1Table) AS a, " + "col2, col3 FROM parquetV1Table") diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala index 8655728119..30a2823cf9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala @@ -106,7 +106,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql( @@ -165,7 +165,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql( @@ -222,7 +222,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -238,7 +238,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -254,7 +254,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -321,7 +321,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -336,7 +336,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "true") { spark .sql("select c1 from parquetV1Table") @@ -409,7 +409,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { spark .sql(s"select $columns from parquetV1Table") .repartition(partitionNum, Column("c1")) @@ -422,7 +422,8 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "native") { spark .sql(s"select $columns from parquetV1Table") .repartition(partitionNum, Column("c1")) diff --git a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala index 32b76d9b00..019b4f030b 100644 --- a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala +++ b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala @@ -43,7 +43,7 @@ class CometExec3_4Suite extends CometTestBase { // The syntax is only supported by Spark 3.4+. test("subquery limit: limit with offset should return correct results") { - withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withTable("t1", "t2") { val table1 = """create temporary view t1 as select * from values @@ -95,7 +95,7 @@ class CometExec3_4Suite extends CometTestBase { // Dataset.offset API is not available before Spark 3.4 test("offset") { - withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { checkSparkAnswer(testData.offset(90)) checkSparkAnswer(arrayData.toDF().offset(99)) checkSparkAnswer(mapData.toDF().offset(99))