Skip to content

Commit

Permalink
chore(query): refactor new transform partition bucket for new aggrega…
Browse files Browse the repository at this point in the history
…tion hashtable (#15093)

* finish new transform partition bucket for singleton

* support for cluster

* fix new transform partition bucket

* revert

* fix cluster spill bug

* remove code

* fix logic shortcirt bug

* fix logic shortcirt bug

* refactor block_number

* fix block_number

* fix spill hang by avoid sending empty block

---------

Co-authored-by: jw <[email protected]>
Co-authored-by: sundy-li <[email protected]>
  • Loading branch information
3 people authored Mar 30, 2024
1 parent dac261a commit f8494c8
Show file tree
Hide file tree
Showing 19 changed files with 861 additions and 438 deletions.
10 changes: 8 additions & 2 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,17 @@ impl AggregateHashTable {
config: HashTableConfig,
capacity: usize,
arena: Arc<Bump>,
need_init_entry: bool,
) -> Self {
let entries = if need_init_entry {
vec![0u64; capacity]
} else {
vec![]
};
Self {
entries: vec![],
entries,
count: 0,
direct_append: true,
direct_append: !need_init_entry,
current_radix_bits: config.initial_radix_bits,
payload: PartitionedPayload::new(
group_types,
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl HashTableConfig {

pub fn cluster_with_partial(mut self, partial_agg: bool, node_nums: usize) -> Self {
self.partial_agg = partial_agg;
self.repartition_radix_bits_incr = 4;
self.max_partial_capacity = 131072 * (2 << node_nums);

self
Expand Down
10 changes: 0 additions & 10 deletions src/query/expression/src/aggregate/partitioned_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,6 @@ impl PartitionedPayload {
pub fn memory_size(&self) -> usize {
self.payloads.iter().map(|x| x.memory_size()).sum()
}

pub fn include_arena(&self, other: &Arc<Bump>) -> bool {
for arena in self.arenas.iter() {
if Arc::ptr_eq(arena, other) {
return true;
}
}

false
}
}

#[inline]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,14 @@ impl PipelineBuilder {
.settings
.get_enable_experimental_aggregate_hashtable()?;

let in_cluster = !self.ctx.get_cluster().is_empty();

let params = Self::build_aggregator_params(
aggregate.input.output_schema()?,
&aggregate.group_by,
&aggregate.agg_funcs,
enable_experimental_aggregate_hashtable,
in_cluster,
max_block_size as usize,
None,
)?;
Expand Down Expand Up @@ -217,12 +220,13 @@ impl PipelineBuilder {
let enable_experimental_aggregate_hashtable = self
.settings
.get_enable_experimental_aggregate_hashtable()?;

let in_cluster = !self.ctx.get_cluster().is_empty();
let params = Self::build_aggregator_params(
aggregate.before_group_by_schema.clone(),
&aggregate.group_by,
&aggregate.agg_funcs,
enable_experimental_aggregate_hashtable,
in_cluster,
max_block_size as usize,
aggregate.limit,
)?;
Expand Down Expand Up @@ -288,6 +292,7 @@ impl PipelineBuilder {
group_by: &[IndexType],
agg_funcs: &[AggregateFunctionDesc],
enable_experimental_aggregate_hashtable: bool,
in_cluster: bool,
max_block_size: usize,
limit: Option<usize>,
) -> Result<Arc<AggregatorParams>> {
Expand Down Expand Up @@ -329,6 +334,7 @@ impl PipelineBuilder {
&aggs,
&agg_args,
enable_experimental_aggregate_hashtable,
in_cluster,
max_block_size,
limit,
)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ struct AggregateExchangeSorting<Method: HashMethodBounds, V: Send + Sync + 'stat
_phantom: PhantomData<(Method, V)>,
}

pub fn compute_block_number(bucket: isize, max_partition_count: usize) -> Result<isize> {
Ok(max_partition_count as isize * 1000 + bucket)
}

impl<Method: HashMethodBounds, V: Send + Sync + 'static> ExchangeSorting
for AggregateExchangeSorting<Method, V>
{
Expand All @@ -78,14 +82,17 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> ExchangeSorting
))),
Some(meta_info) => match meta_info {
AggregateMeta::Partitioned { .. } => unreachable!(),
AggregateMeta::Serialized(v) => Ok(v.bucket),
AggregateMeta::Serialized(v) => {
compute_block_number(v.bucket, v.max_partition_count)
}
AggregateMeta::HashTable(v) => Ok(v.bucket),
AggregateMeta::AggregateHashTable(_) => unreachable!(),
AggregateMeta::AggregatePayload(v) => Ok(v.bucket),
AggregateMeta::AggregatePayload(v) => {
compute_block_number(v.bucket, v.max_partition_count)
}
AggregateMeta::AggregateSpilling(_)
| AggregateMeta::Spilled(_)
| AggregateMeta::Spilling(_)
| AggregateMeta::BucketSpilled(_) => Ok(-1),
| AggregateMeta::BucketSpilled(_)
| AggregateMeta::Spilling(_) => Ok(-1),
},
}
}
Expand Down Expand Up @@ -252,9 +259,12 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> FlightScatter
}
AggregateMeta::AggregateSpilling(payload) => {
for p in scatter_partitioned_payload(payload, self.buckets)? {
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::<Method, V>::create_agg_spilling(p),
))
blocks.push(match p.len() == 0 {
true => DataBlock::empty(),
false => DataBlock::empty_with_meta(
AggregateMeta::<Method, V>::create_agg_spilling(p),
),
});
}
}
AggregateMeta::HashTable(payload) => {
Expand All @@ -271,16 +281,18 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> FlightScatter
});
}
}
AggregateMeta::AggregateHashTable(_) => unreachable!(),
AggregateMeta::AggregatePayload(p) => {
for payload in scatter_payload(p.payload, self.buckets)? {
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::<Method, V>::create_agg_payload(
p.bucket,
payload,
p.max_partition_count,
blocks.push(match payload.len() == 0 {
true => DataBlock::empty(),
false => DataBlock::empty_with_meta(
AggregateMeta::<Method, V>::create_agg_payload(
p.bucket,
payload,
p.max_partition_count,
),
),
))
});
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,22 @@ impl SerializedPayload {
aggrs: Vec<Arc<dyn AggregateFunction>>,
radix_bits: u64,
arena: Arc<Bump>,
need_init_entry: bool,
) -> Result<AggregateHashTable> {
let rows_num = self.data_block.num_rows();
let capacity = AggregateHashTable::get_capacity_for_count(rows_num);
let config = HashTableConfig::default().with_initial_radix_bits(radix_bits);
let mut state = ProbeState::default();
let agg_len = aggrs.len();
let group_len = group_types.len();
let mut hashtable =
AggregateHashTable::new_directly(group_types, aggrs, config, rows_num, arena);
let mut hashtable = AggregateHashTable::new_directly(
group_types,
aggrs,
config,
capacity,
arena,
need_init_entry,
);

let agg_states = (0..agg_len)
.map(|i| {
Expand Down Expand Up @@ -103,7 +111,8 @@ impl SerializedPayload {
radix_bits: u64,
arena: Arc<Bump>,
) -> Result<PartitionedPayload> {
let hashtable = self.convert_to_aggregate_table(group_types, aggrs, radix_bits, arena)?;
let hashtable =
self.convert_to_aggregate_table(group_types, aggrs, radix_bits, arena, false)?;
Ok(hashtable.payload)
}
}
Expand All @@ -126,7 +135,6 @@ pub struct AggregatePayload {
pub enum AggregateMeta<Method: HashMethodBounds, V: Send + Sync + 'static> {
Serialized(SerializedPayload),
HashTable(HashTablePayload<Method, V>),
AggregateHashTable(PartitionedPayload),
AggregatePayload(AggregatePayload),
AggregateSpilling(PartitionedPayload),
BucketSpilled(BucketSpilledPayload),
Expand All @@ -144,10 +152,6 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> AggregateMeta<Method, V
}))
}

