From 7a15e4a224e131c3b7d5647d094fd0319f5b6f0c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 22 May 2024 15:08:51 -0700 Subject: [PATCH] feat: Add COMET_SHUFFLE_MODE config to control Comet shuffle mode --- .../scala/org/apache/comet/CometConf.scala | 21 +- docs/source/user-guide/tuning.md | 22 +- .../comet/CometSparkSessionExtensions.scala | 197 ++++++++---------- .../comet/exec/CometAggregateSuite.scala | 64 +++--- .../exec/CometColumnarShuffleSuite.scala | 10 +- .../apache/comet/exec/CometExecSuite.scala | 24 ++- .../comet/exec/CometNativeShuffleSuite.scala | 2 +- .../spark/sql/CometTPCHQuerySuite.scala | 2 +- .../sql/benchmark/CometExecBenchmark.scala | 2 +- .../sql/benchmark/CometShuffleBenchmark.scala | 19 +- .../apache/comet/exec/CometExec3_4Suite.scala | 4 +- 11 files changed, 178 insertions(+), 189 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 463de90c25..daa6213b95 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -19,6 +19,7 @@ package org.apache.comet +import java.util.Locale import java.util.concurrent.TimeUnit import scala.collection.mutable.ListBuffer @@ -131,14 +132,18 @@ object CometConf { .booleanConf .createWithDefault(false) - val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] = - conf("spark.comet.columnar.shuffle.enabled") - .doc( - "Whether to enable Arrow-based columnar shuffle for Comet and Spark regular operators. " + - "If this is enabled, Comet prefers columnar shuffle than native shuffle. " + - "By default, this config is true.") - .booleanConf - .createWithDefault(true) + val COMET_SHUFFLE_MODE: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode") + .doc( + "The mode of Comet shuffle. This config is only effective only if Comet shuffle " + + "is enabled. Available modes are 'native', 'jvm', and 'auto'. " + + "'native' is for native shuffle which has best performance in general." + + "'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle." + + "'auto' is for Comet to choose the best shuffle mode based on the query plan." + + "By default, this config is 'jvm'.") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .checkValues(Set("native", "jvm", "auto")) + .createWithDefault("jvm") val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.shuffle.enforceMode.enabled") diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index 01fa7bdbef..5a3100bd05 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -39,22 +39,26 @@ It must be set before the Spark context is created. You can enable or disable Co at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`. Once it is disabled, Comet will fallback to the default Spark shuffle manager. -### Columnar Shuffle +### Shuffle Mode -By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses columnar shuffle +Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto Mode. + +#### Columnar Shuffle + +By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses JVM-based columnar shuffle to improve the performance of shuffle operations. Columnar shuffle supports HashPartitioning, -RoundRobinPartitioning, RangePartitioning and SinglePartitioning. +RoundRobinPartitioning, RangePartitioning and SinglePartitioning. This mode has the highest +query coverage. -Columnar shuffle can be disabled by setting `spark.comet.columnar.shuffle.enabled` to `false`. +Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to `jvm`. -### Native Shuffle +#### Native Shuffle Comet also provides a fully native shuffle implementation that can be used to improve the performance. -To enable native shuffle, just disable `spark.comet.columnar.shuffle.enabled`. +To enable native shuffle, just set `spark.comet.exec.shuffle.mode` to `native` Native shuffle only supports HashPartitioning and SinglePartitioning. +### Auto Mode - - - +`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan. diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 85a19f55ca..c5394cc930 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf._ -import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometColumnarShuffleEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometShuffleMode, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometOperatorEnabled, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSchemaSupported, isSpark34Plus, shouldApplyRowToColumnar, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde @@ -194,30 +194,6 @@ class CometSparkSessionExtensions } case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { - private def applyCometShuffle(plan: SparkPlan): SparkPlan = { - plan.transformUp { - case s: ShuffleExchangeExec - if isCometPlan(s.child) && !isCometColumnarShuffleEnabled(conf) && - QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 => - logInfo("Comet extension enabled for Native Shuffle") - - // Switch to use Decimal128 regardless of precision, since Arrow native execution - // doesn't support Decimal32 and Decimal64 yet. - conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true") - CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle) - - // Columnar shuffle for regular Spark operators (not Comet) and Comet operators - // (if configured) - case s: ShuffleExchangeExec - if (!s.child.supportsColumnar || isCometPlan( - s.child)) && isCometColumnarShuffleEnabled(conf) && - QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 && - !isShuffleOperator(s.child) => - logInfo("Comet extension enabled for JVM Columnar Shuffle") - CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle) - } - } - private def isCometPlan(op: SparkPlan): Boolean = op.isInstanceOf[CometPlan] private def isCometNative(op: SparkPlan): Boolean = op.isInstanceOf[CometNativeExec] @@ -641,7 +617,7 @@ class CometSparkSessionExtensions // Native shuffle for Comet operators case s: ShuffleExchangeExec if isCometShuffleEnabled(conf) && - !isCometColumnarShuffleEnabled(conf) && + !getCometShuffleMode(conf) == JVMShuffle && QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._1 => logInfo("Comet extension enabled for Native Shuffle") @@ -662,7 +638,7 @@ class CometSparkSessionExtensions // If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not // convert it to CometColumnarShuffle, case s: ShuffleExchangeExec - if isCometShuffleEnabled(conf) && isCometColumnarShuffleEnabled(conf) && + if isCometShuffleEnabled(conf) && getCometShuffleMode(conf) != NativeShuffle && QueryPlanSerde.supportPartitioningTypes(s.child.output)._1 && !isShuffleOperator(s.child) => logInfo("Comet extension enabled for JVM Columnar Shuffle") @@ -684,19 +660,19 @@ class CometSparkSessionExtensions case s: ShuffleExchangeExec => val isShuffleEnabled = isCometShuffleEnabled(conf) val reason = getCometShuffleNotEnabledReason(conf).getOrElse("no reason available") - val msg1 = createMessage(!isShuffleEnabled, s"Native shuffle is not enabled: $reason") - val columnarShuffleEnabled = isCometColumnarShuffleEnabled(conf) + val msg1 = createMessage(!isShuffleEnabled, s"Comet shuffle is not enabled: $reason") + val columnarShuffleEnabled = getCometShuffleMode(conf) == JVMShuffle val msg2 = createMessage( isShuffleEnabled && !columnarShuffleEnabled && !QueryPlanSerde .supportPartitioning(s.child.output, s.outputPartitioning) ._1, - "Shuffle: " + + "Native shuffle: " + s"${QueryPlanSerde.supportPartitioning(s.child.output, s.outputPartitioning)._2}") val msg3 = createMessage( isShuffleEnabled && columnarShuffleEnabled && !QueryPlanSerde .supportPartitioningTypes(s.child.output) ._1, - s"Columnar shuffle: ${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}") + s"JVM shuffle: ${QueryPlanSerde.supportPartitioningTypes(s.child.output)._2}") withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(",")) s @@ -724,89 +700,80 @@ class CometSparkSessionExtensions } // We shouldn't transform Spark query plan if Comet is disabled. - if (!isCometEnabled(conf)) return plan - - if (!isCometExecEnabled(conf)) { - // Comet exec is disabled, but for Spark shuffle, we still can use Comet columnar shuffle - if (isCometShuffleEnabled(conf)) { - applyCometShuffle(plan) - } else { - plan - } - } else { - var newPlan = transform(plan) - - // if the plan cannot be run fully natively then explain why (when appropriate - // config is enabled) - if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { - new ExtendedExplainInfo().extensionInfo(newPlan) match { - case reasons if reasons.size == 1 => - logWarning( - "Comet cannot execute some parts of this plan natively " + - s"because ${reasons.head}") - case reasons if reasons.size > 1 => - logWarning( - "Comet cannot execute some parts of this plan natively" + - s" because:\n\t- ${reasons.mkString("\n\t- ")}") - case _ => - // no reasons recorded - } + if (!isCometEnabled(conf) || !isCometExecEnabled(conf)) return plan + + var newPlan = transform(plan) + + // if the plan cannot be run fully natively then explain why (when appropriate + // config is enabled) + if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) { + new ExtendedExplainInfo().extensionInfo(newPlan) match { + case reasons if reasons.size == 1 => + logWarning( + "Comet cannot execute some parts of this plan natively " + + s"because ${reasons.head}") + case reasons if reasons.size > 1 => + logWarning( + "Comet cannot execute some parts of this plan natively" + + s" because:\n\t- ${reasons.mkString("\n\t- ")}") + case _ => + // no reasons recorded } + } - // Remove placeholders - newPlan = newPlan.transform { - case CometSinkPlaceHolder(_, _, s) => s - case CometScanWrapper(_, s) => s - } + // Remove placeholders + newPlan = newPlan.transform { + case CometSinkPlaceHolder(_, _, s) => s + case CometScanWrapper(_, s) => s + } - // Set up logical links - newPlan = newPlan.transform { - case op: CometExec => - if (op.originalPlan.logicalLink.isEmpty) { - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) - } else { - op.originalPlan.logicalLink.foreach(op.setLogicalLink) - } - op - case op: CometShuffleExchangeExec => - // Original Spark shuffle exchange operator might have empty logical link. - // But the `setLogicalLink` call above on downstream operator of - // `CometShuffleExchangeExec` will set its logical link to the downstream - // operators which cause AQE behavior to be incorrect. So we need to unset - // the logical link here. - if (op.originalPlan.logicalLink.isEmpty) { - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) - } else { - op.originalPlan.logicalLink.foreach(op.setLogicalLink) - } - op + // Set up logical links + newPlan = newPlan.transform { + case op: CometExec => + if (op.originalPlan.logicalLink.isEmpty) { + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) + } else { + op.originalPlan.logicalLink.foreach(op.setLogicalLink) + } + op + case op: CometShuffleExchangeExec => + // Original Spark shuffle exchange operator might have empty logical link. + // But the `setLogicalLink` call above on downstream operator of + // `CometShuffleExchangeExec` will set its logical link to the downstream + // operators which cause AQE behavior to be incorrect. So we need to unset + // the logical link here. + if (op.originalPlan.logicalLink.isEmpty) { + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) + } else { + op.originalPlan.logicalLink.foreach(op.setLogicalLink) + } + op - case op: CometBroadcastExchangeExec => - if (op.originalPlan.logicalLink.isEmpty) { - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) - op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) - } else { - op.originalPlan.logicalLink.foreach(op.setLogicalLink) - } - op - } + case op: CometBroadcastExchangeExec => + if (op.originalPlan.logicalLink.isEmpty) { + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_TAG) + op.unsetTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG) + } else { + op.originalPlan.logicalLink.foreach(op.setLogicalLink) + } + op + } - // Convert native execution block by linking consecutive native operators. - var firstNativeOp = true - newPlan.transformDown { - case op: CometNativeExec => - if (firstNativeOp) { - firstNativeOp = false - op.convertBlock() - } else { - op - } - case op => - firstNativeOp = true + // Convert native execution block by linking consecutive native operators. + var firstNativeOp = true + newPlan.transformDown { + case op: CometNativeExec => + if (firstNativeOp) { + firstNativeOp = false + op.convertBlock() + } else { op - } + } + case op => + firstNativeOp = true + op } } @@ -966,8 +933,12 @@ object CometSparkSessionExtensions extends Logging { COMET_EXEC_ENABLED.get(conf) } - private[comet] def isCometColumnarShuffleEnabled(conf: SQLConf): Boolean = { - COMET_COLUMNAR_SHUFFLE_ENABLED.get(conf) + private[comet] def getCometShuffleMode(conf: SQLConf): CometShuffleType = { + COMET_SHUFFLE_MODE.get(conf) match { + case "jvm" => JVMShuffle + case "native" => NativeShuffle + case _ => AutoShuffle + } } private[comet] def isCometAllOperatorEnabled(conf: SQLConf): Boolean = { @@ -1138,3 +1109,9 @@ object CometSparkSessionExtensions extends Logging { } } } + +sealed abstract class CometShuffleType + +case object AutoShuffle extends CometShuffleType +case object JVMShuffle extends CometShuffleType +case object NativeShuffle extends CometShuffleType \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 310a24ee3b..36a52eea1d 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -44,7 +44,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df1 = sql("SELECT count(DISTINCT 2), count(DISTINCT 2,3)") checkSparkAnswer(df1) @@ -57,7 +57,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { checkSparkAnswer(sql(""" |SELECT | lag(123, 100, 321) OVER (ORDER BY id) as lag, @@ -78,7 +78,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df1 = Seq( ("a", "b", "c"), ("a", "b", "c"), @@ -99,7 +99,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df = sql("SELECT LAST(n) FROM lowerCaseData") checkSparkAnswer(df) } @@ -114,7 +114,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df = sql("select sum(a), avg(a) from allNulls") checkSparkAnswer(df) } @@ -125,7 +125,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test") makeParquetFile(path, 10000, 10, false) @@ -141,7 +141,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test") @@ -160,7 +160,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { sql( "CREATE TABLE lineitem(l_extendedprice DOUBLE, l_quantity DOUBLE, l_partkey STRING) USING PARQUET") @@ -197,7 +197,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> EliminateSorts.ruleName, CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test") @@ -216,7 +216,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { withTable(table) { sql(s"CREATE TABLE $table(col DECIMAL(5, 2)) USING PARQUET") sql(s"INSERT INTO TABLE $table VALUES (CAST(12345.01 AS DECIMAL(5, 2)))") @@ -316,7 +316,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionaryEnabled => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> nativeShuffleEnabled.toString, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { withParquetTable( (0 until 100).map(i => (i, (i % 10).toString)), "tbl", @@ -497,7 +497,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { nativeShuffleEnabled => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> nativeShuffleEnabled.toString, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test") makeParquetFile(path, 1000, 20, dictionaryEnabled) @@ -686,7 +686,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final count") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(false, true).foreach { dictionaryEnabled => withParquetTable((0 until 5).map(i => (i, i % 2)), "tbl", dictionaryEnabled) { checkSparkAnswerAndNumOfAggregates("SELECT _2, COUNT(_1) FROM tbl GROUP BY _2", 2) @@ -703,7 +703,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final min/max") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable((0 until 5).map(i => (i, i % 2)), "tbl", dictionaryEnabled) { checkSparkAnswerAndNumOfAggregates( @@ -724,7 +724,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final min/max/count with result expressions") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable((0 until 5).map(i => (i, i % 2)), "tbl", dictionaryEnabled) { checkSparkAnswerAndNumOfAggregates( @@ -759,7 +759,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final sum") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(false, true).foreach { dictionaryEnabled => withParquetTable((0L until 5L).map(i => (i, i % 2)), "tbl", dictionaryEnabled) { checkSparkAnswerAndNumOfAggregates( @@ -780,7 +780,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test final avg") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable( (0 until 5).map(i => (i.toDouble, i.toDouble % 2)), @@ -805,7 +805,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { Seq(true, false).foreach { dictionaryEnabled => withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) { val table = "t1" @@ -850,7 +850,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("avg null handling") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { + CometConf.COMET_SHUFFLE_MODE.key -> "native") { val table = "t1" withTable(table) { sql(s"create table $table(a double, b double) using parquet") @@ -872,7 +872,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { nativeShuffleEnabled => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> nativeShuffleEnabled.toString, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", + CometConf.COMET_SHUFFLE_MODE.key -> "native", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test") @@ -912,9 +912,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("distinct") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => + Seq("native", "jvm").foreach { cometColumnShuffleEnabled => withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" @@ -970,7 +970,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" @@ -1016,9 +1016,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("test bool_and/bool_or") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => + Seq("native", "jvm").foreach { cometColumnShuffleEnabled => withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled.toString) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" @@ -1043,7 +1043,7 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("bitwise aggregate") { withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" @@ -1092,9 +1092,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("covar_pop and covar_samp") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => + Seq("native", "jvm").foreach { cometColumnShuffleEnabled => withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" @@ -1131,9 +1131,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("var_pop and var_samp") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => + Seq("native", "jvm").foreach { cometColumnShuffleEnabled => withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { Seq(true, false).foreach { nullOnDivideByZero => @@ -1171,9 +1171,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("stddev_pop and stddev_samp") { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - Seq(true, false).foreach { cometColumnShuffleEnabled => + Seq("native", "jvm").foreach { cometColumnShuffleEnabled => withSQLConf( - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + CometConf.COMET_SHUFFLE_MODE.key -> cometColumnShuffleEnabled) { Seq(true, false).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { Seq(true, false).foreach { nullOnDivideByZero => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 600f9c44f0..c38be7c4a0 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -54,7 +54,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString, CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString, CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") { testFun @@ -963,7 +963,7 @@ class CometShuffleSuite extends CometColumnarShuffleSuite { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { val df = sql("SELECT * FROM tbl_a") val shuffled = df @@ -983,7 +983,7 @@ class CometShuffleSuite extends CometColumnarShuffleSuite { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") { val df = sql("SELECT * FROM tbl_a") @@ -1016,7 +1016,7 @@ class DisableAQECometShuffleSuite extends CometColumnarShuffleSuite { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { withParquetTable((0 until 10).map(i => (i % 10, i + 2)), "tbl_b") { val df = sql("SELECT * FROM tbl_a") @@ -1061,7 +1061,7 @@ class CometShuffleEncryptionSuite extends CometTestBase { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "false", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncEnabled.toString) { readParquetFile(path.toString) { df => val shuffled = df diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index c5fef022cf..668202d96e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -134,14 +134,16 @@ class CometExecSuite extends CometTestBase { .toDF("c1", "c2") .createOrReplaceTempView("v") - Seq(true, false).foreach { columnarShuffle => + Seq("native", "jvm").foreach { columnarShuffle => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> columnarShuffle.toString) { + CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffle.toString) { val df = sql("SELECT * FROM v where c1 = 1 order by c1, c2") val shuffle = find(df.queryExecution.executedPlan) { - case _: CometShuffleExchangeExec if columnarShuffle => true - case _: ShuffleExchangeExec if !columnarShuffle => true + case _: CometShuffleExchangeExec + if columnarShuffle.equalsIgnoreCase("jvm") => true + case _: ShuffleExchangeExec + if !columnarShuffle.equalsIgnoreCase("jvm") => true case _ => false }.get assert(shuffle.logicalLink.isEmpty) @@ -179,7 +181,7 @@ class CometExecSuite extends CometTestBase { SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled, // `REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION` is a new config in Spark 3.3+. "spark.sql.requireAllClusterKeysForDistribution" -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { val df = Seq(("a", 1, 1), ("a", 2, 2), ("b", 1, 3), ("b", 1, 4)).toDF("key1", "key2", "value") val windowSpec = Window.partitionBy("key1", "key2").orderBy("value") @@ -318,7 +320,7 @@ class CometExecSuite extends CometTestBase { dataTypes.map { subqueryType => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { var column1 = s"CAST(max(_1) AS $subqueryType)" @@ -499,7 +501,7 @@ class CometExecSuite extends CometTestBase { SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withTable(tableName, dim) { sql( @@ -716,7 +718,7 @@ class CometExecSuite extends CometTestBase { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { val df = sql("SELECT * FROM tbl").sort($"_1".desc) checkSparkAnswerAndOperator(df) @@ -764,10 +766,10 @@ class CometExecSuite extends CometTestBase { } test("limit") { - Seq("true", "false").foreach { columnarShuffle => + Seq("native", "jvm").foreach { columnarShuffle => withSQLConf( CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> columnarShuffle) { + CometConf.COMET_SHUFFLE_MODE.key -> columnarShuffle) { withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl_a") { val df = sql("SELECT * FROM tbl_a") .repartition(10, $"_1") @@ -1411,7 +1413,7 @@ class CometExecSuite extends CometTestBase { Seq("true", "false").foreach(aqe => { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqe, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> "false") { spark .range(1000) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index d48ba18392..d17e4abf4a 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -37,7 +37,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper super.test(testName, testTags: _*) { withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", + CometConf.COMET_SHUFFLE_MODE.key -> "native", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { testFun } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala index e8aac26195..1abe5faebe 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala @@ -90,7 +90,7 @@ class CometTPCHQuerySuite extends QueryTest with CometTPCBase with SQLQueryTestH conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") - conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true") + conf.set(CometConf.COMET_SHUFFLE_MODE.key, "jvm") conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") conf.set(MEMORY_OFFHEAP_SIZE.key, "2g") } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala index d6020ac691..bf4bfdbee3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala @@ -131,7 +131,7 @@ object CometExecBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { spark.sql( "SELECT (SELECT max(col1) AS parquetV1Table FROM parquetV1Table) AS a, " + "col2, col3 FROM parquetV1Table") diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala index 8655728119..30a2823cf9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala @@ -106,7 +106,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql( @@ -165,7 +165,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql( @@ -222,7 +222,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -238,7 +238,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -254,7 +254,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -321,7 +321,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "false") { spark .sql("select c1 from parquetV1Table") @@ -336,7 +336,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm", CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> "true") { spark .sql("select c1 from parquetV1Table") @@ -409,7 +409,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { spark .sql(s"select $columns from parquetV1Table") .repartition(partitionNum, Column("c1")) @@ -422,7 +422,8 @@ object CometShuffleBenchmark extends CometBenchmarkBase { CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "native") { spark .sql(s"select $columns from parquetV1Table") .repartition(partitionNum, Column("c1")) diff --git a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala index 32b76d9b00..019b4f030b 100644 --- a/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala +++ b/spark/src/test/spark-3.4/org/apache/comet/exec/CometExec3_4Suite.scala @@ -43,7 +43,7 @@ class CometExec3_4Suite extends CometTestBase { // The syntax is only supported by Spark 3.4+. test("subquery limit: limit with offset should return correct results") { - withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { withTable("t1", "t2") { val table1 = """create temporary view t1 as select * from values @@ -95,7 +95,7 @@ class CometExec3_4Suite extends CometTestBase { // Dataset.offset API is not available before Spark 3.4 test("offset") { - withSQLConf(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { checkSparkAnswer(testData.offset(90)) checkSparkAnswer(arrayData.toDF().offset(99)) checkSparkAnswer(mapData.toDF().offset(99))