Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Mar 23, 2024
1 parent 77216a5 commit e677abb
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions dev/diffs/3.4.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ index 00000000000..4b31bea33de
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 5125708be32..e274a497996 100644
index 5125708be32..ce755313178 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -390,16 +390,22 @@ index 5125708be32..e274a497996 100644
import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike}
@@ -1371,7 +1372,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1369,9 +1370,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr, joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 3)
- assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 3)
+ assert(collect(plan) {
+ case _: SortMergeJoinExec => true
+ case _: CometSortMergeJoinExec => true
+ }.size === 3)
// No extra sort on left side before last sort merge join
- assert(collect(plan) { case _: SortExec => true }.size === 5)
+ assert(collect(plan) { case _: SortExec | _: CometSortExec => true }.size === 5)
}

// Test output ordering is not preserved
@@ -1382,7 +1383,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
@@ -1382,7 +1386,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true }.size === 1)
assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 3)
// Have sort on left side before last sort merge join
Expand Down Expand Up @@ -564,6 +570,26 @@ index ac710c32296..37746bd470d 100644
val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as newC$i")
val df = spark.read.parquet(path).selectExpr(projection: _*)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 593bd7bb4ba..ccc2c7db7f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.comet._
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
@@ -116,6 +117,7 @@ class AdaptiveQueryExecSuite
private def findTopLevelSortMergeJoin(plan: SparkPlan): Seq[SortMergeJoinExec] = {
collect(plan) {
case j: SortMergeJoinExec => j
+ case j: CometSortMergeJoinExec => j
}
}

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
index bd9c79e5b96..ab7584e768e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
Expand Down Expand Up @@ -958,7 +984,7 @@ index d083cac48ff..3c11bcde807 100644
import testImplicits._

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 266bb343526..c21a38767ae 100644
index 266bb343526..b33bb677f0d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -24,7 +24,9 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
Expand Down Expand Up @@ -1074,7 +1100,7 @@ index 266bb343526..c21a38767ae 100644
checkAnswer(aggDF, df1.groupBy("j").agg(max("k")))
}
}
@@ -1026,15 +1041,24 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
@@ -1026,15 +1055,24 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
expectedNumShuffles: Int,
expectedCoalescedNumBuckets: Option[Int]): Unit = {
val plan = sql(query).queryExecution.executedPlan
Expand Down

0 comments on commit e677abb

Please sign in to comment.