From ab7139fe983278ffd60ee7bcb6ae91888dafa7db Mon Sep 17 00:00:00 2001 From: Dousir9 <736191200@qq.com> Date: Sat, 7 Oct 2023 17:09:34 +0800 Subject: [PATCH] vectorized hash join --- .../hashtable/src/hashjoin_hashtable.rs | 77 ++++--------- .../src/hashjoin_string_hashtable.rs | 101 +++++------------- src/common/hashtable/src/traits.rs | 17 ++- src/query/expression/src/kernels/group_by.rs | 22 ++-- .../expression/src/kernels/group_by_hash.rs | 87 ++++++++++++++- .../group_by/aggregator_polymorphic_keys.rs | 7 ++ .../processors/transforms/hash_join/common.rs | 34 ------ .../hash_join/hash_join_probe_state.rs | 7 +- .../hash_join/probe_join/inner_join.rs | 24 ++--- .../hash_join/probe_join/left_join.rs | 48 ++++----- .../hash_join/probe_join/left_mark.rs | 59 +++++----- .../hash_join/probe_join/left_semi_join.rs | 36 +++---- .../hash_join/probe_join/right_anti_join.rs | 57 +++++----- .../hash_join/probe_join/right_join.rs | 28 ++--- .../hash_join/probe_join/right_mark.rs | 37 ++++--- .../hash_join/probe_join/right_semi_join.rs | 57 +++++----- .../transforms/hash_join/result_blocks.rs | 22 +++- 17 files changed, 356 insertions(+), 364 deletions(-) diff --git a/src/common/hashtable/src/hashjoin_hashtable.rs b/src/common/hashtable/src/hashjoin_hashtable.rs index 51a2efc6da9f..4993e1642ee7 100644 --- a/src/common/hashtable/src/hashjoin_hashtable.rs +++ b/src/common/hashtable/src/hashjoin_hashtable.rs @@ -52,7 +52,7 @@ pub struct RawEntry { pub struct HashJoinHashTable { pub(crate) pointers: Box<[u64], A>, pub(crate) atomic_pointers: *mut AtomicU64, - pub(crate) hash_mask: usize, + pub(crate) hash_mask: u64, pub(crate) phantom: PhantomData, } @@ -68,7 +68,7 @@ impl HashJoinHashTable { Box::new_zeroed_slice_in(capacity, Default::default()).assume_init() }, atomic_pointers: std::ptr::null_mut(), - hash_mask: capacity - 1, + hash_mask: capacity as u64 - 1, phantom: PhantomData, }; hashtable.atomic_pointers = unsafe { @@ -78,7 +78,7 @@ impl HashJoinHashTable { } pub fn insert(&mut self, key: K, raw_entry_ptr: *mut RawEntry) { - let index = key.hash() as usize & self.hash_mask; + let index = (key.hash() & self.hash_mask) as usize; // # Safety // `index` is less than the capacity of hash table. let mut head = unsafe { (*self.atomic_pointers.add(index)).load(Ordering::Relaxed) }; @@ -107,73 +107,42 @@ where { type Key = K; - fn contains(&self, key_ref: &Self::Key) -> bool { - let index = key_ref.hash() as usize & self.hash_mask; - let mut raw_entry_ptr = self.pointers[index]; - loop { - if raw_entry_ptr == 0 { - break; - } - let raw_entry = unsafe { &*(raw_entry_ptr as *mut RawEntry) }; - if key_ref == &raw_entry.key { - return true; - } - raw_entry_ptr = raw_entry.next; - } - false + // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. + fn probe(&self, hashes: &mut [u64]) { + hashes + .iter_mut() + .for_each(|hash| *hash = self.pointers[(*hash & self.hash_mask) as usize]); } - fn probe_hash_table( - &self, - key_ref: &Self::Key, - vec_ptr: *mut RowPtr, - mut occupied: usize, - capacity: usize, - ) -> (usize, u64) { - let index = key_ref.hash() as usize & self.hash_mask; - let origin = occupied; - let mut raw_entry_ptr = self.pointers[index]; + fn next_contains(&self, key: &Self::Key, mut ptr: u64) -> bool { loop { - if raw_entry_ptr == 0 || occupied >= capacity { + if ptr == 0 { break; } - let raw_entry = unsafe { &*(raw_entry_ptr as *mut RawEntry) }; - if key_ref == &raw_entry.key { - // # Safety - // occupied is less than the capacity of vec_ptr. - unsafe { - std::ptr::copy_nonoverlapping( - &raw_entry.row_ptr as *const RowPtr, - vec_ptr.add(occupied), - 1, - ) - }; - occupied += 1; + let raw_entry = unsafe { &*(ptr as *mut RawEntry) }; + if key == &raw_entry.key { + return true; } - raw_entry_ptr = raw_entry.next; - } - if occupied > origin { - (occupied - origin, raw_entry_ptr) - } else { - (0, 0) + ptr = raw_entry.next; } + false } - fn next_incomplete_ptr( + fn next_probe( &self, - key_ref: &Self::Key, - mut incomplete_ptr: u64, + key: &Self::Key, + mut ptr: u64, vec_ptr: *mut RowPtr, mut occupied: usize, capacity: usize, ) -> (usize, u64) { let origin = occupied; loop { - if incomplete_ptr == 0 || occupied >= capacity { + if ptr == 0 || occupied >= capacity { break; } - let raw_entry = unsafe { &*(incomplete_ptr as *mut RawEntry) }; - if key_ref == &raw_entry.key { + let raw_entry = unsafe { &*(ptr as *mut RawEntry) }; + if key == &raw_entry.key { // # Safety // occupied is less than the capacity of vec_ptr. unsafe { @@ -185,10 +154,10 @@ where }; occupied += 1; } - incomplete_ptr = raw_entry.next; + ptr = raw_entry.next; } if occupied > origin { - (occupied - origin, incomplete_ptr) + (occupied - origin, ptr) } else { (0, 0) } diff --git a/src/common/hashtable/src/hashjoin_string_hashtable.rs b/src/common/hashtable/src/hashjoin_string_hashtable.rs index 24a77e898ee7..3dc5cfde64cb 100644 --- a/src/common/hashtable/src/hashjoin_string_hashtable.rs +++ b/src/common/hashtable/src/hashjoin_string_hashtable.rs @@ -34,7 +34,7 @@ pub struct StringRawEntry { pub struct HashJoinStringHashTable { pub(crate) pointers: Box<[u64], A>, pub(crate) atomic_pointers: *mut AtomicU64, - pub(crate) hash_mask: usize, + pub(crate) hash_mask: u64, } unsafe impl Send for HashJoinStringHashTable {} @@ -49,7 +49,7 @@ impl HashJoinStringHashTable { Box::new_zeroed_slice_in(capacity, Default::default()).assume_init() }, atomic_pointers: std::ptr::null_mut(), - hash_mask: capacity - 1, + hash_mask: capacity as u64 - 1, }; hashtable.atomic_pointers = unsafe { std::mem::transmute::<*mut u64, *mut AtomicU64>(hashtable.pointers.as_mut_ptr()) @@ -58,7 +58,7 @@ impl HashJoinStringHashTable { } pub fn insert(&mut self, key: &[u8], raw_entry_ptr: *mut StringRawEntry) { - let index = key.fast_hash() as usize & self.hash_mask; + let index = (key.fast_hash() & self.hash_mask) as usize; // # Safety // `index` is less than the capacity of hash table. let mut head = unsafe { (*self.atomic_pointers.add(index)).load(Ordering::Relaxed) }; @@ -85,23 +85,28 @@ where A: Allocator + Clone + 'static { type Key = [u8]; - fn contains(&self, key_ref: &Self::Key) -> bool { - let index = key_ref.fast_hash() as usize & self.hash_mask; - let mut raw_entry_ptr = self.pointers[index]; + // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. + fn probe(&self, hashes: &mut [u64]) { + hashes + .iter_mut() + .for_each(|hash| *hash = self.pointers[(*hash & self.hash_mask) as usize]); + } + + fn next_contains(&self, key: &Self::Key, mut ptr: u64) -> bool { loop { - if raw_entry_ptr == 0 { + if ptr == 0 { break; } - let raw_entry = unsafe { &*(raw_entry_ptr as *mut StringRawEntry) }; + let raw_entry = unsafe { &*(ptr as *mut StringRawEntry) }; // Compare `early` and the length of the string, the size of `early` is 4. let min_len = std::cmp::min( STRING_EARLY_SIZE, - std::cmp::min(key_ref.len(), raw_entry.length as usize), + std::cmp::min(key.len(), raw_entry.length as usize), ); - if raw_entry.length as usize == key_ref.len() - && key_ref[0..min_len] == raw_entry.early[0..min_len] + if raw_entry.length as usize == key.len() + && key[0..min_len] == raw_entry.early[0..min_len] { - let key = unsafe { + let key_ref = unsafe { std::slice::from_raw_parts( raw_entry.key as *const u8, raw_entry.length as usize, @@ -111,79 +116,31 @@ where A: Allocator + Clone + 'static return true; } } - raw_entry_ptr = raw_entry.next; + ptr = raw_entry.next; } false } - fn probe_hash_table( - &self, - key_ref: &Self::Key, - vec_ptr: *mut RowPtr, - mut occupied: usize, - capacity: usize, - ) -> (usize, u64) { - let index = key_ref.fast_hash() as usize & self.hash_mask; - let origin = occupied; - let mut raw_entry_ptr = self.pointers[index]; - loop { - if raw_entry_ptr == 0 || occupied >= capacity { - break; - } - let raw_entry = unsafe { &*(raw_entry_ptr as *mut StringRawEntry) }; - // Compare `early` and the length of the string, the size of `early` is 4. - let min_len = std::cmp::min(STRING_EARLY_SIZE, key_ref.len()); - if raw_entry.length as usize == key_ref.len() - && key_ref[0..min_len] == raw_entry.early[0..min_len] - { - let key = unsafe { - std::slice::from_raw_parts( - raw_entry.key as *const u8, - raw_entry.length as usize, - ) - }; - if key == key_ref { - // # Safety - // occupied is less than the capacity of vec_ptr. - unsafe { - std::ptr::copy_nonoverlapping( - &raw_entry.row_ptr as *const RowPtr, - vec_ptr.add(occupied), - 1, - ) - }; - occupied += 1; - } - } - raw_entry_ptr = raw_entry.next; - } - if occupied > origin { - (occupied - origin, raw_entry_ptr) - } else { - (0, 0) - } - } - - fn next_incomplete_ptr( + fn next_probe( &self, - key_ref: &Self::Key, - mut incomplete_ptr: u64, + key: &Self::Key, + mut ptr: u64, vec_ptr: *mut RowPtr, mut occupied: usize, capacity: usize, ) -> (usize, u64) { let origin = occupied; loop { - if incomplete_ptr == 0 || occupied >= capacity { + if ptr == 0 || occupied >= capacity { break; } - let raw_entry = unsafe { &*(incomplete_ptr as *mut StringRawEntry) }; + let raw_entry = unsafe { &*(ptr as *mut StringRawEntry) }; // Compare `early` and the length of the string, the size of `early` is 4. - let min_len = std::cmp::min(STRING_EARLY_SIZE, key_ref.len()); - if raw_entry.length as usize == key_ref.len() - && key_ref[0..min_len] == raw_entry.early[0..min_len] + let min_len = std::cmp::min(STRING_EARLY_SIZE, key.len()); + if raw_entry.length as usize == key.len() + && key[0..min_len] == raw_entry.early[0..min_len] { - let key = unsafe { + let key_ref = unsafe { std::slice::from_raw_parts( raw_entry.key as *const u8, raw_entry.length as usize, @@ -202,10 +159,10 @@ where A: Allocator + Clone + 'static occupied += 1; } } - incomplete_ptr = raw_entry.next; + ptr = raw_entry.next; } if occupied > origin { - (occupied - origin, incomplete_ptr) + (occupied - origin, ptr) } else { (0, 0) } diff --git a/src/common/hashtable/src/traits.rs b/src/common/hashtable/src/traits.rs index 3dd71b945921..3f241bd53147 100644 --- a/src/common/hashtable/src/traits.rs +++ b/src/common/hashtable/src/traits.rs @@ -436,20 +436,15 @@ pub trait HashtableLike { pub trait HashJoinHashtableLike { type Key: ?Sized; - fn contains(&self, key_ref: &Self::Key) -> bool; + // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. + fn probe(&self, hashes: &mut [u64]); - fn probe_hash_table( - &self, - key_ref: &Self::Key, - vec_ptr: *mut RowPtr, - occupied: usize, - capacity: usize, - ) -> (usize, u64); + fn next_contains(&self, key: &Self::Key, ptr: u64) -> bool; - fn next_incomplete_ptr( + fn next_probe( &self, - key_ref: &Self::Key, - incomplete_ptr: u64, + key: &Self::Key, + ptr: u64, vec_ptr: *mut RowPtr, occupied: usize, capacity: usize, diff --git a/src/query/expression/src/kernels/group_by.rs b/src/query/expression/src/kernels/group_by.rs index 61a9c36bb05f..9db18751585d 100644 --- a/src/query/expression/src/kernels/group_by.rs +++ b/src/query/expression/src/kernels/group_by.rs @@ -49,18 +49,20 @@ impl DataBlock { hash_key_types: &[DataType], efficiently_memory: bool, ) -> Result { - if hash_key_types.len() == 1 { - let typ = hash_key_types[0].clone(); - if matches!(typ, DataType::String | DataType::Variant) { - return Ok(HashMethodKind::SingleString( - HashMethodSingleString::default(), - )); - } + if hash_key_types.len() == 1 + && matches!( + hash_key_types[0], + DataType::String | DataType::Variant | DataType::Bitmap + ) + { + return Ok(HashMethodKind::SingleString( + HashMethodSingleString::default(), + )); } let mut group_key_len = 0; - for typ in hash_key_types { - let not_null_type = typ.remove_nullable(); + for hash_key_type in hash_key_types { + let not_null_type = hash_key_type.remove_nullable(); if not_null_type.is_numeric() || not_null_type.is_date_or_date_time() @@ -69,7 +71,7 @@ impl DataBlock { group_key_len += not_null_type.numeric_byte_size().unwrap(); // extra one byte for null flag - if typ.is_nullable() { + if hash_key_type.is_nullable() { group_key_len += 1; } } else { diff --git a/src/query/expression/src/kernels/group_by_hash.rs b/src/query/expression/src/kernels/group_by_hash.rs index 344e9366f378..34bd823bf49d 100644 --- a/src/query/expression/src/kernels/group_by_hash.rs +++ b/src/query/expression/src/kernels/group_by_hash.rs @@ -83,6 +83,11 @@ pub trait HashMethod: Clone + Sync + Send + 'static { ) -> Result; fn build_keys_iter<'a>(&self, keys_state: &'a KeysState) -> Result>; + + fn build_keys_iter_and_hashes<'a>( + &self, + keys_state: &'a KeysState, + ) -> Result<(Self::HashKeyIter<'a>, Vec)>; } pub type HashMethodKeysU8 = HashMethodFixedKeys; @@ -195,10 +200,27 @@ impl HashMethod for HashMethodSingleString { Ok(KeysState::Column(group_columns[0].0.clone())) } - fn build_keys_iter<'a>(&self, key_state: &'a KeysState) -> Result> { - match key_state { - KeysState::Column(Column::String(col)) => Ok(col.iter()), - KeysState::Column(Column::Variant(col)) => Ok(col.iter()), + fn build_keys_iter<'a>(&self, keys_state: &'a KeysState) -> Result> { + match keys_state { + KeysState::Column(Column::String(col)) + | KeysState::Column(Column::Variant(col)) + | KeysState::Column(Column::Bitmap(col)) => Ok(col.iter()), + _ => unreachable!(), + } + } + + fn build_keys_iter_and_hashes<'a>( + &self, + keys_state: &'a KeysState, + ) -> Result<(Self::HashKeyIter<'a>, Vec)> { + match keys_state { + KeysState::Column(Column::String(col)) + | KeysState::Column(Column::Variant(col)) + | KeysState::Column(Column::Bitmap(col)) => { + let mut hashes = Vec::with_capacity(col.len()); + hashes.extend(col.iter().map(|key| key.fast_hash())); + Ok((col.iter(), hashes)) + } _ => unreachable!(), } } @@ -241,6 +263,20 @@ impl HashMethod for HashMethodSerializer { _ => unreachable!(), } } + + fn build_keys_iter_and_hashes<'a>( + &self, + keys_state: &'a KeysState, + ) -> Result<(Self::HashKeyIter<'a>, Vec)> { + match keys_state { + KeysState::Column(Column::String(col)) => { + let mut hashes = Vec::with_capacity(col.len()); + hashes.extend(col.iter().map(|key| key.fast_hash())); + Ok((col.iter(), hashes)) + } + _ => unreachable!(), + } + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -315,6 +351,20 @@ impl HashMethod for HashMethodDictionarySerializer { _ => unreachable!(), } } + + fn build_keys_iter_and_hashes<'a>( + &self, + keys_state: &'a KeysState, + ) -> Result<(Self::HashKeyIter<'a>, Vec)> { + match keys_state { + KeysState::Dictionary { dictionaries, .. } => { + let mut hashes = Vec::with_capacity(dictionaries.len()); + hashes.extend(dictionaries.iter().map(|key| key.fast_hash())); + Ok((dictionaries.iter(), hashes)) + } + _ => unreachable!(), + } + } } #[derive(Clone, Debug)] @@ -532,6 +582,21 @@ macro_rules! impl_hash_method_fixed_keys { other => unreachable!("{:?} -> {}", other, NumberType::<$ty>::data_type()), } } + + fn build_keys_iter_and_hashes<'a>( + &self, + keys_state: &'a KeysState, + ) -> Result<(Self::HashKeyIter<'a>, Vec)> { + use crate::types::ArgType; + match keys_state { + KeysState::Column(Column::Number(NumberColumn::$dt(col))) => { + let mut hashes = Vec::with_capacity(col.len()); + hashes.extend(col.iter().map(|key| key.fast_hash())); + Ok((col.iter(), hashes)) + } + other => unreachable!("{:?} -> {}", other, NumberType::<$ty>::data_type()), + } + } } }; } @@ -587,6 +652,20 @@ macro_rules! impl_hash_method_fixed_large_keys { _ => unreachable!(), } } + + fn build_keys_iter_and_hashes<'a>( + &self, + keys_state: &'a KeysState, + ) -> Result<(Self::HashKeyIter<'a>, Vec)> { + match keys_state { + KeysState::$name(v) => { + let mut hashes = Vec::with_capacity(v.len()); + hashes.extend(v.iter().map(|key| key.fast_hash())); + Ok((v.iter(), hashes)) + } + _ => unreachable!(), + } + } } }; } diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs index 2026218cf25d..19f96812298c 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs @@ -619,6 +619,13 @@ impl HashMethod for PartitionedHashMethod { fn build_keys_iter<'a>(&self, keys_state: &'a KeysState) -> Result> { self.method.build_keys_iter(keys_state) } + + fn build_keys_iter_and_hashes<'a>( + &self, + keys_state: &'a KeysState, + ) -> Result<(Self::HashKeyIter<'a>, Vec)> { + self.method.build_keys_iter_and_hashes(keys_state) + } } impl PolymorphicKeysHelper> for PartitionedHashMethod diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index 19e09bf62455..46e9577f3b0b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -30,8 +30,6 @@ use common_expression::Expr; use common_expression::Scalar; use common_expression::Value; use common_functions::BUILTIN_FUNCTIONS; -use common_hashtable::HashJoinHashtableLike; -use common_hashtable::RowPtr; use common_sql::executor::cast_expr_to_non_null_boolean; use super::desc::MARKER_KIND_FALSE; @@ -61,38 +59,6 @@ impl HashJoinProbeState { } } - #[inline] - pub(crate) fn contains<'a, H: HashJoinHashtableLike>( - &self, - hash_table: &'a H, - key: &'a H::Key, - valids: &Option, - i: usize, - ) -> bool { - if valids.as_ref().map_or(true, |v| v.get_bit(i)) { - return hash_table.contains(key); - } - false - } - - #[inline] - #[allow(clippy::too_many_arguments)] - pub(crate) fn probe_key<'a, H: HashJoinHashtableLike>( - &self, - hash_table: &'a H, - key: &'a H::Key, - valids: &Option, - i: usize, - vec_ptr: *mut RowPtr, - occupied: usize, - max_block_size: usize, - ) -> (usize, u64) { - if valids.as_ref().map_or(true, |v| v.get_bit(i)) { - return hash_table.probe_hash_table(key, vec_ptr, occupied, max_block_size); - } - (0, 0) - } - pub(crate) fn create_marker_block( &self, has_null: bool, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index d5d052db41d8..06786374c600 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -38,6 +38,7 @@ use common_expression::RemoteExpr; use common_expression::Scalar; use common_expression::Value; use common_functions::BUILTIN_FUNCTIONS; +use common_hashtable::HashJoinHashtableLike; use common_sql::ColumnSet; use log::info; use parking_lot::Mutex; @@ -246,11 +247,15 @@ impl HashJoinProbeState { let keys_state = table .hash_method .build_keys_state(&probe_keys, input.num_rows())?; - let keys_iter = table.hash_method.build_keys_iter(&keys_state)?; + let (keys_iter, mut hashes) = + table.hash_method.build_keys_iter_and_hashes(&keys_state)?; + // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. + table.hash_table.probe(&mut hashes); self.result_blocks( &table.hash_table, probe_state, keys_iter, + &hashes, &input, is_probe_projected, ) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs index 4a04d3879b80..b1a5ab6a58b4 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/inner_join.rs @@ -38,6 +38,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -46,7 +47,7 @@ impl HashJoinProbeState { H::Key: 'a, { let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); // The inner join will return multiple data blocks of similar size. let mut matched_num = 0; let mut result_blocks = vec![]; @@ -63,22 +64,17 @@ impl HashJoinProbeState { .is_build_projected .load(Ordering::Relaxed); - for (i, key) in keys_iter.enumerate() { + for (i, (key, ptr)) in keys_iter.zip(pointers.iter()).enumerate() { // If the join is derived from correlated subquery, then null equality is safe. let (mut match_count, mut incomplete_ptr) = - if self.hash_join_state.hash_join_desc.from_correlated_subquery { - hash_table.probe_hash_table(key, build_indexes_ptr, matched_num, max_block_size) + if self.hash_join_state.hash_join_desc.from_correlated_subquery + || valids.map_or(true, |v| v.get_bit(i)) + { + hash_table.next_probe(key, *ptr, build_indexes_ptr, matched_num, max_block_size) } else { - self.probe_key( - hash_table, - key, - valids, - i, - build_indexes_ptr, - matched_num, - max_block_size, - ) + continue; }; + if match_count == 0 { continue; } @@ -144,7 +140,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, build_indexes_ptr, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index 6d3df2a14c1c..b856aa54260e 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -36,6 +36,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -45,7 +46,7 @@ impl HashJoinProbeState { { let input_num_rows = input.num_rows(); let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); let true_validity = &probe_state.true_validity; let probe_indexes = &mut probe_state.probe_indexes; let local_build_indexes = &mut probe_state.build_indexes; @@ -68,25 +69,20 @@ impl HashJoinProbeState { .load(Ordering::Relaxed); // Start to probe hash table. - for (i, key) in keys_iter.enumerate() { + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { let (mut match_count, mut incomplete_ptr) = - if self.hash_join_state.hash_join_desc.from_correlated_subquery { - hash_table.probe_hash_table( + if self.hash_join_state.hash_join_desc.from_correlated_subquery + || valids.map_or(true, |v| v.get_bit(i)) + { + hash_table.next_probe( key, + *ptr, local_build_indexes_ptr, matched_num, max_block_size, ) } else { - self.probe_key( - hash_table, - key, - valids, - i, - local_build_indexes_ptr, - matched_num, - max_block_size, - ) + (0, 0) }; let mut total_probe_matched = 0; if match_count > 0 { @@ -217,7 +213,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, local_build_indexes_ptr, @@ -265,6 +261,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -274,7 +271,7 @@ impl HashJoinProbeState { { let input_num_rows = input.num_rows(); let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); let true_validity = &probe_state.true_validity; let probe_indexes = &mut probe_state.probe_indexes; let local_build_indexes = &mut probe_state.build_indexes; @@ -304,25 +301,20 @@ impl HashJoinProbeState { .load(Ordering::Relaxed); // Start to probe hash table. - for (i, key) in keys_iter.enumerate() { + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { let (mut match_count, mut incomplete_ptr) = - if self.hash_join_state.hash_join_desc.from_correlated_subquery { - hash_table.probe_hash_table( + if self.hash_join_state.hash_join_desc.from_correlated_subquery + || valids.map_or(true, |v| v.get_bit(i)) + { + hash_table.next_probe( key, + *ptr, local_build_indexes_ptr, matched_num, max_block_size, ) } else { - self.probe_key( - hash_table, - key, - valids, - i, - local_build_indexes_ptr, - matched_num, - max_block_size, - ) + (0, 0) }; let mut total_probe_matched = 0; if match_count > 0 { @@ -484,7 +476,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, local_build_indexes_ptr, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark.rs index b5f55ed74a47..14292d08898b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_mark.rs @@ -36,6 +36,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, ) -> Result> where @@ -43,7 +44,7 @@ impl HashJoinProbeState { H::Key: 'a, { let mut max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); // `probe_column` is the subquery result column. // For sql: select * from t1 where t1.a in (select t2.a from t2); t2.a is the `probe_column`, let probe_column = input.get_by_offset(0).value.as_column().unwrap(); @@ -64,7 +65,7 @@ impl HashJoinProbeState { // If find join partner, set the marker to true. let mark_scan_map = unsafe { &mut *self.hash_join_state.mark_scan_map.get() }; - for (i, key) in keys_iter.enumerate() { + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { if (i & max_block_size) == 0 { max_block_size <<= 1; @@ -75,27 +76,19 @@ impl HashJoinProbeState { } } - let (mut match_count, mut incomplete_ptr) = match self - .hash_join_state - .hash_join_desc - .from_correlated_subquery - { - true => { - hash_table.probe_hash_table(key, build_indexes_ptr, matched_num, max_block_size) - } - false => self.probe_key( - hash_table, - key, - valids, - i, - build_indexes_ptr, - matched_num, - max_block_size, - ), - }; + let (mut match_count, mut incomplete_ptr) = + if self.hash_join_state.hash_join_desc.from_correlated_subquery + || valids.map_or(true, |v| v.get_bit(i)) + { + hash_table.next_probe(key, *ptr, build_indexes_ptr, matched_num, max_block_size) + } else { + continue; + }; + if match_count == 0 { continue; } + matched_num += match_count; loop { for probed_row in &build_index[0..matched_num] { @@ -106,7 +99,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, build_indexes_ptr, @@ -128,6 +121,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -136,7 +130,7 @@ impl HashJoinProbeState { H::Key: 'a, { let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); // `probe_column` is the subquery result column. // For sql: select * from t1 where t1.a in (select t2.a from t2); t2.a is the `probe_column`, let probe_column = input.get_by_offset(0).value.as_column().unwrap(); @@ -176,21 +170,16 @@ impl HashJoinProbeState { let mark_scan_map = unsafe { &mut *self.hash_join_state.mark_scan_map.get() }; let _mark_scan_map_lock = self.mark_scan_map_lock.lock(); - for (i, key) in keys_iter.enumerate() { + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { let (mut match_count, mut incomplete_ptr) = - if self.hash_join_state.hash_join_desc.from_correlated_subquery { - hash_table.probe_hash_table(key, build_indexes_ptr, matched_num, max_block_size) + if self.hash_join_state.hash_join_desc.from_correlated_subquery + || valids.map_or(true, |v| v.get_bit(i)) + { + hash_table.next_probe(key, *ptr, build_indexes_ptr, matched_num, max_block_size) } else { - self.probe_key( - hash_table, - key, - valids, - i, - build_indexes_ptr, - matched_num, - max_block_size, - ) + continue; }; + if match_count == 0 { continue; } @@ -248,7 +237,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, build_indexes_ptr, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs index 33e4f36dff7e..fcda0177426d 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_semi_join.rs @@ -32,6 +32,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, ) -> Result> where @@ -41,16 +42,18 @@ impl HashJoinProbeState { // If there is no build key, the result is input // Eg: select * from onecolumn as a right semi join twocolumn as b on true order by b.x let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); let probe_indexes = &mut probe_state.probe_indexes; let mut matched_num = 0; let mut result_blocks = vec![]; - for (i, key) in keys_iter.enumerate() { - let contains = if self.hash_join_state.hash_join_desc.from_correlated_subquery { - hash_table.contains(key) + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { + let contains = if self.hash_join_state.hash_join_desc.from_correlated_subquery + || valids.map_or(true, |v| v.get_bit(i)) + { + hash_table.next_contains(key, *ptr) } else { - self.contains(hash_table, key, valids, i) + false }; match (contains, SEMI) { @@ -90,6 +93,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -98,7 +102,7 @@ impl HashJoinProbeState { H::Key: 'a, { let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); // The semi join will return multiple data chunks of similar size. let mut matched_num = 0; let mut result_blocks = vec![]; @@ -128,20 +132,14 @@ impl HashJoinProbeState { row_index: 0, }]; - for (i, key) in keys_iter.enumerate() { + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { let (mut match_count, mut incomplete_ptr) = - if self.hash_join_state.hash_join_desc.from_correlated_subquery { - hash_table.probe_hash_table(key, build_indexes_ptr, matched_num, max_block_size) + if self.hash_join_state.hash_join_desc.from_correlated_subquery + || valids.map_or(true, |v| v.get_bit(i)) + { + hash_table.next_probe(key, *ptr, build_indexes_ptr, matched_num, max_block_size) } else { - self.probe_key( - hash_table, - key, - valids, - i, - build_indexes_ptr, - matched_num, - max_block_size, - ) + (0, 0) }; let true_match_count = match_count; @@ -221,7 +219,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, build_indexes_ptr, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_anti_join.rs index fface00e13d9..6bcabc0beb6f 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_anti_join.rs @@ -29,32 +29,37 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], ) -> Result> where IT: Iterator + TrustedLen, H::Key: 'a, { let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); let mut matched_num = 0; let local_build_indexes = &mut probe_state.build_indexes; let local_build_indexes_ptr = local_build_indexes.as_mut_ptr(); let outer_scan_map = unsafe { &mut *self.hash_join_state.outer_scan_map.get() }; - for (i, key) in keys_iter.enumerate() { - let (mut match_count, mut incomplete_ptr) = self.probe_key( - hash_table, - key, - valids, - i, - local_build_indexes_ptr, - matched_num, - max_block_size, - ); + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { + let (mut match_count, mut incomplete_ptr) = if valids.map_or(true, |v| v.get_bit(i)) { + hash_table.next_probe( + key, + *ptr, + local_build_indexes_ptr, + matched_num, + max_block_size, + ) + } else { + continue; + }; + if match_count == 0 { continue; } + matched_num += match_count; if matched_num >= max_block_size { loop { @@ -74,7 +79,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, local_build_indexes_ptr, @@ -106,6 +111,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -114,7 +120,7 @@ impl HashJoinProbeState { H::Key: 'a, { let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); // The right join will return multiple data blocks of similar size. let mut matched_num = 0; let local_probe_indexes = &mut probe_state.probe_indexes; @@ -131,16 +137,19 @@ impl HashJoinProbeState { .is_build_projected .load(Ordering::Relaxed); - for (i, key) in keys_iter.enumerate() { - let (mut match_count, mut incomplete_ptr) = self.probe_key( - hash_table, - key, - valids, - i, - local_build_indexes_ptr, - matched_num, - max_block_size, - ); + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { + let (mut match_count, mut incomplete_ptr) = if valids.map_or(true, |v| v.get_bit(i)) { + hash_table.next_probe( + key, + *ptr, + local_build_indexes_ptr, + matched_num, + max_block_size, + ) + } else { + continue; + }; + if match_count == 0 { continue; } @@ -208,7 +217,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, local_build_indexes_ptr, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs index 0b491a820826..fd246e583e57 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_join.rs @@ -35,6 +35,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -43,7 +44,7 @@ impl HashJoinProbeState { H::Key: 'a, { let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); let true_validity = &probe_state.true_validity; let local_probe_indexes = &mut probe_state.probe_indexes; let local_build_indexes = &mut probe_state.build_indexes; @@ -73,16 +74,19 @@ impl HashJoinProbeState { .is_build_projected .load(Ordering::Relaxed); - for (i, key) in keys_iter.enumerate() { - let (mut match_count, mut incomplete_ptr) = self.probe_key( - hash_table, - key, - valids, - i, - local_build_indexes_ptr, - matched_num, - max_block_size, - ); + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { + let (mut match_count, mut incomplete_ptr) = if valids.map_or(true, |v| v.get_bit(i)) { + hash_table.next_probe( + key, + *ptr, + local_build_indexes_ptr, + matched_num, + max_block_size, + ) + } else { + continue; + }; + if match_count == 0 { continue; } @@ -210,7 +214,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, local_build_indexes_ptr, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark.rs index 0ccd3297fc7a..8439d649b18e 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_mark.rs @@ -35,6 +35,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -42,7 +43,7 @@ impl HashJoinProbeState { IT: Iterator + TrustedLen, H::Key: 'a, { - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); let has_null = *self .hash_join_state .hash_join_desc @@ -50,10 +51,12 @@ impl HashJoinProbeState { .has_null .read(); let markers = probe_state.markers.as_mut().unwrap(); - for (i, key) in keys_iter.enumerate() { - let contains = match self.hash_join_state.hash_join_desc.from_correlated_subquery { - true => hash_table.contains(key), - false => self.contains(hash_table, key, valids, i), + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { + let contains = match self.hash_join_state.hash_join_desc.from_correlated_subquery + || valids.map_or(true, |v| v.get_bit(i)) + { + true => hash_table.next_contains(key, *ptr), + false => false, }; if contains { @@ -80,6 +83,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -88,7 +92,7 @@ impl HashJoinProbeState { H::Key: 'a, { let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); let has_null = *self .hash_join_state .hash_join_desc @@ -125,21 +129,16 @@ impl HashJoinProbeState { .is_build_projected .load(Ordering::Relaxed); - for (i, key) in keys_iter.enumerate() { + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { let (mut match_count, mut incomplete_ptr) = - if self.hash_join_state.hash_join_desc.from_correlated_subquery { - hash_table.probe_hash_table(key, build_indexes_ptr, matched_num, max_block_size) + if self.hash_join_state.hash_join_desc.from_correlated_subquery + || valids.map_or(true, |v| v.get_bit(i)) + { + hash_table.next_probe(key, *ptr, build_indexes_ptr, matched_num, max_block_size) } else { - self.probe_key( - hash_table, - key, - valids, - i, - build_indexes_ptr, - matched_num, - max_block_size, - ) + continue; }; + if match_count == 0 { continue; } @@ -194,7 +193,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, build_indexes_ptr, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_join.rs index 418a44e0c338..1da419414037 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/right_semi_join.rs @@ -29,32 +29,37 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], ) -> Result> where IT: Iterator + TrustedLen, H::Key: 'a, { let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); // The right join will return multiple data blocks of similar size. let mut matched_num = 0; let local_build_indexes = &mut probe_state.build_indexes; let local_build_indexes_ptr = local_build_indexes.as_mut_ptr(); let outer_scan_map = unsafe { &mut *self.hash_join_state.outer_scan_map.get() }; - for (i, key) in keys_iter.enumerate() { - let (mut match_count, mut incomplete_ptr) = self.probe_key( - hash_table, - key, - valids, - i, - local_build_indexes_ptr, - matched_num, - max_block_size, - ); + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { + let (mut match_count, mut incomplete_ptr) = if valids.map_or(true, |v| v.get_bit(i)) { + hash_table.next_probe( + key, + *ptr, + local_build_indexes_ptr, + matched_num, + max_block_size, + ) + } else { + continue; + }; + if match_count == 0 { continue; } + matched_num += match_count; if matched_num >= max_block_size { loop { @@ -74,7 +79,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, local_build_indexes_ptr, @@ -106,6 +111,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -114,7 +120,7 @@ impl HashJoinProbeState { H::Key: 'a, { let max_block_size = probe_state.max_block_size; - let valids = &probe_state.valids; + let valids = probe_state.valids.as_ref(); // The right join will return multiple data blocks of similar size. let mut matched_num = 0; let local_probe_indexes = &mut probe_state.probe_indexes; @@ -131,16 +137,19 @@ impl HashJoinProbeState { .is_build_projected .load(Ordering::Relaxed); - for (i, key) in keys_iter.enumerate() { - let (mut match_count, mut incomplete_ptr) = self.probe_key( - hash_table, - key, - valids, - i, - local_build_indexes_ptr, - matched_num, - max_block_size, - ); + for (i, (key, ptr)) in keys_iter.zip(pointers).enumerate() { + let (mut match_count, mut incomplete_ptr) = if valids.map_or(true, |v| v.get_bit(i)) { + hash_table.next_probe( + key, + *ptr, + local_build_indexes_ptr, + matched_num, + max_block_size, + ) + } else { + continue; + }; + if match_count == 0 { continue; } @@ -208,7 +217,7 @@ impl HashJoinProbeState { if incomplete_ptr == 0 { break; } - (match_count, incomplete_ptr) = hash_table.next_incomplete_ptr( + (match_count, incomplete_ptr) = hash_table.next_probe( key, incomplete_ptr, local_build_indexes_ptr, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs index 3f79d218c16f..1b0c90d7b843 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs @@ -33,6 +33,7 @@ impl HashJoinProbeState { hash_table: &H, probe_state: &mut ProbeState, keys_iter: IT, + pointers: &[u64], input: &DataBlock, is_probe_projected: bool, ) -> Result> @@ -45,6 +46,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ), @@ -59,6 +61,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, ) } else { @@ -66,6 +69,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ) @@ -82,6 +86,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, ) } else { @@ -89,6 +94,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ) @@ -101,12 +107,13 @@ impl HashJoinProbeState { .other_predicate .is_none() { - self.probe_right_semi_join::<_, _>(hash_table, probe_state, keys_iter) + self.probe_right_semi_join::<_, _>(hash_table, probe_state, keys_iter, pointers) } else { self.probe_right_semi_join_with_conjunct::<_, _>( hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ) @@ -119,12 +126,13 @@ impl HashJoinProbeState { .other_predicate .is_none() { - self.probe_right_anti_join::<_, _>(hash_table, probe_state, keys_iter) + self.probe_right_anti_join::<_, _>(hash_table, probe_state, keys_iter, pointers) } else { self.probe_right_anti_join_with_conjunct::<_, _>( hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ) @@ -142,6 +150,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ) @@ -150,6 +159,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ) @@ -159,6 +169,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ), @@ -178,11 +189,14 @@ impl HashJoinProbeState { .other_predicate .is_none() { - true => self.probe_left_mark_join(hash_table, probe_state, keys_iter, input), + true => { + self.probe_left_mark_join(hash_table, probe_state, keys_iter, pointers, input) + } false => self.probe_left_mark_join_with_conjunct( hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ), @@ -197,6 +211,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ), @@ -204,6 +219,7 @@ impl HashJoinProbeState { hash_table, probe_state, keys_iter, + pointers, input, is_probe_projected, ),