From 12ae519e1964d0f948449ddee7c939e06593b4d1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 1 Aug 2024 23:09:39 -0700 Subject: [PATCH] Fix --- dev/diffs/4.0.0-preview1.diff | 353 +++++++++++++++++++++++++++++++++- 1 file changed, 343 insertions(+), 10 deletions(-) diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 663901a626..33cb4fb3dd 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -746,7 +746,7 @@ index 53e47f428c3..a55d8f0c161 100644 assert(shuffleMergeJoins.size == 1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -index fcb937d82ba..64a11bc0b85 100644 +index fcb937d82ba..df79db88fed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -965,6 +965,16 @@ index fcb937d82ba..64a11bc0b85 100644 } dupStreamSideColTest("SHUFFLE_HASH", check) } +@@ -1773,7 +1807,8 @@ class ThreadLeakInSortMergeJoinSuite + sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20)) + } + +- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") { ++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)", ++ IgnoreComet("Comet SMJ doesn't spill yet")) { + + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala index 34c6c49bc49..f5dea07a213 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -1329,6 +1339,28 @@ index 15de4c5cc5b..6a85dfb6883 100644 import testImplicits._ setupTestData() +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +index 3608e7c9207..6a05de2b9ac 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution + import scala.collection.mutable + import scala.io.Source + +-import org.apache.spark.sql.{AnalysisException, Dataset, ExtendedExplainGenerator, FastOperator} ++import org.apache.spark.sql.{AnalysisException, Dataset, ExtendedExplainGenerator, FastOperator, IgnoreComet} + import org.apache.spark.sql.catalyst.{QueryPlanningTracker, QueryPlanningTrackerCallback} + import org.apache.spark.sql.catalyst.analysis.CurrentNamespace + import org.apache.spark.sql.catalyst.expressions.UnsafeRow +@@ -383,7 +383,7 @@ class QueryExecutionSuite extends SharedSparkSession { + } + } + +- test("SPARK-47289: extended explain info") { ++ test("SPARK-47289: extended explain info", IgnoreComet("Comet plan extended info is different")) { + val concat = new PlanStringConcat() + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala index b5bac8079c4..a3731888e12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala @@ -1408,26 +1440,158 @@ index 47679ed7865..9ffbaecb98e 100644 assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala -index 3aaf61ffba4..93752e2a535 100644 +index 3aaf61ffba4..4130ece2283 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.rdd.MapPartitionsWithEvaluatorRDD import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator} -+import org.apache.spark.sql.comet.CometSortMergeJoinExec ++import org.apache.spark.sql.comet.{CometHashJoinExec, CometSortExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -@@ -237,6 +238,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession +@@ -172,6 +173,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + val oneJoinDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2") + assert(oneJoinDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true ++ case _: CometHashJoinExec => true + }.size === 1) + checkAnswer(oneJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4))) + +@@ -180,6 +182,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + .join(df3.hint("SHUFFLE_HASH"), $"k1" === $"k3") + assert(twoJoinsDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true ++ case _: CometHashJoinExec => true + }.size === 2) + checkAnswer(twoJoinsDF, + Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) +@@ -206,6 +209,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(joinUniqueDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(joinUniqueDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4), + Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9))) +@@ -216,6 +221,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(joinNonUniqueDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(joinNonUniqueDF, Seq(Row(0, 0), Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1), + Row(1, 4), Row(1, 7), Row(2, 2), Row(2, 5), Row(2, 8), Row(3, null), Row(4, null))) +@@ -226,6 +233,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(joinWithNonEquiDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(joinWithNonEquiDF, Seq(Row(0, 0), Row(0, 6), Row(0, 9), Row(1, 1), + Row(1, 7), Row(2, 2), Row(2, 8), Row(3, null), Row(4, null), Row(null, 3), Row(null, 4), +@@ -237,6 +246,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession assert(twoJoinsDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true + case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true }.size === 2) checkAnswer(twoJoinsDF, Seq(Row(0, 0, 0), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, null), Row(4, 4, null), -@@ -360,6 +362,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession +@@ -258,6 +269,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(rightJoinUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(rightJoinUniqueDf, Seq(Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4), + Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9), +@@ -269,6 +282,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(leftJoinUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(leftJoinUniqueDf, Seq(Row(0, null), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4))) + assert(leftJoinUniqueDf.count() === 5) +@@ -278,6 +293,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(rightJoinNonUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(rightJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1), + Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8))) +@@ -287,6 +304,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(leftJoinNonUniqueDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(leftJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0, 9), Row(1, 1), + Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8), Row(3, null), +@@ -298,6 +317,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(rightJoinWithNonEquiDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(rightJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1, 1), Row(1, 7), + Row(1, 10), Row(2, 2), Row(2, 8), Row(null, 3), Row(null, 4), Row(null, 5))) +@@ -308,6 +329,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(leftJoinWithNonEquiDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 1) + checkAnswer(leftJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1, 1), Row(1, 7), + Row(1, 10), Row(2, 2), Row(2, 8), Row(3, null), Row(4, null))) +@@ -318,6 +341,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(twoRightJoinsDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 2) + checkAnswer(twoRightJoinsDf, Seq(Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) + +@@ -327,6 +352,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + assert(twoLeftJoinsDf.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint == "SHUFFLE_HASH" => true + case WholeStageCodegenExec(_: SortMergeJoinExec) if hint == "SHUFFLE_MERGE" => true ++ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true ++ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true + }.size === 2) + checkAnswer(twoLeftJoinsDf, + Seq(Row(0, null, null), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) +@@ -343,6 +370,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + val oneLeftOuterJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_outer") + assert(oneLeftOuterJoinDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : SortMergeJoinExec) => true ++ case _: CometSortMergeJoinExec => true + }.size === 1) + checkAnswer(oneLeftOuterJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, null), + Row(5, null), Row(6, null), Row(7, null), Row(8, null), Row(9, null))) +@@ -351,6 +379,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + val oneRightOuterJoinDF = df2.join(df3.hint("SHUFFLE_MERGE"), $"k2" === $"k3", "right_outer") + assert(oneRightOuterJoinDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : SortMergeJoinExec) => true ++ case _: CometSortMergeJoinExec => true + }.size === 1) + checkAnswer(oneRightOuterJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(null, 4), + Row(null, 5))) +@@ -360,6 +389,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "right_outer") assert(twoJoinsDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : SortMergeJoinExec) => true @@ -1435,38 +1599,57 @@ index 3aaf61ffba4..93752e2a535 100644 }.size === 2) checkAnswer(twoJoinsDF, Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, null, 4), Row(5, null, 5), -@@ -382,8 +385,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession +@@ -375,6 +405,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_semi") + assert(oneJoinDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) => true ++ case _: CometSortMergeJoinExec => true + }.size === 1) + checkAnswer(oneJoinDF, Seq(Row(0), Row(1), Row(2), Row(3))) + +@@ -382,8 +413,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession val twoJoinsDF = df3.join(df2.hint("SHUFFLE_MERGE"), $"k3" === $"k2", "left_semi") .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "left_semi") assert(twoJoinsDF.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) | - WholeStageCodegenExec(_ : SortMergeJoinExec) => true + case _: SortMergeJoinExec => true ++ case _: CometSortMergeJoinExec => true }.size === 2) checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3))) } -@@ -404,8 +406,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession +@@ -397,6 +428,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession + val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti") + assert(oneJoinDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) => true ++ case _: CometSortMergeJoinExec => true + }.size === 1) + checkAnswer(oneJoinDF, Seq(Row(4), Row(5), Row(6), Row(7), Row(8), Row(9))) + +@@ -404,8 +436,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", "left_anti") .join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti") assert(twoJoinsDF.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) | - WholeStageCodegenExec(_ : SortMergeJoinExec) => true + case _: SortMergeJoinExec => true ++ case _: CometSortMergeJoinExec => true }.size === 2) checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9))) } -@@ -538,7 +539,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession +@@ -538,7 +570,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession val plan = df.queryExecution.executedPlan assert(plan.exists(p => p.isInstanceOf[WholeStageCodegenExec] && - p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec])) + p.asInstanceOf[WholeStageCodegenExec].collect { + case _: SortExec => true ++ case _: CometSortExec => true + }.nonEmpty)) assert(df.collect() === Array(Row(1), Row(2), Row(3))) } -@@ -718,7 +721,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession +@@ -718,7 +753,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession .write.mode(SaveMode.Overwrite).parquet(path) withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255", @@ -2293,6 +2476,35 @@ index 4bd35e0789b..6544d86dbe0 100644 ) } test(s"parquet widening conversion $fromType -> $toType") { +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +index c800168b507..991d52a1a75 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +@@ -22,7 +22,7 @@ import org.scalatest.Assertions + + import org.apache.spark.SparkUnsupportedOperationException + import org.apache.spark.io.CompressionCodec +-import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row} ++import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, IgnoreComet, Row} + import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow} + import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning + import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +@@ -525,11 +525,13 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass + } + } + +- test("flatMapGroupsWithState, state ver 1") { ++ test("flatMapGroupsWithState, state ver 1", ++ IgnoreComet("Ignored if Comet is enabled due to SPARK-49070.")) { + testFlatMapGroupsWithState(1) + } + +- test("flatMapGroupsWithState, state ver 2") { ++ test("flatMapGroupsWithState, state ver 2", ++ IgnoreComet("Ignored if Comet is enabled due to SPARK-49070.")) { + testFlatMapGroupsWithState(2) + } + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -2825,8 +3037,97 @@ index e05cb4d3c35..dc65a4fe18e 100644 }) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala +index dea16e52989..55cdf47c4d5 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala +@@ -18,7 +18,7 @@ + package org.apache.spark.sql.streaming + + import org.apache.spark.SparkIllegalArgumentException +-import org.apache.spark.sql.Encoders ++import org.apache.spark.sql.{Encoders, IgnoreCometSuite} + import org.apache.spark.sql.execution.streaming.MemoryStream + import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider} + import org.apache.spark.sql.internal.SQLConf +@@ -128,7 +128,7 @@ class ToggleSaveAndEmitProcessor + } + + class TransformWithListStateSuite extends StreamTest +- with AlsoTestWithChangelogCheckpointingEnabled { ++ with AlsoTestWithChangelogCheckpointingEnabled with IgnoreCometSuite { + import testImplicits._ + + test("test appending null value in list state throw exception") { +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala +index 299a3346b2e..2213f3c52a6 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateTTLSuite.scala +@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming + + import java.time.Duration + +-import org.apache.spark.sql.Encoders ++import org.apache.spark.sql.{Encoders, IgnoreCometSuite} + import org.apache.spark.sql.execution.streaming.{ListStateImplWithTTL, MemoryStream} + import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider + import org.apache.spark.sql.internal.SQLConf +@@ -95,7 +95,7 @@ class ListStateTTLProcessor(ttlConfig: TTLConfig) + * Test suite for testing list state with TTL. + * We use the base TTL suite with a list state processor. + */ +-class TransformWithListStateTTLSuite extends TransformWithStateTTLTest { ++class TransformWithListStateTTLSuite extends TransformWithStateTTLTest with IgnoreCometSuite { + + import testImplicits._ + +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala +index bf46c802fde..623c3003430 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateTTLSuite.scala +@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming + + import java.time.Duration + +-import org.apache.spark.sql.Encoders ++import org.apache.spark.sql.{Encoders, IgnoreCometSuite} + import org.apache.spark.sql.execution.streaming.{MapStateImplWithTTL, MemoryStream} + import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider + import org.apache.spark.sql.internal.SQLConf +@@ -174,7 +174,7 @@ class MapStateTTLProcessor(ttlConfig: TTLConfig) + } + } + +-class TransformWithMapStateTTLSuite extends TransformWithStateTTLTest { ++class TransformWithMapStateTTLSuite extends TransformWithStateTTLTest with IgnoreCometSuite { + + import testImplicits._ + override def getProcessor(ttlConfig: TTLConfig): +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala +index 5388d6f1fb6..8aa11c5b875 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateChainingSuite.scala +@@ -21,7 +21,7 @@ import java.sql.Timestamp + import java.time.{Instant, LocalDateTime, ZoneId} + + import org.apache.spark.{SparkRuntimeException, SparkThrowable} +-import org.apache.spark.sql.AnalysisException ++import org.apache.spark.sql.{AnalysisException, IgnoreCometSuite} + import org.apache.spark.sql.catalyst.ExtendedAnalysisException + import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} + import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider +@@ -106,7 +106,8 @@ case class AggEventRow( + window: Window, + count: Long) + +-class TransformWithStateChainingSuite extends StreamTest { ++class TransformWithStateChainingSuite extends StreamTest ++ with IgnoreCometSuite { + import testImplicits._ + + test("watermark is propagated correctly for next stateful operator" + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala -index 0057af44d3e..7741bd743ac 100644 +index 0057af44d3e..51975748309 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -22,7 +22,7 @@ import java.util.UUID @@ -2851,6 +3152,38 @@ index 0057af44d3e..7741bd743ac 100644 import testImplicits._ +@@ -786,7 +788,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest + } + } + +-class TransformWithStateValidationSuite extends StateStoreMetricsTest { ++class TransformWithStateValidationSuite extends StateStoreMetricsTest with IgnoreCometSuite { + import testImplicits._ + + test("transformWithState - streaming with hdfsStateStoreProvider should fail") { +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala +index 54004b419f7..4e5b35aa0da 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala +@@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming + import java.time.Duration + + import org.apache.spark.internal.Logging +-import org.apache.spark.sql.Encoders ++import org.apache.spark.sql.{Encoders, IgnoreCometSuite} + import org.apache.spark.sql.execution.streaming.{MemoryStream, ValueStateImpl, ValueStateImplWithTTL} + import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider + import org.apache.spark.sql.internal.SQLConf +@@ -160,7 +160,8 @@ case class MultipleValueStatesTTLProcessor( + } + } + +-class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest { ++class TransformWithValueStateTTLSuite extends TransformWithStateTTLTest ++ with IgnoreCometSuite{ + + import testImplicits._ + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index af07aceaed1..ed0b5e6d9be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala