Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into spark-4.0-spark-test
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuyukitanimura committed Jun 8, 2024
2 parents b980c0e + 311e13e commit 69f9cb2
Show file tree
Hide file tree
Showing 40 changed files with 829 additions and 2,944 deletions.
240 changes: 159 additions & 81 deletions core/Cargo.lock

Large diffs are not rendered by default.

25 changes: 14 additions & 11 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ include = [

[dependencies]
parquet-format = "4.0.0" # This must be kept in sync with that from parquet crate
arrow = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
arrow-data = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
arrow-schema = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
arrow-string = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c" }
parquet = { git = "https://github.com/viirya/arrow-rs.git", rev = "3f1ae0c", default-features = false, features = ["experimental"] }
half = { version = "~2.1", default-features = false }
arrow = { version = "52.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "52.0.0" }
arrow-buffer = { version = "52.0.0" }
arrow-data = { version = "52.0.0" }
arrow-schema = { version = "52.0.0" }
arrow-string = { version = "52.0.0" }
parquet = { version = "52.0.0", default-features = false, features = ["experimental"] }
half = { version = "2.4.1", default-features = false }
futures = "0.3.28"
mimalloc = { version = "*", default-features = false, optional = true }
tokio = { version = "1", features = ["rt-multi-thread"] }
Expand Down Expand Up @@ -66,10 +67,12 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4" }
datafusion = { default-features = false, git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", features = ["crypto_expressions"]}
datafusion-physical-expr = { git = "https://github.com/viirya/arrow-datafusion.git", rev = "57b3be4", default-features = false, features = ["unicode_expressions"] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1" }
datafusion = { default-features = false, git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["crypto_expressions"] }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
datafusion-physical-expr-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/expressions/bitwise_not.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl PhysicalExpr for BitwiseNotExpr {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.arg.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.arg]
}

fn with_new_children(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ impl PhysicalExpr for BloomFilterMightContain {
})
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.bloom_filter_expr.clone(), self.value_expr.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.bloom_filter_expr, &self.value_expr]
}

fn with_new_children(
Expand Down
22 changes: 13 additions & 9 deletions core/src/execution/datafusion/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{
any::Any,
fmt::{Debug, Display, Formatter},
hash::{Hash, Hasher},
num::Wrapping,
sync::Arc,
};

Expand Down Expand Up @@ -1291,8 +1292,8 @@ impl PhysicalExpr for Cast {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -1570,7 +1571,7 @@ fn date_parser(date_str: &str, eval_mode: EvalMode) -> CometResult<Option<i32>>
let mut date_segments = [1, 1, 1];
let mut sign = 1;
let mut current_segment = 0;
let mut current_segment_value = 0;
let mut current_segment_value = Wrapping(0);
let mut current_segment_digits = 0;
let bytes = date_str.as_bytes();

Expand All @@ -1597,16 +1598,16 @@ fn date_parser(date_str: &str, eval_mode: EvalMode) -> CometResult<Option<i32>>
return return_result(date_str, eval_mode);
}
//if valid update corresponding segment with the current segment value.
date_segments[current_segment as usize] = current_segment_value;
current_segment_value = 0;
date_segments[current_segment as usize] = current_segment_value.0;
current_segment_value = Wrapping(0);
current_segment_digits = 0;
current_segment += 1;
} else if !b.is_ascii_digit() {
return return_result(date_str, eval_mode);
} else {
//increment value of current segment by the next digit
let parsed_value = (b - b'0') as i32;
current_segment_value = current_segment_value * 10 + parsed_value;
let parsed_value = Wrapping((b - b'0') as i32);
current_segment_value = current_segment_value * Wrapping(10) + parsed_value;
current_segment_digits += 1;
}
j += 1;
Expand All @@ -1622,7 +1623,7 @@ fn date_parser(date_str: &str, eval_mode: EvalMode) -> CometResult<Option<i32>>
return return_result(date_str, eval_mode);
}

date_segments[current_segment as usize] = current_segment_value;
date_segments[current_segment as usize] = current_segment_value.0;

match NaiveDate::from_ymd_opt(
sign * date_segments[0],
Expand Down Expand Up @@ -1836,6 +1837,8 @@ mod tests {
Some(" 202 "),
Some("\n 2020-\r8 "),
Some("2020-01-01T"),
// Overflows i32
Some("-4607172990231812908"),
]));

