Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: Upgrade DF to 36.0.0 and arrow-rs 50.0.0 #66

Merged
merged 6 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 65 additions & 28 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
datafusion-common = { version = "35.0.0" }
datafusion = { default-features = false, version = "35.0.0", features = ["unicode_expressions"] }
datafusion-physical-expr = { version = "35.0.0", default-features = false , features = ["unicode_expressions"] }
datafusion-common = { version = "36.0.0" }
datafusion = { default-features = false, version = "36.0.0", features = ["unicode_expressions"] }
datafusion-physical-expr = { version = "36.0.0", default-features = false , features = ["unicode_expressions"] }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
Expand Down
2 changes: 2 additions & 0 deletions core/benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub fn create_int64_array(size: usize, null_density: f32, min: i64, max: i64) ->
.collect()
}

#[allow(dead_code)]
pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
where
T: ArrowPrimitiveType,
Expand All @@ -64,6 +65,7 @@ where

/// Creates a dictionary with random keys and values, with value type `T`.
/// Note here the keys are the dictionary indices.
#[allow(dead_code)]
pub fn create_dictionary_array<T>(
size: usize,
value_size: usize,
Expand Down
12 changes: 6 additions & 6 deletions core/src/execution/datafusion/expressions/avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use arrow_array::{
Array, ArrayRef, ArrowNumericType, Int64Array, PrimitiveArray,
};
use arrow_schema::{DataType, Field};
use datafusion::logical_expr::{type_coercion::aggregates::avg_return_type, Accumulator};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{
expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
use datafusion::logical_expr::{
type_coercion::aggregates::avg_return_type, Accumulator, EmitTo, GroupsAccumulator,
};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
use std::{any::Any, sync::Arc};

use arrow_array::ArrowNativeTypeOp;
Expand Down Expand Up @@ -146,7 +146,7 @@ pub struct AvgAccumulator {
}

impl Accumulator for AvgAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::Float64(self.sum),
ScalarValue::from(self.count),
Expand Down Expand Up @@ -175,7 +175,7 @@ impl Accumulator for AvgAccumulator {
Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Float64(
self.sum.map(|f| f / self.count as f64),
))
Expand Down
10 changes: 4 additions & 6 deletions core/src/execution/datafusion/expressions/avg_decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ use arrow_array::{
Array, ArrayRef, Decimal128Array, Int64Array, PrimitiveArray,
};
use arrow_schema::{DataType, Field};
use datafusion::logical_expr::Accumulator;
use datafusion::logical_expr::{Accumulator, EmitTo, GroupsAccumulator};
use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::{
expressions::format_state_name, AggregateExpr, EmitTo, GroupsAccumulator, PhysicalExpr,
};
use datafusion_physical_expr::{expressions::format_state_name, AggregateExpr, PhysicalExpr};
use std::{any::Any, sync::Arc};

use arrow_array::ArrowNativeTypeOp;
Expand Down Expand Up @@ -214,7 +212,7 @@ impl AvgDecimalAccumulator {
}

impl Accumulator for AvgDecimalAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::Decimal128(self.sum, self.sum_precision, self.sum_scale),
ScalarValue::from(self.count),
Expand Down Expand Up @@ -266,7 +264,7 @@ impl Accumulator for AvgDecimalAccumulator {
Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
fn evaluate(&mut self) -> Result<ScalarValue> {
fn make_decimal128(value: Option<i128>, precision: u8, scale: i8) -> ScalarValue {
ScalarValue::Decimal128(value, precision, scale)
}
Expand Down
20 changes: 13 additions & 7 deletions core/src/execution/datafusion/expressions/scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ use datafusion::{
physical_plan::ColumnarValue,
};
use datafusion_common::{
cast::as_generic_string_array, internal_err, DataFusionError, Result as DataFusionResult,
ScalarValue,
cast::as_generic_string_array, exec_err, internal_err, DataFusionError,
Result as DataFusionResult, ScalarValue,
};
use datafusion_physical_expr::{
execution_props::ExecutionProps,
functions::{create_physical_fun, make_scalar_function},
math_expressions,
execution_props::ExecutionProps, functions::create_physical_fun, math_expressions,
};
use num::{BigInt, Signed, ToPrimitive};
use unicode_segmentation::UnicodeSegmentation;
Expand Down Expand Up @@ -366,7 +364,12 @@ fn spark_round(
let (precision, scale) = get_precision_scale(data_type);
make_decimal_array(array, precision, scale, &f)
}
_ => make_scalar_function(math_expressions::round)(args),
DataType::Float32 | DataType::Float64 => {
Ok(ColumnarValue::Array(math_expressions::round(&[
array.clone()
])?))
}
dt => exec_err!("Not supported datatype[{dt}] for ROUND"),
comphead marked this conversation as resolved.
Show resolved Hide resolved
comphead marked this conversation as resolved.
Show resolved Hide resolved
},
ColumnarValue::Scalar(a) => match a {
ScalarValue::Int64(a) if *point < 0 => {
Expand All @@ -386,7 +389,10 @@ fn spark_round(
let (precision, scale) = get_precision_scale(data_type);
make_decimal_scalar(a, precision, scale, &f)
}
_ => make_scalar_function(math_expressions::round)(args),
ScalarValue::Float32(_) | ScalarValue::Float64(_) => Ok(ColumnarValue::Scalar(
ScalarValue::try_from_array(&math_expressions::round(&[a.to_array()?])?, 0)?,
)),
dt => exec_err!("Not supported datatype[{dt}] for ROUND"),
comphead marked this conversation as resolved.
Show resolved Hide resolved
},
}
}
Expand Down
Loading