Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Feb 25, 2024
1 parent 4827acb commit 6f82611
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 75 deletions.
4 changes: 2 additions & 2 deletions spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
Range(0, len).map(_ => chars.charAt(r.nextInt(chars.length))).mkString
}

private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType) {
private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType): Unit = {
val r = new Random(0)
val inputs = Range(0, 10000).map(_ => genString(r, chars, maxLen))
castTest(inputs.toDF("a"), toType)
}

private def castTest(input: DataFrame, toType: DataType) {
private def castTest(input: DataFrame, toType: DataType): Unit = {
withTempPath { dir =>
val df = roundtripParquet(input, dir)
.withColumn("converted", col("a").cast(toType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
val conf = super.sparkConf
conf
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString)
.set(CometConf.COMET_EXEC_ENABLED.key, "false")
.set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "true")
.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
.set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString)
}

Expand All @@ -55,7 +52,10 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
super.test(testName, testTags: _*) {
withSQLConf(
CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString,
CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString) {
CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString,
CometConf.COMET_EXEC_ENABLED.key -> "false",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
testFun
}
}
Expand Down Expand Up @@ -821,16 +821,6 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
}
}

test("grouped aggregate: Comet shuffle") {
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") {
val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1")
checkCometExchange(df, 1, true)
checkSparkAnswerAndOperator(df)
}
}
}

test("hash-based columnar shuffle") {
Seq(10, 200, 201).foreach { numPartitions =>
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
Expand Down Expand Up @@ -888,43 +878,29 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
}
}

// TODO: separate this into `CometNativeShuffleSuite`?
test("Comet native operator after Comet shuffle") {
Seq(true, false).foreach { columnarShuffle =>
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> columnarShuffle.toString) {
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
val df = sql("SELECT * FROM tbl")

val shuffled1 = df
.repartition(10, $"_2")
.select($"_1", $"_1" + 1, $"_2" + 2)
.repartition(10, $"_1")
.filter($"_1" > 1)

// 2 Comet shuffle exchanges are expected
checkCometExchange(shuffled1, 2, !columnarShuffle)
checkSparkAnswer(shuffled1)

val shuffled2 = df
.repartitionByRange(10, $"_2")
.select($"_1", $"_1" + 1, $"_2" + 2)
.repartition(10, $"_1")
.filter($"_1" > 1)

// 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled
if (columnarShuffle) {
checkCometExchange(shuffled2, 2, !columnarShuffle)
} else {
// Because the first exchange from the bottom is range exchange which native shuffle
// doesn't support. So Comet exec operators stop before the first exchange and thus
// there is no Comet exchange.
checkCometExchange(shuffled2, 0, true)
}
checkSparkAnswer(shuffled2)
}
}
test("native operator after columnar shuffle") {
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
val df = sql("SELECT * FROM tbl")

val shuffled1 = df
.repartition(10, $"_2")
.select($"_1", $"_1" + 1, $"_2" + 2)
.repartition(10, $"_1")
.filter($"_1" > 1)

// 2 Comet shuffle exchanges are expected
checkCometExchange(shuffled1, 2, false)
checkSparkAnswer(shuffled1)

val shuffled2 = df
.repartitionByRange(10, $"_2")
.select($"_1", $"_1" + 1, $"_2" + 2)
.repartition(10, $"_1")
.filter($"_1" > 1)

// 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled
checkCometExchange(shuffled2, 2, false)
checkSparkAnswer(shuffled2)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.apache.comet.exec

import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand All @@ -30,12 +32,16 @@ import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus

class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper {
override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
conf
.set(CometConf.COMET_EXEC_ENABLED.key, "true")
.set(CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key, "false")
.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
pos: Position): Unit = {
super.test(testName, testTags: _*) {
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
testFun
}
}
}

import testImplicits._
Expand All @@ -61,9 +67,7 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper
allTypes = allTypes.filterNot(Set(14, 17).contains)
}
allTypes.map(i => s"_$i").foreach { c =>
withSQLConf(
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
"parquet.enable.dictionary" -> dictionaryEnabled.toString) {
withSQLConf("parquet.enable.dictionary" -> dictionaryEnabled.toString) {
readParquetFile(path.toString) { df =>
val shuffled = df
.select($"_1")
Expand Down Expand Up @@ -98,20 +102,49 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper
}

test("columnar shuffle: single partition") {
Seq(true, false).foreach { execEnabled =>
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> (!execEnabled).toString) {
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)

val shuffled = df.repartition(1)
val shuffled = df.repartition(1)

checkCometExchange(shuffled, 1, execEnabled)
checkSparkAnswer(shuffled)
}
}
checkCometExchange(shuffled, 1, true)
checkSparkAnswer(shuffled)
}
}

test("native operator after native shuffle") {
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
val df = sql("SELECT * FROM tbl")

val shuffled1 = df
.repartition(10, $"_2")
.select($"_1", $"_1" + 1, $"_2" + 2)
.repartition(10, $"_1")
.filter($"_1" > 1)

// 2 Comet shuffle exchanges are expected
checkCometExchange(shuffled1, 2, true)
checkSparkAnswer(shuffled1)

val shuffled2 = df
.repartitionByRange(10, $"_2")
.select($"_1", $"_1" + 1, $"_2" + 2)
.repartition(10, $"_1")
.filter($"_1" > 1)

// Because the first exchange from the bottom is range exchange which native shuffle
// doesn't support. So Comet exec operators stop before the first exchange and thus
// there is no Comet exchange.
checkCometExchange(shuffled2, 0, true)
checkSparkAnswer(shuffled2)
}
}

test("grouped aggregate: native shuffle") {
withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") {
val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1")
checkCometExchange(df, 1, true)
checkSparkAnswerAndOperator(df)
}
}

Expand Down

0 comments on commit 6f82611

Please sign in to comment.