Skip to content

Commit

Permalink
Support Decimal256 in Min/Max aggregate expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 20, 2023
1 parent 1dd887c commit ee7b99f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
34 changes: 34 additions & 0 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ use crate::aggregate::utils::down_cast_any_ref;
use crate::expressions::format_state_name;
use arrow::array::Array;
use arrow::array::Decimal128Array;
use arrow::array::Decimal256Array;
use arrow::datatypes::i256;
use arrow::datatypes::Decimal256Type;

use super::moving_min_max;

Expand Down Expand Up @@ -183,6 +186,7 @@ impl AggregateExpr for Max {
| Float32
| Float64
| Decimal128(_, _)
| Decimal256(_, _)
| Date32
| Date64
| Time32(_)
Expand Down Expand Up @@ -239,6 +243,9 @@ impl AggregateExpr for Max {
Decimal128(_, _) => {
instantiate_max_accumulator!(self, i128, Decimal128Type)
}
Decimal256(_, _) => {
instantiate_max_accumulator!(self, i256, Decimal256Type)
}

// It would be nice to have a fast implementation for Strings as well
// https://github.com/apache/arrow-datafusion/issues/6906
Expand Down Expand Up @@ -318,6 +325,16 @@ macro_rules! min_max_batch {
scale
)
}
DataType::Decimal256(precision, scale) => {
typed_min_max_batch!(
$VALUES,
Decimal256Array,
Decimal256,
$OP,
precision,
scale
)
}
// all types that have a natural order
DataType::Float64 => {
typed_min_max_batch!($VALUES, Float64Array, Float64, $OP)
Expand Down Expand Up @@ -522,6 +539,19 @@ macro_rules! min_max {
);
}
}
(
lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss),
rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss)
) => {
if lhsp.eq(rhsp) && lhss.eq(rhss) {
typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss)
} else {
return internal_err!(
"MIN/MAX is not expected to receive scalars of incompatible types {:?}",
(lhs, rhs)
);
}
}
(ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => {
typed_min_max!(lhs, rhs, Boolean, $OP)
}
Expand Down Expand Up @@ -880,6 +910,7 @@ impl AggregateExpr for Min {
| Float32
| Float64
| Decimal128(_, _)
| Decimal256(_, _)
| Date32
| Date64
| Time32(_)
Expand Down Expand Up @@ -935,6 +966,9 @@ impl AggregateExpr for Min {
Decimal128(_, _) => {
instantiate_min_accumulator!(self, i128, Decimal128Type)
}
Decimal256(_, _) => {
instantiate_min_accumulator!(self, i256, Decimal256Type)
}
// This is only reached if groups_accumulator_supported is out of sync
_ => internal_err!(
"GroupsAccumulator not supported for min({})",
Expand Down
10 changes: 10 additions & 0 deletions datafusion/sqllogictest/test_files/decimal.slt
Original file line number Diff line number Diff line change
Expand Up @@ -691,5 +691,15 @@ select arrow_typeof(avg(c1)), avg(c1) from decimal256_simple;
----
Decimal256(54, 10) 0.0000366666

query TR
select arrow_typeof(min(c1)), min(c1) from decimal256_simple where c4=false;
----
Decimal256(50, 6) 0.00002

query TR
select arrow_typeof(max(c1)), max(c1) from decimal256_simple where c4=false;
----
Decimal256(50, 6) 0.00005

statement ok
drop table decimal256_simple;

0 comments on commit ee7b99f

Please sign in to comment.