Skip to content

Commit

Permalink
Use DataFusion repo
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 30, 2024
1 parent 7c96fa4 commit 61072d1
Show file tree
Hide file tree
Showing 17 changed files with 209 additions and 177 deletions.
171 changes: 112 additions & 59 deletions core/Cargo.lock

Large diffs are not rendered by default.

24 changes: 13 additions & 11 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ include = [

[dependencies]
parquet-format = "4.0.0" # This must be kept in sync with that from parquet crate
arrow = { git = "https://github.com/viirya/arrow-rs.git", rev = "d176dbb", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { git = "https://github.com/viirya/arrow-rs.git", rev = "d176dbb" }
arrow-data = { git = "https://github.com/viirya/arrow-rs.git", rev = "d176dbb" }
arrow-schema = { git = "https://github.com/viirya/arrow-rs.git", rev = "d176dbb" }
arrow-string = { git = "https://github.com/viirya/arrow-rs.git", rev = "d176dbb" }
parquet = { git = "https://github.com/viirya/arrow-rs.git", rev = "d176dbb", default-features = false, features = ["experimental"] }
half = { version = "~2.1", default-features = false }
arrow = { version = "51.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "51.0.0" }
arrow-data = { version = "51.0.0" }
arrow-schema = { version = "51.0.0" }
arrow-string = { version = "51.0.0" }
parquet = { version = "51.0.0", default-features = false, features = ["experimental"] }
half = { version = "2.4.1", default-features = false }
futures = "0.3.28"
mimalloc = { version = "*", default-features = false, optional = true }
tokio = { version = "1", features = ["rt-multi-thread"] }
Expand Down Expand Up @@ -66,10 +66,12 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "82b6b81" }
datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "82b6b81", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "82b6b81", features = ["crypto_expressions"] }
datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "82b6b81", default-features = false, features = ["unicode_expressions"] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "ad2b1dc" }
datafusion = { default-features = false, git = "https://github.com/apache/arrow-datafusion.git", rev = "ad2b1dc", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "ad2b1dc", features = ["crypto_expressions"] }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "ad2b1dc", default-features = false }
datafusion-physical-expr-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "ad2b1dc", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "ad2b1dc", default-features = false }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/expressions/bitwise_not.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl PhysicalExpr for BitwiseNotExpr {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.arg.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.arg]
}

fn with_new_children(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ impl PhysicalExpr for BloomFilterMightContain {
})
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.bloom_filter_expr.clone(), self.value_expr.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.bloom_filter_expr, &self.value_expr]
}

