Skip to content

Commit

Permalink
Merge branch 'main' into fix/agghashtable_test
Browse files Browse the repository at this point in the history
  • Loading branch information
Freejww authored Feb 2, 2024
2 parents cd350e5 + e79eab5 commit d0a392b
Show file tree
Hide file tree
Showing 8 changed files with 555 additions and 263 deletions.
66 changes: 45 additions & 21 deletions src/common/hashtable/src/hashjoin_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,56 +209,80 @@ where
count
}

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
fn early_filtering_probe(&self, hashes: &mut [u64], bitmap: Option<Bitmap>) -> usize {
// Perform early filtering probe, store matched indexes in `matched_selection` and store unmatched indexes
// in `unmatched_selection`, return the number of matched and unmatched indexes.
fn early_filtering_probe(
&self,
hashes: &mut [u64],
bitmap: Option<Bitmap>,
matched_selection: &mut [u32],
unmatched_selection: &mut [u32],
) -> (usize, usize) {
let mut valids = None;
if let Some(bitmap) = bitmap {
if bitmap.unset_bits() == bitmap.len() {
hashes.iter_mut().for_each(|hash| {
*hash = 0;
});
return 0;
unmatched_selection
.iter_mut()
.enumerate()
.for_each(|(idx, val)| {
*val = idx as u32;
});
return (0, hashes.len());
} else if bitmap.unset_bits() > 0 {
valids = Some(bitmap);
}
}
let mut count = 0;
let mut matched_idx = 0;
let mut unmatched_idx = 0;
match valids {
Some(valids) => {
valids
.iter()
.zip(hashes.iter_mut())
.for_each(|(valid, hash)| {
valids.iter().zip(hashes.iter_mut().enumerate()).for_each(
|(valid, (idx, hash))| {
if valid {
let header = self.pointers[(*hash >> self.hash_shift) as usize];
if header != 0 && early_filtering(header, *hash) {
*hash = remove_header_tag(header);
count += 1;
unsafe {
*matched_selection.get_unchecked_mut(matched_idx) = idx as u32
};
matched_idx += 1;
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) =
idx as u32
};
unmatched_idx += 1;
}
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
});
},
);
}
None => {
hashes.iter_mut().for_each(|hash| {
hashes.iter_mut().enumerate().for_each(|(idx, hash)| {
let header = self.pointers[(*hash >> self.hash_shift) as usize];
if header != 0 && early_filtering(header, *hash) {
*hash = remove_header_tag(header);
count += 1;
unsafe { *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 };
matched_idx += 1;
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
});
}
}
count
(matched_idx, unmatched_idx)
}

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
fn early_filtering_probe_with_selection(
// Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes.
fn early_filtering_matched_probe(
&self,
hashes: &mut [u64],
bitmap: Option<Bitmap>,
Expand Down
58 changes: 41 additions & 17 deletions src/common/hashtable/src/hashjoin_string_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,53 +138,77 @@ where A: Allocator + Clone + 'static
count
}

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
fn early_filtering_probe(&self, hashes: &mut [u64], bitmap: Option<Bitmap>) -> usize {
// Perform early filtering probe, store matched indexes in `matched_selection` and store unmatched indexes
// in `unmatched_selection`, return the number of matched and unmatched indexes.
fn early_filtering_probe(
&self,
hashes: &mut [u64],
bitmap: Option<Bitmap>,
matched_selection: &mut [u32],
unmatched_selection: &mut [u32],
) -> (usize, usize) {
let mut valids = None;
if let Some(bitmap) = bitmap {
if bitmap.unset_bits() == bitmap.len() {
hashes.iter_mut().for_each(|hash| {
*hash = 0;
});
return 0;
unmatched_selection
.iter_mut()
.enumerate()
.for_each(|(idx, val)| {
*val = idx as u32;
});
return (0, hashes.len());
} else if bitmap.unset_bits() > 0 {
valids = Some(bitmap);
}
}
let mut count = 0;
let mut matched_idx = 0;
let mut unmatched_idx = 0;
match valids {
Some(valids) => {
hashes.iter_mut().enumerate().for_each(|(idx, hash)| {
if unsafe { valids.get_bit_unchecked(idx) } {
let header = self.pointers[(*hash >> self.hash_shift) as usize];
if header != 0 && early_filtering(header, *hash) {
*hash = remove_header_tag(header);
count += 1;
unsafe {
*matched_selection.get_unchecked_mut(matched_idx) = idx as u32
};
matched_idx += 1;
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
} else {
*hash = 0;
};
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
});
}
None => {
hashes.iter_mut().for_each(|hash| {
hashes.iter_mut().enumerate().for_each(|(idx, hash)| {
let header = self.pointers[(*hash >> self.hash_shift) as usize];
if header != 0 && early_filtering(header, *hash) {
*hash = remove_header_tag(header);
count += 1;
unsafe { *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 };
matched_idx += 1;
} else {
*hash = 0;
unsafe {
*unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32
};
unmatched_idx += 1;
}
});
}
}
count
(matched_idx, unmatched_idx)
}

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
fn early_filtering_probe_with_selection(
// Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes.
fn early_filtering_matched_probe(
&self,
hashes: &mut [u64],
bitmap: Option<Bitmap>,
Expand Down
29 changes: 13 additions & 16 deletions src/common/hashtable/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,24 +513,21 @@ pub trait HashtableLike {
pub trait HashJoinHashtableLike {
type Key: ?Sized;

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
// same with `early_filtering_probe`, but we don't use early_filter
// Probe hash table, use `hashes` to probe hash table and convert it in-place to pointers for memory reuse.
fn probe(&self, hashes: &mut [u64], bitmap: Option<Bitmap>) -> usize;

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
// 1. same with `early_filtering_probe_with_selection`, but we don't use selection to preserve the
// unfiltered indexes, we just set the filtered hashes as zero.
// 2. return the unfiltered counts.
fn early_filtering_probe(&self, hashes: &mut [u64], bitmap: Option<Bitmap>) -> usize;

// Using hashes to probe hash table and converting them in-place to pointers for memory reuse.
// we use `early_filtering_probe_with_selection` to do the first round probe.
// 1. `hashes` is the hash value of probe block's rows. we will use this one to
// do early filtering. if we can't early filter one row(at idx), we will assign pointer in
// the bucket to hashes[idx] to reuse the memory.
// 2. `selection` is used to preserved the indexes which can't be early_filtered.
// 3. return the count of preserved the indexes in `selection`
fn early_filtering_probe_with_selection(
// Perform early filtering probe, store matched indexes in `matched_selection` and store unmatched indexes
// in `unmatched_selection`, return the number of matched and unmatched indexes.
fn early_filtering_probe(
&self,
hashes: &mut [u64],
valids: Option<Bitmap>,
matched_selection: &mut [u32],
unmatched_selection: &mut [u32],
) -> (usize, usize);

// Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes.
fn early_filtering_matched_probe(
&self,
hashes: &mut [u64],
valids: Option<Bitmap>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl HashJoinProbeState {

// Probe:
// (1) INNER / RIGHT / RIGHT SINGLE / RIGHT SEMI / RIGHT ANTI / RIGHT MARK / LEFT SEMI / LEFT MARK
// prefer_early_filtering is true => early_filtering_probe_with_selection
// prefer_early_filtering is true => early_filtering_matched_probe
// prefer_early_filtering is false => probe
// (2) LEFT / LEFT SINGLE / LEFT ANTI / FULL
// prefer_early_filtering is true => early_filtering_probe
Expand All @@ -308,36 +308,42 @@ impl HashJoinProbeState {
.build_keys_accessor_and_hashes(keys_state, &mut probe_state.hashes)?;

// Perform a round of hash table probe.
if Self::check_for_selection(&self.hash_join_state.hash_join_desc.join_type) {
probe_state.selection_count = if prefer_early_filtering {
probe_state.probe_with_selection = prefer_early_filtering;
probe_state.selection_count = if !Self::need_unmatched_selection(
&self.hash_join_state.hash_join_desc.join_type,
probe_state.with_conjunction,
) {
if prefer_early_filtering {
// Early filtering, use selection to get better performance.
probe_state.probe_with_selection = true;

table.hash_table.early_filtering_probe_with_selection(
table.hash_table.early_filtering_matched_probe(
&mut probe_state.hashes,
valids,
&mut probe_state.selection,
)
} else {
// If don't do early filtering, don't use selection.
probe_state.probe_with_selection = false;

table.hash_table.probe(&mut probe_state.hashes, valids)
};
probe_state.num_keys_hash_matched += probe_state.selection_count as u64;
}
} else {
// For left join, left single join, full join and left anti join, don't use selection.
probe_state.probe_with_selection = false;

let count = if prefer_early_filtering {
table
.hash_table
.early_filtering_probe(&mut probe_state.hashes, valids)
if prefer_early_filtering {
// Early filtering, use matched selection and unmatched selection to get better performance.
let unmatched_selection =
probe_state.probe_unmatched_indexes.as_mut().unwrap();
let (matched_count, unmatched_count) =
table.hash_table.early_filtering_probe(
&mut probe_state.hashes,
valids,
&mut probe_state.selection,
unmatched_selection,
);
probe_state.probe_unmatched_indexes_count = unmatched_count;
matched_count
} else {
// If don't do early filtering, don't use selection.
table.hash_table.probe(&mut probe_state.hashes, valids)
};
probe_state.num_keys_hash_matched += count as u64;
}
}
};
probe_state.num_keys_hash_matched += probe_state.selection_count as u64;

// Continue to probe hash table and process data blocks.
self.result_blocks(&input, keys, &table.hash_table, probe_state)
Expand Down Expand Up @@ -369,19 +375,12 @@ impl HashJoinProbeState {
)
}

/// Checks if a join type can use selection.
pub fn check_for_selection(join_type: &JoinType) -> bool {
/// Checks if the join type need to use unmatched selection.
pub fn need_unmatched_selection(join_type: &JoinType, with_conjunction: bool) -> bool {
matches!(
join_type,
JoinType::Inner
| JoinType::Right
| JoinType::RightSingle
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::RightMark
| JoinType::LeftSemi
| JoinType::LeftMark
)
JoinType::Left | JoinType::LeftSingle | JoinType::Full | JoinType::LeftAnti
) && !with_conjunction
}

pub fn probe_attach(&self) -> Result<usize> {
Expand Down
Loading

0 comments on commit d0a392b

Please sign in to comment.