Skip to content

Commit

Permalink
add new datafusion-comet-expr crate
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jul 6, 2024
1 parent e7fff73 commit c900020
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 52 deletions.
12 changes: 12 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 24 additions & 1 deletion native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
# under the License.

[workspace]
members = ["core"]
members = ["core", "spark-expr"]
resolver = "2"

[workspace.package]
version = "0.1.0"
Expand All @@ -30,3 +31,25 @@ edition = "2021"

# Comet uses the same minimum Rust version as DataFusion
rust-version = "1.75"

[workspace.dependencies]
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
arrow-data = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
arrow-string = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1" }
datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", features = ["crypto_expressions"] }
datafusion-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", default-features = false }
datafusion-physical-expr-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", default-features = false }
datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", default-features = false }

[profile.release]
debug = true
overflow-checks = false
lto = "thin"
codegen-units = 1
strip = "debuginfo"

32 changes: 13 additions & 19 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ include = [

[dependencies]
parquet-format = "4.0.0" # This must be kept in sync with that from parquet crate
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
arrow-data = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
arrow-string = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1" }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
arrow-string = { workspace = true }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1", default-features = false, features = ["experimental"] }
half = { version = "2.4.1", default-features = false }
futures = "0.3.28"
Expand Down Expand Up @@ -71,17 +71,18 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1" }
datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", features = ["crypto_expressions"] }
datafusion-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", default-features = false }
datafusion-physical-expr-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", default-features = false }
datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", default-features = false }
datafusion-common = { workspace = true }
datafusion = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
crc32fast = "1.3.2"
simd-adler32 = "0.3.7"
datafusion-comet-expr = { path = "../spark-expr" }

[build-dependencies]
prost-build = "0.9.0"
Expand All @@ -98,13 +99,6 @@ twox-hash = "1.6.3"
[features]
default = []

[profile.release]
debug = true
overflow-checks = false
lto = "thin"
codegen-units = 1
strip = "debuginfo"

[lib]
name = "comet"
# "rlib" is for benchmarking with criterion.
Expand Down
23 changes: 2 additions & 21 deletions native/core/src/execution/datafusion/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ pub mod if_expr;
mod normalize_nan;
pub mod scalar_funcs;
pub use normalize_nan::NormalizeNaNAndZero;
use prost::DecodeError;

use crate::{errors::CometError, execution::spark_expression};
pub mod abs;
use crate::errors::CometError;
pub mod avg;
pub mod avg_decimal;
pub mod bloom_filter_might_contain;
Expand All @@ -46,24 +44,7 @@ mod utils;
pub mod variance;
pub mod xxhash64;

#[derive(Debug, Hash, PartialEq, Clone, Copy)]
pub enum EvalMode {
Legacy,
Ansi,
Try,
}

impl TryFrom<i32> for EvalMode {
type Error = DecodeError;

fn try_from(value: i32) -> Result<Self, Self::Error> {
match spark_expression::EvalMode::try_from(value)? {
spark_expression::EvalMode::Legacy => Ok(EvalMode::Legacy),
spark_expression::EvalMode::Try => Ok(EvalMode::Try),
spark_expression::EvalMode::Ansi => Ok(EvalMode::Ansi),
}
}
}
pub use datafusion_comet_expr::EvalMode;

fn arithmetic_overflow_error(from_type: &str) -> CometError {
CometError::ArithmeticOverflow {
Expand Down
15 changes: 12 additions & 3 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ use crate::{
},
};

use super::expressions::{abs::CometAbsFunc, create_named_struct::CreateNamedStruct, EvalMode};
use super::expressions::{create_named_struct::CreateNamedStruct, EvalMode};
use datafusion_comet_expr::abs::CometAbsFunc;

// For clippy error on type_complexity.
type ExecResult<T> = Result<T, ExecutionError>;
Expand Down Expand Up @@ -365,7 +366,7 @@ impl PhysicalPlanner {
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
let timezone = expr.timezone.clone();
let eval_mode = expr.eval_mode.try_into()?;
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;

Ok(Arc::new(Cast::new(child, datatype, eval_mode, timezone)))
}
Expand Down Expand Up @@ -504,7 +505,7 @@ impl PhysicalPlanner {
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?;
let return_type = child.data_type(&input_schema)?;
let args = vec![child];
let eval_mode = expr.eval_mode.try_into()?;
let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?;
let comet_abs = Arc::new(ScalarUDF::new_from_impl(CometAbsFunc::new(
eval_mode,
return_type.to_string(),
Expand Down Expand Up @@ -1743,6 +1744,14 @@ fn rewrite_physical_expr(
Ok(expr.rewrite(&mut rewriter).data()?)
}

fn from_protobuf_eval_mode(value: i32) -> Result<EvalMode, prost::DecodeError> {
match spark_expression::EvalMode::try_from(value)? {
spark_expression::EvalMode::Legacy => Ok(EvalMode::Legacy),
spark_expression::EvalMode::Try => Ok(EvalMode::Try),
spark_expression::EvalMode::Ansi => Ok(EvalMode::Ansi),
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, task::Poll};
Expand Down
38 changes: 38 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-comet-expr"
version = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
authors = { workspace = true }
description = "Apache DataFusion Comet: High performance accelerator for Apache Spark"
readme = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

[dependencies]
arrow = { workspace = true }
arrow-schema = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-functions = { workspace = true }

[lib]
name = "datafusion_comet_expr"
path = "src/lib.rs"
22 changes: 22 additions & 0 deletions native/spark-expr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Apache DataFusion Comet: Spark-compatible Expressions

This crate provides Spark-compatible expressions for use with DataFusion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, sync::Arc};

use arrow::datatypes::DataType;
use arrow_schema::ArrowError;

use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature};
use datafusion_common::DataFusionError;
use datafusion_functions::math;
use std::{any::Any, sync::Arc};

use crate::execution::operators::ExecutionError;

use super::{arithmetic_overflow_error, EvalMode};
use super::EvalMode;

#[derive(Debug)]
pub struct CometAbsFunc {
Expand All @@ -34,15 +34,15 @@ pub struct CometAbsFunc {
}

impl CometAbsFunc {
pub fn new(eval_mode: EvalMode, data_type_name: String) -> Result<Self, ExecutionError> {
pub fn new(eval_mode: EvalMode, data_type_name: String) -> Result<Self, DataFusionError> {
if let EvalMode::Legacy | EvalMode::Ansi = eval_mode {
Ok(Self {
inner_abs_func: math::abs().inner(),
eval_mode,
data_type_name,
})
} else {
Err(ExecutionError::GeneralError(format!(
Err(DataFusionError::Execution(format!(
"Invalid EvalMode: \"{:?}\"",
eval_mode
)))
Expand Down Expand Up @@ -74,9 +74,8 @@ impl ScalarUDFImpl for CometAbsFunc {
if self.eval_mode == EvalMode::Legacy {
Ok(args[0].clone())
} else {
let msg = arithmetic_overflow_error(&self.data_type_name).to_string();
Err(DataFusionError::ArrowError(
ArrowError::ComputeError(msg),
ArrowError::ComputeError(format!("overflow from {}", self.data_type_name)),
trace,
))
}
Expand Down
24 changes: 24 additions & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod abs;
#[derive(Debug, Hash, PartialEq, Clone, Copy)]
pub enum EvalMode {
Legacy,
Ansi,
Try,
}

0 comments on commit c900020

Please sign in to comment.