fn with_new_children(
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1291,8 +1291,8 @@ impl PhysicalExpr for Cast {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/expressions/checkoverflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl PhysicalExpr for CheckOverflow {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
8 changes: 2 additions & 6 deletions core/src/execution/datafusion/expressions/if_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,8 @@ impl PhysicalExpr for IfExpr {
Ok(ColumnarValue::Array(current_value))
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![
self.if_expr.clone(),
self.true_expr.clone(),
self.false_expr.clone(),
]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.if_expr, &self.true_expr, &self.false_expr]
}

fn with_new_children(
Expand Down
2 changes: 1 addition & 1 deletion core/src/execution/datafusion/expressions/normalize_nan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl PhysicalExpr for NormalizeNaNAndZero {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
self.child.children()
}

Expand Down
37 changes: 13 additions & 24 deletions core/src/execution/datafusion/expressions/scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::{
any::Any,
cmp::min,
fmt::{Debug, Write},
str::FromStr,
sync::Arc,
};

Expand All @@ -35,17 +34,15 @@ use arrow_array::{Array, ArrowNativeTypeOp, Decimal128Array, StringArray};
use arrow_schema::DataType;
use datafusion::{
execution::FunctionRegistry,
logical_expr::{
BuiltinScalarFunction, ScalarFunctionDefinition, ScalarFunctionImplementation,
ScalarUDFImpl, Signature, Volatility,
},
functions::math::round::round,
logical_expr::{ScalarFunctionImplementation, ScalarUDFImpl, Signature, Volatility},
physical_plan::ColumnarValue,
};
use datafusion_common::{
cast::{as_binary_array, as_generic_string_array},
exec_err, internal_err, DataFusionError, Result as DataFusionResult, ScalarValue,
};
use datafusion_physical_expr::{math_expressions, udf::ScalarUDF};
use datafusion_expr::ScalarUDF;
use num::{
integer::{div_ceil, div_floor},
BigInt, Signed, ToPrimitive,
Expand All @@ -63,9 +60,7 @@ macro_rules! make_comet_scalar_udf {
$data_type.clone(),
Arc::new(move |args| $func(args, &$data_type)),
);
Ok(ScalarFunctionDefinition::UDF(Arc::new(
ScalarUDF::new_from_impl(scalar_func),
)))
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
}};
($name:expr, $func:expr, without $data_type:ident) => {{
let scalar_func = CometScalarFunction::new(
Expand All @@ -74,9 +69,7 @@ macro_rules! make_comet_scalar_udf {
$data_type,
$func,
);
Ok(ScalarFunctionDefinition::UDF(Arc::new(
ScalarUDF::new_from_impl(scalar_func),
)))
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
}};
}

Expand All @@ -85,7 +78,7 @@ pub fn create_comet_physical_fun(
fun_name: &str,
data_type: DataType,
registry: &dyn FunctionRegistry,
) -> Result<ScalarFunctionDefinition, DataFusionError> {
) -> Result<Arc<ScalarUDF>, DataFusionError> {
let sha2_functions = ["sha224", "sha256", "sha384", "sha512"];
match fun_name {
"ceil" => {
Expand Down Expand Up @@ -129,13 +122,11 @@ pub fn create_comet_physical_fun(
let spark_func_name = "spark".to_owned() + sha;
make_comet_scalar_udf!(spark_func_name, wrapped_func, without data_type)
}
_ => {
if let Ok(fun) = BuiltinScalarFunction::from_str(fun_name) {
Ok(ScalarFunctionDefinition::BuiltIn(fun))
} else {
Ok(ScalarFunctionDefinition::UDF(registry.udf(fun_name)?))
}
}
_ => registry.udf(fun_name).map_err(|e| {
DataFusionError::Execution(format!(
"Function {fun_name} not found in the registry: {e}",
))
}),
}
}

Expand Down Expand Up @@ -498,9 +489,7 @@ fn spark_round(
make_decimal_array(array, precision, scale, &f)
}
DataType::Float32 | DataType::Float64 => {
Ok(ColumnarValue::Array(math_expressions::round(&[
array.clone()
])?))
Ok(ColumnarValue::Array(round(&[array.clone()])?))
}
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
Expand All @@ -523,7 +512,7 @@ fn spark_round(
make_decimal_scalar(a, precision, scale, &f)
}
ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar(
ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?,
ScalarValue::try_from_array(&round(&[a.to_array()?])?, 0)?,
)),
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
Expand Down
12 changes: 6 additions & 6 deletions core/src/execution/datafusion/expressions/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ macro_rules! make_predicate_function {
Ok(ColumnarValue::Array(Arc::new(array)))
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.left.clone(), self.right.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.left, &self.right]
}

fn with_new_children(
Expand Down Expand Up @@ -221,8 +221,8 @@ impl PhysicalExpr for SubstringExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -286,8 +286,8 @@ impl PhysicalExpr for StringSpaceExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
2 changes: 1 addition & 1 deletion core/src/execution/datafusion/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl PhysicalExpr for Subquery {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}

Expand Down
20 changes: 10 additions & 10 deletions core/src/execution/datafusion/expressions/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ impl PhysicalExpr for HourExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -205,8 +205,8 @@ impl PhysicalExpr for MinuteExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -299,8 +299,8 @@ impl PhysicalExpr for SecondExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -386,8 +386,8 @@ impl PhysicalExpr for DateTruncExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -511,8 +511,8 @@ impl PhysicalExpr for TimestampTruncExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ impl ExecutionPlan for CometExpandExec {
self.schema.clone()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
Loading

0 comments on commit 61072d1

Please sign in to comment.