Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: Add spark-4.0 profile and shims #407

Merged
merged 27 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a1fff9b
build: Add spark-4.0 profile
kazuyukitanimura Apr 30, 2024
465b828
build: Add spark-4.0 profile
kazuyukitanimura May 1, 2024
62b7d2f
build: Add spark-4.0 profile
kazuyukitanimura May 7, 2024
8db78cb
build: Add spark-4.0 profile
kazuyukitanimura May 7, 2024
02a970a
Merge remote-tracking branch 'upstream/main' into spark-4.0
kazuyukitanimura May 7, 2024
7251eb2
build: Add spark-4.0 profile
kazuyukitanimura May 8, 2024
17a6995
build: Add spark-4.0 profile
kazuyukitanimura May 8, 2024
57d6538
build: Add spark-4.0 profile
kazuyukitanimura May 8, 2024
d3efeb8
build: Add spark-4.0 profile
kazuyukitanimura May 8, 2024
e310eb1
Merge remote-tracking branch 'upstream/main' into spark-4.0
kazuyukitanimura May 9, 2024
69ca228
build: Add spark-4.0 profile and shims
kazuyukitanimura May 9, 2024
3aec9e6
build: Add spark-4.0 profile and shims
kazuyukitanimura May 9, 2024
d629df1
build: Add spark-4.0 profile and shims
kazuyukitanimura May 9, 2024
65628fb
build: Add spark-4.0 profile and shims
kazuyukitanimura May 10, 2024
328705f
build: Add spark-4.0 profile and shims
kazuyukitanimura May 11, 2024
b85c712
build: Add spark-4.0 profile and shims
kazuyukitanimura May 11, 2024
8dc9dba
build: Add spark-4.0 profile and shims
kazuyukitanimura May 11, 2024
9a4b605
build: Add spark-4.0 profile and shims
kazuyukitanimura May 11, 2024
396d077
address review comments
kazuyukitanimura May 15, 2024
f472ee3
Merge remote-tracking branch 'upstream/main' into spark-4.0
kazuyukitanimura May 17, 2024
5179467
Merge remote-tracking branch 'upstream/main' into spark-4.0
kazuyukitanimura May 20, 2024
85e698f
address review comments
kazuyukitanimura May 20, 2024
5463bd9
address review comments
kazuyukitanimura May 21, 2024
0b81d7f
Merge remote-tracking branch 'upstream/main' into spark-4.0
kazuyukitanimura May 21, 2024
d45ac85
address review comments
kazuyukitanimura May 22, 2024
32bc314
address review comments
kazuyukitanimura May 23, 2024
d5f822f
Merge remote-tracking branch 'upstream/main' into spark-4.0
kazuyukitanimura May 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions .github/workflows/pr_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,44 @@ jobs:
# upload test reports only for java 17
upload-test-reports: ${{ matrix.java_version == '17' }}

linux-test-with-spark4_0:
strategy:
matrix:
os: [ubuntu-latest]
java_version: [17]
test-target: [java]
spark-version: ['4.0']
is_push_event:
- ${{ github.event_name == 'push' }}
fail-fast: false
name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}/${{ matrix.test-target }}
runs-on: ${{ matrix.os }}
container:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{env.RUST_VERSION}}
jdk-version: ${{ matrix.java_version }}
- name: Clone Spark
uses: actions/checkout@v4
with:
repository: "apache/spark"
path: "apache-spark"
- name: Install Spark
shell: bash
working-directory: ./apache-spark
run: build/mvn install -Phive -Phadoop-cloud -DskipTests
- name: Java test steps
uses: ./.github/actions/java-test
with:
# TODO: remove -DskipTests after fixing tests
maven_opts: "-Pspark-${{ matrix.spark-version }} -DskipTests"
# TODO: upload test reports after enabling tests
upload-test-reports: false

linux-test-with-old-spark:
strategy:
matrix:
Expand Down Expand Up @@ -169,6 +207,81 @@ jobs:
with:
maven_opts: -Pspark-${{ matrix.spark-version }},scala-${{ matrix.scala-version }}

macos-test-with-spark4_0:
strategy:
matrix:
os: [macos-13]
java_version: [17]
test-target: [java]
spark-version: ['4.0']
fail-fast: false
if: github.event_name == 'push'
name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}/${{ matrix.test-target }}
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-macos-builder
with:
rust-version: ${{env.RUST_VERSION}}
jdk-version: ${{ matrix.java_version }}
- name: Clone Spark
uses: actions/checkout@v4
with:
repository: "apache/spark"
path: "apache-spark"
- name: Install Spark
shell: bash
working-directory: ./apache-spark
run: build/mvn install -Phive -Phadoop-cloud -DskipTests
- name: Java test steps
uses: ./.github/actions/java-test
with:
# TODO: remove -DskipTests after fixing tests
maven_opts: "-Pspark-${{ matrix.spark-version }} -DskipTests"
# TODO: upload test reports after enabling tests
upload-test-reports: false

