Comet is an Apache Spark plugin that uses Apache Arrow DataFusion as native runtime to achieve improvement in terms of query efficiency and query runtime.
Comet runs Spark SQL queries using the native DataFusion runtime, which is typically faster and more resource efficient than JVM based runtimes.
Comet aims to support:
- a native Parquet implementation, including both reader and writer
- full implementation of Spark operators, including Filter/Project/Aggregation/Join/Exchange etc.
- full implementation of Spark built-in expressions
- a UDF framework for users to migrate their existing UDF to native
The following diagram illustrates the architecture of Comet:
The project is currently integrated into Apache Spark 3.2, 3.3, and 3.4.
The project strives to keep feature parity with Apache Spark, that is, users should expect the same behavior (w.r.t features, configurations, query results, etc) with Comet turned on or turned off in their Spark jobs. In addition, Comet extension should automatically detect unsupported features and fallback to Spark engine.
To achieve this, besides unit tests within Comet itself, we also re-use Spark SQL tests and make sure they all pass with Comet extension enabled.
Linux, Apple OSX (Intel and M1)
- Apache Spark 3.2, 3.3, or 3.4
- JDK 8 and up
- GLIBC 2.17 (Centos 7) and up
Make sure the requirements above are met and software installed on your machine
git clone https://github.com/apache/arrow-datafusion-comet.git
Spark 3.4 used for the example.
cd arrow-datafusion-comet
make release PROFILES="-Pspark-3.4"
Make sure SPARK_HOME
points to the same Spark version as Comet has built for.
$SPARK_HOME/bin/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true
Create a test Parquet source
scala> (0 until 10).toDF("a").write.mode("overwrite").parquet("/tmp/test")
Query the data from the test source and check:
- INFO message shows the native Comet library has been initialized.
- The query plan reflects Comet operators being used for this query instead of Spark ones
scala> spark.read.parquet("/tmp/test").createOrReplaceTempView("t1")
scala> spark.sql("select * from t1 where a > 5").explain
INFO src/lib.rs: Comet native library initialized
== Physical Plan ==
*(1) ColumnarToRow
+- CometFilter [a#14], (isnotnull(a#14) AND (a#14 > 5))
+- CometScan parquet [a#14] Batched: true, DataFilters: [isnotnull(a#14), (a#14 > 5)],
Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/test], PartitionFilters: [],
PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct<a:int>
Comet shuffle feature is disabled by default. To enable it, please add related configs:
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
--conf spark.comet.exec.shuffle.enabled=true
Above configs enable Comet native shuffle which only supports hash partiting and single partition. Comet native shuffle doesn't support complext types yet.
To enable columnar shuffle which supports all partitioning and basic complex types, one more config is required:
--conf spark.comet.columnar.shuffle.enabled=true