From e79eab5fa547701312e1733918be8cc51ad8c205 Mon Sep 17 00:00:00 2001 From: Jk Xu <54522439+Dousir9@users.noreply.github.com> Date: Fri, 2 Feb 2024 18:18:38 +0800 Subject: [PATCH] feat(query): support early filtering for more join types (#14525) * support early filtering for more join types * chore: fix need_unmatched_selection * chore: refine code * test: add more sqllogictest --- .../hashtable/src/hashjoin_hashtable.rs | 66 ++-- .../src/hashjoin_string_hashtable.rs | 58 ++- src/common/hashtable/src/traits.rs | 29 +- .../hash_join/hash_join_probe_state.rs | 61 ++-- .../hash_join/probe_join/left_anti_join.rs | 195 +++++++---- .../hash_join/probe_join/left_join.rs | 331 +++++++++++++----- .../transforms/hash_join/probe_state.rs | 52 +-- tests/sqllogictests/suites/tpch/join.test | 26 ++ 8 files changed, 555 insertions(+), 263 deletions(-) diff --git a/src/common/hashtable/src/hashjoin_hashtable.rs b/src/common/hashtable/src/hashjoin_hashtable.rs index 7c126159fa14..235200e83f7f 100644 --- a/src/common/hashtable/src/hashjoin_hashtable.rs +++ b/src/common/hashtable/src/hashjoin_hashtable.rs @@ -209,56 +209,80 @@ where count } - // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. - fn early_filtering_probe(&self, hashes: &mut [u64], bitmap: Option) -> usize { + // Perform early filtering probe, store matched indexes in `matched_selection` and store unmatched indexes + // in `unmatched_selection`, return the number of matched and unmatched indexes. + fn early_filtering_probe( + &self, + hashes: &mut [u64], + bitmap: Option, + matched_selection: &mut [u32], + unmatched_selection: &mut [u32], + ) -> (usize, usize) { let mut valids = None; if let Some(bitmap) = bitmap { if bitmap.unset_bits() == bitmap.len() { - hashes.iter_mut().for_each(|hash| { - *hash = 0; - }); - return 0; + unmatched_selection + .iter_mut() + .enumerate() + .for_each(|(idx, val)| { + *val = idx as u32; + }); + return (0, hashes.len()); } else if bitmap.unset_bits() > 0 { valids = Some(bitmap); } } - let mut count = 0; + let mut matched_idx = 0; + let mut unmatched_idx = 0; match valids { Some(valids) => { - valids - .iter() - .zip(hashes.iter_mut()) - .for_each(|(valid, hash)| { + valids.iter().zip(hashes.iter_mut().enumerate()).for_each( + |(valid, (idx, hash))| { if valid { let header = self.pointers[(*hash >> self.hash_shift) as usize]; if header != 0 && early_filtering(header, *hash) { *hash = remove_header_tag(header); - count += 1; + unsafe { + *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 + }; + matched_idx += 1; } else { - *hash = 0; + unsafe { + *unmatched_selection.get_unchecked_mut(unmatched_idx) = + idx as u32 + }; + unmatched_idx += 1; } } else { - *hash = 0; + unsafe { + *unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32 + }; + unmatched_idx += 1; } - }); + }, + ); } None => { - hashes.iter_mut().for_each(|hash| { + hashes.iter_mut().enumerate().for_each(|(idx, hash)| { let header = self.pointers[(*hash >> self.hash_shift) as usize]; if header != 0 && early_filtering(header, *hash) { *hash = remove_header_tag(header); - count += 1; + unsafe { *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 }; + matched_idx += 1; } else { - *hash = 0; + unsafe { + *unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32 + }; + unmatched_idx += 1; } }); } } - count + (matched_idx, unmatched_idx) } - // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. - fn early_filtering_probe_with_selection( + // Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes. + fn early_filtering_matched_probe( &self, hashes: &mut [u64], bitmap: Option, diff --git a/src/common/hashtable/src/hashjoin_string_hashtable.rs b/src/common/hashtable/src/hashjoin_string_hashtable.rs index c7ca141e7f03..7112d2ee68fb 100644 --- a/src/common/hashtable/src/hashjoin_string_hashtable.rs +++ b/src/common/hashtable/src/hashjoin_string_hashtable.rs @@ -138,20 +138,31 @@ where A: Allocator + Clone + 'static count } - // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. - fn early_filtering_probe(&self, hashes: &mut [u64], bitmap: Option) -> usize { + // Perform early filtering probe, store matched indexes in `matched_selection` and store unmatched indexes + // in `unmatched_selection`, return the number of matched and unmatched indexes. + fn early_filtering_probe( + &self, + hashes: &mut [u64], + bitmap: Option, + matched_selection: &mut [u32], + unmatched_selection: &mut [u32], + ) -> (usize, usize) { let mut valids = None; if let Some(bitmap) = bitmap { if bitmap.unset_bits() == bitmap.len() { - hashes.iter_mut().for_each(|hash| { - *hash = 0; - }); - return 0; + unmatched_selection + .iter_mut() + .enumerate() + .for_each(|(idx, val)| { + *val = idx as u32; + }); + return (0, hashes.len()); } else if bitmap.unset_bits() > 0 { valids = Some(bitmap); } } - let mut count = 0; + let mut matched_idx = 0; + let mut unmatched_idx = 0; match valids { Some(valids) => { hashes.iter_mut().enumerate().for_each(|(idx, hash)| { @@ -159,32 +170,45 @@ where A: Allocator + Clone + 'static let header = self.pointers[(*hash >> self.hash_shift) as usize]; if header != 0 && early_filtering(header, *hash) { *hash = remove_header_tag(header); - count += 1; + unsafe { + *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 + }; + matched_idx += 1; } else { - *hash = 0; + unsafe { + *unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32 + }; + unmatched_idx += 1; } } else { - *hash = 0; - }; + unsafe { + *unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32 + }; + unmatched_idx += 1; + } }); } None => { - hashes.iter_mut().for_each(|hash| { + hashes.iter_mut().enumerate().for_each(|(idx, hash)| { let header = self.pointers[(*hash >> self.hash_shift) as usize]; if header != 0 && early_filtering(header, *hash) { *hash = remove_header_tag(header); - count += 1; + unsafe { *matched_selection.get_unchecked_mut(matched_idx) = idx as u32 }; + matched_idx += 1; } else { - *hash = 0; + unsafe { + *unmatched_selection.get_unchecked_mut(unmatched_idx) = idx as u32 + }; + unmatched_idx += 1; } }); } } - count + (matched_idx, unmatched_idx) } - // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. - fn early_filtering_probe_with_selection( + // Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes. + fn early_filtering_matched_probe( &self, hashes: &mut [u64], bitmap: Option, diff --git a/src/common/hashtable/src/traits.rs b/src/common/hashtable/src/traits.rs index 2fdc9bf2d089..14ff77790d80 100644 --- a/src/common/hashtable/src/traits.rs +++ b/src/common/hashtable/src/traits.rs @@ -513,24 +513,21 @@ pub trait HashtableLike { pub trait HashJoinHashtableLike { type Key: ?Sized; - // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. - // same with `early_filtering_probe`, but we don't use early_filter + // Probe hash table, use `hashes` to probe hash table and convert it in-place to pointers for memory reuse. fn probe(&self, hashes: &mut [u64], bitmap: Option) -> usize; - // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. - // 1. same with `early_filtering_probe_with_selection`, but we don't use selection to preserve the - // unfiltered indexes, we just set the filtered hashes as zero. - // 2. return the unfiltered counts. - fn early_filtering_probe(&self, hashes: &mut [u64], bitmap: Option) -> usize; - - // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. - // we use `early_filtering_probe_with_selection` to do the first round probe. - // 1. `hashes` is the hash value of probe block's rows. we will use this one to - // do early filtering. if we can't early filter one row(at idx), we will assign pointer in - // the bucket to hashes[idx] to reuse the memory. - // 2. `selection` is used to preserved the indexes which can't be early_filtered. - // 3. return the count of preserved the indexes in `selection` - fn early_filtering_probe_with_selection( + // Perform early filtering probe, store matched indexes in `matched_selection` and store unmatched indexes + // in `unmatched_selection`, return the number of matched and unmatched indexes. + fn early_filtering_probe( + &self, + hashes: &mut [u64], + valids: Option, + matched_selection: &mut [u32], + unmatched_selection: &mut [u32], + ) -> (usize, usize); + + // Perform early filtering probe and store matched indexes in `selection`, return the number of matched indexes. + fn early_filtering_matched_probe( &self, hashes: &mut [u64], valids: Option, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index 8460fa1e8f86..bfb74cf90240 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -291,7 +291,7 @@ impl HashJoinProbeState { // Probe: // (1) INNER / RIGHT / RIGHT SINGLE / RIGHT SEMI / RIGHT ANTI / RIGHT MARK / LEFT SEMI / LEFT MARK - // prefer_early_filtering is true => early_filtering_probe_with_selection + // prefer_early_filtering is true => early_filtering_matched_probe // prefer_early_filtering is false => probe // (2) LEFT / LEFT SINGLE / LEFT ANTI / FULL // prefer_early_filtering is true => early_filtering_probe @@ -308,36 +308,42 @@ impl HashJoinProbeState { .build_keys_accessor_and_hashes(keys_state, &mut probe_state.hashes)?; // Perform a round of hash table probe. - if Self::check_for_selection(&self.hash_join_state.hash_join_desc.join_type) { - probe_state.selection_count = if prefer_early_filtering { + probe_state.probe_with_selection = prefer_early_filtering; + probe_state.selection_count = if !Self::need_unmatched_selection( + &self.hash_join_state.hash_join_desc.join_type, + probe_state.with_conjunction, + ) { + if prefer_early_filtering { // Early filtering, use selection to get better performance. - probe_state.probe_with_selection = true; - - table.hash_table.early_filtering_probe_with_selection( + table.hash_table.early_filtering_matched_probe( &mut probe_state.hashes, valids, &mut probe_state.selection, ) } else { // If don't do early filtering, don't use selection. - probe_state.probe_with_selection = false; - table.hash_table.probe(&mut probe_state.hashes, valids) - }; - probe_state.num_keys_hash_matched += probe_state.selection_count as u64; + } } else { - // For left join, left single join, full join and left anti join, don't use selection. - probe_state.probe_with_selection = false; - - let count = if prefer_early_filtering { - table - .hash_table - .early_filtering_probe(&mut probe_state.hashes, valids) + if prefer_early_filtering { + // Early filtering, use matched selection and unmatched selection to get better performance. + let unmatched_selection = + probe_state.probe_unmatched_indexes.as_mut().unwrap(); + let (matched_count, unmatched_count) = + table.hash_table.early_filtering_probe( + &mut probe_state.hashes, + valids, + &mut probe_state.selection, + unmatched_selection, + ); + probe_state.probe_unmatched_indexes_count = unmatched_count; + matched_count } else { + // If don't do early filtering, don't use selection. table.hash_table.probe(&mut probe_state.hashes, valids) - }; - probe_state.num_keys_hash_matched += count as u64; - } + } + }; + probe_state.num_keys_hash_matched += probe_state.selection_count as u64; // Continue to probe hash table and process data blocks. self.result_blocks(&input, keys, &table.hash_table, probe_state) @@ -369,19 +375,12 @@ impl HashJoinProbeState { ) } - /// Checks if a join type can use selection. - pub fn check_for_selection(join_type: &JoinType) -> bool { + /// Checks if the join type need to use unmatched selection. + pub fn need_unmatched_selection(join_type: &JoinType, with_conjunction: bool) -> bool { matches!( join_type, - JoinType::Inner - | JoinType::Right - | JoinType::RightSingle - | JoinType::RightSemi - | JoinType::RightAnti - | JoinType::RightMark - | JoinType::LeftSemi - | JoinType::LeftMark - ) + JoinType::Left | JoinType::LeftSingle | JoinType::Full | JoinType::LeftAnti + ) && !with_conjunction } pub fn probe_attach(&self) -> Result { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs index caea33882c97..c41f6d7f3dab 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_anti_join.rs @@ -39,39 +39,65 @@ impl HashJoinProbeState { H::Key: 'a, { // Probe states. - let mutable_indexes = &mut probe_state.mutable_indexes; - let probe_indexes = &mut mutable_indexes.probe_indexes; let pointers = probe_state.hashes.as_slice(); - // Results. - let mut matched_idx = 0; - let mut result_blocks = vec![]; - // Probe hash table and generate data blocks. - for idx in 0..input.num_rows() { - let key = unsafe { keys.key_unchecked(idx) }; - let ptr = unsafe { *pointers.get_unchecked(idx) }; - if !hash_table.next_contains(key, ptr) { - unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = idx as u32 }; - matched_idx += 1; + let result_block = if probe_state.probe_with_selection { + // Safe to unwrap. + let probe_unmatched_indexes = probe_state.probe_unmatched_indexes.as_mut().unwrap(); + let mut unmatched_idx = probe_state.probe_unmatched_indexes_count; + let selection = &probe_state.selection.as_slice()[0..probe_state.selection_count]; + for idx in selection.iter() { + let key = unsafe { keys.key_unchecked(*idx as usize) }; + let ptr = unsafe { *pointers.get_unchecked(*idx as usize) }; + if !hash_table.next_contains(key, ptr) { + unsafe { *probe_unmatched_indexes.get_unchecked_mut(unmatched_idx) = *idx }; + unmatched_idx += 1; + } } - } - if self.hash_join_state.interrupt.load(Ordering::Relaxed) { - return Err(ErrorCode::AbortedQuery( - "Aborted query, because the server is shutting down or the query was killed.", - )); - } + if self.hash_join_state.interrupt.load(Ordering::Relaxed) { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } - if matched_idx > 0 { - result_blocks.push(DataBlock::take( + DataBlock::take( input, - &probe_indexes[0..matched_idx], + &probe_unmatched_indexes[0..unmatched_idx], &mut probe_state.generation_state.string_items_buf, - )?); - } + )? + } else { + let mutable_indexes = &mut probe_state.mutable_indexes; + let probe_indexes = &mut mutable_indexes.probe_indexes; + let mut unmatched_idx = 0; + for idx in 0..input.num_rows() { + let key = unsafe { keys.key_unchecked(idx) }; + let ptr = unsafe { *pointers.get_unchecked(idx) }; + if !hash_table.next_contains(key, ptr) { + unsafe { *probe_indexes.get_unchecked_mut(unmatched_idx) = idx as u32 }; + unmatched_idx += 1; + } + } - Ok(result_blocks) + if self.hash_join_state.interrupt.load(Ordering::Relaxed) { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + + DataBlock::take( + input, + &probe_indexes[0..unmatched_idx], + &mut probe_state.generation_state.string_items_buf, + )? + }; + + if result_block.is_empty() { + Ok(vec![]) + } else { + Ok(vec![result_block]) + } } pub(crate) fn left_anti_join_with_conjunct<'a, H: HashJoinHashtableLike>( @@ -109,47 +135,94 @@ impl HashJoinProbeState { let mut result_blocks = vec![]; // Probe hash table and generate data blocks. - for idx in 0..input.num_rows() { - let key = unsafe { keys.key_unchecked(idx) }; - let ptr = unsafe { *pointers.get_unchecked(idx) }; + if probe_state.probe_with_selection { + let selection = &probe_state.selection.as_slice()[0..probe_state.selection_count]; + for idx in selection.iter() { + let key = unsafe { keys.key_unchecked(*idx as usize) }; + let ptr = unsafe { *pointers.get_unchecked(*idx as usize) }; - // Probe hash table and fill `build_indexes`. - let (mut match_count, mut incomplete_ptr) = - hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); + // Probe hash table and fill `build_indexes`. + let (mut match_count, mut incomplete_ptr) = + hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); - if match_count == 0 { - continue; - } + if match_count == 0 { + continue; + } - // Fill `probe_indexes`. - for _ in 0..match_count { - unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = idx as u32 }; - matched_idx += 1; + // Fill `probe_indexes`. + for _ in 0..match_count { + unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = *idx }; + matched_idx += 1; + } + + while matched_idx == max_block_size { + self.process_left_anti_join_block( + matched_idx, + input, + probe_indexes, + build_indexes, + &mut probe_state.generation_state, + &build_state.generation_state, + other_predicate, + &mut row_state, + )?; + matched_idx = 0; + (match_count, incomplete_ptr) = hash_table.next_probe( + key, + incomplete_ptr, + build_indexes_ptr, + matched_idx, + max_block_size, + ); + for _ in 0..match_count { + unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = *idx }; + matched_idx += 1; + } + } } + } else { + for idx in 0..input.num_rows() { + let key = unsafe { keys.key_unchecked(idx) }; + let ptr = unsafe { *pointers.get_unchecked(idx) }; + + // Probe hash table and fill `build_indexes`. + let (mut match_count, mut incomplete_ptr) = + hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); - while matched_idx == max_block_size { - self.process_left_anti_join_block( - matched_idx, - input, - probe_indexes, - build_indexes, - &mut probe_state.generation_state, - &build_state.generation_state, - other_predicate, - &mut row_state, - )?; - matched_idx = 0; - (match_count, incomplete_ptr) = hash_table.next_probe( - key, - incomplete_ptr, - build_indexes_ptr, - matched_idx, - max_block_size, - ); + if match_count == 0 { + continue; + } + + // Fill `probe_indexes`. for _ in 0..match_count { unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = idx as u32 }; matched_idx += 1; } + + while matched_idx == max_block_size { + self.process_left_anti_join_block( + matched_idx, + input, + probe_indexes, + build_indexes, + &mut probe_state.generation_state, + &build_state.generation_state, + other_predicate, + &mut row_state, + )?; + matched_idx = 0; + (match_count, incomplete_ptr) = hash_table.next_probe( + key, + incomplete_ptr, + build_indexes_ptr, + matched_idx, + max_block_size, + ); + for _ in 0..match_count { + unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = idx as u32 }; + matched_idx += 1; + } + } } } @@ -167,17 +240,17 @@ impl HashJoinProbeState { } // Find all unmatched indexes and generate the result `DataBlock`. - matched_idx = 0; + let mut unmatched_idx = 0; for (i, state) in row_state.iter().enumerate() { if !*state { - unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = i as u32 }; - matched_idx += 1; + unsafe { *probe_indexes.get_unchecked_mut(unmatched_idx) = i as u32 }; + unmatched_idx += 1; } } - if matched_idx > 0 { + if unmatched_idx > 0 { result_blocks.push(DataBlock::take( input, - &probe_indexes[0..matched_idx], + &probe_indexes[0..unmatched_idx], &mut probe_state.generation_state.string_items_buf, )?); } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index bdf26a1dc984..cbcbe54d80b1 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -63,56 +63,84 @@ impl HashJoinProbeState { let mut unmatched_idx = 0; let mut result_blocks = vec![]; - // Probe hash table and generate data blocks. - for idx in 0..input_rows { - let key = unsafe { keys.key_unchecked(idx) }; - let ptr = unsafe { *pointers.get_unchecked(idx) }; - - // Probe hash table and fill `build_indexes`. - let (mut match_count, mut incomplete_ptr) = - hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); - - let mut total_probe_matched = 0; - if match_count > 0 { - total_probe_matched += match_count; - if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle - && total_probe_matched > 1 - { - return Err(ErrorCode::Internal( - "Scalar subquery can't return more than one row", - )); + if probe_state.probe_with_selection { + unmatched_idx = probe_state.probe_unmatched_indexes_count; + let selection = &probe_state.selection.as_slice()[0..probe_state.selection_count]; + for idx in selection.iter() { + let key = unsafe { keys.key_unchecked(*idx as usize) }; + let ptr = unsafe { *pointers.get_unchecked(*idx as usize) }; + // Probe hash table and fill `build_indexes`. + let (mut match_count, mut incomplete_ptr) = + hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); + + let mut total_probe_matched = 0; + if match_count > 0 { + total_probe_matched += match_count; + if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle + && total_probe_matched > 1 + { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } + for _ in 0..match_count { + unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = *idx }; + matched_idx += 1; + } + } else { + unsafe { *probe_unmatched_indexes.get_unchecked_mut(unmatched_idx) = *idx }; + unmatched_idx += 1; } - for _ in 0..match_count { - unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = idx as u32 }; - matched_idx += 1; + + while matched_idx == max_block_size { + self.process_left_or_full_join_block( + matched_idx, + input, + probe_indexes, + build_indexes, + &mut probe_state.generation_state, + &build_state.generation_state, + outer_scan_map, + &mut result_blocks, + None, + None, + None, + )?; + matched_idx = 0; + (match_count, incomplete_ptr) = hash_table.next_probe( + key, + incomplete_ptr, + build_indexes_ptr, + matched_idx, + max_block_size, + ); + if match_count > 0 { + total_probe_matched += match_count; + if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle + && total_probe_matched > 1 + { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } + for _ in 0..match_count { + unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = *idx }; + matched_idx += 1; + } + } } - } else { - unsafe { *probe_unmatched_indexes.get_unchecked_mut(unmatched_idx) = idx as u32 }; - unmatched_idx += 1; } + } else { + // Probe hash table and generate data blocks. + for idx in 0..input_rows { + let key = unsafe { keys.key_unchecked(idx) }; + let ptr = unsafe { *pointers.get_unchecked(idx) }; - while matched_idx == max_block_size { - self.process_left_or_full_join_block( - matched_idx, - input, - probe_indexes, - build_indexes, - &mut probe_state.generation_state, - &build_state.generation_state, - outer_scan_map, - &mut result_blocks, - None, - None, - None, - )?; - matched_idx = 0; - (match_count, incomplete_ptr) = hash_table.next_probe( - key, - incomplete_ptr, - build_indexes_ptr, - matched_idx, - max_block_size, - ); + // Probe hash table and fill `build_indexes`. + let (mut match_count, mut incomplete_ptr) = + hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); + + let mut total_probe_matched = 0; if match_count > 0 { total_probe_matched += match_count; if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle @@ -126,6 +154,49 @@ impl HashJoinProbeState { unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = idx as u32 }; matched_idx += 1; } + } else { + unsafe { + *probe_unmatched_indexes.get_unchecked_mut(unmatched_idx) = idx as u32 + }; + unmatched_idx += 1; + } + + while matched_idx == max_block_size { + self.process_left_or_full_join_block( + matched_idx, + input, + probe_indexes, + build_indexes, + &mut probe_state.generation_state, + &build_state.generation_state, + outer_scan_map, + &mut result_blocks, + None, + None, + None, + )?; + matched_idx = 0; + (match_count, incomplete_ptr) = hash_table.next_probe( + key, + incomplete_ptr, + build_indexes_ptr, + matched_idx, + max_block_size, + ); + if match_count > 0 { + total_probe_matched += match_count; + if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle + && total_probe_matched > 1 + { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } + for _ in 0..match_count { + unsafe { *probe_indexes.get_unchecked_mut(matched_idx) = idx as u32 }; + matched_idx += 1; + } + } } } } @@ -200,57 +271,90 @@ impl HashJoinProbeState { let mut result_blocks = vec![]; // Probe hash table and generate data blocks. - for idx in 0..input_rows { - let key = unsafe { keys.key_unchecked(idx) }; - let ptr = unsafe { *pointers.get_unchecked(idx) }; - - // Probe hash table and fill `build_indexes`. - let (mut match_count, mut incomplete_ptr) = - hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); - // `total_probe_matched` is used to record the matched rows count for current `idx` row from probe_block - let mut total_probe_matched = 0; - if match_count > 0 { - total_probe_matched += match_count; - if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle - && total_probe_matched > 1 - { - return Err(ErrorCode::Internal( - "Scalar subquery can't return more than one row", - )); + if probe_state.probe_with_selection { + let selection = &probe_state.selection.as_slice()[0..probe_state.selection_count]; + for idx in selection.iter() { + let key = unsafe { keys.key_unchecked(*idx as usize) }; + let ptr = unsafe { *pointers.get_unchecked(*idx as usize) }; + + // Probe hash table and fill `build_indexes`. + let (mut match_count, mut incomplete_ptr) = + hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); + // `total_probe_matched` is used to record the matched rows count for current `idx` row from probe_block + let mut total_probe_matched = 0; + if match_count > 0 { + total_probe_matched += match_count; + if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle + && total_probe_matched > 1 + { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } + + unsafe { + *row_state.get_unchecked_mut(*idx as usize) += match_count; + for _ in 0..match_count { + *row_state_indexes.get_unchecked_mut(matched_idx) = *idx as usize; + *probe_indexes.get_unchecked_mut(matched_idx) = *idx; + matched_idx += 1; + } + } } - unsafe { - *row_state.get_unchecked_mut(idx) += match_count; - for _ in 0..match_count { - *row_state_indexes.get_unchecked_mut(matched_idx) = idx; - *probe_indexes.get_unchecked_mut(matched_idx) = idx as u32; - matched_idx += 1; + while matched_idx == max_block_size { + self.process_left_or_full_join_block( + matched_idx, + input, + probe_indexes, + build_indexes, + &mut probe_state.generation_state, + &build_state.generation_state, + outer_scan_map, + &mut result_blocks, + Some(other_predicate), + Some(row_state), + Some(row_state_indexes), + )?; + matched_idx = 0; + (match_count, incomplete_ptr) = hash_table.next_probe( + key, + incomplete_ptr, + build_indexes_ptr, + matched_idx, + max_block_size, + ); + if match_count > 0 { + total_probe_matched += match_count; + if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle + && total_probe_matched > 1 + { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } + + unsafe { + *row_state.get_unchecked_mut(*idx as usize) += match_count; + for _ in 0..match_count { + *row_state_indexes.get_unchecked_mut(matched_idx) = *idx as usize; + *probe_indexes.get_unchecked_mut(matched_idx) = *idx; + matched_idx += 1; + } + } } } } - - while matched_idx == max_block_size { - self.process_left_or_full_join_block( - matched_idx, - input, - probe_indexes, - build_indexes, - &mut probe_state.generation_state, - &build_state.generation_state, - outer_scan_map, - &mut result_blocks, - Some(other_predicate), - Some(row_state), - Some(row_state_indexes), - )?; - matched_idx = 0; - (match_count, incomplete_ptr) = hash_table.next_probe( - key, - incomplete_ptr, - build_indexes_ptr, - matched_idx, - max_block_size, - ); + } else { + for idx in 0..input_rows { + let key = unsafe { keys.key_unchecked(idx) }; + let ptr = unsafe { *pointers.get_unchecked(idx) }; + + // Probe hash table and fill `build_indexes`. + let (mut match_count, mut incomplete_ptr) = + hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); + // `total_probe_matched` is used to record the matched rows count for current `idx` row from probe_block + let mut total_probe_matched = 0; if match_count > 0 { total_probe_matched += match_count; if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle @@ -270,6 +374,49 @@ impl HashJoinProbeState { } } } + + while matched_idx == max_block_size { + self.process_left_or_full_join_block( + matched_idx, + input, + probe_indexes, + build_indexes, + &mut probe_state.generation_state, + &build_state.generation_state, + outer_scan_map, + &mut result_blocks, + Some(other_predicate), + Some(row_state), + Some(row_state_indexes), + )?; + matched_idx = 0; + (match_count, incomplete_ptr) = hash_table.next_probe( + key, + incomplete_ptr, + build_indexes_ptr, + matched_idx, + max_block_size, + ); + if match_count > 0 { + total_probe_matched += match_count; + if self.hash_join_state.hash_join_desc.join_type == JoinType::LeftSingle + && total_probe_matched > 1 + { + return Err(ErrorCode::Internal( + "Scalar subquery can't return more than one row", + )); + } + + unsafe { + *row_state.get_unchecked_mut(idx) += match_count; + for _ in 0..match_count { + *row_state_indexes.get_unchecked_mut(matched_idx) = idx; + *probe_indexes.get_unchecked_mut(matched_idx) = idx as u32; + matched_idx += 1; + } + } + } + } } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs index f4215a541e8d..8be9a294a40d 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs @@ -41,18 +41,23 @@ pub struct ProbeState { pub(crate) probe_unmatched_indexes: Option>, // The `markers` is used for right mark join. pub(crate) markers: Option>, + // If hash join other condition is not empty. + pub(crate) with_conjunction: bool, // Early filtering. // 1.The `selection` is used to store the indexes of input which matched by hash. pub(crate) selection: Vec, // 2.The indexes of [0, selection_count) in `selection` are valid. pub(crate) selection_count: usize, - // 3.Statistics for **adaptive** early filtering, the `num_keys` indicates the number of valid keys in probe side, - // the `num_keys_hash_matched` indicates the number of keys which matched by hash. + // 3.Statistics for **adaptive** early filtering, the `num_keys` indicates the number of valid keys + // in probe side, the `num_keys_hash_matched` indicates the number of keys which matched by hash. pub(crate) num_keys: u64, pub(crate) num_keys_hash_matched: u64, // 4.Whether to probe with selection. pub(crate) probe_with_selection: bool, + // 5.If join type is LEFT / LEFT SINGLE / LEFT ANTI / FULL, we use it to store unmatched indexes + // count during early filtering. + pub(crate) probe_unmatched_indexes_count: usize, } impl ProbeState { @@ -64,27 +69,28 @@ impl ProbeState { pub fn create( max_block_size: usize, join_type: &JoinType, - with_conjunct: bool, + with_conjunction: bool, has_string_column: bool, func_ctx: FunctionContext, ) -> Self { - let (row_state, row_state_indexes, probe_unmatched_indexes) = match &join_type { + let (row_state, row_state_indexes) = match &join_type { JoinType::Left | JoinType::LeftSingle | JoinType::Full => { - if with_conjunct { - ( - Some(vec![0; max_block_size]), - Some(vec![0; max_block_size]), - None, - ) + if with_conjunction { + (Some(vec![0; max_block_size]), Some(vec![0; max_block_size])) } else { - ( - Some(vec![0; max_block_size]), - None, - Some(vec![0; max_block_size]), - ) + (Some(vec![0; max_block_size]), None) } } - _ => (None, None, None), + _ => (None, None), + }; + let probe_unmatched_indexes = if matches!( + &join_type, + JoinType::Left | JoinType::LeftSingle | JoinType::Full | JoinType::LeftAnti + ) && !with_conjunction + { + Some(vec![0; max_block_size]) + } else { + None }; let markers = if matches!(&join_type, JoinType::RightMark) { Some(vec![MARKER_KIND_FALSE; max_block_size]) @@ -106,6 +112,8 @@ impl ProbeState { row_state_indexes, probe_unmatched_indexes, markers, + probe_unmatched_indexes_count: 0, + with_conjunction, } } @@ -138,17 +146,11 @@ impl MutableIndexes { } pub struct ProbeBlockGenerationState { - /// in fact, it means whether we need to output some probe blocks's columns, - /// we use probe_projections to check whether we can get a non-empty result - /// block. + // The is_probe_projected means whether we need to output probe columns. pub(crate) is_probe_projected: bool, - /// for Right/Full/RightSingle we use true_validity to reduce memory, because - /// we need to wrap probe block's all column type as nullable(if they are not). - /// But when we need to wrap this way, the validity is all true, so we use this - /// one to share the memory. + // When we need a bitmap that is all true, we can directly slice it to reduce memory usage. pub(crate) true_validity: Bitmap, - /// we use `string_items_buf` for Binary/String/Bitmap/Variant Column - /// to store the (pointer,length). So we can reuse the memory for all take. + // The string_items_buf is used as a buffer to reduce memory allocation when taking [u8] Columns. pub(crate) string_items_buf: Option>, } diff --git a/tests/sqllogictests/suites/tpch/join.test b/tests/sqllogictests/suites/tpch/join.test index 60f5edda0902..9d392679c85d 100644 --- a/tests/sqllogictests/suites/tpch/join.test +++ b/tests/sqllogictests/suites/tpch/join.test @@ -332,5 +332,31 @@ select l_orderkey from (select * from lineitem order by l_orderkey limit 5000) a 3 3 +# LEFT OUTER / LEFT SINGEL / FULL +query I +select l_orderkey, o_orderdate, o_shippriority from lineitem left join orders on l_orderkey = o_orderkey and o_orderdate < to_date('1995-03-15') order by o_orderdate limit 5; +---- +571586 1992-01-01 0 +190656 1992-01-01 0 +359170 1992-01-01 0 +414725 1992-01-01 0 +45697 1992-01-01 0 + +# LEFT ANTI +query I +select o_custkey from orders where not exists (select * from customer where substring(c_phone from 1 for 2) in ('13', '31', '23', '29', '30', '18', '17') and o_custkey = c_custkey) order by o_custkey limit 10; +---- +1 +1 +1 +1 +1 +1 +1 +1 +1 +4 + + statement ok set enable_runtime_filter = 0;