Skip to content

Commit

Permalink
chore(query): improve aggregate unary function (#14729)
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Feb 25, 2024
1 parent 17f7f37 commit d62b315
Show file tree
Hide file tree
Showing 11 changed files with 5 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ use simple_hll::HyperLogLog;

use super::aggregate_function::AggregateFunction;
use super::aggregate_function_factory::AggregateFunctionDescription;
use super::borsh_deserialize_state;
use super::borsh_serialize_state;
use super::AggregateUnaryFunction;
use super::FunctionData;
use super::UnaryState;
Expand Down Expand Up @@ -69,15 +67,6 @@ where
builder.push(self.count() as u64);
Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, &self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
borsh_deserialize_state(reader)
}
}

pub fn try_create_aggregate_approx_count_distinct_function(
Expand Down
18 changes: 0 additions & 18 deletions src/query/functions/src/aggregates/aggregate_array_moving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,6 @@ where
T: Number + AsPrimitive<TSum> + BorshSerialize + BorshDeserialize,
TSum: Number + AsPrimitive<f64> + std::ops::AddAssign + std::ops::SubAssign,
{
fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, &self.values)
}

fn deserialize(&mut self, reader: &mut &[u8]) -> Result<()> {
self.values = borsh_deserialize_state(reader)?;
Ok(())
}

fn accumulate_row(&mut self, column: &Column, row: usize) -> Result<()> {
let buffer = match column {
Column::Null { .. } => {
Expand Down Expand Up @@ -242,15 +233,6 @@ where T: Decimal
+ std::fmt::Debug
+ std::cmp::PartialOrd
{
fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, &self.values)
}

fn deserialize(&mut self, reader: &mut &[u8]) -> Result<()> {
self.values = borsh_deserialize_state(reader)?;
Ok(())
}

fn accumulate_row(&mut self, column: &Column, row: usize) -> Result<()> {
let buffer = match column {
Column::Null { .. } => {
Expand Down
20 changes: 0 additions & 20 deletions src/query/functions/src/aggregates/aggregate_avg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use databend_common_expression::Scalar;
use num_traits::AsPrimitive;

use super::aggregate_sum::DecimalSumState;
use super::borsh_deserialize_state;
use super::borsh_serialize_state;
use super::AggregateUnaryFunction;
use super::FunctionData;
use super::UnaryState;
Expand Down Expand Up @@ -94,15 +92,6 @@ where
builder.push(F64::from(value));
Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
borsh_deserialize_state(reader)
}
}

struct DecimalAvgData {
Expand Down Expand Up @@ -202,15 +191,6 @@ where
))),
}
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
borsh_deserialize_state(reader)
}
}

pub fn try_create_aggregate_avg_function(
Expand Down
10 changes: 0 additions & 10 deletions src/query/functions/src/aggregates/aggregate_kurtosis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use databend_common_expression::with_number_mapped_type;
use databend_common_expression::Scalar;
use num_traits::AsPrimitive;

use super::borsh_deserialize_state;
use super::borsh_serialize_state;
use super::AggregateUnaryFunction;
use super::FunctionData;
use super::UnaryState;
Expand Down Expand Up @@ -100,14 +98,6 @@ where
}
Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self> {
borsh_deserialize_state::<Self>(reader)
}
}

pub fn try_create_aggregate_kurtosis_function(
Expand Down
11 changes: 0 additions & 11 deletions src/query/functions/src/aggregates/aggregate_min_max_any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ use super::aggregate_scalar_state::CmpMin;
use super::aggregate_scalar_state::TYPE_ANY;
use super::aggregate_scalar_state::TYPE_MAX;
use super::aggregate_scalar_state::TYPE_MIN;
use super::borsh_deserialize_state;
use super::borsh_serialize_state;
use super::AggregateUnaryFunction;
use super::FunctionData;
use super::UnaryState;
Expand Down Expand Up @@ -110,15 +108,6 @@ where

Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
borsh_deserialize_state(reader)
}
}

pub fn try_create_aggregate_min_max_any_function<const CMP_TYPE: u8>(
Expand Down
11 changes: 0 additions & 11 deletions src/query/functions/src/aggregates/aggregate_quantile_cont.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use databend_common_expression::ScalarRef;
use num_traits::AsPrimitive;
use ordered_float::OrderedFloat;

use super::borsh_deserialize_state;
use super::borsh_serialize_state;
use super::AggregateUnaryFunction;
use super::FunctionData;
use super::UnaryState;
Expand Down Expand Up @@ -143,15 +141,6 @@ where
}
Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
borsh_deserialize_state(reader)
}
}

pub(crate) fn get_levels(params: &Vec<Scalar>) -> Result<Vec<f64>> {
Expand Down
20 changes: 0 additions & 20 deletions src/query/functions/src/aggregates/aggregate_quantile_disc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use databend_common_expression::with_number_mapped_type;
use databend_common_expression::Scalar;
use ethnum::i256;

use super::borsh_deserialize_state;
use super::borsh_serialize_state;
use super::get_levels;
use super::AggregateUnaryFunction;
use super::FunctionData;
Expand Down Expand Up @@ -107,15 +105,6 @@ where
}
Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
borsh_deserialize_state(reader)
}
}