for eval_mode in &[EvalMode::Legacy, EvalMode::Try] {
Expand All @@ -1857,7 +1860,8 @@ mod tests {
None,
None,
None,
Some(18262)
Some(18262),
None
]
);
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/expressions/checkoverflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl PhysicalExpr for CheckOverflow {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
12 changes: 4 additions & 8 deletions core/src/execution/datafusion/expressions/if_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,8 @@ impl PhysicalExpr for IfExpr {
Ok(ColumnarValue::Array(current_value))
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![
self.if_expr.clone(),
self.true_expr.clone(),
self.false_expr.clone(),
]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.if_expr, &self.true_expr, &self.false_expr]
}

fn with_new_children(
Expand Down Expand Up @@ -225,8 +221,8 @@ mod tests {
let true_expr = lit(123i32);
let false_expr = lit(999i32);

let expr = if_fn(if_expr, true_expr, false_expr);
let children = expr.unwrap().children();
let expr = if_fn(if_expr, true_expr, false_expr).unwrap();
let children = expr.children();
assert_eq!(children.len(), 3);
assert_eq!(children[0].to_string(), "true");
assert_eq!(children[1].to_string(), "123");
Expand Down
1 change: 1 addition & 0 deletions core/src/execution/datafusion/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ pub mod strings;
pub mod subquery;
pub mod sum_decimal;
pub mod temporal;
pub mod unbound;
mod utils;
pub mod variance;
19 changes: 10 additions & 9 deletions core/src/execution/datafusion/expressions/negative.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
use crate::errors::CometError;
use arrow::{compute::kernels::numeric::neg_wrapping, datatypes::IntervalDayTimeType};
use arrow_array::RecordBatch;
use arrow_buffer::IntervalDayTime;
use arrow_schema::{DataType, Schema};
use datafusion::{
logical_expr::{interval_arithmetic::Interval, ColumnarValue},
physical_expr::PhysicalExpr,
};
use datafusion_common::{Result, ScalarValue};
use datafusion_physical_expr::{
aggregate::utils::down_cast_any_ref, sort_properties::SortProperties,
};
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_physical_expr::aggregate::utils::down_cast_any_ref;
use std::{
any::Any,
hash::{Hash, Hasher},
Expand Down Expand Up @@ -63,7 +63,7 @@ macro_rules! check_overflow {
for i in 0..typed_array.len() {
if typed_array.value(i) == $min_val {
if $type_name == "byte" || $type_name == "short" {
let value = typed_array.value(i).to_string() + " caused";
let value = format!("{:?} caused", typed_array.value(i));
return Err(arithmetic_overflow_error(value.as_str()).into());
}
return Err(arithmetic_overflow_error($type_name).into());
Expand Down Expand Up @@ -135,7 +135,7 @@ impl PhysicalExpr for NegativeExpr {
arrow::datatypes::IntervalUnit::DayTime => check_overflow!(
array,
arrow::array::IntervalDayTimeArray,
i64::MIN,
IntervalDayTime::MIN,
"interval"
),
arrow::datatypes::IntervalUnit::MonthDayNano => {
Expand Down Expand Up @@ -195,8 +195,8 @@ impl PhysicalExpr for NegativeExpr {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.arg.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.arg]
}

fn with_new_children(
Expand Down Expand Up @@ -255,8 +255,9 @@ impl PhysicalExpr for NegativeExpr {
}

/// The ordering of a [`NegativeExpr`] is simply the reverse of its child.
fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
-children[0]
fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
let properties = children[0].clone().with_order(children[0].sort_properties);
Ok(properties)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/execution/datafusion/expressions/normalize_nan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl PhysicalExpr for NormalizeNaNAndZero {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
self.child.children()
}

Expand Down
37 changes: 13 additions & 24 deletions core/src/execution/datafusion/expressions/scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::{
any::Any,
cmp::min,
fmt::{Debug, Write},
str::FromStr,
sync::Arc,
};

Expand All @@ -35,17 +34,15 @@ use arrow_array::{Array, ArrowNativeTypeOp, Decimal128Array, StringArray};
use arrow_schema::DataType;
use datafusion::{
execution::FunctionRegistry,
logical_expr::{
BuiltinScalarFunction, ScalarFunctionDefinition, ScalarFunctionImplementation,
ScalarUDFImpl, Signature, Volatility,
},
functions::math::round::round,
logical_expr::{ScalarFunctionImplementation, ScalarUDFImpl, Signature, Volatility},
physical_plan::ColumnarValue,
};
use datafusion_common::{
cast::{as_binary_array, as_generic_string_array},
exec_err, internal_err, DataFusionError, Result as DataFusionResult, ScalarValue,
};
use datafusion_physical_expr::{math_expressions, udf::ScalarUDF};
use datafusion_expr::ScalarUDF;
use num::{
integer::{div_ceil, div_floor},
BigInt, Signed, ToPrimitive,
Expand All @@ -66,9 +63,7 @@ macro_rules! make_comet_scalar_udf {
$data_type.clone(),
Arc::new(move |args| $func(args, &$data_type)),
);
Ok(ScalarFunctionDefinition::UDF(Arc::new(
ScalarUDF::new_from_impl(scalar_func),
)))
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
}};
($name:expr, $func:expr, without $data_type:ident) => {{
let scalar_func = CometScalarFunction::new(
Expand All @@ -77,9 +72,7 @@ macro_rules! make_comet_scalar_udf {
$data_type,
$func,
);
Ok(ScalarFunctionDefinition::UDF(Arc::new(
ScalarUDF::new_from_impl(scalar_func),
)))
Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func)))
}};
}

Expand All @@ -88,7 +81,7 @@ pub fn create_comet_physical_fun(
fun_name: &str,
data_type: DataType,
registry: &dyn FunctionRegistry,
) -> Result<ScalarFunctionDefinition, DataFusionError> {
) -> Result<Arc<ScalarUDF>, DataFusionError> {
let sha2_functions = ["sha224", "sha256", "sha384", "sha512"];
match fun_name {
"ceil" => {
Expand Down Expand Up @@ -140,13 +133,11 @@ pub fn create_comet_physical_fun(
let spark_func_name = "spark".to_owned() + sha;
make_comet_scalar_udf!(spark_func_name, wrapped_func, without data_type)
}
_ => {
if let Ok(fun) = BuiltinScalarFunction::from_str(fun_name) {
Ok(ScalarFunctionDefinition::BuiltIn(fun))
} else {
Ok(ScalarFunctionDefinition::UDF(registry.udf(fun_name)?))
}
}
_ => registry.udf(fun_name).map_err(|e| {
DataFusionError::Execution(format!(
"Function {fun_name} not found in the registry: {e}",
))
}),
}
}

Expand Down Expand Up @@ -509,9 +500,7 @@ fn spark_round(
make_decimal_array(array, precision, scale, &f)
}
DataType::Float32 | DataType::Float64 => {
Ok(ColumnarValue::Array(math_expressions::round(&[
array.clone()
])?))
Ok(ColumnarValue::Array(round(&[array.clone()])?))
}
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
Expand All @@ -534,7 +523,7 @@ fn spark_round(
make_decimal_scalar(a, precision, scale, &f)
}
ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar(
ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?,
ScalarValue::try_from_array(&round(&[a.to_array()?])?, 0)?,
)),
dt => exec_err!("Not supported datatype for ROUND: {dt}"),
},
Expand Down
12 changes: 6 additions & 6 deletions core/src/execution/datafusion/expressions/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ macro_rules! make_predicate_function {
Ok(ColumnarValue::Array(Arc::new(array)))
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.left.clone(), self.right.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.left, &self.right]
}

fn with_new_children(
Expand Down Expand Up @@ -221,8 +221,8 @@ impl PhysicalExpr for SubstringExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down Expand Up @@ -286,8 +286,8 @@ impl PhysicalExpr for StringSpaceExec {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![self.child.clone()]
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
Expand Down
2 changes: 1 addition & 1 deletion core/src/execution/datafusion/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl PhysicalExpr for Subquery {
}
}

fn children(&self) -> Vec<Arc<dyn PhysicalExpr>> {
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}

Expand Down
Loading

0 comments on commit 69f9cb2

Please sign in to comment.