Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Mar 24, 2024
1 parent e677abb commit 04c4aed
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions dev/diffs/3.4.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ index 00000000000..4b31bea33de
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 5125708be32..ce755313178 100644
index 5125708be32..a1f1ae90796 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -405,9 +405,15 @@ index 5125708be32..ce755313178 100644
}

// Test output ordering is not preserved
@@ -1382,7 +1386,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1380,9 +1384,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 3)
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 3)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec => true
+ case _: CometSortMergeJoinExec => true
+ }.size === 3)
// Have sort on left side before last sort merge join
- assert(collect(plan) { case _: SortExec => true }.size === 6)
+ assert(collect(plan) { case _: SortExec | _: CometSortExec => true }.size === 6)
Expand Down Expand Up @@ -571,7 +577,7 @@ index ac710c32296..37746bd470d 100644
val df = spark.read.parquet(path).selectExpr(projection: _*)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 593bd7bb4ba..ccc2c7db7f2 100644
index 593bd7bb4ba..be1b82d0030 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
Expand All @@ -582,11 +588,13 @@ index 593bd7bb4ba..ccc2c7db7f2 100644
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
@@ -116,6 +117,7 @@ class AdaptiveQueryExecSuite
@@ -116,6 +117,9 @@ class AdaptiveQueryExecSuite
private def findTopLevelSortMergeJoin(plan: SparkPlan): Seq[SortMergeJoinExec] = {
collect(plan) {
case j: SortMergeJoinExec => j
+ case j: CometSortMergeJoinExec => j
+ case j: CometSortMergeJoinExec =>
+ assert(j.originalPlan.isInstanceOf[SortMergeJoinExec])
+ j.originalPlan.asInstanceOf[SortMergeJoinExec]
}
}

Expand Down

0 comments on commit 04c4aed

Please sign in to comment.