Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jul 15, 2024
2 parents 84035d3 + c434872 commit 80637bd
Show file tree
Hide file tree
Showing 568 changed files with 26,068 additions and 8,019 deletions.
2 changes: 1 addition & 1 deletion .github/actions/java-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ runs:
- name: Run Cargo build
shell: bash
run: |
cd core
cd native
cargo build
- name: Cache Maven dependencies
Expand Down
14 changes: 10 additions & 4 deletions .github/actions/rust-test/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,27 @@ runs:
- name: Check Cargo fmt
shell: bash
run: |
cd core
cd native
cargo fmt --all -- --check --color=never
- name: Check Cargo clippy
shell: bash
run: |
cd core
cd native
cargo clippy --color=never -- -D warnings
- name: Check compilation
shell: bash
run: |
cd core
cd native
cargo check --benches
- name: Check unused dependencies
shell: bash
run: |
cd native
cargo install cargo-machete && cargo machete
- name: Cache Maven dependencies
uses: actions/cache@v4
with:
Expand All @@ -56,5 +62,5 @@ runs:
- name: Run Cargo test
shell: bash
run: |
cd core
cd native
RUST_BACKTRACE=1 cargo test
6 changes: 3 additions & 3 deletions .github/workflows/benchmark-tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ jobs:
with:
name: libcomet-${{ github.run_id }}
path: |
core/target/release/libcomet.so
core/target/release/libcomet.dylib
native/target/release/libcomet.so
native/target/release/libcomet.dylib
retention-days: 1 # remove the artifact after 1 day, only valid for this workflow
overwrite: true
- name: Generate TPC-H (SF=1) table data
Expand Down Expand Up @@ -119,7 +119,7 @@ jobs:
uses: actions/download-artifact@v4
with:
name: libcomet-${{ github.run_id }}
path: core/target/release
path: native/target/release
- name: Run TPC-H queries
run: |
SPARK_HOME=`pwd` SPARK_TPCH_DATA=`pwd`/tpch/sf1_parquet ./mvnw -B -Prelease -Dsuites=org.apache.spark.sql.CometTPCHQuerySuite test
6 changes: 3 additions & 3 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ jobs:
with:
name: libcomet-${{ github.run_id }}
path: |
core/target/release/libcomet.so
core/target/release/libcomet.dylib
native/target/release/libcomet.so
native/target/release/libcomet.dylib
retention-days: 1 # remove the artifact after 1 day, only valid for this workflow
overwrite: true
- name: Build tpcds-kit
Expand Down Expand Up @@ -134,7 +134,7 @@ jobs:
uses: actions/download-artifact@v4
with:
name: libcomet-${{ github.run_id }}
path: core/target/release
path: native/target/release
- name: Run TPC-DS queries (Sort merge join)
if: matrix.join == 'sort_merge'
run: |
Expand Down
49 changes: 49 additions & 0 deletions .github/workflows/miri.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Run Miri Safety Checks

on:
push:
paths-ignore:
- "doc/**"
- "docs/**"
- "**.md"
pull_request:
paths-ignore:
- "doc/**"
- "docs/**"
- "**.md"
# manual trigger
# https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow
workflow_dispatch:

jobs:
miri:
name: "Miri"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Miri
run: |
rustup toolchain install nightly --component miri
rustup override set nightly
cargo miri setup
- name: Test with Miri
run: |
cd native
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test
3 changes: 2 additions & 1 deletion .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
matrix:
os: [ubuntu-latest]
java-version: [11]
spark-version: [{short: '3.4', full: '3.4.3'}]
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.1'}]
module:
- {name: "catalyst", args1: "catalyst/test", args2: ""}
- {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
Expand Down Expand Up @@ -75,6 +75,7 @@ jobs:
- name: Run Spark tests
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ derby.log
metastore_db/
spark-warehouse/
dependency-reduced-pom.xml
core/src/execution/generated
native/core/src/execution/generated
prebuild
.flattened-pom.xml
rat.txt
Expand Down
36 changes: 18 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,65 @@
all: core jvm

core:
cd core && cargo build
cd native && cargo build
test-rust:
# We need to compile CometException so that the cargo test can pass
./mvnw compile -pl common -DskipTests $(PROFILES)
cd core && cargo build && \
cd native && cargo build && \
RUST_BACKTRACE=1 cargo test
jvm:
./mvnw clean package -DskipTests $(PROFILES)
test-jvm: core
SPARK_HOME=`pwd` COMET_CONF_DIR=$(shell pwd)/conf RUST_BACKTRACE=1 ./mvnw verify $(PROFILES)
test: test-rust test-jvm
clean:
cd core && cargo clean
cd native && cargo clean
./mvnw clean $(PROFILES)
rm -rf .dist
bench:
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS))
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo bench $(filter-out $@,$(MAKECMDGOALS))
format:
cd core && cargo fmt
cd native && cargo fmt
./mvnw compile test-compile scalafix:scalafix -Psemanticdb $(PROFILES)
./mvnw spotless:apply $(PROFILES)

