Skip to content

Commit

Permalink
Update diff
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Apr 29, 2024
1 parent 891fa23 commit 3c1704a
Showing 1 changed file with 112 additions and 4 deletions.
116 changes: 112 additions & 4 deletions dev/diffs/3.4.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -453,18 +453,116 @@ index 00000000000..4b31bea33de
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 5125708be32..a1f1ae90796 100644
index 5125708be32..123f58c522a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
@@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.comet._
+import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike}
@@ -1369,9 +1370,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -739,7 +741,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
}

- test("test SortMergeJoin (with spill)") {
+ test("test SortMergeJoin (with spill)",
+ IgnoreComet("TODO: Comet SMJ doesn't support spill yet")) {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD.key -> "0",
SQLConf.SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD.key -> "1") {
@@ -868,14 +871,14 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val executed = df.queryExecution.executedPlan
val executedJoins = collect(executed) {
- case j: SortMergeJoinExec => j
+ case j: CometSortMergeJoinExec => j
}
// This only applies to the above tested queries, in which a child SortMergeJoin always
// contains the SortOrder required by its parent SortMergeJoin. Thus, SortExec should never
// appear as parent of SortMergeJoin.
executed.foreach {
case s: SortExec => s.foreach {
- case j: SortMergeJoinExec => fail(
+ case j: CometSortMergeJoinExec => fail(
s"No extra sort should be added since $j already satisfies the required ordering"
)
case _ =>
@@ -1114,9 +1117,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val plan = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType)
.groupBy($"k1").count()
.queryExecution.executedPlan
- assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
+ assert(collect(plan) {
+ case _: ShuffledHashJoinExec | _: CometHashJoinExec => true }.size === 1)
// No extra shuffle before aggregate
- assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 2)
+ assert(collect(plan) {
+ case _: ShuffleExchangeExec | _: CometShuffleExchangeExec => true }.size === 2)
})
}

@@ -1133,10 +1138,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
.join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType)
.queryExecution
.executedPlan
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec | _: CometSortMergeJoinExec => true }.size === 2)
assert(collect(plan) { case _: BroadcastHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
- assert(collect(plan) { case _: SortExec => true }.size === 3)
+ assert(collect(plan) { case _: SortExec | _: CometSortExec => true }.size === 3)
})

// Test shuffled hash join
@@ -1146,10 +1152,13 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
.join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType)
.queryExecution
.executedPlan
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
- assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec | _: CometSortMergeJoinExec => true }.size === 2)
+ assert(collect(plan) {
+ case _: ShuffledHashJoinExec | _: CometHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
- assert(collect(plan) { case _: SortExec => true }.size === 3)
+ assert(collect(plan) {
+ case _: SortExec | _: CometSortExec => true }.size === 3)
})
}

@@ -1240,12 +1249,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
inputDFs.foreach { case (df1, df2, joinExprs) =>
val smjDF = df1.join(df2.hint("SHUFFLE_MERGE"), joinExprs, "full")
assert(collect(smjDF.queryExecution.executedPlan) {
- case _: SortMergeJoinExec => true }.size === 1)
+ case _: SortMergeJoinExec | _: CometSortMergeJoinExec => true }.size === 1)
val smjResult = smjDF.collect()

val shjDF = df1.join(df2.hint("SHUFFLE_HASH"), joinExprs, "full")
assert(collect(shjDF.queryExecution.executedPlan) {
- case _: ShuffledHashJoinExec => true }.size === 1)
+ case _: ShuffledHashJoinExec | _: CometHashJoinExec => true }.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}
@@ -1340,7 +1349,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
// Have shuffle before aggregation
- assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 1)
+ assert(collect(plan) {
+ case _: ShuffleExchangeExec | _: CometShuffleExchangeExec => true }.size === 1)
}

def getJoinQuery(selectExpr: String, joinType: String): String = {
@@ -1369,9 +1379,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -479,7 +577,7 @@ index 5125708be32..a1f1ae90796 100644
}

// Test output ordering is not preserved
@@ -1380,9 +1384,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1380,9 +1393,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
Expand All @@ -494,6 +592,16 @@ index 5125708be32..a1f1ae90796 100644
}

// Test singe partition
@@ -1392,7 +1408,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
- assert(collect(plan) { case _: ShuffleExchangeExec => true}.size == 1)
+ assert(collect(plan) {
+ case _: ShuffleExchangeExec | _: CometShuffleExchangeExec => true}.size == 1)
checkAnswer(fullJoinDF, Row(100))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
index b5b34922694..a72403780c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
Expand Down

0 comments on commit 3c1704a

Please sign in to comment.