From 36fd25a4def08c530aa3af6af25be60f1179dab3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 19 Jun 2024 02:30:37 -0600 Subject: [PATCH] address feedback --- .../apache/comet/shims/ShimBatchReader.scala | 8 ++++---- .../apache/comet/shims/ShimBatchReader.scala | 8 ++++---- .../apache/comet/shims/ShimBatchReader.scala | 8 ++++---- pom.xml | 8 +++----- .../sql/comet/shims/ShimCometScanExec.scala | 19 ++----------------- 5 files changed, 17 insertions(+), 34 deletions(-) diff --git a/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala b/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala index 1b1742a3e..3cbca896f 100644 --- a/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala +++ b/common/src/main/spark-3.3/org/apache/comet/shims/ShimBatchReader.scala @@ -28,9 +28,9 @@ object ShimBatchReader { PartitionedFile( partitionValues, file, - Long.box(-1), // -1 means we read the entire file - Long.box(-1), + -1, // -1 means we read the entire file + -1, Array.empty[String], - Long.box(0), - Long.box(0)) + 0, + 0) } diff --git a/common/src/main/spark-3.4/org/apache/comet/shims/ShimBatchReader.scala b/common/src/main/spark-3.4/org/apache/comet/shims/ShimBatchReader.scala index 352b91145..17b60e0e5 100644 --- a/common/src/main/spark-3.4/org/apache/comet/shims/ShimBatchReader.scala +++ b/common/src/main/spark-3.4/org/apache/comet/shims/ShimBatchReader.scala @@ -29,9 +29,9 @@ object ShimBatchReader { PartitionedFile( partitionValues, SparkPath.fromPathString(file), - Long.box(-1), // -1 means we read the entire file - Long.box(-1), + -1, // -1 means we read the entire file + -1, Array.empty[String], - Long.box(0), - Long.box(0)) + 0, + 0) } diff --git a/common/src/main/spark-3.5/org/apache/comet/shims/ShimBatchReader.scala b/common/src/main/spark-3.5/org/apache/comet/shims/ShimBatchReader.scala index 46bbcb21a..ec11caf89 100644 --- a/common/src/main/spark-3.5/org/apache/comet/shims/ShimBatchReader.scala +++ b/common/src/main/spark-3.5/org/apache/comet/shims/ShimBatchReader.scala @@ -29,10 +29,10 @@ object ShimBatchReader { PartitionedFile( partitionValues, SparkPath.fromPathString(file), - Long.box(-1), // -1 means we read the entire file - Long.box(-1), + -1, // -1 means we read the entire file + -1, Array.empty[String], - Long.box(0), - Long.box(0), + 0, + 0, Map.empty) } diff --git a/pom.xml b/pom.xml index de67b8571..268dea63a 100644 --- a/pom.xml +++ b/pom.xml @@ -529,7 +529,6 @@ under the License. - spark-3.3 2.12.15 @@ -538,7 +537,6 @@ under the License. 1.12.0 1.7.32 not-needed-yet - spark-pre-3.5 spark-3.3 spark-pre-3.5 @@ -560,13 +558,13 @@ under the License. spark-3.5 - 2.12.15 + 2.12.18 3.5.1 3.5 1.13.1 spark-3.5 not-needed - not needed + not-needed spark-3.5 @@ -586,7 +584,7 @@ under the License. spark-4.0 not-needed-yet not-needed - not needed + not-needed 17 ${java.version} diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala index 030a40f34..b9ff2ec87 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala @@ -55,24 +55,9 @@ trait ShimCometScanExec { protected def invalidBucketFile(path: String, sparkVersion: String): Throwable = new SparkException("INVALID_BUCKET_FILE", Map("path" -> path), null) - // Copied from Spark 3.4 RowIndexUtil due to PARQUET-2161 (tracked in SPARK-39634) - // TODO: remove after PARQUET-2161 becomes available in Parquet - private def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = { - sparkSchema.fields.zipWithIndex.find { case (field: StructField, _: Int) => - field.name == ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME - } match { - case Some((field: StructField, idx: Int)) => - if (field.dataType != LongType) { - throw new RuntimeException( - s"${ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} must be of LongType") - } - idx - case _ => -1 - } - } - protected def isNeededForSchema(sparkSchema: StructType): Boolean = { - findRowIndexColumnIndexInSchema(sparkSchema) >= 0 + // TODO: remove after PARQUET-2161 becomes available in Parquet (tracked in SPARK-39634) + ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0 } protected def getPartitionedFile(f: FileStatusWithMetadata, p: PartitionDirectory): PartitionedFile =