From ae7ea9912de2f7dc612a91b57308ece6bec4d74f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 16 Jul 2024 17:04:49 -0600 Subject: [PATCH] chore: Fix some regressions with Spark 3.5.1 (#674) --- dev/diffs/3.5.1.diff | 127 +----------------- .../sql/comet/shims/ShimCometScanExec.scala | 13 +- 2 files changed, 6 insertions(+), 134 deletions(-) diff --git a/dev/diffs/3.5.1.diff b/dev/diffs/3.5.1.diff index cf6754416..6892e8686 100644 --- a/dev/diffs/3.5.1.diff +++ b/dev/diffs/3.5.1.diff @@ -1532,107 +1532,6 @@ index 68bae34790a..ea906fd1adc 100644 } assert(shuffles2.size == 4) val smj2 = findTopLevelSortMergeJoin(adaptive2) -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala -index 15055a276fa..6e60b94dc3d 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala -@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat - - import org.apache.spark.TestUtils - import org.apache.spark.paths.SparkPath --import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row} -+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, IgnoreComet, QueryTest, Row} - import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - import org.apache.spark.sql.catalyst.trees.TreeNodeTag - import org.apache.spark.sql.execution.FileSourceScanExec -@@ -116,7 +116,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { - testName: String, fileSchema: StructType) - (f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = { - Seq("json", "parquet").foreach { testFileFormat => -- test(s"metadata struct ($testFileFormat): " + testName) { -+ test(s"metadata struct ($testFileFormat): " + testName, -+ // https://github.com/apache/datafusion-comet/issues/617 -+ IgnoreComet("TODO: fix Comet for this test")) { - withTempDir { dir => - import scala.collection.JavaConverters._ - -@@ -767,7 +769,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { - - Seq(true, false).foreach { useVectorizedReader => - val label = if (useVectorizedReader) "reading batches" else "reading rows" -- test(s"SPARK-39806: metadata for a partitioned table ($label)") { -+ test(s"SPARK-39806: metadata for a partitioned table ($label)", -+ // https://github.com/apache/datafusion-comet/issues/617 -+ IgnoreComet("TODO: fix Comet for this test")) { - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) { - withTempPath { dir => - // Store dynamically partitioned data. -@@ -789,7 +793,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { - } - - Seq("parquet", "orc").foreach { format => -- test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format") { -+ test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format", -+ // https://github.com/apache/datafusion-comet/issues/617 -+ IgnoreComet("TODO: fix Comet for this test")) { - // The issue was that ParquetFileFormat would not count the _metadata columns towards - // the WholeStageCodegenExec.isTooManyFields limit, while FileSourceScanExec would, - // resulting in Parquet reader returning columnar output, while scan expected row. -@@ -862,7 +868,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { - } - } - -- test("SPARK-41896: Filter on constant and generated metadata attributes at the same time") { -+ test("SPARK-41896: Filter on constant and generated metadata attributes at the same time", -+ // https://github.com/apache/datafusion-comet/issues/617 -+ IgnoreComet("TODO: fix Comet for this test")) { - withTempPath { dir => - val idColumnName = "id" - val partitionColumnName = "partition" -@@ -897,7 +905,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { - } - } - -- test("SPARK-41896: Filter by a function that takes the metadata struct as argument") { -+ test("SPARK-41896: Filter by a function that takes the metadata struct as argument", -+ // https://github.com/apache/datafusion-comet/issues/617 -+ IgnoreComet("TODO: fix Comet for this test")) { - withTempPath { dir => - val idColumnName = "id" - val numFiles = 4 -@@ -984,7 +994,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { - - - Seq("parquet", "json", "csv", "text", "orc").foreach { format => -- test(s"metadata file path is url encoded for format: $format") { -+ test(s"metadata file path is url encoded for format: $format", -+ // https://github.com/apache/datafusion-comet/issues/617 -+ IgnoreComet("TODO: fix Comet for this test")) { - withTempPath { f => - val dirWithSpace = s"$f/with space" - spark.range(10) -@@ -1002,7 +1014,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { - } - } - -- test(s"metadata file name is url encoded for format: $format") { -+ test(s"metadata file name is url encoded for format: $format", -+ // https://github.com/apache/datafusion-comet/issues/617 -+ IgnoreComet("TODO: fix Comet for this test")) { - val suffix = if (format == "text") ".txt" else s".$format" - withTempPath { f => - val dirWithSpace = s"$f/with space" -@@ -1056,7 +1070,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { - } - } - -- test("SPARK-43450: Filter on full _metadata column struct") { -+ test("SPARK-43450: Filter on full _metadata column struct", -+ // https://github.com/apache/datafusion-comet/issues/617 -+ IgnoreComet("TODO: fix Comet for this test")) { - withTempPath { dir => - val numRows = 10 - spark.range(end = numRows) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala index 05872d41131..a2c328b9742 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala @@ -1756,30 +1655,6 @@ index 07e2849ce6f..3e73645b638 100644 val extraOptions = Map[String, String]( ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString ) -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala -index c10e1799702..f18ca092dba 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala -@@ -16,7 +16,7 @@ - */ - package org.apache.spark.sql.execution.datasources.parquet - --import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest} -+import org.apache.spark.sql.{AnalysisException, DataFrame, IgnoreComet, QueryTest} - import org.apache.spark.sql.execution.datasources.FileFormat - import org.apache.spark.sql.functions.{col, lit} - import org.apache.spark.sql.internal.SQLConf -@@ -219,7 +219,9 @@ class ParquetFileMetadataStructRowIndexSuite extends QueryTest with SharedSparkS - } - } - -- test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column") { -+ // https://github.com/apache/datafusion-comet/issues/617 -+ test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column", -+ IgnoreComet("TODO: fix Comet for this test")) { - withReadDataFrame("parquet", partitionCol = "pb") { df => - withTempPath { dir => - // The `df` has 10 input files with 10 rows each. Therefore the `_metadata.row_index` values diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 8e88049f51e..98d1eb07493 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -2589,7 +2464,7 @@ index abe606ad9c1..2d930b64cca 100644 val tblTargetName = "tbl_target" val tblSourceQualified = s"default.$tblSourceName" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala -index dd55fcfe42c..293e9dc2986 100644 +index dd55fcfe42c..e7fcd0a9e6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 3c6f764cf..3c3e8c471 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -19,14 +19,13 @@ package org.apache.spark.sql.comet.shims -import org.apache.comet.shims.ShimFileFormat import org.apache.hadoop.fs.Path -import org.apache.spark.SparkException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions @@ -49,16 +48,14 @@ trait ShimCometScanExec { filePartitions, readSchema, fileConstantMetadataColumns, - Map.empty, + fsRelation.fileFormat.fileConstantMetadataExtractors, options) protected def invalidBucketFile(path: String, sparkVersion: String): Throwable = - new SparkException("INVALID_BUCKET_FILE", Map("path" -> path), null) + QueryExecutionErrors.invalidBucketFile(path) - protected def isNeededForSchema(sparkSchema: StructType): Boolean = { - // TODO: remove after PARQUET-2161 becomes available in Parquet (tracked in SPARK-39634) - ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0 - } + // see SPARK-39634 + protected def isNeededForSchema(sparkSchema: StructType): Boolean = false protected def getPartitionedFile(f: FileStatusWithMetadata, p: PartitionDirectory): PartitionedFile = PartitionedFileUtil.getPartitionedFile(f, p.values)