Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Update README to highlight Comet benefits #497

Merged
merged 6 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 63 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,86 @@ under the License.

# Apache DataFusion Comet

Apache DataFusion Comet is an Apache Spark plugin that uses [Apache DataFusion](https://datafusion.apache.org/)
as native runtime to achieve improvement in terms of query efficiency and query runtime.
Apache DataFusion Comet is a high-performance accelerator for Apache Spark, built on top of the powerful
[Apache DataFusion](https://datafusion.apache.org) query engine. Comet is designed to significantly enhance the
performance of Apache Spark workloads while leveraging commodity hardware and seamlessly integrating with the
Spark ecosystem without requiring any code changes.

Comet runs Spark SQL queries using the native DataFusion runtime, which is
typically faster and more resource efficient than JVM based runtimes.
# Benefits of Using Comet

<a href="docs/source/_static/images/comet-overview.png"><img src="docs/source/_static/images/comet-system-diagram.png" align="center" width="500" ></a>
## Run Spark Queries at DataFusion Speeds

Comet aims to support:
Comet delivers a performance speedup for many queries, enabling faster data processing and shorter time-to-insights.

- a native Parquet implementation, including both reader and writer
- full implementation of Spark operators, including
Filter/Project/Aggregation/Join/Exchange etc.
- full implementation of Spark built-in expressions
- a UDF framework for users to migrate their existing UDF to native
The following chart shows the time it takes to run the 22 TPC-H queries against 100 GB of data in Parquet format
using a single executor with 8 cores. See the [Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html)
for details of the environment used for these benchmarks.

## Architecture
When using Comet, the overall run time is reduced from 649 seconds to 440 seconds, a 1.5x speedup.

The following diagram illustrates the architecture of Comet:
Running the same queries with DataFusion standalone (without Spark) using the same number of cores results in a 3.9x
speedup compared to Spark.

<a href="docs/source/_static/images/comet-overview.png"><img src="docs/source/_static/images/comet-overview.png" align="center" height="600" width="750" ></a>
Comet is not yet achieving full DataFusion speeds in all cases, but with future work we aim to provide a 2x-4x speedup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if pure DF gives 3.9x, is that possible Comet built on top of DF to give 4x?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was rounding up to the nearest whole percent here, but of course it will be challenging. DataFusion isn't performing shuffle operations, but I think that DataFusion performance sets a hard limit on what we can do with Comet, so 2-4x seems to be our expected range (unless there are future optimizations in DataFusion).

for many use cases.

## Current Status
![](docs/source/_static/images/tpch_allqueries.png)
kazuyukitanimura marked this conversation as resolved.
Show resolved Hide resolved

The project is currently integrated into Apache Spark 3.2, 3.3, and 3.4.
Here is a breakdown showing relative performance of Spark, Comet, and DataFusion for each TPC-H query.

## Feature Parity with Apache Spark
![](docs/source/_static/images/tpch_queries_compare.png)

The project strives to keep feature parity with Apache Spark, that is,
users should expect the same behavior (w.r.t features, configurations,
query results, etc) with Comet turned on or turned off in their Spark
jobs. In addition, Comet extension should automatically detect unsupported
features and fallback to Spark engine.
The following chart shows how much Comet currently accelerates each query from the benchmark. Performance optimization
is an ongoing task, and we welcome contributions from the community to help achieve even greater speedups in the future.

To achieve this, besides unit tests within Comet itself, we also re-use
Spark SQL tests and make sure they all pass with Comet extension
enabled.
![](docs/source/_static/images/tpch_queries_speedup.png)

## Supported Platforms
These benchmarks can be reproduced in any environment using the documentation in the
[Comet Benchmarking Guide](https://datafusion.apache.org/comet/contributor-guide/benchmarking.html). We encourage
you to run your own benchmarks.

Linux, Apple OSX (Intel and M1)
## Use Commodity Hardware

## Requirements
Comet leverages commodity hardware, eliminating the need for costly hardware upgrades or
specialized hardware accelerators, such as GPUs or FGPA. By maximizing the utilization of commodity hardware, Comet
ensures cost-effectiveness and scalability for your Spark deployments.

- Apache Spark 3.2, 3.3, or 3.4
- JDK 8, 11 and 17 (JDK 11 recommended because Spark 3.2 doesn't support 17)
- GLIBC 2.17 (Centos 7) and up
## Spark Compatibility

## Getting started
Comet aims for 100% compatibility with all supported versions of Apache Spark, allowing you to integrate Comet into
your existing Spark deployments and workflows seamlessly. With no code changes required, you can immediately harness
the benefits of Comet's acceleration capabilities without disrupting your Spark applications.

See the [DataFusion Comet User Guide](https://datafusion.apache.org/comet/user-guide/installation.html) for installation instructions.
## Tight Integration with Apache DataFusion

Comet tightly integrates with the core Apache DataFusion project, leveraging its powerful execution engine. With
seamless interoperability between Comet and DataFusion, you can achieve optimal performance and efficiency in your
Spark workloads.

## Active Community

Comet boasts a vibrant and active community of developers, contributors, and users dedicated to advancing the
capabilities of Apache DataFusion and accelerating the performance of Apache Spark.

## Getting Started

To get started with Apache DataFusion Comet, follow the
[installation instructions](https://datafusion.apache.org/comet/user-guide/installation.html). Join the
[DataFusion Slack and Discord channels](https://datafusion.apache.org/contributor-guide/communication.html) to connect
with other users, ask questions, and share your experiences with Comet.

## Contributing
See the [DataFusion Comet Contribution Guide](https://datafusion.apache.org/comet/contributor-guide/contributing.html)
for information on how to get started contributing to the project.

We welcome contributions from the community to help improve and enhance Apache DataFusion Comet. Whether it's fixing
bugs, adding new features, writing documentation, or optimizing performance, your contributions are invaluable in
shaping the future of Comet. Check out our
[contributor guide](https://datafusion.apache.org/comet/contributor-guide/contributing.html) to get started.

## License

Apache DataFusion Comet is licensed under the Apache License 2.0. See the [LICENSE.txt](LICENSE.txt) file for details.

## Acknowledgments

We would like to express our gratitude to the Apache DataFusion community for their support and contributions to
Comet. Together, we're building a faster, more efficient future for big data processing with Apache Spark.
Binary file added docs/source/_static/images/tpch_allqueries.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
{
"engine": "datafusion-comet",
"benchmark": "tpch",
"data_path": "/mnt/bigdata/tpch/sf100/",
"query_path": "../../tpch/queries",
"spark_conf": {
"spark.comet.explainFallback.enabled": "true",
"spark.jars": "file:///home/andy/git/apache/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.comet.cast.allowIncompatible": "true",
"spark.executor.extraClassPath": "/home/andy/git/apache/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.executor.memory": "8G",
"spark.comet.exec.shuffle.enabled": "true",
"spark.app.name": "DataFusion Comet Benchmark derived from TPC-H / TPC-DS",
"spark.driver.port": "36573",
"spark.sql.adaptive.coalescePartitions.enabled": "false",
"spark.app.startTime": "1716923498046",
"spark.comet.batchSize": "8192",
"spark.app.id": "app-20240528131138-0043",
"spark.serializer.objectStreamReset": "100",
"spark.app.initial.jar.urls": "spark://woody.lan:36573/jars/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.submit.deployMode": "client",
"spark.sql.autoBroadcastJoinThreshold": "-1",
"spark.comet.exec.all.enabled": "true",
"spark.eventLog.enabled": "false",
"spark.driver.host": "woody.lan",
"spark.driver.extraJavaOptions": "-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false",
"spark.sql.warehouse.dir": "file:/home/andy/git/apache/datafusion-benchmarks/runners/datafusion-comet/spark-warehouse",
"spark.shuffle.manager": "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager",
"spark.comet.exec.enabled": "true",
"spark.repl.local.jars": "file:///home/andy/git/apache/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.executor.id": "driver",
"spark.master": "spark://woody:7077",
"spark.executor.instances": "8",
"spark.comet.exec.shuffle.mode": "auto",
"spark.sql.extensions": "org.apache.comet.CometSparkSessionExtensions",
"spark.driver.memory": "8G",
"spark.driver.extraClassPath": "/home/andy/git/apache/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.rdd.compress": "True",
"spark.executor.extraJavaOptions": "-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false",
"spark.cores.max": "8",
"spark.comet.enabled": "true",
"spark.app.submitTime": "1716923497738",
"spark.submit.pyFiles": "",
"spark.executor.cores": "1",
"spark.comet.parquet.io.enabled": "false"
},
"1": [
32.121661901474,
27.997092485427856,
27.756758451461792,
28.55236315727234,
28.332542181015015
],
"2": [
18.269107580184937,
16.200955629348755,
16.194639682769775,
16.745808839797974,
16.59864115715027
],
"3": [
17.265466690063477,
17.069786310195923,
17.12887978553772,
19.33678102493286,
18.182055234909058
],
"4": [
8.367004156112671,
8.172023296356201,
8.023266077041626,
8.350765228271484,
8.258736610412598
],
"5": [
34.10048794746399,
32.69314408302307,
33.21383595466614,
36.391114473342896,
39.00048065185547
],
"6": [
3.1693499088287354,
3.044705390930176,
3.047694206237793,
3.2817511558532715,
3.274174928665161
],
"7": [
25.369214296340942,
24.020941257476807,
24.0787034034729,
28.47402787208557,
28.23443365097046
],
"8": [
40.06126809120178,
39.828824281692505,
45.250510454177856,
44.406742572784424,
48.98451232910156
],
"9": [
62.822797775268555,
61.26328158378601,
64.95581865310669,
69.51708793640137,
73.52380013465881
],
"10": [
20.55334782600403,
20.546096324920654,
20.57452392578125,
22.84211039543152,
23.724371671676636
],
"11": [
11.068235158920288,
10.715423822402954,
11.353424310684204,
11.37632942199707,
11.530814170837402
],
"12": [
10.264788389205933,
8.67864990234375,
8.845952033996582,
8.593009233474731,
8.540803909301758
],
"13": [
9.603406190872192,
9.648627042770386,
13.040799140930176,
10.154011249542236,
9.716034412384033
],
"14": [
6.20926308631897,
6.0385496616363525,
7.674488544464111,
10.53052043914795,
7.661675691604614
],
"15": [
11.466301918029785,
11.473632097244263,
11.279382228851318,
13.291078329086304,
12.81026816368103
],
"16": [
8.096073865890503,
7.73410701751709,
7.742897272109985,
8.477537631988525,
7.821273326873779
],
"17": [
43.69264578819275,
43.33040428161621,
46.291987657547,
54.654345989227295,
54.37124800682068
],
"18": [
27.205485105514526,
26.785916090011597,
27.331408262252808,
29.946768760681152,
28.037617444992065
],
"19": [
8.100102186203003,
7.845783472061157,
8.52329158782959,
8.907397985458374,
9.13755488395691
],
"20": [
13.09695029258728,
12.683861255645752,
15.612725019454956,
13.361177206039429,
16.614356517791748
],
"21": [
43.69623780250549,
43.26758122444153,
46.91650056838989,
47.875754833221436,
57.9763662815094
],
"22": [
4.5090577602386475,
4.420571804046631,
4.639787673950195,
5.118046998977661,
5.017346143722534
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"engine": "datafusion-python",
"datafusion-version": "38.0.1",
"benchmark": "tpch",
"data_path": "/mnt/bigdata/tpch/sf100/",
"query_path": "../../tpch/queries/",
"1": [
7.410699844360352
],
"2": [
2.966364622116089
],
"3": [
3.988652467727661
],
"4": [
1.8821499347686768
],
"5": [
6.957948684692383
],
"6": [
1.779731273651123
],
"7": [
14.559604167938232
],
"8": [
7.062309265136719
],
"9": [
14.908353805541992
],
"10": [
7.73533296585083
],
"11": [
2.346423387527466
],
"12": [
2.7248904705047607
],
"13": [
6.38663387298584
],
"14": [
2.4675676822662354
],
"15": [
4.799000024795532
],
"16": [
1.9091999530792236
],
"17": [
19.230653762817383
],
"18": [
25.15683078765869
],
"19": [
4.2268781661987305
],
"20": [
8.66620659828186
],
"21": [
17.696006059646606
],
"22": [
1.3805692195892334
]
}
Loading