From c43ef49bd1feb8c244ab732b27796bf6c600d83f Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 11 Mar 2024 16:09:26 -0700 Subject: [PATCH] update diff to support shuffle --- dev/diffs/3.4.2.diff | 234 +++++++++++++++++++++++-------------------- 1 file changed, 125 insertions(+), 109 deletions(-) diff --git a/dev/diffs/3.4.2.diff b/dev/diffs/3.4.2.diff index b571cd2b5..ffd8b7294 100644 --- a/dev/diffs/3.4.2.diff +++ b/dev/diffs/3.4.2.diff @@ -109,6 +109,19 @@ index c595b50950b..483508dc076 100644 try { val extensionConfClass = Utils.classForName(extensionConfClassName) val extensionConf = extensionConfClass.getConstructor().newInstance() +diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +index 362615770a3..d014b8ea745 100644 +--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +@@ -69,6 +69,8 @@ class QueryExecution( + if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { + UnsupportedOperationChecker.checkForBatch(analyzed) + } ++ ++ this.toRdd + } + + lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index db587dd9868..aac7295a53d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -234,51 +247,6 @@ index 56e9520fdab..917932336df 100644 spark.range(50).write.saveAsTable(s"$dbName.$table1Name") spark.range(100).write.saveAsTable(s"$dbName.$table2Name") -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala -new file mode 100644 -index 00000000000..07687f6685a ---- /dev/null -+++ b/sql/core/src/test/scala/org/apache/spark/sql/DisableComet.scala -@@ -0,0 +1,39 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.spark.sql -+ -+import org.scalactic.source.Position -+import org.scalatest.Tag -+ -+import org.apache.spark.sql.test.SQLTestUtils -+ -+case class DisableComet(reason: String) extends Tag("DisableComet") -+ -+/** -+ * Helper trait that disables Comet for all tests regardless of default config values. -+ */ -+trait DisableCometSuite extends SQLTestUtils { -+ override protected def test(testName: String, testTags: Tag*)(testFun: => Any) -+ (implicit pos: Position): Unit = { -+ if (isCometEnabled) { -+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) -+ } else { -+ super.test(testName, testTags: _*)(testFun) -+ } -+ } -+} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index f33432ddb6f..fe9f74ff8f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -311,7 +279,7 @@ index f33432ddb6f..fe9f74ff8f1 100644 } assert(scanOption.isDefined) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -index a6b295578d6..d5e25564bb9 100644 +index a6b295578d6..a5cb616945a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -463,7 +463,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite @@ -320,7 +288,7 @@ index a6b295578d6..d5e25564bb9 100644 - test("Explain formatted output for scan operator for datasource V2") { + test("Explain formatted output for scan operator for datasource V2", -+ DisableComet("Comet explain output is different")) { ++ IgnoreComet("Comet explain output is different")) { withTempDir { dir => Seq("parquet", "orc", "csv", "json").foreach { fmt => val basePath = dir.getCanonicalPath + "/" + fmt @@ -361,6 +329,54 @@ index 2796b1cf154..94591f83c84 100644 }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +new file mode 100644 +index 00000000000..4b31bea33de +--- /dev/null ++++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +@@ -0,0 +1,42 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.spark.sql ++ ++import org.scalactic.source.Position ++import org.scalatest.Tag ++ ++import org.apache.spark.sql.test.SQLTestUtils ++ ++/** ++ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). ++ */ ++case class IgnoreComet(reason: String) extends Tag("DisableComet") ++ ++/** ++ * Helper trait that disables Comet for all tests regardless of default config values. ++ */ ++trait IgnoreCometSuite extends SQLTestUtils { ++ override protected def test(testName: String, testTags: Tag*)(testFun: => Any) ++ (implicit pos: Position): Unit = { ++ if (isCometEnabled) { ++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) ++ } else { ++ super.test(testName, testTags: _*)(testFun) ++ } ++ } ++} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 5125708be32..e274a497996 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -392,7 +408,7 @@ index 5125708be32..e274a497996 100644 // Test singe partition diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala -index b5b34922694..5fa734d30e1 100644 +index b5b34922694..a72403780c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala @@ -69,7 +69,7 @@ import org.apache.spark.tags.ExtendedSQLTest @@ -400,7 +416,7 @@ index b5b34922694..5fa734d30e1 100644 */ // scalastyle:on line.size.limit -trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite { -+trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite with DisableCometSuite { ++trait PlanStabilitySuite extends DisableAdaptiveExecutionSuite with IgnoreCometSuite { protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path @@ -455,7 +471,7 @@ index cfc8b2cc845..c6fcfd7bd08 100644 } finally { spark.listenerManager.unregister(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala -index c0ec8a58bd5..5f880751e21 100644 +index c0ec8a58bd5..4e8bc6ed3c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.permission.FsPermission @@ -463,7 +479,7 @@ index c0ec8a58bd5..5f880751e21 100644 import org.apache.spark._ -import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest, Row, SaveMode} -+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, DisableComet, QueryTest, Row, SaveMode} ++import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, IgnoreComet, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ import org.apache.spark.sql.catalyst.util.BadRecordException import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, JDBCOptions} @@ -473,12 +489,12 @@ index c0ec8a58bd5..5f880751e21 100644 test("INCONSISTENT_BEHAVIOR_CROSS_VERSION: " + - "compatibility with Spark 2.4/3.2 in reading/writing dates") { + "compatibility with Spark 2.4/3.2 in reading/writing dates", -+ DisableComet("Comet doesn't completely support datetime rebase mode yet")) { ++ IgnoreComet("Comet doesn't completely support datetime rebase mode yet")) { // Fail to read ancient datetime values. withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_READ.key -> EXCEPTION.toString) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala -index 418ca3430bb..d5fc207601c 100644 +index 418ca3430bb..eb8267192f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -23,7 +23,7 @@ import scala.util.Random @@ -486,7 +502,7 @@ index 418ca3430bb..d5fc207601c 100644 import org.apache.spark.SparkConf -import org.apache.spark.sql.{DataFrame, QueryTest} -+import org.apache.spark.sql.{DataFrame, DisableComet, QueryTest} ++import org.apache.spark.sql.{DataFrame, IgnoreComet, QueryTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.internal.SQLConf @@ -495,7 +511,7 @@ index 418ca3430bb..d5fc207601c 100644 } - test("FileScan description") { -+ test("FileScan description", DisableComet("Comet doesn't use BatchScan")) { ++ test("FileScan description", IgnoreComet("Comet doesn't use BatchScan")) { Seq("json", "orc", "parquet").foreach { format => withTempPath { path => val dir = path.getCanonicalPath @@ -559,7 +575,7 @@ index bd9c79e5b96..ab7584e768e 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala -index 1d2e467c94c..77a119505b9 100644 +index 1d2e467c94c..3ea82cd1a3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} @@ -567,7 +583,7 @@ index 1d2e467c94c..77a119505b9 100644 import org.apache.spark.SparkException -import org.apache.spark.sql.{DataFrame, QueryTest, Row} -+import org.apache.spark.sql.{DataFrame, DisableCometSuite, QueryTest, Row} ++import org.apache.spark.sql.{DataFrame, IgnoreCometSuite, QueryTest, Row} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.functions.col @@ -578,12 +594,12 @@ index 1d2e467c94c..77a119505b9 100644 -class BinaryFileFormatSuite extends QueryTest with SharedSparkSession { +// For some reason this suite is flaky w/ or w/o Comet when running in Github workflow. +// Since it isn't related to Comet, we disable it for now. -+class BinaryFileFormatSuite extends QueryTest with SharedSparkSession with DisableCometSuite { ++class BinaryFileFormatSuite extends QueryTest with SharedSparkSession with IgnoreCometSuite { import BinaryFileFormat._ private var testDir: String = _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala -index 07e2849ce6f..264fb61db16 100644 +index 07e2849ce6f..3e73645b638 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -591,7 +607,7 @@ index 07e2849ce6f..264fb61db16 100644 import org.apache.spark.TestUtils import org.apache.spark.memory.MemoryMode -import org.apache.spark.sql.Row -+import org.apache.spark.sql.{DisableComet, Row} ++import org.apache.spark.sql.{IgnoreComet, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -601,12 +617,12 @@ index 07e2849ce6f..264fb61db16 100644 - test("parquet v2 pages - rle encoding for boolean value columns") { + test("parquet v2 pages - rle encoding for boolean value columns", -+ DisableComet("Comet doesn't support RLE encoding yet")) { ++ IgnoreComet("Comet doesn't support RLE encoding yet")) { val extraOptions = Map[String, String]( ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -index 9adcb43c838..84c4db4a727 100644 +index 9adcb43c838..c6872c7b24b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -1025,7 +1025,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared @@ -654,7 +670,7 @@ index 9adcb43c838..84c4db4a727 100644 - test("SPARK-17091: Convert IN predicate to Parquet filter push-down") { + test("SPARK-17091: Convert IN predicate to Parquet filter push-down", -+ DisableComet("IN predicate is not yet supported in Comet, see issue #36")) { ++ IgnoreComet("IN predicate is not yet supported in Comet, see issue #36")) { val schema = StructType(Seq( StructField("a", IntegerType, nullable = false) )) @@ -664,7 +680,7 @@ index 9adcb43c838..84c4db4a727 100644 - test("Support Parquet column index") { + test("Support Parquet column index", -+ DisableComet("Comet doesn't support Parquet column index yet")) { ++ IgnoreComet("Comet doesn't support Parquet column index yet")) { // block 1: // null count min max // page-0 0 0 99 @@ -695,7 +711,7 @@ index 9adcb43c838..84c4db4a727 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8670d95c65e..4a16d9f6ff4 100644 +index 8670d95c65e..b624c3811dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession @@ -704,7 +720,7 @@ index 8670d95c65e..4a16d9f6ff4 100644 - test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings") { + test("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings", -+ DisableComet("Comet doesn't support DELTA encoding yet")) { ++ IgnoreComet("Comet doesn't support DELTA encoding yet")) { withAllParquetReaders { checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. @@ -733,7 +749,7 @@ index 2e7b26126d2..f7368eb026e 100644 checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala -index 240bb4e6dcb..c37b92d3691 100644 +index 240bb4e6dcb..8287ffa03ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -21,7 +21,7 @@ import java.nio.file.{Files, Paths, StandardCopyOption} @@ -741,7 +757,7 @@ index 240bb4e6dcb..c37b92d3691 100644 import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException} -import org.apache.spark.sql.{QueryTest, Row, SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY} -+import org.apache.spark.sql.{DisableCometSuite, QueryTest, Row, SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY} ++import org.apache.spark.sql.{IgnoreCometSuite, QueryTest, Row, SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, ParquetOutputTimestampType} @@ -753,7 +769,7 @@ index 240bb4e6dcb..c37b92d3691 100644 abstract class ParquetRebaseDatetimeSuite extends QueryTest with ParquetTest -+ with DisableCometSuite ++ with IgnoreCometSuite with SharedSparkSession { import testImplicits._ @@ -803,14 +819,14 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index bf5c51b89bb..2c7f9701eeb 100644 +index bf5c51b89bb..ca22370ca3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException -+import org.apache.spark.sql.DisableComet ++import org.apache.spark.sql.IgnoreComet import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -820,19 +836,19 @@ index bf5c51b89bb..2c7f9701eeb 100644 - test("schema mismatch failure error message for parquet reader") { + test("schema mismatch failure error message for parquet reader", -+ DisableComet("Comet doesn't work with vectorizedReaderEnabled = false")) { ++ IgnoreComet("Comet doesn't work with vectorizedReaderEnabled = false")) { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala -index 3a0bd35cb70..ef351a56ec8 100644 +index 3a0bd35cb70..b28f06a757f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug import java.io.ByteArrayOutputStream import org.apache.spark.rdd.RDD -+import org.apache.spark.sql.DisableComet ++import org.apache.spark.sql.IgnoreComet import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext @@ -842,12 +858,12 @@ index 3a0bd35cb70..ef351a56ec8 100644 - test("SPARK-28537: DebugExec cannot debug columnar related queries") { + test("SPARK-28537: DebugExec cannot debug columnar related queries", -+ DisableComet("Comet does not use FileScan")) { ++ IgnoreComet("Comet does not use FileScan")) { withTempPath { workDir => val workDirPath = workDir.getAbsolutePath val input = spark.range(5).toDF("id") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala -index 26e61c6b58d..2a7c96d164a 100644 +index 26e61c6b58d..cde10983c68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -737,7 +737,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils @@ -856,7 +872,7 @@ index 26e61c6b58d..2a7c96d164a 100644 - test("SPARK-26327: FileSourceScanExec metrics") { + test("SPARK-26327: FileSourceScanExec metrics", -+ DisableComet("Spark uses row-based Parquet reader while Comet is vectorized")) { ++ IgnoreComet("Spark uses row-based Parquet reader while Comet is vectorized")) { withTable("testDataForScan") { spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") @@ -916,7 +932,7 @@ index 0ab8691801d..d9125f658ad 100644 assert(scanNodes.length == 1) // $"a" is not null and $"a" > 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala -index d083cac48ff..43057eb251b 100644 +index d083cac48ff..3c11bcde807 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala @@ -37,8 +37,10 @@ import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, @@ -927,23 +943,24 @@ index d083cac48ff..43057eb251b 100644 +// Since it isn't related to Comet, we disable it for now. class AsyncProgressTrackingMicroBatchExecutionSuite - extends StreamTest with BeforeAndAfter with Matchers { -+ extends StreamTest with BeforeAndAfter with Matchers with DisableCometSuite { ++ extends StreamTest with BeforeAndAfter with Matchers with IgnoreCometSuite { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -index 266bb343526..85ec36db996 100644 +index 266bb343526..cb90d15fed7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala -@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec +@@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.comet._ ++import org.apache.spark.sql.comet.execution.shuffle._ import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, DisableAdaptiveExecution} import org.apache.spark.sql.execution.datasources.BucketingUtils -@@ -101,12 +102,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -101,12 +103,20 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } @@ -966,7 +983,7 @@ index 266bb343526..85ec36db996 100644 // To verify if the bucket pruning works, this function checks two conditions: // 1) Check if the pruned buckets (before filtering) are empty. // 2) Verify the final result is the same as the expected one -@@ -155,7 +164,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -155,7 +165,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val planWithoutBucketedScan = bucketedDataFrame.filter(filterCondition) .queryExecution.executedPlan val fileScan = getFileScan(planWithoutBucketedScan) @@ -976,25 +993,18 @@ index 266bb343526..85ec36db996 100644 val bucketColumnType = bucketedDataFrame.schema.apply(bucketColumnIndex).dataType val rowsWithInvalidBuckets = fileScan.execute().filter(row => { -@@ -461,18 +471,29 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -461,18 +472,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti // check existence of shuffle assert( - joinOperator.left.exists(_.isInstanceOf[ShuffleExchangeExec]) == shuffleLeft, -+ joinOperator.left.find { op => -+ op.isInstanceOf[SortExec] || -+ (op.isInstanceOf[CometExec] && -+ op.asInstanceOf[CometExec].originalPlan.find(_.isInstanceOf[SortExec]).isDefined) -+ }.isDefined == sortLeft, -+ ++ joinOperator.left.exists(op => op.isInstanceOf[ShuffleExchangeExec] || ++ op.isInstanceOf[CometShuffleExchangeExec]) == shuffleLeft, s"expected shuffle in plan to be $shuffleLeft but found\n${joinOperator.left}") assert( - joinOperator.right.exists(_.isInstanceOf[ShuffleExchangeExec]) == shuffleRight, -+ joinOperator.right.find { op => -+ op.isInstanceOf[SortExec] || -+ (op.isInstanceOf[CometExec] && -+ op.asInstanceOf[CometExec].originalPlan.find(_.isInstanceOf[SortExec]).isDefined) -+ }.isDefined == sortRight, ++ joinOperator.right.exists(op => op.isInstanceOf[ShuffleExchangeExec] || ++ op.isInstanceOf[CometShuffleExchangeExec]) == shuffleRight, s"expected shuffle in plan to be $shuffleRight but found\n${joinOperator.right}") // check existence of sort @@ -1010,7 +1020,7 @@ index 266bb343526..85ec36db996 100644 s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") // check the output partitioning -@@ -835,11 +856,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -835,11 +850,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") val scanDF = spark.table("bucketed_table").select("j") @@ -1024,7 +1034,7 @@ index 266bb343526..85ec36db996 100644 checkAnswer(aggDF, df1.groupBy("j").agg(max("k"))) } } -@@ -1031,10 +1052,16 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti +@@ -1031,10 +1046,16 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti val scans = plan.collect { case f: FileSourceScanExec if f.optionalNumCoalescedBuckets.isDefined => f @@ -1043,7 +1053,7 @@ index 266bb343526..85ec36db996 100644 assert(scans.isEmpty) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala -index b5f6d2f9f68..8e84ec3f070 100644 +index b5f6d2f9f68..277784a92af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.sources @@ -1051,7 +1061,7 @@ index b5f6d2f9f68..8e84ec3f070 100644 import org.apache.spark.SparkException -import org.apache.spark.sql.AnalysisException -+import org.apache.spark.sql.{AnalysisException, DisableCometSuite} ++import org.apache.spark.sql.{AnalysisException, IgnoreCometSuite} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType} import org.apache.spark.sql.catalyst.parser.ParseException @@ -1063,7 +1073,7 @@ index b5f6d2f9f68..8e84ec3f070 100644 +// For some reason this suite is flaky w/ or w/o Comet when running in Github workflow. +// Since it isn't related to Comet, we disable it for now. +class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession -+ with DisableCometSuite { ++ with IgnoreCometSuite { import testImplicits._ protected override lazy val sql = spark.sql _ @@ -1113,7 +1123,7 @@ index 75f440caefc..36b1146bc3a 100644 fail(s"No FileScan in query\n${df.queryExecution}") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala -index abe606ad9c1..438e7494473 100644 +index abe606ad9c1..2d930b64cca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -22,7 +22,7 @@ import java.util @@ -1121,7 +1131,7 @@ index abe606ad9c1..438e7494473 100644 import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, Row, SaveMode} -+import org.apache.spark.sql.{AnalysisException, DisableComet, Row, SaveMode} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} @@ -1131,12 +1141,12 @@ index abe606ad9c1..438e7494473 100644 - test("explain with table on DSv1 data source") { + test("explain with table on DSv1 data source", -+ DisableComet("Comet explain output is different")) { ++ IgnoreComet("Comet explain output is different")) { val tblSourceName = "tbl_src" val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index dd55fcfe42c..e83348242d3 100644 +index dd55fcfe42c..cc18147d17a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest @@ -1152,7 +1162,7 @@ index dd55fcfe42c..e83348242d3 100644 } } else { - super.test(testName, testTags: _*)(testFun) -+ if (isCometEnabled && testTags.exists(_.isInstanceOf[DisableComet])) { ++ if (isCometEnabled && testTags.exists(_.isInstanceOf[IgnoreComet])) { + ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) + } else { + super.test(testName, testTags: _*)(testFun) @@ -1194,10 +1204,10 @@ index dd55fcfe42c..e83348242d3 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..3767d4e7ca4 100644 +index ed2e309fa07..4cfe0093da7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,18 @@ trait SharedSparkSessionBase +@@ -74,6 +74,21 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -1211,6 +1221,9 @@ index ed2e309fa07..3767d4e7ca4 100644 + conf + .set("spark.comet.exec.enabled", "true") + .set("spark.comet.exec.all.enabled", "true") ++ .set("spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .set("spark.comet.exec.shuffle.enabled", "true") + } + } conf.set( @@ -1239,10 +1252,10 @@ index 52abd248f3a..7a199931a08 100644 case d: DynamicPruningExpression => d.child } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala -index 07361cfdce9..545b3184c23 100644 +index 07361cfdce9..bff67d5ecb4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala -@@ -55,25 +55,43 @@ object TestHive +@@ -55,25 +55,46 @@ object TestHive new SparkContext( System.getProperty("spark.sql.test.master", "local[1]"), "TestSQLContext", @@ -1289,6 +1302,9 @@ index 07361cfdce9..545b3184c23 100644 + conf + .set("spark.sql.extensions", "org.apache.spark.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.shuffle.manager", ++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") ++ .set("spark.comet.exec.shuffle.enabled", "true") + + val v = System.getenv("ENABLE_COMET_SCAN_ONLY") + if (v == null || !v.toBoolean) {