Skip to content

Commit

Permalink
chore: Fix some regressions with Spark 3.5.1 (#674)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Jul 16, 2024
1 parent 8e1c7e1 commit ae7ea99
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 134 deletions.
127 changes: 1 addition & 126 deletions dev/diffs/3.5.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1532,107 +1532,6 @@ index 68bae34790a..ea906fd1adc 100644
}
assert(shuffles2.size == 4)
val smj2 = findTopLevelSortMergeJoin(adaptive2)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 15055a276fa..6e60b94dc3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat

import org.apache.spark.TestUtils
import org.apache.spark.paths.SparkPath
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, IgnoreComet, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution.FileSourceScanExec
@@ -116,7 +116,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
testName: String, fileSchema: StructType)
(f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
Seq("json", "parquet").foreach { testFileFormat =>
- test(s"metadata struct ($testFileFormat): " + testName) {
+ test(s"metadata struct ($testFileFormat): " + testName,
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempDir { dir =>
import scala.collection.JavaConverters._

@@ -767,7 +769,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {

Seq(true, false).foreach { useVectorizedReader =>
val label = if (useVectorizedReader) "reading batches" else "reading rows"
- test(s"SPARK-39806: metadata for a partitioned table ($label)") {
+ test(s"SPARK-39806: metadata for a partitioned table ($label)",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) {
withTempPath { dir =>
// Store dynamically partitioned data.
@@ -789,7 +793,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}

Seq("parquet", "orc").foreach { format =>
- test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format") {
+ test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in $format",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
// The issue was that ParquetFileFormat would not count the _metadata columns towards
// the WholeStageCodegenExec.isTooManyFields limit, while FileSourceScanExec would,
// resulting in Parquet reader returning columnar output, while scan expected row.
@@ -862,7 +868,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}

- test("SPARK-41896: Filter on constant and generated metadata attributes at the same time") {
+ test("SPARK-41896: Filter on constant and generated metadata attributes at the same time",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempPath { dir =>
val idColumnName = "id"
val partitionColumnName = "partition"
@@ -897,7 +905,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}

- test("SPARK-41896: Filter by a function that takes the metadata struct as argument") {
+ test("SPARK-41896: Filter by a function that takes the metadata struct as argument",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempPath { dir =>
val idColumnName = "id"
val numFiles = 4
@@ -984,7 +994,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {


Seq("parquet", "json", "csv", "text", "orc").foreach { format =>
- test(s"metadata file path is url encoded for format: $format") {
+ test(s"metadata file path is url encoded for format: $format",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempPath { f =>
val dirWithSpace = s"$f/with space"
spark.range(10)
@@ -1002,7 +1014,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}

- test(s"metadata file name is url encoded for format: $format") {
+ test(s"metadata file name is url encoded for format: $format",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
val suffix = if (format == "text") ".txt" else s".$format"
withTempPath { f =>
val dirWithSpace = s"$f/with space"
@@ -1056,7 +1070,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}

- test("SPARK-43450: Filter on full _metadata column struct") {
+ test("SPARK-43450: Filter on full _metadata column struct",
+ // https://github.com/apache/datafusion-comet/issues/617
+ IgnoreComet("TODO: fix Comet for this test")) {
withTempPath { dir =>
val numRows = 10
spark.range(end = numRows)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
index 05872d41131..a2c328b9742 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCustomMetadataStructSuite.scala
Expand Down Expand Up @@ -1756,30 +1655,6 @@ index 07e2849ce6f..3e73645b638 100644
val extraOptions = Map[String, String](
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
index c10e1799702..f18ca092dba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMetadataStructRowIndexSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.parquet

-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
+import org.apache.spark.sql.{AnalysisException, DataFrame, IgnoreComet, QueryTest}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.internal.SQLConf
@@ -219,7 +219,9 @@ class ParquetFileMetadataStructRowIndexSuite extends QueryTest with SharedSparkS
}
}

- test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column") {
+ // https://github.com/apache/datafusion-comet/issues/617
+ test(s"read user created ${FileFormat.METADATA_NAME}.${ROW_INDEX} column",
+ IgnoreComet("TODO: fix Comet for this test")) {
withReadDataFrame("parquet", partitionCol = "pb") { df =>
withTempPath { dir =>
// The `df` has 10 input files with 10 rows each. Therefore the `_metadata.row_index` values
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 8e88049f51e..98d1eb07493 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
Expand Down Expand Up @@ -2589,7 +2464,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index dd55fcfe42c..293e9dc2986 100644
index dd55fcfe42c..e7fcd0a9e6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@

package org.apache.spark.sql.comet.shims

import org.apache.comet.shims.ShimFileFormat

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{FileSourceScanExec, PartitionedFileUtil}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
Expand All @@ -49,16 +48,14 @@ trait ShimCometScanExec {
filePartitions,
readSchema,
fileConstantMetadataColumns,
Map.empty,
fsRelation.fileFormat.fileConstantMetadataExtractors,
options)

protected def invalidBucketFile(path: String, sparkVersion: String): Throwable =
new SparkException("INVALID_BUCKET_FILE", Map("path" -> path), null)
QueryExecutionErrors.invalidBucketFile(path)

protected def isNeededForSchema(sparkSchema: StructType): Boolean = {
// TODO: remove after PARQUET-2161 becomes available in Parquet (tracked in SPARK-39634)
ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0
}
// see SPARK-39634
protected def isNeededForSchema(sparkSchema: StructType): Boolean = false

protected def getPartitionedFile(f: FileStatusWithMetadata, p: PartitionDirectory): PartitionedFile =
PartitionedFileUtil.getPartitionedFile(f, p.values)
Expand Down

0 comments on commit ae7ea99

Please sign in to comment.