From bd965041d0239a2207ee2d2ce8576970345722a8 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Sun, 25 Feb 2024 00:22:45 -0800 Subject: [PATCH] fix --- .../org/apache/comet/CometCastSuite.scala | 4 +- .../exec/CometColumnarShuffleSuite.scala | 78 +++++++------------ .../comet/exec/CometNativeShuffleSuite.scala | 77 ++++++++++++------ 3 files changed, 84 insertions(+), 75 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 565d2264b7..317371fb90 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -90,13 +90,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { Range(0, len).map(_ => chars.charAt(r.nextInt(chars.length))).mkString } - private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType) { + private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType): Unit = { val r = new Random(0) val inputs = Range(0, 10000).map(_ => genString(r, chars, maxLen)) castTest(inputs.toDF("a"), toType) } - private def castTest(input: DataFrame, toType: DataType) { + private def castTest(input: DataFrame, toType: DataType): Unit = { withTempPath { dir => val df = roundtripParquet(input, dir) .withColumn("converted", col("a").cast(toType)) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index a6146148bd..289ed07678 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -42,9 +42,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar val conf = super.sparkConf conf .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString) - .set(CometConf.COMET_EXEC_ENABLED.key, "false") - .set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true") - .set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") .set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString) } @@ -55,7 +52,10 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar super.test(testName, testTags: _*) { withSQLConf( CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString, - CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString) { + CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString, + CometConf.COMET_EXEC_ENABLED.key -> "false", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { testFun } } @@ -821,16 +821,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar } } - test("grouped aggregate: Comet shuffle") { - withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { - val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1") - checkCometExchange(df, 1, true) - checkSparkAnswerAndOperator(df) - } - } - } - test("hash-based columnar shuffle") { Seq(10, 200, 201).foreach { numPartitions => withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { @@ -888,43 +878,29 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar } } - // TODO: separate this into `CometNativeShuffleSuite`? - test("Comet native operator after Comet shuffle") { - Seq(true, false).foreach { columnarShuffle => - withSQLConf( - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> columnarShuffle.toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") - - val shuffled1 = df - .repartition(10, $"_2") - .select($"_1", $"_1" + 1, $"_2" + 2) - .repartition(10, $"_1") - .filter($"_1" > 1) - - // 2 Comet shuffle exchanges are expected - checkCometExchange(shuffled1, 2, !columnarShuffle) - checkSparkAnswer(shuffled1) - - val shuffled2 = df - .repartitionByRange(10, $"_2") - .select($"_1", $"_1" + 1, $"_2" + 2) - .repartition(10, $"_1") - .filter($"_1" > 1) - - // 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled - if (columnarShuffle) { - checkCometExchange(shuffled2, 2, !columnarShuffle) - } else { - // Because the first exchange from the bottom is range exchange which native shuffle - // doesn't support. So Comet exec operators stop before the first exchange and thus - // there is no Comet exchange. - checkCometExchange(shuffled2, 0, true) - } - checkSparkAnswer(shuffled2) - } - } + test("native operator after columnar shuffle") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") + + val shuffled1 = df + .repartition(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // 2 Comet shuffle exchanges are expected + checkCometExchange(shuffled1, 2, false) + checkSparkAnswer(shuffled1) + + val shuffled2 = df + .repartitionByRange(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled + checkCometExchange(shuffled2, 2, false) + checkSparkAnswer(shuffled2) } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala index f047f07246..0b027a68ae 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -19,8 +19,10 @@ package org.apache.comet.exec +import org.scalactic.source.Position +import org.scalatest.Tag + import org.apache.hadoop.fs.Path -import org.apache.spark.SparkConf import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -30,12 +32,16 @@ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper { - override protected def sparkConf: SparkConf = { - val conf = super.sparkConf - conf - .set(CometConf.COMET_EXEC_ENABLED.key, "true") - .set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "false") - .set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + super.test(testName, testTags: _*) { + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + testFun + } + } } import testImplicits._ @@ -61,9 +67,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper allTypes = allTypes.filterNot(Set(14, 17).contains) } allTypes.map(i => s"_$i").foreach { c => - withSQLConf( - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - "parquet.enable.dictionary" -> dictionaryEnabled.toString) { + withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) { readParquetFile(path.toString) { df => val shuffled = df .select($"_1") @@ -98,20 +102,49 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper } test("columnar shuffle: single partition") { - Seq(true, false).foreach { execEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(1) + val shuffled = df.repartition(1) - checkCometExchange(shuffled, 1, execEnabled) - checkSparkAnswer(shuffled) - } - } + checkCometExchange(shuffled, 1, true) + checkSparkAnswer(shuffled) + } + } + + test("native operator after native shuffle") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") + + val shuffled1 = df + .repartition(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // 2 Comet shuffle exchanges are expected + checkCometExchange(shuffled1, 2, true) + checkSparkAnswer(shuffled1) + + val shuffled2 = df + .repartitionByRange(10, $"_2") + .select($"_1", $"_1" + 1, $"_2" + 2) + .repartition(10, $"_1") + .filter($"_1" > 1) + + // Because the first exchange from the bottom is range exchange which native shuffle + // doesn't support. So Comet exec operators stop before the first exchange and thus + // there is no Comet exchange. + checkCometExchange(shuffled2, 0, true) + checkSparkAnswer(shuffled2) + } + } + + test("grouped aggregate: native shuffle") { + withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") { + val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1") + checkCometExchange(df, 1, true) + checkSparkAnswerAndOperator(df) } }