pub fn create_agg_hashtable(payload: PartitionedPayload) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::<Method, V>::AggregateHashTable(payload))
}

pub fn create_agg_payload(
bucket: isize,
payload: Payload,
Expand Down Expand Up @@ -231,9 +235,6 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Debug for AggregateMeta
AggregateMeta::Spilling(_) => f.debug_struct("Aggregate::Spilling").finish(),
AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilling").finish(),
AggregateMeta::BucketSpilled(_) => f.debug_struct("Aggregate::BucketSpilled").finish(),
AggregateMeta::AggregateHashTable(_) => {
f.debug_struct("AggregateMeta:AggHashTable").finish()
}
AggregateMeta::AggregatePayload(_) => {
f.debug_struct("AggregateMeta:AggregatePayload").finish()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct AggregatorParams {
pub offsets_aggregate_states: Vec<usize>,

pub enable_experimental_aggregate_hashtable: bool,
pub in_cluster: bool,
pub max_block_size: usize,
// Limit is push down to AggregatorTransform
pub limit: Option<usize>,
Expand All @@ -55,6 +56,7 @@ impl AggregatorParams {
agg_funcs: &[AggregateFunctionRef],
agg_args: &[Vec<usize>],
enable_experimental_aggregate_hashtable: bool,
in_cluster: bool,
max_block_size: usize,
limit: Option<usize>,
) -> Result<Arc<AggregatorParams>> {
Expand All @@ -74,6 +76,7 @@ impl AggregatorParams {
layout: states_layout,
offsets_aggregate_states: states_offsets,
enable_experimental_aggregate_hashtable,
in_cluster,
max_block_size,
limit,
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod aggregate_cell;
mod aggregate_exchange_injector;
mod aggregate_meta;
mod aggregator_params;
mod new_transform_partition_bucket;
mod serde;
mod transform_aggregate_expand;
mod transform_aggregate_final;
Expand Down
Loading

0 comments on commit f8494c8

Please sign in to comment.