Skip to content

Commit

Permalink
Rework the shims a little bit for supporting calling different versio…
Browse files Browse the repository at this point in the history
…ns of Spark stuff, add some notes to development about formatting.
  • Loading branch information
holdenk committed Feb 28, 2024
1 parent eac944e commit fe1f715
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 42 deletions.
4 changes: 4 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ make benchmark-org.apache.spark.sql.benchmark.CometReadBenchmark
To run TPC-H or TPC-DS micro benchmarks, please follow the instructions
in the respective source code, e.g., `CometTPCHQueryBenchmark`.

## Style

You can fix Scala style issues using spotless by running `make format`.

## Debugging
Comet is a multi-language project with native code written in Rust and JVM code written in Java and Scala.
It is possible to debug both native and JVM code concurrently as described in the [DEBUGGING guide](DEBUGGING.md)
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ bench:
RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS))
format:
./mvnw compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES)
./mvnw spotless:apply $(PROFILES)
./mvnw spotless:apply $(PROFILES) -Pspark-3.5
./mvnw spotless:apply $(PROFILES) -Pspark-3.4
./mvnw spotless:apply $(PROFILES) -Pspark-3.3
./mvnw spotless:apply $(PROFILES) -Pspark-3.2


core-amd64:
rustup target add x86_64-apple-darwin
Expand Down
4 changes: 1 addition & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ under the License.
<properties>
<scala.version>2.12.17</scala.version>
<spark.version.short>3.4</spark.version.short>
<spark.version>3.4.2</spark.version>
<parquet.version>1.13.1</parquet.version>
<java.version>11</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand All @@ -526,9 +527,6 @@ under the License.

<profile>
<id>spark-3.5</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<scala.version>2.12.17</scala.version>
<spark.version.short>3.5</spark.version.short>
Expand Down
10 changes: 5 additions & 5 deletions shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ under the License.
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>comet-spark-shims</artifactId>
<name>comet-spark-shims</name>
<artifactId>comet-spark-per-spark-shims</artifactId>
<name>comet-spark-per-spark-shims</name>
<packaging>pom</packaging>

<properties>
Expand Down Expand Up @@ -103,19 +103,19 @@ under the License.
<profile>
<id>spark-3.2</id>
<modules>
<module>spark-3.2</module>
<module>pre-spark-3.5</module>
</modules>
</profile>
<profile>
<id>spark-3.3</id>
<modules>
<module>spark-3.3</module>
<module>pre-spark-3.5</module>
</modules>
</profile>
<profile>
<id>spark-3.4</id>
<modules>
<module>spark-3.4</module>
<module>pre-spark-3.5</module>
</modules>
</profile>
<profile>
Expand Down
16 changes: 7 additions & 9 deletions shims/spark-3.4/pom.xml → shims/pre-spark-3.5/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,13 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.comet</groupId>
<artifactId>comet-spark-shims</artifactId>
<artifactId>comet-spark-per-spark-shims</artifactId>
<version>0.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>comet-spark-shims${spark.version.short}_${scala.binary.version}</artifactId>
<name>comet-spark-shims${spark.version.short}_${scala.binary.version}</name>
<packaging>pom</packaging>

<properties>
<!-- Reverse default (skip installation), and then enable only for child modules -->
<maven.deploy.skip>false</maven.deploy.skip>
</properties>
<artifactId>comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}</artifactId>
<name>comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}</name>

<dependencies>
<dependency>
Expand Down Expand Up @@ -96,6 +90,10 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,33 @@
* under the License.
*/

package org.apache.comet
package org.apache.comet.shims

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources._

import org.apache.hadoop.fs.Path

class Shim {
class PartitionShim {

def getPartitionedFile(
file: FileStatusWithMetadata,
partitionValues: InternalRow): PartitionedFile = {
PartitonedFileUtil.getPartitionedFile(file, partitionValues)
PartitionedFileUtil.getPartitionedFile(file, f.getPath, partitionValues)
}

def splitFiles(
sparkSession: SparkSession,
file: FileStatusWithMetadata,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
PartitionedFileUtil.splitFiles(
sparkSession = sparkSession,
file = file,
filePath = file.getPath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partitionValues)
}
}
16 changes: 7 additions & 9 deletions shims/spark-3.5/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,13 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.comet</groupId>
<artifactId>comet-spark-shims</artifactId>
<artifactId>comet-spark-per-spark-shims</artifactId>
<version>0.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>comet-spark-shims${spark.version.short}_${scala.binary.version}</artifactId>
<name>comet-spark-shims${spark.version.short}_${scala.binary.version}</name>
<packaging>pom</packaging>

<properties>
<!-- Reverse default (skip installation), and then enable only for child modules -->
<maven.deploy.skip>false</maven.deploy.skip>
</properties>
<artifactId>comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}</artifactId>
<name>comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}</name>

<dependencies>
<dependency>
Expand Down Expand Up @@ -96,6 +90,10 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,32 @@
* under the License.
*/

package org.apache.comet
package org.apache.comet.shims

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources._

import org.apache.hadoop.fs.Path

class Shim {
object PartitionShim {

def getPartitionedFile(
file: FileStatusWithMetadata,
partitionValues: InternalRow): PartitionedFile = {
PartitonedFileUtil.getPartitionedFile(file, f.getPath, partitionValues)
PartitionedFileUtil.getPartitionedFile(file, partitionValues)
}

def splitFiles(
sparkSession: SparkSession,
file: FileStatusWithMetadata,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
PartitionedFileUtil.splitFiles(
sparkSession = sparkSession,
file = file,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partitionValues)
}
}
2 changes: 1 addition & 1 deletion spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.comet</groupId>
<artifactId>comet-spark-shims${spark.version.short}_${scala.binary.version}</artifactId>
<artifactId>comet-spark-per-spark-shims${spark.version.short}_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.util.collection._

import org.apache.comet.{CometConf, MetricsSupport}
import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetPartitionReaderFactory}
import org.apache.comet.shims.{ShimCometScanExec, ShimFileFormat}
import org.apache.comet.shims.{PartitionShim, ShimCometScanExec, ShimFileFormat}

/**
* Comet physical scan node for DataSource V1. Most of the code here follow Spark's
Expand Down Expand Up @@ -267,7 +267,7 @@ case class CometScanExec(
selectedPartitions
.flatMap { p =>
p.files.map { f =>
PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
PartitionShim.getPartitionedFile(f, p.values)
}
}
.groupBy { f =>
Expand Down Expand Up @@ -354,10 +354,9 @@ case class CometScanExec(
// SPARK-39634: Allow file splitting in combination with row index generation once
// the fix for PARQUET-2161 is available.
!isNeededForSchema(requiredSchema)
PartitionedFileUtil.splitFiles(
PartitionShim.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values)
Expand Down

0 comments on commit fe1f715

Please sign in to comment.