core-amd64:
rustup target add x86_64-apple-darwin
cd core && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
mkdir -p common/target/classes/org/apache/comet/darwin/x86_64
cp core/target/x86_64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/x86_64
cd core && RUSTFLAGS="-Ctarget-cpu=haswell -Ctarget-feature=-prefer-256-bit" cargo build --release
cp native/target/x86_64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/x86_64
cd native && RUSTFLAGS="-Ctarget-cpu=haswell -Ctarget-feature=-prefer-256-bit" cargo build --release
mkdir -p common/target/classes/org/apache/comet/linux/amd64
cp core/target/release/libcomet.so common/target/classes/org/apache/comet/linux/amd64
cp native/target/release/libcomet.so common/target/classes/org/apache/comet/linux/amd64
jar -cf common/target/comet-native-x86_64.jar \
-C common/target/classes/org/apache/comet darwin \
-C common/target/classes/org/apache/comet linux
./dev/deploy-file common/target/comet-native-x86_64.jar comet-native-x86_64${COMET_CLASSIFIER} jar

core-arm64:
rustup target add aarch64-apple-darwin
cd core && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
mkdir -p common/target/classes/org/apache/comet/darwin/aarch64
cp core/target/aarch64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/aarch64
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
cp native/target/aarch64-apple-darwin/release/libcomet.dylib common/target/classes/org/apache/comet/darwin/aarch64
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
mkdir -p common/target/classes/org/apache/comet/linux/aarch64
cp core/target/release/libcomet.so common/target/classes/org/apache/comet/linux/aarch64
cp native/target/release/libcomet.so common/target/classes/org/apache/comet/linux/aarch64
jar -cf common/target/comet-native-aarch64.jar \
-C common/target/classes/org/apache/comet darwin \
-C common/target/classes/org/apache/comet linux
./dev/deploy-file common/target/comet-native-aarch64.jar comet-native-aarch64${COMET_CLASSIFIER} jar

release-linux: clean
rustup target add aarch64-apple-darwin x86_64-apple-darwin
cd core && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
cd core && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
cd core && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
cd native && RUSTFLAGS="-Ctarget-cpu=apple-m1" CC=arm64-apple-darwin21.4-clang CXX=arm64-apple-darwin21.4-clang++ CARGO_FEATURE_NEON=1 cargo build --target aarch64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=skylake -Ctarget-feature=-prefer-256-bit" CC=o64-clang CXX=o64-clang++ cargo build --target x86_64-apple-darwin --release
cd native && RUSTFLAGS="-Ctarget-cpu=native -Ctarget-feature=-prefer-256-bit" cargo build --release
./mvnw install -Prelease -DskipTests $(PROFILES)
release:
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release
./mvnw install -Prelease -DskipTests $(PROFILES)
release-nogit:
cd core && RUSTFLAGS="-Ctarget-cpu=native" cargo build --features nightly --release
cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --features nightly --release
./mvnw install -Prelease -DskipTests $(PROFILES) -Dmaven.gitcommitid.skip=true
benchmark-%: clean release
cd spark && COMET_CONF_DIR=$(shell pwd)/conf MAVEN_OPTS='-Xmx20g' ../mvnw exec:java -Dexec.mainClass="$*" -Dexec.classpathScope="test" -Dexec.cleanupDaemonThreads="false" -Dexec.args="$(filter-out $@,$(MAKECMDGOALS))" $(PROFILES)
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ The following chart shows the time it takes to run the 22 TPC-H queries against
using a single executor with 8 cores. See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)
for details of the environment used for these benchmarks.

