Skip to content

Commit

Permalink
feat(query): update
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li committed Nov 18, 2023
1 parent 9bada69 commit 27f66f7
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 73 deletions.
66 changes: 0 additions & 66 deletions src/query/expression/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ mod payload_flush;
mod payload_row;
mod probe_state;

use std::hash::Hasher;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

Expand Down Expand Up @@ -92,68 +91,3 @@ impl HashTableConfig {
self
}
}

pub struct PerfectHashBuilder;

// NOTE: This is a dummy hasher that just returns the value passed to it.
// This is only used for i8-i64, u8-u64, isize and usize keys.
pub struct PerfectHash {
val: u64,
}

impl std::hash::BuildHasher for PerfectHashBuilder {
type Hasher = PerfectHash;
fn build_hasher(&self) -> PerfectHash {
PerfectHash { val: 0 }
}
}

impl Hasher for PerfectHash {
fn finish(&self) -> u64 {
self.val
}

fn write(&mut self, _bytes: &[u8]) {
unreachable!()
}

fn write_u8(&mut self, i: u8) {
self.val = i as u64;
}

fn write_u16(&mut self, i: u16) {
self.val = i as u64;
}

fn write_u32(&mut self, i: u32) {
self.val = i as u64;
}

fn write_u64(&mut self, i: u64) {
self.val = i;
}

fn write_usize(&mut self, i: usize) {
self.val = i as u64;
}

fn write_i8(&mut self, i: i8) {
self.val = i as u64;
}

fn write_i16(&mut self, i: i16) {
self.val = i as u64;
}

fn write_i32(&mut self, i: i32) {
self.val = i as u64;
}

fn write_i64(&mut self, i: i64) {
self.val = i as u64;
}

fn write_isize(&mut self, i: isize) {
self.val = i as u64;
}
}
4 changes: 2 additions & 2 deletions src/query/expression/src/aggregate/partitioned_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct PartitionedPayload {
pub state_addr_offsets: Vec<usize>,
pub state_layout: Option<Layout>,

arenas: Vec<Arc<Bump>>,
pub arenas: Vec<Arc<Bump>>,

partition_count: u64,
mask_v: u64,
Expand Down Expand Up @@ -166,7 +166,7 @@ impl PartitionedPayload {
self.arenas.append(&mut other.arenas);
}

fn combine_single(&mut self, mut other: Payload, state: &mut PayloadFlushState) {
pub fn combine_single(&mut self, mut other: Payload, state: &mut PayloadFlushState) {
if other.len() == 0 {
return;
}
Expand Down
11 changes: 8 additions & 3 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl Payload {
let mut page = self.writable_page();
for i in 0..row_count {
let index = select_vector[i];

unsafe {
std::ptr::copy_nonoverlapping(
address[index],
Expand Down Expand Up @@ -335,9 +336,13 @@ impl Drop for Payload {
for page in self.pages.iter() {
for row in 0..page.rows {
unsafe {
let state_addr = self.data_ptr(page, row).add(self.state_offset);
aggr.drop_state(StateAddr::new(state_addr as usize + *addr_offset))
};
let state_place = StateAddr::new(core::ptr::read::<u64>(
self.data_ptr(page, row).add(self.state_offset) as _,
)
as usize);

aggr.drop_state(state_place.next(*addr_offset));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,17 +443,27 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> Processor
let aggrs = payloads[0].aggrs.clone();

let mut payload_map = (0..self.max_partition_count).map(|_| vec![]).collect_vec();
for payload in payloads.into_iter() {

// All arenas should be kept in the bucket partition payload
let mut arenas = vec![];

for mut payload in payloads.into_iter() {
for (bucket, p) in payload.payloads.into_iter().enumerate() {
payload_map[bucket].push(p);
}
arenas.append(&mut payload.arenas);
}

for (bucket, mut payloads) in payload_map.into_iter().enumerate() {
let mut partition_payload =
PartitionedPayload::new(group_types.clone(), aggrs.clone(), 1);

partition_payload.payloads.append(payloads.as_mut());
for payload in payloads.drain(0..) {
partition_payload.combine_single(payload, &mut self.flush_state);
}

partition_payload.arenas.extend_from_slice(&arenas);

self.buckets_blocks
.insert(bucket as isize, vec![DataBlock::empty_with_meta(
AggregateMeta::<Method, V>::create_agg_hashtable(partition_payload),
Expand Down

0 comments on commit 27f66f7

Please sign in to comment.