Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into support_collect_limit…
Browse files Browse the repository at this point in the history
…_exec
  • Loading branch information
advancedxy committed Feb 28, 2024
2 parents e15852f + 4d103b8 commit 5543837
Show file tree
Hide file tree
Showing 14 changed files with 202 additions and 87 deletions.
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,49 @@ Linux, Apple OSX (Intel and M1)
- Apache Spark 3.2, 3.3, or 3.4
- JDK 8 and up
- GLIBC 2.17 (Centos 7) and up

## Getting started

Make sure the requirements above are met and software installed on your machine

### Clone repo
```commandline
git clone https://github.com/apache/arrow-datafusion-comet.git
```

### Specify the Spark version and build the Comet
Spark 3.4 used for the example.
```
cd arrow-datafusion-comet
make release PROFILES="-Pspark-3.4"
```

### Run Spark with Comet enabled
Make sure `SPARK_HOME` points to the same Spark version as Comet has built for.

```
$SPARK_HOME/bin/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true
```

### Verify Comet enabled for Spark SQL query

Create a test Parquet source
```scala
scala> (0 until 10).toDF("a").write.mode("overwrite").parquet("/tmp/test")
```

Query the data from the test source and check:
- INFO message shows the native Comet library has been initialized.
- The query plan reflects Comet operators being used for this query instead of Spark ones
```scala
scala> spark.read.parquet("/tmp/test").createOrReplaceTempView("t1"); spark.sql("select * from t1 where a > 5").explain
INFO src/lib.rs: Comet native library initialized
== Physical Plan ==
*(1) ColumnarToRow
+- CometFilter [a#14], (isnotnull(a#14) AND (a#14 > 5))
+- CometScan parquet [a#14] Batched: true, DataFilters: [isnotnull(a#14), (a#14 > 5)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/test], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct<a:int>
```
93 changes: 65 additions & 28 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
datafusion-common = { version = "35.0.0" }
datafusion = { default-features = false, version = "35.0.0", features = ["unicode_expressions"] }
datafusion-physical-expr = { version = "35.0.0", default-features = false , features = ["unicode_expressions"] }
datafusion-common = { version = "36.0.0" }
datafusion = { default-features = false, version = "36.0.0", features = ["unicode_expressions"] }
datafusion-physical-expr = { version = "36.0.0", default-features = false , features = ["unicode_expressions"] }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
Expand Down
2 changes: 2 additions & 0 deletions core/benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub fn create_int64_array(size: usize, null_density: f32, min: i64, max: i64) ->
.collect()
}

#[allow(dead_code)]
pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
where
T: ArrowPrimitiveType,
Expand All @@ -64,6 +65,7 @@ where

/// Creates a dictionary with random keys and values, with value type `T`.
/// Note here the keys are the dictionary indices.
#[allow(dead_code)]
pub fn create_dictionary_array<T>(
size: usize,
value_size: usize,
Expand Down
12 changes: 6 additions & 6 deletions core/src/execution/datafusion/expressions/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use arrow_array::{
Array, ArrayRef, ArrowNumericType, Int64Array, PrimitiveArray,
};
use arrow_schema::{DataType, Field};
use datafusion::logical_expr::{type_coercion::aggregates::avg_return_type, Accumulator};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{
expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
use datafusion::logical_expr::{
type_coercion::aggregates::avg_return_type, Accumulator, EmitTo, GroupsAccumulator,
};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
use std::{any::Any, sync::Arc};

use arrow_array::ArrowNativeTypeOp;
Expand Down Expand Up @@ -146,7 +146,7 @@ pub struct AvgAccumulator {
}

impl Accumulator for AvgAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::Float64(self.sum),
ScalarValue::from(self.count),
Expand Down Expand Up @@ -175,7 +175,7 @@ impl Accumulator for AvgAccumulator {
Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Float64(
self.sum.map(|f| f / self.count as f64),
))
Expand Down
10 changes: 4 additions & 6 deletions core/src/execution/datafusion/expressions/avg_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ use arrow_array::{
Array, ArrayRef, Decimal128Array, Int64Array, PrimitiveArray,
};
use arrow_schema::{DataType, Field};
use datafusion::logical_expr::Accumulator;
use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{
expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
};
use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
use std::{any::Any, sync::Arc};

use arrow_array::ArrowNativeTypeOp;
Expand Down Expand Up @@ -214,7 +212,7 @@ impl AvgDecimalAccumulator {
}

impl Accumulator for AvgDecimalAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::Decimal128(self.sum, self.sum_precision, self.sum_scale),
ScalarValue::from(self.count),
Expand Down Expand Up @@ -266,7 +264,7 @@ impl Accumulator for AvgDecimalAccumulator {
Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
fn evaluate(&mut self) -> Result<ScalarValue> {
fn make_decimal128(value: Option<i128>, precision: u8, scale: i8) -> ScalarValue {
ScalarValue::Decimal128(value, precision, scale)
}
Expand Down
Loading

0 comments on commit 5543837

Please sign in to comment.