diff --git a/dev/diffs/3.4.2.diff b/dev/diffs/3.4.2.diff index b571cd2b5e..74449c7890 100644 --- a/dev/diffs/3.4.2.diff +++ b/dev/diffs/3.4.2.diff @@ -932,17 +932,19 @@ index d083cac48ff..43057eb251b 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -index 266bb343526..85ec36db996 100644 +index 266bb343526..02b8763ad92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec +@@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +-import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, SparkPlan} +import org.apache.spark.sql.comet._ - import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, SparkPlan} ++import org.apache.spark.sql.execution.{ColumnarToRowExec, FileSourceScanExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, DisableAdaptiveExecution} import org.apache.spark.sql.execution.datasources.BucketingUtils + import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -101,12 +102,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } @@ -976,7 +978,35 @@ index 266bb343526..85ec36db996 100644 val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType val rowsWithInvalidBuckets = fileScan.execute().filter(row => { -@@ -461,18 +471,29 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -451,28 +461,53 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti + val joinOperator = if (joined.sqlContext.conf.adaptiveExecutionEnabled) { + val executedPlan = + joined.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan +- assert(executedPlan.isInstanceOf[SortMergeJoinExec]) +- executedPlan.asInstanceOf[SortMergeJoinExec] ++ executedPlan match { ++ case s: SortMergeJoinExec => s ++ case b: CometExec => ++ b.originalPlan match { ++ case s: SortMergeJoinExec => s ++ case o => fail(s"expected SortMergeJoinExec, but found\n$o") ++ } ++ case o => fail(s"expected SortMergeJoinExec, but found\n$o") ++ } + } else { + val executedPlan = joined.queryExecution.executedPlan +- assert(executedPlan.isInstanceOf[SortMergeJoinExec]) +- executedPlan.asInstanceOf[SortMergeJoinExec] ++ executedPlan match { ++ case s: SortMergeJoinExec => s ++ case ColumnarToRowExec(child) => ++ child.asInstanceOf[CometExec].originalPlan match { ++ case s: SortMergeJoinExec => s ++ case o => fail(s"expected SortMergeJoinExec, but found\n$o") ++ } ++ case o => fail(s"expected SortMergeJoinExec, but found\n$o") ++ } + } // check existence of shuffle assert( @@ -1010,7 +1040,7 @@ index 266bb343526..85ec36db996 100644 s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") // check the output partitioning -@@ -835,11 +856,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -835,11 +870,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") val scanDF = spark.table("bucketed_table").select("j") @@ -1024,7 +1054,7 @@ index 266bb343526..85ec36db996 100644 checkAnswer(aggDF, df1.groupBy("j").agg(max("k"))) } } -@@ -1031,10 +1052,16 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -1031,10 +1066,16 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val scans = plan.collect { case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f