Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into spark-4.0-spark-test
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuyukitanimura committed Jun 4, 2024
2 parents 49364c0 + 77c9a6c commit 3a36f2e
Show file tree
Hide file tree
Showing 46 changed files with 6,073 additions and 814 deletions.
4 changes: 2 additions & 2 deletions .github/actions/setup-spark-builder/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ inputs:
required: true
default: '3.4'
spark-version:
description: 'The Apache Spark version (e.g., 3.4.2) to build'
description: 'The Apache Spark version (e.g., 3.4.3) to build'
required: true
default: '3.4.2'
default: '3.4.3'
comet-version:
description: 'The Comet version to use for Spark'
required: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
matrix:
os: [ubuntu-latest]
java-version: [11]
spark-version: [{short: '3.4', full: '3.4.2'}]
spark-version: [{short: '3.4', full: '3.4.3'}]
module:
- {name: "catalyst", args1: "catalyst/test", args2: ""}
- {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
Expand Down
98 changes: 63 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,86 @@ under the License.

# Apache DataFusion Comet

Apache DataFusion Comet is an Apache Spark plugin that uses [Apache DataFusion](https://datafusion.apache.org/)
as native runtime to achieve improvement in terms of query efficiency and query runtime.
Apache DataFusion Comet is a high-performance accelerator for Apache Spark, built on top of the powerful
[Apache DataFusion](https://datafusion.apache.org) query engine. Comet is designed to significantly enhance the
performance of Apache Spark workloads while leveraging commodity hardware and seamlessly integrating with the
Spark ecosystem without requiring any code changes.

Comet runs Spark SQL queries using the native DataFusion runtime, which is
typically faster and more resource efficient than JVM based runtimes.
# Benefits of Using Comet

<a href="docs/source/_static/images/comet-overview.png"><img src="docs/source/_static/images/comet-system-diagram.png" align="center" width="500" ></a>
## Run Spark Queries at DataFusion Speeds

Comet aims to support:
Comet delivers a performance speedup for many queries, enabling faster data processing and shorter time-to-insights.

- a native Parquet implementation, including both reader and writer
- full implementation of Spark operators, including
Filter/Project/Aggregation/Join/Exchange etc.
- full implementation of Spark built-in expressions
- a UDF framework for users to migrate their existing UDF to native
The following chart shows the time it takes to run the 22 TPC-H queries against 100 GB of data in Parquet format
using a single executor with 8 cores. See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)
for details of the environment used for these benchmarks.

## Architecture
When using Comet, the overall run time is reduced from 649 seconds to 440 seconds, a 1.5x speedup.

The following diagram illustrates the architecture of Comet:
Running the same queries with DataFusion standalone (without Spark) using the same number of cores results in a 3.9x
speedup compared to Spark.

<a href="docs/source/_static/images/comet-overview.png"><img src="docs/source/_static/images/comet-overview.png" align="center" height="600" width="750" ></a>
Comet is not yet achieving full DataFusion speeds in all cases, but with future work we aim to provide a 2x-4x speedup
for many use cases.

## Current Status
![](docs/source/_static/images/tpch_allqueries.png)

The project is currently integrated into Apache Spark 3.2, 3.3, and 3.4.
Here is a breakdown showing relative performance of Spark, Comet, and DataFusion for each TPC-H query.

## Feature Parity with Apache Spark
![](docs/source/_static/images/tpch_queries_compare.png)

The project strives to keep feature parity with Apache Spark, that is,
users should expect the same behavior (w.r.t features, configurations,
query results, etc) with Comet turned on or turned off in their Spark
jobs. In addition, Comet extension should automatically detect unsupported
features and fallback to Spark engine.
The following chart shows how much Comet currently accelerates each query from the benchmark. Performance optimization
is an ongoing task, and we welcome contributions from the community to help achieve even greater speedups in the future.

To achieve this, besides unit tests within Comet itself, we also re-use
Spark SQL tests and make sure they all pass with Comet extension
enabled.
![](docs/source/_static/images/tpch_queries_speedup.png)

## Supported Platforms
These benchmarks can be reproduced in any environment using the documentation in the
[Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html). We encourage
you to run your own benchmarks.

Linux, Apple OSX (Intel and M1)
## Use Commodity Hardware

## Requirements
Comet leverages commodity hardware, eliminating the need for costly hardware upgrades or
specialized hardware accelerators, such as GPUs or FGPA. By maximizing the utilization of commodity hardware, Comet
ensures cost-effectiveness and scalability for your Spark deployments.

- Apache Spark 3.2, 3.3, or 3.4
- JDK 8, 11 and 17 (JDK 11 recommended because Spark 3.2 doesn't support 17)
- GLIBC 2.17 (Centos 7) and up
## Spark Compatibility

## Getting started
Comet aims for 100% compatibility with all supported versions of Apache Spark, allowing you to integrate Comet into
your existing Spark deployments and workflows seamlessly. With no code changes required, you can immediately harness
the benefits of Comet's acceleration capabilities without disrupting your Spark applications.

See the [DataFusion Comet User Guide](https://datafusion.apache.org/comet/user-guide/installation.html) for installation instructions.
## Tight Integration with Apache DataFusion

Comet tightly integrates with the core Apache DataFusion project, leveraging its powerful execution engine. With
seamless interoperability between Comet and DataFusion, you can achieve optimal performance and efficiency in your
Spark workloads.

## Active Community

Comet boasts a vibrant and active community of developers, contributors, and users dedicated to advancing the
capabilities of Apache DataFusion and accelerating the performance of Apache Spark.

## Getting Started

To get started with Apache DataFusion Comet, follow the
[installation instructions](https://datafusion.apache.org/comet/user-guide/installation.html). Join the
[DataFusion Slack and Discord channels](https://datafusion.apache.org/contributor-guide/communication.html) to connect
with other users, ask questions, and share your experiences with Comet.

## Contributing
See the [DataFusion Comet Contribution Guide](https://datafusion.apache.org/comet/contributor-guide/contributing.html)
for information on how to get started contributing to the project.

We welcome contributions from the community to help improve and enhance Apache DataFusion Comet. Whether it's fixing
bugs, adding new features, writing documentation, or optimizing performance, your contributions are invaluable in
shaping the future of Comet. Check out our
[contributor guide](https://datafusion.apache.org/comet/contributor-guide/contributing.html) to get started.

## License

Apache DataFusion Comet is licensed under the Apache License 2.0. See the [LICENSE.txt](LICENSE.txt) file for details.

## Acknowledgments

We would like to express our gratitude to the Apache DataFusion community for their support and contributions to
Comet. Together, we're building a faster, more efficient future for big data processing with Apache Spark.
73 changes: 73 additions & 0 deletions common/src/main/java/org/apache/arrow/c/CometSchemaImporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.arrow.c;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.Field;

/** This is a simple wrapper around SchemaImporter to make it accessible from Java Arrow. */
public class CometSchemaImporter {
private final BufferAllocator allocator;
private final SchemaImporter importer;
private final CDataDictionaryProvider provider = new CDataDictionaryProvider();

public CometSchemaImporter(BufferAllocator allocator) {
this.allocator = allocator;
this.importer = new SchemaImporter(allocator);
}

public BufferAllocator getAllocator() {
return allocator;
}

public CDataDictionaryProvider getProvider() {
return provider;
}

public Field importField(ArrowSchema schema) {
try {
return importer.importField(schema, provider);
} finally {
schema.release();
schema.close();
}
}

/**
* Imports data from ArrowArray/ArrowSchema into a FieldVector. This is basically the same as Java
* Arrow `Data.importVector`. `Data.importVector` initiates `SchemaImporter` internally which is
* used to fill dictionary ids for dictionary encoded vectors. Every call to `importVector` will
* begin with dictionary ids starting from 0. So, separate calls to `importVector` will overwrite
* dictionary ids. To avoid this, we need to use the same `SchemaImporter` instance for all calls
* to `importVector`.
*/
public FieldVector importVector(ArrowArray array, ArrowSchema schema) {
Field field = importField(schema);
FieldVector vector = field.createVector(allocator);
Data.importIntoVector(allocator, array, vector, provider);

return vector;
}

public void close() {
provider.close();
}
}
13 changes: 13 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/BatchReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.arrow.c.CometSchemaImporter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -88,6 +91,7 @@
*/
public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
protected static final BufferAllocator ALLOCATOR = new RootAllocator();

private Configuration conf;
private int capacity;
Expand All @@ -104,6 +108,7 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
private MessageType requestedSchema;
private CometVector[] vectors;
private AbstractColumnReader[] columnReaders;
private CometSchemaImporter importer;
private ColumnarBatch currentBatch;
private Future<Option<Throwable>> prefetchTask;
private LinkedBlockingQueue<Pair<PageReadStore, Long>> prefetchQueue;
Expand Down Expand Up @@ -515,6 +520,10 @@ public void close() throws IOException {
fileReader.close();
fileReader = null;
}
if (importer != null) {
importer.close();
importer = null;
}
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -552,6 +561,9 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
numRowGroupsMetric.add(1);
}

if (importer != null) importer.close();
importer = new CometSchemaImporter(ALLOCATOR);

List<ColumnDescriptor> columns = requestedSchema.getColumns();
for (int i = 0; i < columns.size(); i++) {
if (missingColumns[i]) continue;
Expand All @@ -564,6 +576,7 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
Utils.getColumnReader(
dataType,
columns.get(i),
importer,
capacity,
useDecimal128,
useLazyMaterialization,
Expand Down
37 changes: 20 additions & 17 deletions common/src/main/java/org/apache/comet/parquet/ColumnReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@

import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.CDataDictionaryProvider;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.c.CometSchemaImporter;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
Expand All @@ -53,7 +50,6 @@

public class ColumnReader extends AbstractColumnReader {
protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class);
protected static final BufferAllocator ALLOCATOR = new RootAllocator();

/**
* The current Comet vector holding all the values read by this column reader. Owned by this
Expand Down Expand Up @@ -89,18 +85,19 @@ public class ColumnReader extends AbstractColumnReader {
*/
boolean hadNull;

/** Dictionary provider for this column. */
private final CDataDictionaryProvider dictionaryProvider = new CDataDictionaryProvider();
private final CometSchemaImporter importer;

public ColumnReader(
DataType type,
ColumnDescriptor descriptor,
CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
this.batchSize = batchSize;
this.importer = importer;
initNative();
}

Expand Down Expand Up @@ -164,7 +161,6 @@ public void close() {
currentVector.close();
currentVector = null;
}
dictionaryProvider.close();
super.close();
}

Expand Down Expand Up @@ -209,10 +205,11 @@ public CometDecodedVector loadVector() {

try (ArrowArray array = ArrowArray.wrap(addresses[0]);
ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
FieldVector vector = Data.importVector(ALLOCATOR, array, schema, dictionaryProvider);
FieldVector vector = importer.importVector(array, schema);

DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();

CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128, isUuid);
CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
Expand All @@ -230,19 +227,25 @@ public CometDecodedVector loadVector() {
// return plain vector.
currentVector = cometVector;
return currentVector;
} else if (dictionary == null) {
// There is dictionary from native side but the Java side dictionary hasn't been
// initialized yet.
Dictionary arrowDictionary = dictionaryProvider.lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
}

// We should already re-initiate `CometDictionary` here because `Data.importVector` API will
// release the previous dictionary vector and create a new one.
Dictionary arrowDictionary = importer.getProvider().lookup(dictionaryEncoding.getId());
CometPlainVector dictionaryVector =
new CometPlainVector(arrowDictionary.getVector(), useDecimal128, isUuid);
if (dictionary != null) {
dictionary.setDictionaryVector(dictionaryVector);
} else {
dictionary = new CometDictionary(dictionaryVector);
}

currentVector =
new CometDictionaryVector(
cometVector, dictionary, dictionaryProvider, useDecimal128, false, isUuid);
cometVector, dictionary, importer.getProvider(), useDecimal128, false, isUuid);

currentVector =
new CometDictionaryVector(cometVector, dictionary, importer.getProvider(), useDecimal128);
return currentVector;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;

import org.apache.arrow.c.CometSchemaImporter;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
import org.apache.spark.sql.types.DataType;
Expand All @@ -45,10 +46,11 @@ public class LazyColumnReader extends ColumnReader {
public LazyColumnReader(
DataType sparkReadType,
ColumnDescriptor descriptor,
CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
super(sparkReadType, descriptor, batchSize, useDecimal128, useLegacyDateTimestamp);
super(sparkReadType, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
this.batchSize = 0; // the batch size is set later in `readBatch`
this.vector = new CometLazyVector(sparkReadType, this, useDecimal128);
}
Expand Down
Loading

0 comments on commit 3a36f2e

Please sign in to comment.