macos-aarch64-test-with-spark4_0:
strategy:
matrix:
java_version: [17]
test-target: [java]
spark-version: ['4.0']
is_push_event:
- ${{ github.event_name == 'push' }}
exclude: # exclude java 11 for pull_request event
- java_version: 11
is_push_event: false
fail-fast: false
name: macos-14(Silicon)/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}/${{ matrix.test-target }}
runs-on: macos-14
steps:
- uses: actions/checkout@v4
- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-macos-builder
with:
rust-version: ${{env.RUST_VERSION}}
jdk-version: ${{ matrix.java_version }}
jdk-architecture: aarch64
protoc-architecture: aarch_64
- name: Clone Spark
uses: actions/checkout@v4
with:
repository: "apache/spark"
path: "apache-spark"
- name: Install Spark
shell: bash
working-directory: ./apache-spark
run: build/mvn install -Phive -Phadoop-cloud -DskipTests
- name: Java test steps
uses: ./.github/actions/java-test
with:
# TODO: remove -DskipTests after fixing tests
maven_opts: "-Pspark-${{ matrix.spark-version }} -DskipTests"
# TODO: upload test reports after enabling tests
upload-test-reports: false

macos-aarch64-test-with-old-spark:
strategy:
matrix:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
package org.apache.comet.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.comet.shims.ShimCometParquetUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

object CometParquetUtils {
object CometParquetUtils extends ShimCometParquetUtils {
private val PARQUET_FIELD_ID_WRITE_ENABLED = "spark.sql.parquet.fieldId.write.enabled"
private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled"
private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing"
Expand All @@ -39,61 +39,4 @@ object CometParquetUtils {

def ignoreMissingIds(conf: SQLConf): Boolean =
conf.getConfString(IGNORE_MISSING_PARQUET_FIELD_ID, "false").toBoolean

// The following is copied from QueryExecutionErrors
// TODO: remove after dropping Spark 3.2.0 support and directly use
// QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError
def foundDuplicateFieldInFieldIdLookupModeError(
requiredId: Int,
matchedFields: String): Throwable = {
new RuntimeException(s"""
|Found duplicate field(s) "$requiredId": $matchedFields
|in id mapping mode
""".stripMargin.replaceAll("\n", " "))
}

// The followings are copied from org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
// TODO: remove after dropping Spark 3.2.0 support and directly use ParquetUtils
/**
* A StructField metadata key used to set the field id of a column in the Parquet schema.
*/
val FIELD_ID_METADATA_KEY = "parquet.field.id"

/**
* Whether there exists a field in the schema, whether inner or leaf, has the parquet field ID
* metadata.
*/
def hasFieldIds(schema: StructType): Boolean = {
def recursiveCheck(schema: DataType): Boolean = {
schema match {
case st: StructType =>
st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))

case at: ArrayType => recursiveCheck(at.elementType)

case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)

case _ =>
// No need to really check primitive types, just to terminate the recursion
false
}
}
if (schema.isEmpty) false else recursiveCheck(schema)
}

def hasFieldId(field: StructField): Boolean =
field.metadata.contains(FIELD_ID_METADATA_KEY)

def getFieldId(field: StructField): Int = {
require(
hasFieldId(field),
s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field)
try {
Math.toIntExact(field.metadata.getLong(FIELD_ID_METADATA_KEY))
} catch {
case _: ArithmeticException | _: ClassCastException =>
throw new IllegalArgumentException(
s"The key `$FIELD_ID_METADATA_KEY` must be a 32-bit integer")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet.shims

import org.apache.spark.sql.types._

trait ShimCometParquetUtils {
// The following is copied from QueryExecutionErrors
// TODO: remove after dropping Spark 3.2.0 support and directly use
// QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError
def foundDuplicateFieldInFieldIdLookupModeError(
requiredId: Int,
matchedFields: String): Throwable = {
new RuntimeException(s"""
|Found duplicate field(s) "$requiredId": $matchedFields
|in id mapping mode
""".stripMargin.replaceAll("\n", " "))
}

// The followings are copied from org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
// TODO: remove after dropping Spark 3.2.0 support and directly use ParquetUtils
/**
* A StructField metadata key used to set the field id of a column in the Parquet schema.
*/
val FIELD_ID_METADATA_KEY = "parquet.field.id"

/**
* Whether there exists a field in the schema, whether inner or leaf, has the parquet field ID
* metadata.
*/
def hasFieldIds(schema: StructType): Boolean = {
def recursiveCheck(schema: DataType): Boolean = {
schema match {
case st: StructType =>
st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))

case at: ArrayType => recursiveCheck(at.elementType)

case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)

case _ =>
// No need to really check primitive types, just to terminate the recursion
false
}
}
if (schema.isEmpty) false else recursiveCheck(schema)
}

def hasFieldId(field: StructField): Boolean =
field.metadata.contains(FIELD_ID_METADATA_KEY)

def getFieldId(field: StructField): Int = {
require(
hasFieldId(field),
s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field)
try {
Math.toIntExact(field.metadata.getLong(FIELD_ID_METADATA_KEY))
} catch {
case _: ArithmeticException | _: ClassCastException =>
throw new IllegalArgumentException(
s"The key `$FIELD_ID_METADATA_KEY` must be a 32-bit integer")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile

object ShimBatchReader {
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
PartitionedFile(
partitionValues,
SparkPath.fromUrlString(file),
-1, // -1 means we read the entire file
-1,
kazuyukitanimura marked this conversation as resolved.
Show resolved Hide resolved
Array.empty[String],
0,
0
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

object ShimFileFormat {
// A name for a temporary column that holds row indexes computed by the file format reader
// until they can be placed in the _metadata struct.
val ROW_INDEX_TEMPORARY_COLUMN_NAME = ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME

val OPTION_RETURNING_BATCH = FileFormat.OPTION_RETURNING_BATCH
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims


import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.types.{StructField, StructType}

object ShimResolveDefaultColumns {
def getExistenceDefaultValue(field: StructField): Any =
ResolveDefaultColumns.getExistenceDefaultValues(StructType(Seq(field))).head
}
Loading
Loading