impl<T> UnaryState<T, T> for QuantileState<T>
Expand Down Expand Up @@ -161,15 +150,6 @@ where

Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
borsh_deserialize_state(reader)
}
}

pub fn try_create_aggregate_quantile_disc_function(
Expand Down
10 changes: 0 additions & 10 deletions src/query/functions/src/aggregates/aggregate_skewness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use databend_common_expression::Scalar;
use num_traits::AsPrimitive;

use super::assert_unary_arguments;
use super::borsh_deserialize_state;
use super::borsh_serialize_state;
use super::FunctionData;
use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription;
use crate::aggregates::aggregate_unary::AggregateUnaryFunction;
Expand Down Expand Up @@ -95,14 +93,6 @@ where
}
Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self> {
borsh_deserialize_state::<Self>(reader)
}
}

pub fn try_create_aggregate_skewness_function(
Expand Down
20 changes: 0 additions & 20 deletions src/query/functions/src/aggregates/aggregate_stddev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ use databend_common_expression::with_number_mapped_type;
use databend_common_expression::Scalar;
use num_traits::AsPrimitive;

use super::borsh_deserialize_state;
use super::borsh_serialize_state;
use super::AggregateUnaryFunction;
use super::FunctionData;
use super::UnaryState;
Expand Down Expand Up @@ -106,15 +104,6 @@ where

Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
borsh_deserialize_state(reader)
}
}

struct DecimalFuncData {
Expand Down Expand Up @@ -339,15 +328,6 @@ where

Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, self)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
borsh_deserialize_state(reader)
}
}

pub fn try_create_aggregate_stddev_pop_function<const TYPE: u8>(
Expand Down
24 changes: 1 addition & 23 deletions src/query/functions/src/aggregates/aggregate_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use databend_common_expression::StateAddr;
use num_traits::AsPrimitive;

use super::assert_unary_arguments;
use super::borsh_deserialize_state;
use super::borsh_serialize_state;
use super::FunctionData;
use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription;
use crate::aggregates::aggregate_unary::UnaryState;
Expand All @@ -42,8 +40,7 @@ pub trait SumState: BorshSerialize + BorshDeserialize + Send + Sync + Default +
fn mem_size() -> Option<usize> {
None
}
fn serialize(&self, writer: &mut Vec<u8>) -> Result<()>;
fn deserialize(&mut self, reader: &mut &[u8]) -> Result<()>;

fn accumulate(&mut self, column: &Column, validity: Option<&Bitmap>) -> Result<()>;

fn accumulate_row(&mut self, column: &Column, row: usize) -> Result<()>;
Expand Down Expand Up @@ -109,15 +106,6 @@ where
N::push_item(builder, N::to_scalar_ref(&self.value));
Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, &self.value)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self> {
let value = borsh_deserialize_state(reader)?;
Ok(Self { value })
}
}

#[derive(BorshDeserialize, BorshSerialize)]
Expand Down Expand Up @@ -171,16 +159,6 @@ where
T::push_item(builder, T::to_scalar_ref(&self.value));
Ok(())
}

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()> {
borsh_serialize_state(writer, &self.value)
}

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized {
let value = borsh_deserialize_state(reader)?;
Ok(Self { value })
}
}

pub fn try_create_aggregate_sum_function(
Expand Down
12 changes: 4 additions & 8 deletions src/query/functions/src/aggregates/aggregate_unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use databend_common_expression::ColumnBuilder;
use databend_common_expression::Scalar;
use databend_common_expression::StateAddr;

pub trait UnaryState<T, R>: Send + Sync + Default
pub trait UnaryState<T, R>:
Send + Sync + Default + borsh::BorshSerialize + borsh::BorshDeserialize
where
T: ValueType,
R: ValueType,
Expand All @@ -49,11 +50,6 @@ where
builder: &mut R::ColumnBuilder,
function_data: Option<&dyn FunctionData>,
) -> Result<()>;

fn serialize(&self, writer: &mut Vec<u8>) -> Result<()>;

fn deserialize(reader: &mut &[u8]) -> Result<Self>
where Self: Sized;
}

pub trait FunctionData: Send + Sync {
Expand Down Expand Up @@ -248,12 +244,12 @@ where

fn serialize(&self, place: StateAddr, writer: &mut Vec<u8>) -> Result<()> {
let state: &mut S = place.get::<S>();
state.serialize(writer)
Ok(borsh::to_writer(writer, state)?)
}

fn merge(&self, place: StateAddr, reader: &mut &[u8]) -> Result<()> {
let state: &mut S = place.get::<S>();
let rhs = S::deserialize(reader)?;
let rhs = S::deserialize_reader(reader)?;
state.merge(&rhs)
}

Expand Down

0 comments on commit d62b315

Please sign in to comment.