When using Comet, the overall run time is reduced from 649 seconds to 440 seconds, a 1.5x speedup.
When using Comet, the overall run time is reduced from 649 seconds to 433 seconds, a 1.5x speedup, with some queries
showing a 2x-3x speedup.

Running the same queries with DataFusion standalone (without Spark) using the same number of cores results in a 3.9x
speedup compared to Spark.

Comet is not yet achieving full DataFusion speeds in all cases, but with future work we aim to provide a 2x-4x speedup
for many use cases.
for a broader set of queries.

![](docs/source/_static/images/tpch_allqueries.png)

Expand Down
4 changes: 2 additions & 2 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,14 @@ under the License.
<directory>${project.basedir}/src/main/resources</directory>
</resource>
<resource>
<directory>${project.basedir}/../core/target/x86_64-apple-darwin/release</directory>
<directory>${project.basedir}/../native/target/x86_64-apple-darwin/release</directory>
<includes>
<include>libcomet.dylib</include>
</includes>
<targetPath>org/apache/comet/darwin/x86_64</targetPath>
</resource>
<resource>
<directory>${project.basedir}/../core/target/aarch64-apple-darwin/release</directory>
<directory>${project.basedir}/../native/target/aarch64-apple-darwin/release</directory>
<includes>
<include>libcomet.dylib</include>
</includes>
Expand Down
13 changes: 11 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,22 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.verbose.enabled")
.doc(
"When this setting is enabled, Comet will provide a verbose tree representation of " +
"the extended information.")
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explainFallback.enabled")
.doc(
"When this setting is enabled, Comet will provide logging explaining the reason(s) " +
"why a query stage cannot be executed natively.")
"why a query stage cannot be executed natively. Set this to false to " +
"reduce the amount of logging.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
Expand Down
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ package org.apache

import java.util.Properties

import org.apache.arrow.memory.RootAllocator

package object comet {

/**
* The root allocator for Comet execution. Because Arrow Java memory management is based on
* reference counting, exposed arrays increase the reference count of the underlying buffers.
* Until the reference count is zero, the memory will not be released. If the consumer side is
* finished later than the close of the allocator, the allocator will think the memory is
* leaked. To avoid this, we use a single allocator for the whole execution process.
*/
val CometArrowAllocator = new RootAllocator(Long.MaxValue)

/**
* Provides access to build information about the Comet libraries. This will be used by the
* benchmarking software to provide the source revision and repository. In addition, the build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ package org.apache.comet.vector
import scala.collection.mutable

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

class NativeUtil {
import Utils._

private val allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(this.getClass.getSimpleName, 0, Long.MaxValue)
private val allocator = CometArrowAllocator
private val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider
private val importer = new ArrowImporter(allocator)

Expand Down
12 changes: 5 additions & 7 deletions common/src/main/scala/org/apache/comet/vector/StreamReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ package org.apache.comet.vector

import java.nio.channels.ReadableByteChannel

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel}
import org.apache.arrow.vector.ipc.message.MessageChannelReader
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

/**
* A reader that consumes Arrow data from an input channel, and produces Comet batches.
*/
case class StreamReader(channel: ReadableByteChannel, source: String) extends AutoCloseable {
private var allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(s"${this.getClass.getSimpleName}/$source", 0, Long.MaxValue)
private val channelReader = new MessageChannelReader(new ReadChannel(channel), allocator)
private var arrowReader = new ArrowStreamReader(channelReader, allocator)
private val channelReader =
new MessageChannelReader(new ReadChannel(channel), CometArrowAllocator)
private var arrowReader = new ArrowStreamReader(channelReader, CometArrowAllocator)
private var root = arrowReader.getVectorSchemaRoot

def nextBatch(): Option[ColumnarBatch] = {
Expand All @@ -53,11 +53,9 @@ case class StreamReader(channel: ReadableByteChannel, source: String) extends Au
if (root != null) {
arrowReader.close()
root.close()
allocator.close()

arrowReader = null
root = null
allocator = null
}
}
}
4 changes: 2 additions & 2 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2491,8 +2491,8 @@ index dd55fcfe42c..293e9dc2986 100644
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
+ case CometFilterExec(_, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, child, _), _) => child
+ case CometFilterExec(_, _, _, _, child, _) => child
+ case CometProjectExec(_, _, _, _, CometFilterExec(_, _, _, _, child, _), _) => child
}

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
Expand Down
Loading

0 comments on commit 80637bd

Please sign in to comment.