Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jun 19, 2024
1 parent d7f43fe commit 36fd25a
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ object ShimBatchReader {
PartitionedFile(
partitionValues,
file,
Long.box(-1), // -1 means we read the entire file
Long.box(-1),
-1, // -1 means we read the entire file
-1,
Array.empty[String],
Long.box(0),
Long.box(0))
0,
0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ object ShimBatchReader {
PartitionedFile(
partitionValues,
SparkPath.fromPathString(file),
Long.box(-1), // -1 means we read the entire file
Long.box(-1),
-1, // -1 means we read the entire file
-1,
Array.empty[String],
Long.box(0),
Long.box(0))
0,
0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ object ShimBatchReader {
PartitionedFile(
partitionValues,
SparkPath.fromPathString(file),
Long.box(-1), // -1 means we read the entire file
Long.box(-1),
-1, // -1 means we read the entire file
-1,
Array.empty[String],
Long.box(0),
Long.box(0),
0,
0,
Map.empty)
}
8 changes: 3 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,6 @@ under the License.
</profile>

<profile>

<id>spark-3.3</id>
<properties>
<scala.version>2.12.15</scala.version>
Expand All @@ -538,7 +537,6 @@ under the License.
<parquet.version>1.12.0</parquet.version>
<slf4j.version>1.7.32</slf4j.version>
<additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
<additional.pre35.test.source>spark-pre-3.5</additional.pre35.test.source>
<shims.minorVerSrc>spark-3.3</shims.minorVerSrc>
<shims.pre35Src>spark-pre-3.5</shims.pre35Src>
</properties>
Expand All @@ -560,13 +558,13 @@ under the License.
<!-- FIXME: this is WIP. Tests may fail -->
<id>spark-3.5</id>
<properties>
<scala.version>2.12.15</scala.version>
<scala.version>2.12.18</scala.version>
<spark.version>3.5.1</spark.version>
<spark.version.short>3.5</spark.version.short>
<parquet.version>1.13.1</parquet.version>
<shims.minorVerSrc>spark-3.5</shims.minorVerSrc>
<shims.pre35Src>not-needed</shims.pre35Src>
<additional.pre35.test.source>not needed</additional.pre35.test.source>
<additional.pre35.test.source>not-needed</additional.pre35.test.source>
<additional.3_5.test.source>spark-3.5</additional.3_5.test.source>
</properties>
</profile>
Expand All @@ -586,7 +584,7 @@ under the License.
<shims.majorVerSrc>spark-4.0</shims.majorVerSrc>
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
<shims.pre35Src>not-needed</shims.pre35Src>
<additional.pre35.test.source>not needed</additional.pre35.test.source>
<additional.pre35.test.source>not-needed</additional.pre35.test.source>
<!-- Use jdk17 by default -->
<java.version>17</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,9 @@ trait ShimCometScanExec {
protected def invalidBucketFile(path: String, sparkVersion: String): Throwable =
new SparkException("INVALID_BUCKET_FILE", Map("path" -> path), null)

// Copied from Spark 3.4 RowIndexUtil due to PARQUET-2161 (tracked in SPARK-39634)
// TODO: remove after PARQUET-2161 becomes available in Parquet
private def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
sparkSchema.fields.zipWithIndex.find { case (field: StructField, _: Int) =>
field.name == ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
} match {
case Some((field: StructField, idx: Int)) =>
if (field.dataType != LongType) {
throw new RuntimeException(
s"${ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME} must be of LongType")
}
idx
case _ => -1
}
}

protected def isNeededForSchema(sparkSchema: StructType): Boolean = {
findRowIndexColumnIndexInSchema(sparkSchema) >= 0
// TODO: remove after PARQUET-2161 becomes available in Parquet (tracked in SPARK-39634)
ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema) >= 0
}

protected def getPartitionedFile(f: FileStatusWithMetadata, p: PartitionDirectory): PartitionedFile =
Expand Down

0 comments on commit 36fd25a

Please sign in to comment.