diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala similarity index 65% rename from spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala rename to spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index beb6dc8609..a6146148bd 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -27,24 +27,24 @@ import org.apache.spark.{Partitioner, SparkConf} import org.apache.spark.sql.{CometTestBase, Row} import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf -import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus - -abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPlanHelper { +abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper { protected val adaptiveExecutionEnabled: Boolean - protected val fastMergeEnabled: Boolean = true - protected val numElementsForceSpillThreshold: Int = 10 override protected def sparkConf: SparkConf = { val conf = super.sparkConf conf .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString) + .set(CometConf.COMET_EXEC_ENABLED.key, "false") + .set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true") + .set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") .set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString) } @@ -66,11 +66,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on nested struct including nulls") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => (i, Seq((i + 1, i.toString), null, (i + 3, (i + 3).toString)), i + 1)), @@ -91,11 +87,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on struct including nulls") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { val data: Seq[(Int, (Int, String))] = Seq((1, (0, "1")), (2, (3, "3")), (3, null)) withParquetTable(data, "tbl") { @@ -118,8 +110,6 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq("1.0", "10.0").foreach { ratio => withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> execEnabled, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable((0 until 50).map(i => (Map(Seq(i, i + 1) -> i), i + 1)), "tbl") { val df = sql("SELECT * FROM tbl") @@ -176,8 +166,6 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq("1.0", "10.0").foreach { ratio => withSQLConf( CometConf.COMET_EXEC_ENABLED.key -> execEnabled, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => ((Seq(Map(1 -> i)), Map(2 -> i), Map(3 -> i)), i + 1)), @@ -201,9 +189,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla withSQLConf( // AQE has `ShuffleStage` which is a leaf node which blocks // collecting `CometShuffleExchangeExec` node. - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl") val shuffled = df @@ -261,11 +247,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { // Boolean key withParquetTable(genTuples(50, Seq(true, false)), "tbl") { val df = sql("SELECT * FROM tbl") @@ -573,11 +555,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on array") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => ( @@ -610,11 +588,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla Seq("false", "true").foreach { execEnabled => Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled, - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => (Seq(Seq(i + 1), Seq(i + 2), Seq(i + 3)), i + 1)), "tbl") { @@ -636,11 +610,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("columnar shuffle on nested struct") { Seq(10, 201).foreach { numPartitions => Seq("1.0", "10.0").foreach { ratio => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { withParquetTable( (0 until 50).map(i => ((i, 2.toString, (i + 1).toLong, (3.toString, i + 1, (i + 2).toLong)), i + 1)), @@ -658,36 +628,13 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("fix: Dictionary arrays imported from native should not be overridden") { - Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_BATCH_SIZE.key -> "10", - CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", - CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 50).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") - .filter($"_1" === 1.toString) - .repartition(numPartitions, $"_1", $"_2") - .sortWithinPartitions($"_1") - checkSparkAnswerAndOperator(df) - } - } - } - } - test("fix: closing sliced dictionary Comet vector should not close dictionary array") { (0 to 10).foreach { _ => withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", CometConf.COMET_BATCH_SIZE.key -> "10", - CometConf.COMET_EXEC_ENABLED.key -> "false", CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.1", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") { val table1 = (0 until 1000) .map(i => (111111.toString, 2222222.toString, 3333333.toString, i.toLong)) .toDF("a", "b", "c", "d") @@ -708,11 +655,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("fix: Dictionary field should have distinct dict_id") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0") { withParquetTable( (0 until 10000).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { @@ -729,11 +672,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("dictionary shuffle") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0") { withParquetTable((0 until 10000).map(i => (1.toString, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() == sql( @@ -748,11 +687,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("dictionary shuffle: fallback to string") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0") { withParquetTable((0 until 10000).map(i => (1.toString, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() == sql( @@ -766,15 +701,10 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } test("fix: inMemSorter should be reset after spilling") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") { - assert( - sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") - .count()) - } + withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") { + assert( + sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") + .count()) } } @@ -801,14 +731,9 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla $"_18", $"_19", $"_20").foreach { col => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - readParquetFile(path.toString) { df => - val shuffled = df.select(col).repartition(numPartitions, col) - checkSparkAnswer(shuffled) - } + readParquetFile(path.toString) { df => + val shuffled = df.select(col).repartition(numPartitions, col) + checkSparkAnswer(shuffled) } } } @@ -817,10 +742,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("fix: StreamReader should always set useDecimal128 as true") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withTempPath { dir => val data = makeDecimalRDD(1000, DecimalType(12, 2), false) data.write.parquet(dir.getCanonicalPath) @@ -837,18 +759,13 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("fix: Native Unsafe decimal accessors return incorrect results") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withTempPath { dir => val data = makeDecimalRDD(1000, DecimalType(22, 2), false) data.write.parquet(dir.getCanonicalPath) readParquetFile(dir.getCanonicalPath) { df => - { - val shuffled = df.repartition(numPartitions, $"dec") - checkSparkAnswer(shuffled) - } + val shuffled = df.repartition(numPartitions, $"dec") + checkSparkAnswer(shuffled) } } } @@ -857,10 +774,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("Comet shuffle reader should respect spark.comet.batchSize") { Seq(10, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") { assert( sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() == sql( @@ -870,14 +784,11 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("Arrow shuffle should work with BatchScan") { + test("columnar shuffle should work with BatchScan") { withSQLConf( SQLConf.USE_V1_SOURCE_LIST.key -> "", // Use DataSourceV2 SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", // Disable AQE - CometConf.COMET_SCAN_ENABLED.key -> "false", // Disable CometScan to use Spark BatchScan - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { + CometConf.COMET_SCAN_ENABLED.key -> "false") { // Disable CometScan to use Spark BatchScan withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { val df = sql("SELECT * FROM tbl") val shuffled = df @@ -894,23 +805,18 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla test("Columnar shuffle for large shuffle partition number") { Seq(10, 200, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") - val shuffled = df.repartitionByRange(numPartitions, $"_2") + val shuffled = df.repartitionByRange(numPartitions, $"_2") - val cometShuffleExecs = checkCometExchange(shuffled, 1, false) - // `CometSerializedShuffleHandle` is used for large shuffle partition number, - // i.e., sort-based shuffle writer - cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName - .contains("CometSerializedShuffleHandle") + val cometShuffleExecs = checkCometExchange(shuffled, 1, false) + // `CometSerializedShuffleHandle` is used for large shuffle partition number, + // i.e., sort-based shuffle writer + cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName + .contains("CometSerializedShuffleHandle") - checkSparkAnswer(shuffled) - } + checkSparkAnswer(shuffled) } } } @@ -925,186 +831,56 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("hash shuffle: Comet shuffle") { - // Disable CometExec to explicit test Comet Arrow shuffle path - Seq(true, false).foreach { execEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled1 = df.repartition(10, $"_1") - - // If Comet execution is disabled, `Sort` operator is Spark operator - // and jvm arrow shuffle is applied. - checkCometExchange(shuffled1, 1, execEnabled) - checkSparkAnswer(shuffled1) - - val shuffled2 = df.repartition(10, $"_1", $"_2") - - checkCometExchange(shuffled2, 1, execEnabled) - checkSparkAnswer(shuffled2) - - val shuffled3 = df.repartition(10, $"_2", $"_1") - - checkCometExchange(shuffled3, 1, execEnabled) - checkSparkAnswer(shuffled3) - } - } - } - } - - test("Comet shuffle: different data type") { - // Disable CometExec to explicit test Comet native shuffle path - Seq(true, false).foreach { execEnabled => - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - val all_types = if (isSpark34Plus) { - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_13", - $"_14", - $"_15", - $"_16", - $"_17", - $"_18", - $"_19", - $"_20") - } else { - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_13", - $"_15", - $"_16", - $"_18", - $"_19", - $"_20") - } - all_types.foreach { col => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - "parquet.enable.dictionary" -> dictionaryEnabled.toString) { - readParquetFile(path.toString) { df => - val shuffled = df - .select($"_1") - .repartition(10, col) - checkCometExchange(shuffled, 1, true) - if (execEnabled) { - checkSparkAnswerAndOperator(shuffled) - } else { - checkSparkAnswer(shuffled) - } - } - } - } - } - } - } - } - - test("hash shuffle: Comet columnar shuffle") { + test("hash-based columnar shuffle") { Seq(10, 200, 201).foreach { numPartitions => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl") + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") - val shuffled1 = - df.repartitionByRange(numPartitions, $"_2").limit(2).repartition(numPartitions, $"_1") + val shuffled1 = + df.repartitionByRange(numPartitions, $"_2").limit(2).repartition(numPartitions, $"_1") - // 3 exchanges are expected: 1) shuffle to repartition by range, 2) shuffle to global limit, 3) hash shuffle - checkCometExchange(shuffled1, 3, false) - checkSparkAnswer(shuffled1) + // 3 exchanges are expected: 1) shuffle to repartition by range, 2) shuffle to global limit, 3) hash shuffle + checkCometExchange(shuffled1, 3, false) + checkSparkAnswer(shuffled1) - val shuffled2 = df - .repartitionByRange(numPartitions, $"_2") - .limit(2) - .repartition(numPartitions, $"_1", $"_2") + val shuffled2 = df + .repartitionByRange(numPartitions, $"_2") + .limit(2) + .repartition(numPartitions, $"_1", $"_2") - checkCometExchange(shuffled2, 3, false) - checkSparkAnswer(shuffled2) + checkCometExchange(shuffled2, 3, false) + checkSparkAnswer(shuffled2) - val shuffled3 = df - .repartitionByRange(numPartitions, $"_2") - .limit(2) - .repartition(numPartitions, $"_2", $"_1") + val shuffled3 = df + .repartitionByRange(numPartitions, $"_2") + .limit(2) + .repartition(numPartitions, $"_2", $"_1") - checkCometExchange(shuffled3, 3, false) - checkSparkAnswer(shuffled3) - } + checkCometExchange(shuffled3, 3, false) + checkSparkAnswer(shuffled3) } } } - test("Comet columnar shuffle shuffle: different data type") { - Seq(10, 200, 201).foreach { numPartitions => - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - - Seq( - $"_1", - $"_2", - $"_3", - $"_4", - $"_5", - $"_6", - $"_7", - $"_8", - $"_9", - $"_10", - $"_11", - $"_13", - $"_14", - $"_15", - $"_16", - $"_17", - $"_18", - $"_19", - $"_20").foreach { col => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - readParquetFile(path.toString) { df => - val shuffled = df - .select($"_1") - .repartition(numPartitions, col) - val cometShuffleExecs = checkCometExchange(shuffled, 1, false) - if (numPartitions > 200) { - // For sort-based shuffle writer - cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName - .contains("CometSerializedShuffleHandle") - } - checkSparkAnswer(shuffled) + test("columnar shuffle: different data type") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) + + Seq(10, 201).foreach { numPartitions => + (1 to 20).map(i => s"_$i").foreach { c => + readParquetFile(path.toString) { df => + val shuffled = df + .select($"_1") + .repartition(numPartitions, col(c)) + val cometShuffleExecs = checkCometExchange(shuffled, 1, false) + if (numPartitions > 200) { + // For sort-based shuffle writer + cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName + .contains("CometSerializedShuffleHandle") } + checkSparkAnswer(shuffled) } } } @@ -1112,6 +888,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } + // TODO: separate this into `CometNativeShuffleSuite`? test("Comet native operator after Comet shuffle") { Seq(true, false).foreach { columnarShuffle => withSQLConf( @@ -1151,88 +928,51 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla } } - test("Comet shuffle: single partition") { - Seq(true, false).foreach { execEnabled => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + test("columnar shuffle: single partition") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(1) + val shuffled = df.repartition(1) - checkCometExchange(shuffled, 1, execEnabled) - checkSparkAnswer(shuffled) - } - } + checkCometExchange(shuffled, 1, false) + checkSparkAnswer(shuffled) } } - test("Comet shuffle metrics") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(10, $"_1") + test("sort-based columnar shuffle metrics") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + val shuffled = df.repartition(201, $"_1") - checkCometExchange(shuffled, 1, true) - checkSparkAnswer(shuffled) + checkCometExchange(shuffled, 1, false) + checkSparkAnswer(shuffled) - // Materialize the shuffled data - shuffled.collect() - val metrics = find(shuffled.queryExecution.executedPlan) { - case _: CometShuffleExchangeExec => true - case _ => false - }.map(_.metrics).get + // Materialize the shuffled data + shuffled.collect() + val metrics = find(shuffled.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + case _ => false + }.map(_.metrics).get - assert(metrics.contains("shuffleRecordsWritten")) - assert(metrics("shuffleRecordsWritten").value == 5L) - } - } - } + assert(metrics.contains("shuffleRecordsWritten")) + assert(metrics("shuffleRecordsWritten").value == 5L) - test("sort-based shuffle metrics") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { - val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) - val shuffled = df.repartition(201, $"_1") - - checkCometExchange(shuffled, 1, false) - checkSparkAnswer(shuffled) - - // Materialize the shuffled data - shuffled.collect() - val metrics = find(shuffled.queryExecution.executedPlan) { - case _: CometShuffleExchangeExec => true - case _ => false - }.map(_.metrics).get + assert(metrics.contains("shuffleBytesWritten")) + assert(metrics("shuffleBytesWritten").value > 0) - assert(metrics.contains("shuffleRecordsWritten")) - assert(metrics("shuffleRecordsWritten").value == 5L) - - assert(metrics.contains("shuffleBytesWritten")) - assert(metrics("shuffleBytesWritten").value > 0) - - assert(metrics.contains("shuffleWriteTime")) - assert(metrics("shuffleWriteTime").value > 0) - } + assert(metrics.contains("shuffleWriteTime")) + assert(metrics("shuffleWriteTime").value > 0) } } } -class CometAsyncShuffleSuite extends CometShuffleSuiteBase { +class CometAsyncShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = true protected val adaptiveExecutionEnabled: Boolean = true } -class CometAsyncNonFastMergeShuffleSuite extends CometShuffleSuiteBase { +class CometAsyncNonFastMergeShuffleSuite extends CometColumnarShuffleSuite { override protected val fastMergeEnabled: Boolean = false protected val adaptiveExecutionEnabled: Boolean = true @@ -1240,7 +980,7 @@ class CometAsyncNonFastMergeShuffleSuite extends CometShuffleSuiteBase { protected val asyncShuffleEnable: Boolean = true } -class CometNonFastMergeShuffleSuite extends CometShuffleSuiteBase { +class CometNonFastMergeShuffleSuite extends CometColumnarShuffleSuite { override protected val fastMergeEnabled: Boolean = false protected val adaptiveExecutionEnabled: Boolean = true @@ -1248,37 +988,19 @@ class CometNonFastMergeShuffleSuite extends CometShuffleSuiteBase { protected val asyncShuffleEnable: Boolean = false } -class CometShuffleSuite extends CometShuffleSuiteBase { +class CometShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = false protected val adaptiveExecutionEnabled: Boolean = true - - import testImplicits._ - - // TODO: this test takes ~5mins to run, we should reduce the test time. - // Because this test takes too long, we only have it in `CometShuffleSuite`. - test("fix: Too many task completion listener of ArrowReaderIterator causes OOM") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_BATCH_SIZE.key -> "1", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { - withParquetTable((0 until 1000000).map(i => (1, (i + 1).toLong)), "tbl") { - assert( - sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") - .count()) - } - } - } } -class DisableAQECometShuffleSuite extends CometShuffleSuiteBase { +class DisableAQECometShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = false protected val adaptiveExecutionEnabled: Boolean = false } -class DisableAQECometAsyncShuffleSuite extends CometShuffleSuiteBase { +class DisableAQECometAsyncShuffleSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = true protected val adaptiveExecutionEnabled: Boolean = false diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala new file mode 100644 index 0000000000..44f6f24313 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.exec + +import org.apache.hadoop.fs.Path +import org.apache.spark.SparkConf +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col + +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus + +class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper { + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf + .set(CometConf.COMET_EXEC_ENABLED.key, "true") + .set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "false") + .set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") + } + + import testImplicits._ + + // TODO: this test takes ~5mins to run, we should reduce the test time. + test("fix: Too many task completion listener of ArrowReaderIterator causes OOM") { + withSQLConf(CometConf.COMET_BATCH_SIZE.key -> "1") { + withParquetTable((0 until 1000000).map(i => (1, (i + 1).toLong)), "tbl") { + assert( + sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl") + .count()) + } + } + } + + test("native shuffle: different data type") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 1000) + var allTypes: Seq[Int] = (1 to 20) + if (isSpark34Plus) { + allTypes = allTypes.filterNot(Set(14, 17).contains) + } + allTypes.map(i => s"_$i").foreach { c => + withSQLConf( + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + "parquet.enable.dictionary" -> dictionaryEnabled.toString) { + readParquetFile(path.toString) { df => + val shuffled = df + .select($"_1") + .repartition(10, col(c)) + checkCometExchange(shuffled, 1, true) + checkSparkAnswerAndOperator(shuffled) + } + } + } + } + } + } + + test("hash-based native shuffle") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + val shuffled1 = df.repartition(10, $"_1") + + checkCometExchange(shuffled1, 1, true) + checkSparkAnswer(shuffled1) + + val shuffled2 = df.repartition(10, $"_1", $"_2") + + checkCometExchange(shuffled2, 1, true) + checkSparkAnswer(shuffled2) + + val shuffled3 = df.repartition(10, $"_2", $"_1") + + checkCometExchange(shuffled3, 1, true) + checkSparkAnswer(shuffled3) + } + } + + test("columnar shuffle: single partition") { + Seq(true, false).foreach { execEnabled => + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString, + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + + val shuffled = df.repartition(1) + + checkCometExchange(shuffled, 1, execEnabled) + checkSparkAnswer(shuffled) + } + } + } + } + + test("native shuffle metrics") { + withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc) + val shuffled = df.repartition(10, $"_1") + + checkCometExchange(shuffled, 1, true) + checkSparkAnswer(shuffled) + + // Materialize the shuffled data + shuffled.collect() + val metrics = find(shuffled.queryExecution.executedPlan) { + case _: CometShuffleExchangeExec => true + case _ => false + }.map(_.metrics).get + + assert(metrics.contains("shuffleRecordsWritten")) + assert(metrics("shuffleRecordsWritten").value == 5L) + } + } + + test("fix: Dictionary arrays imported from native should not be overridden") { + Seq(10, 201).foreach { numPartitions => + withSQLConf( + CometConf.COMET_BATCH_SIZE.key -> "10", + CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", + CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true") { + withParquetTable((0 until 50).map(i => (1.toString, 2.toString, (i + 1).toLong)), "tbl") { + val df = sql("SELECT * FROM tbl") + .filter($"_1" === 1.toString) + .repartition(numPartitions, $"_1", $"_2") + .sortWithinPartitions($"_1") + checkSparkAnswerAndOperator(df) + } + } + } + } +}