From fe1f715c2e9fdddb6325f72d3d4fae157897bdcd Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 27 Feb 2024 16:52:42 -0800 Subject: [PATCH] Rework the shims a little bit for supporting calling different versions of Spark stuff, add some notes to development about formatting. --- DEVELOPMENT.md | 4 +++ Makefile | 6 ++++- pom.xml | 4 +-- shims/pom.xml | 10 ++++---- shims/{spark-3.4 => pre-spark-3.5}/pom.xml | 16 ++++++------ .../apache/comet/shims/PartitionShim.scala} | 25 +++++++++++++++---- shims/spark-3.5/pom.xml | 16 ++++++------ .../apache/comet/shims/PartitionShim.scala} | 24 ++++++++++++++---- spark/pom.xml | 2 +- .../spark/sql/comet/CometScanExec.scala | 7 +++--- 10 files changed, 72 insertions(+), 42 deletions(-) rename shims/{spark-3.4 => pre-spark-3.5}/pom.xml (90%) rename shims/{spark-3.5/src/main/scala/org/apache/comet/Shim.scala => pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala} (58%) rename shims/{spark-3.4/src/main/scala/org/apache/comet/Shim.scala => spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala} (59%) diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 1793bb9e2..38d5c01c1 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -60,6 +60,10 @@ make benchmark-org.apache.spark.sql.benchmark.CometReadBenchmark To run TPC-H or TPC-DS micro benchmarks, please follow the instructions in the respective source code, e.g., `CometTPCHQueryBenchmark`. +## Style + +You can fix Scala style issues using spotless by running `make format`. + ## Debugging Comet is a multi-language project with native code written in Rust and JVM code written in Java and Scala. It is possible to debug both native and JVM code concurrently as described in the [DEBUGGING guide](DEBUGGING.md) diff --git a/Makefile b/Makefile index fe13fbd9b..51e8e09d6 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,11 @@ bench: RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS)) format: ./mvnw compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES) - ./mvnw spotless:apply $(PROFILES) + ./mvnw spotless:apply $(PROFILES) -Pspark-3.5 + ./mvnw spotless:apply $(PROFILES) -Pspark-3.4 + ./mvnw spotless:apply $(PROFILES) -Pspark-3.3 + ./mvnw spotless:apply $(PROFILES) -Pspark-3.2 + core-amd64: rustup target add x86_64-apple-darwin diff --git a/pom.xml b/pom.xml index 854106b8a..b7b2387d7 100644 --- a/pom.xml +++ b/pom.xml @@ -517,6 +517,7 @@ under the License. 2.12.17 3.4 + 3.4.2 1.13.1 11 ${java.version} @@ -526,9 +527,6 @@ under the License. spark-3.5 - - true - 2.12.17 3.5 diff --git a/shims/pom.xml b/shims/pom.xml index 3ad6635e7..c859eefd1 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -30,8 +30,8 @@ under the License. ../pom.xml - comet-spark-shims - comet-spark-shims + comet-spark-per-spark-shims + comet-spark-per-spark-shims pom @@ -103,19 +103,19 @@ under the License. spark-3.2 - spark-3.2 + pre-spark-3.5 spark-3.3 - spark-3.3 + pre-spark-3.5 spark-3.4 - spark-3.4 + pre-spark-3.5 diff --git a/shims/spark-3.4/pom.xml b/shims/pre-spark-3.5/pom.xml similarity index 90% rename from shims/spark-3.4/pom.xml rename to shims/pre-spark-3.5/pom.xml index d7df10846..3bd925bdf 100644 --- a/shims/spark-3.4/pom.xml +++ b/shims/pre-spark-3.5/pom.xml @@ -25,19 +25,13 @@ under the License. 4.0.0 org.apache.comet - comet-spark-shims + comet-spark-per-spark-shims 0.1.0-SNAPSHOT ../pom.xml - comet-spark-shims${spark.version.short}_${scala.binary.version} - comet-spark-shims${spark.version.short}_${scala.binary.version} - pom - - - - false - + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} @@ -96,6 +90,10 @@ under the License. + + net.alchim31.maven + scala-maven-plugin + diff --git a/shims/spark-3.5/src/main/scala/org/apache/comet/Shim.scala b/shims/pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala similarity index 58% rename from shims/spark-3.5/src/main/scala/org/apache/comet/Shim.scala rename to shims/pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala index 1ca1513a6..523c4aa92 100644 --- a/shims/spark-3.5/src/main/scala/org/apache/comet/Shim.scala +++ b/shims/pre-spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala @@ -17,18 +17,33 @@ * under the License. */ -package org.apache.comet +package org.apache.comet.shims +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ -import org.apache.hadoop.fs.Path - -class Shim { +class PartitionShim { def getPartitionedFile( file: FileStatusWithMetadata, partitionValues: InternalRow): PartitionedFile = { - PartitonedFileUtil.getPartitionedFile(file, partitionValues) + PartitionedFileUtil.getPartitionedFile(file, f.getPath, partitionValues) } + def splitFiles( + sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles( + sparkSession = sparkSession, + file = file, + filePath = file.getPath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partitionValues) + } } diff --git a/shims/spark-3.5/pom.xml b/shims/spark-3.5/pom.xml index d7df10846..3bd925bdf 100644 --- a/shims/spark-3.5/pom.xml +++ b/shims/spark-3.5/pom.xml @@ -25,19 +25,13 @@ under the License. 4.0.0 org.apache.comet - comet-spark-shims + comet-spark-per-spark-shims 0.1.0-SNAPSHOT ../pom.xml - comet-spark-shims${spark.version.short}_${scala.binary.version} - comet-spark-shims${spark.version.short}_${scala.binary.version} - pom - - - - false - + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} @@ -96,6 +90,10 @@ under the License. + + net.alchim31.maven + scala-maven-plugin + diff --git a/shims/spark-3.4/src/main/scala/org/apache/comet/Shim.scala b/shims/spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala similarity index 59% rename from shims/spark-3.4/src/main/scala/org/apache/comet/Shim.scala rename to shims/spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala index 0ff522566..ed09bccfe 100644 --- a/shims/spark-3.4/src/main/scala/org/apache/comet/Shim.scala +++ b/shims/spark-3.5/src/main/scala/org/apache/comet/shims/PartitionShim.scala @@ -17,18 +17,32 @@ * under the License. */ -package org.apache.comet +package org.apache.comet.shims +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources._ -import org.apache.hadoop.fs.Path - -class Shim { +object PartitionShim { def getPartitionedFile( file: FileStatusWithMetadata, partitionValues: InternalRow): PartitionedFile = { - PartitonedFileUtil.getPartitionedFile(file, f.getPath, partitionValues) + PartitionedFileUtil.getPartitionedFile(file, partitionValues) } + def splitFiles( + sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles( + sparkSession = sparkSession, + file = file, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partitionValues) + } } diff --git a/spark/pom.xml b/spark/pom.xml index 63b2c1b27..c5c0c948a 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -52,7 +52,7 @@ under the License. org.apache.comet - comet-spark-shims${spark.version.short}_${scala.binary.version} + comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version} ${project.version} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 4bf01f0f4..75e0ec321 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.collection._ import org.apache.comet.{CometConf, MetricsSupport} import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory} -import org.apache.comet.shims.{ShimCometScanExec, ShimFileFormat} +import org.apache.comet.shims.{PartitionShim, ShimCometScanExec, ShimFileFormat} /** * Comet physical scan node for DataSource V1. Most of the code here follow Spark's @@ -267,7 +267,7 @@ case class CometScanExec( selectedPartitions .flatMap { p => p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values) + PartitionShim.getPartitionedFile(f, p.values) } } .groupBy { f => @@ -354,10 +354,9 @@ case class CometScanExec( // SPARK-39634: Allow file splitting in combination with row index generation once // the fix for PARQUET-2161 is available. !isNeededForSchema(requiredSchema) - PartitionedFileUtil.splitFiles( + PartitionShim.splitFiles( sparkSession = relation.sparkSession, file = file, - filePath = filePath, isSplitable = isSplitable, maxSplitBytes = maxSplitBytes, partitionValues = partition.values)