diff --git a/src/common/hashtable/src/lib.rs b/src/common/hashtable/src/lib.rs index 1339d8f5c9f31..9800028411e13 100644 --- a/src/common/hashtable/src/lib.rs +++ b/src/common/hashtable/src/lib.rs @@ -23,18 +23,18 @@ extern crate core; mod container; +mod dictionary_string_hashtable; + mod hashjoin_hashtable; mod hashjoin_string_hashtable; mod hashtable; mod keys_ref; mod lookup_hashtable; -mod stack_hashtable; -mod table0; - -mod dictionary_string_hashtable; mod partitioned_hashtable; mod short_string_hashtable; +mod stack_hashtable; mod string_hashtable; +mod table0; #[allow(dead_code)] mod table1; mod table_empty; @@ -113,3 +113,5 @@ pub use partitioned_hashtable::hash2bucket; pub type HashJoinHashMap = hashjoin_hashtable::HashJoinHashTable; pub type BinaryHashJoinHashMap = hashjoin_string_hashtable::HashJoinStringHashTable; pub use traits::HashJoinHashtableLike; +pub use utils::Interval; +pub use utils::MergeIntoBlockInfoIndex; diff --git a/src/common/hashtable/src/traits.rs b/src/common/hashtable/src/traits.rs index f3337ccd3ae35..f49ad9472b8b1 100644 --- a/src/common/hashtable/src/traits.rs +++ b/src/common/hashtable/src/traits.rs @@ -13,9 +13,8 @@ // limitations under the License. // To avoid RUSTFLAGS="-C target-feature=+sse4.2" warning. -#![allow(unused_imports)] + use std::hash::BuildHasher; -use std::hash::Hasher; use std::iter::TrustedLen; use std::mem::MaybeUninit; use std::num::NonZeroU64; @@ -508,12 +507,22 @@ pub trait HashJoinHashtableLike { type Key: ?Sized; // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. + // same with `early_filtering_probe`, but we don't use early_filter fn probe(&self, hashes: &mut [u64], bitmap: Option) -> usize; // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. + // 1. same with `early_filtering_probe_with_selection`, but we don't use selection to preserve the + // unfiltered indexes, we just set the filtered hashes as zero. + // 2. return the unfiltered counts. fn early_filtering_probe(&self, hashes: &mut [u64], bitmap: Option) -> usize; // Using hashes to probe hash table and converting them in-place to pointers for memory reuse. + // we use `early_filtering_probe_with_selection` to do the first round probe. + // 1. `hashes` is the hash value of probe block's rows. we will use this one to + // do early filtering. if we can't early filter one row(at idx), we will assign pointer in + // the bucket to hashes[idx] to reuse the memory. + // 2. `selection` is used to preserved the indexes which can't be early_filtered. + // 3. return the count of preserved the indexes in `selection` fn early_filtering_probe_with_selection( &self, hashes: &mut [u64], @@ -521,8 +530,19 @@ pub trait HashJoinHashtableLike { selection: &mut [u32], ) -> usize; + // we use `next_contains` to see whether we can find a matched row in the link. + // the ptr is the link header. fn next_contains(&self, key: &Self::Key, ptr: u64) -> bool; + /// 1. `key` is the serialize build key from one row + /// 2. `ptr` pointers to the *RawEntry for of the bucket correlated to key.So before this method, + /// we will do a round probe firstly. If the ptr is zero, it means there is no correlated bucket + /// for key + /// 3. `vec_ptr` is RowPtr Array, we use this one to record the matched row in chunks + /// 4. `occupied` is the length for vec_ptr + /// 5. `capacity` is the capacity of vec_ptr + /// 6. return macthed rows count and next ptr which need to test in the future. + /// if the capacity is enough, the next ptr is zero, otherwise next ptr is valid. fn next_probe( &self, key: &Self::Key, diff --git a/src/common/hashtable/src/utils.rs b/src/common/hashtable/src/utils.rs index 083255f31129d..92cd091d1612c 100644 --- a/src/common/hashtable/src/utils.rs +++ b/src/common/hashtable/src/utils.rs @@ -185,3 +185,434 @@ pub mod sse { } } } + +// This Index is only used for target build merge into (both standalone and distributed mode). +// Advantages: +// 1. Reduces redundant I/O operations, enhancing performance. +// 2. Lowers the maintenance overhead of deduplicating row_id.(But in distributed design, we also need to give rowid) +// 3. Allows the scheduling of the subsequent mutation pipeline to be entirely allocated to not matched append operations. +// Disadvantages: +// 1. This solution is likely to be a one-time approach (especially if there are not matched insert operations involved), +// potentially leading to the target table being unsuitable for use as a build table in the future. +// 2. Requires a significant amount of memory to be efficient and currently does not support spill operations. +// for now we just support sql like below: +// `merge into t using source on xxx when matched then update xxx when not macthed then insert xxx. +// for merge into: +// we use MergeIntoBlockInfoIndex to maintain an index for the block info in chunks. + +pub struct MergeIntoBlockInfoIndex { + // the intervals will be like below: + // (0,10)(11,29),(30,38). it's ordered. + pub intervals: Vec, + prefixs: Vec, + length: usize, +} + +pub type Interval = (u32, u32); + +/// the segment blocks are not sequential,because we do parallel hashtable build. +/// the block lay out in chunks could be like belows: +/// segment0_block1 | +/// segment1_block0 | chunk0 +/// segment0_block0 | +/// +/// segment0_block3 | +/// segment1_block1 | chunk1 +/// segment2_block0 | +/// +/// ......... +impl MergeIntoBlockInfoIndex { + pub fn new_with_capacity(capacity: usize) -> Self { + MergeIntoBlockInfoIndex { + intervals: Vec::with_capacity(capacity), + prefixs: Vec::with_capacity(capacity), + length: 0, + } + } + + /// 1.interval stands for the (start,end) in chunks for one block. + /// 2.prefix is the segment_id_block_id composition. + /// we can promise the ordered insert from outside. + pub fn insert_block_offsets(&mut self, interval: Interval, prefix: u64) { + self.intervals.push(interval); + self.prefixs.push(prefix); + self.length += 1; + } + + /// we do a binary search to get the partial modified offsets + /// we will return the Interval and prefix. For example: + /// intervals: (0,10)(11,22),(23,40)(41,55) + /// interval: (8,27) + /// we will give (8,10),(23,27), we don't give the (11,12),because it's updated all. + /// case1: |-----|------|------| + /// |-----------| + /// case2: |-----|------|------| + /// |------| + /// case3: |-----|------|------| + /// |--| + /// case4: |-----|------|------| + /// |--------| + fn get_block_info(&self, interval: Interval) -> Vec<(Interval, u64)> { + let mut res = Vec::<(Interval, u64)>::with_capacity(2); + let left_idx = self.search_idx(interval.0); + let right_idx = self.search_idx(interval.1); + let left_interval = &self.intervals[left_idx]; + let right_interval = &self.intervals[right_idx]; + // empty cases + if left_interval.0 == interval.0 && right_interval.1 == interval.1 { + return res; + } + // only one result + if self.prefixs[left_idx] == self.prefixs[right_idx] { + res.push(((interval.0, interval.1), self.prefixs[left_idx])); + return res; + } + if left_interval.0 < interval.0 { + res.push(((interval.0, left_interval.1), self.prefixs[left_idx])) + } + if right_interval.1 > interval.1 { + res.push(((right_interval.0, interval.1), self.prefixs[right_idx])) + } + res + } + + /// search idx help us to find out the intervals idx which contain offset. + /// It must contain offset. + fn search_idx(&self, offset: u32) -> usize { + let mut l = 0; + let mut r = self.length - 1; + while l < r { + let mid = (l + r + 1) / 2; + if self.intervals[mid].0 <= offset { + l = mid; + } else { + r = mid - 1; + } + } + l + } + + pub fn gather_matched_all_blocks(&self, hits: &[u8]) -> Vec { + let mut res = Vec::with_capacity(10); + let mut step = 0; + while step < hits.len() { + if hits[step] == 1 { + break; + } + step += 1; + } + if step == hits.len() { + return res; + } + let mut start = step; + let mut end = step; + while start < hits.len() { + while end < hits.len() && hits[end] == 1 { + end += 1; + } + let left = self.search_idx(start as u32); + let right = self.search_idx((end - 1) as u32); + if left == right { + // macthed only one block. + if self.intervals[left].0 == (start as u32) + && self.intervals[right].1 == (end - 1) as u32 + { + res.push(self.prefixs[left]); + } + } else { + assert!(right > left); + // 1. left most side. + if self.intervals[left].0 == start as u32 { + res.push(self.prefixs[left]); + } + for idx in left + 1..right { + res.push(self.prefixs[idx]); + } + // 2. right most side. + if self.intervals[right].1 == (end - 1) as u32 { + res.push(self.prefixs[right]); + } + } + while end < hits.len() && hits[end] == 0 { + end += 1; + } + start = end; + } + res + } + + pub fn gather_all_partial_block_offsets(&self, hits: &[u8]) -> Vec<(Interval, u64)> { + let mut res = Vec::with_capacity(10); + let mut step = 0; + while step < hits.len() { + if hits[step] == 0 { + break; + } + step += 1; + } + if step == hits.len() { + return res; + } + let mut start = step; + let mut end = step; + while start < hits.len() { + while end < hits.len() && hits[end] == 0 { + end += 1; + } + res.extend(self.get_block_info((start as u32, (end - 1) as u32))); + while end < hits.len() && hits[end] == 1 { + end += 1; + } + start = end; + } + res + } + + /// return [{(Interval,prefix),(Interval,prefix)},chunk_idx] + pub fn chunk_offsets( + &self, + partial_unmodified: &Vec<(Interval, u64)>, + chunks_offsets: &Vec, + ) -> Vec<(Vec<(Interval, u64)>, u64)> { + let mut res = Vec::with_capacity(chunks_offsets.len()); + if chunks_offsets.is_empty() { + assert!(partial_unmodified.is_empty()); + } + if partial_unmodified.is_empty() || chunks_offsets.is_empty() { + return res; + } + let mut chunk_idx = 0; + let mut partial_idx = 0; + let mut offset = 0; + let mut new_chunk = true; + while chunk_idx < chunks_offsets.len() && partial_idx < partial_unmodified.len() { + // here is '<', not '<=', chunks_offsets[chunk_idx] is the count of chunks[chunk_idx] + if partial_unmodified[partial_idx].0.1 < chunks_offsets[chunk_idx] { + if new_chunk { + res.push((Vec::new(), chunk_idx as u64)); + offset = res.len() - 1; + new_chunk = false; + } + res[offset].0.push(partial_unmodified[partial_idx]) + } else { + new_chunk = true; + chunk_idx += 1; + partial_idx -= 1; + } + partial_idx += 1; + } + // check + for chunk in &res { + assert!(!chunk.0.is_empty()); + } + res + } +} + +/// we think the build blocks count is about 1024 at most time. +impl Default for MergeIntoBlockInfoIndex { + fn default() -> Self { + Self { + intervals: Vec::with_capacity(1024), + prefixs: Vec::with_capacity(1024), + length: 0, + } + } +} + +#[test] +fn test_block_info_index() { + // let's build [0,10][11,20][21,30],[31,39],and then find [10,37]. + // we should get [10,10],[31,37] + let intervals: Vec = vec![(0, 10), (11, 20), (21, 30), (31, 39)]; + let find_interval: Interval = (10, 37); + let mut block_info_index = MergeIntoBlockInfoIndex::new_with_capacity(10); + for (idx, interval) in intervals.iter().enumerate() { + block_info_index.insert_block_offsets(*interval, idx as u64) + } + let result = block_info_index.get_block_info(find_interval); + assert_eq!(result.len(), 2); + assert_eq!(result[0].0, (10, 10)); + assert_eq!(result[0].1, 0); + assert_eq!(result[1].0, (31, 37)); + assert_eq!(result[1].1, 3); + + // we find [3,7], and should get [3,7] + let find_interval: Interval = (3, 7); + let result = block_info_index.get_block_info(find_interval); + assert_eq!(result.len(), 1); + assert_eq!(result[0].0, (3, 7)); + assert_eq!(result[0].1, 0); + + // we find [11,20], and should get empty + let find_interval: Interval = (11, 20); + let result = block_info_index.get_block_info(find_interval); + assert_eq!(result.len(), 0); + + // we find [11,20], and should get empty + let find_interval: Interval = (11, 30); + let result = block_info_index.get_block_info(find_interval); + assert_eq!(result.len(), 0); + + // we find [8,13], and should get (8,10),(11,13) + let find_interval: Interval = (8, 13); + let result = block_info_index.get_block_info(find_interval); + assert_eq!(result.len(), 2); + assert_eq!(result[0].0, (8, 10)); + assert_eq!(result[0].1, 0); + assert_eq!(result[1].0, (11, 13)); + assert_eq!(result[1].1, 1); + + // we find [11,23], and should get (20,23) + let find_interval: Interval = (11, 23); + let result = block_info_index.get_block_info(find_interval); + assert_eq!(result.len(), 1); + assert_eq!(result[0].0, (21, 23)); + assert_eq!(result[0].1, 2); + + // test `gather_all_partial_block_offsets` + let mut hits = vec![0; 40]; + // [0,9][28,39] + for item in hits.iter_mut().take(27 + 1).skip(10) { + *item = 1; + } + let result = block_info_index.gather_all_partial_block_offsets(&hits); + assert_eq!(result.len(), 2); + assert_eq!(result[0].0, (0, 9)); + assert_eq!(result[0].1, 0); + assert_eq!(result[1].0, (28, 30)); + assert_eq!(result[1].1, 2); + + let mut hits = vec![0; 40]; + // [0,9] + for item in hits.iter_mut().take(30 + 1).skip(10) { + *item = 1; + } + let result = block_info_index.gather_all_partial_block_offsets(&hits); + assert_eq!(result.len(), 1); + assert_eq!(result[0].0, (0, 9)); + assert_eq!(result[0].1, 0); + + let mut hits = vec![0; 40]; + // [0,10] + for item in hits.iter_mut().take(30 + 1).skip(11) { + *item = 1; + } + let result = block_info_index.gather_all_partial_block_offsets(&hits); + assert_eq!(result.len(), 0); + + // test chunk_offsets + // blocks: [0,10][11,20][21,30],[31,39] + // chunks: [0,20],[21,39] + // chunks_offsets: [21],[40] + // partial_unmodified: [((8,10),0),((13,16),1),((33,36),3)] + let partial_unmodified = vec![((8, 10), 0), ((13, 16), 1), ((33, 36), 3)]; + let chunks_offsets = vec![21, 40]; + let res = block_info_index.chunk_offsets(&partial_unmodified, &chunks_offsets); + assert_eq!(res.len(), 2); + + assert_eq!(res[0].0.len(), 2); + assert_eq!(res[0].1, 0); // chunk_idx + assert_eq!(res[0].0[0], ((8, 10), 0)); + assert_eq!(res[0].0[1], ((13, 16), 1)); + + assert_eq!(res[1].0.len(), 1); + assert_eq!(res[1].1, 1); // chunk_idx + assert_eq!(res[1].0[0], ((33, 36), 3)); + + // test only one chunk + // blocks: [0,10][11,20][21,30],[31,39] + // chunks: [0,20],[21,39] + // chunks_offsets: [21],[40] + // partial_unmodified: [((8,10),0),((13,16),1),((33,36),3)] + let partial_unmodified = vec![((13, 16), 1)]; + let chunks_offsets = vec![21, 40]; + let res = block_info_index.chunk_offsets(&partial_unmodified, &chunks_offsets); + assert_eq!(res.len(), 1); + + assert_eq!(res[0].0.len(), 1); + assert_eq!(res[0].1, 0); // chunk_idx + assert_eq!(res[0].0[0], ((13, 16), 1)); + + // test matched all blocks + // blocks: [0,10][11,20][21,30],[31,39] + + // 1.empty + let mut hits = vec![0; 40]; + // set [11,19] + for item in hits.iter_mut().take(19 + 1).skip(11) { + *item = 1; + } + let res = block_info_index.gather_matched_all_blocks(&hits); + assert!(res.is_empty()); + + let mut hits = vec![0; 40]; + // set [13,28] + for item in hits.iter_mut().take(28 + 1).skip(13) { + *item = 1; + } + let res = block_info_index.gather_matched_all_blocks(&hits); + assert!(res.is_empty()); + + // 2.one + let mut hits = vec![0; 40]; + // set [11,20] + for item in hits.iter_mut().take(20 + 1).skip(11) { + *item = 1; + } + let res = block_info_index.gather_matched_all_blocks(&hits); + assert!(res.len() == 1 && res[0] == 1); + + let mut hits = vec![0; 40]; + // set [13,33] + for item in hits.iter_mut().take(33 + 1).skip(13) { + *item = 1; + } + let res = block_info_index.gather_matched_all_blocks(&hits); + assert!(res.len() == 1 && res[0] == 2); + + // 3.multi blocks + let mut hits = vec![0; 40]; + // set [11,30] + for item in hits.iter_mut().take(30 + 1).skip(11) { + *item = 1; + } + let res = block_info_index.gather_matched_all_blocks(&hits); + assert!(res.len() == 2 && res[0] == 1 && res[1] == 2); + + let mut hits = vec![0; 40]; + // set [10,31] + for item in hits.iter_mut().take(31 + 1).skip(11) { + *item = 1; + } + let res = block_info_index.gather_matched_all_blocks(&hits); + assert!(res.len() == 2 && res[0] == 1 && res[1] == 2); +} + +#[test] +fn test_chunk_offsets_skip_chunk() { + // test chunk_offsets + // blocks: [0,10],[11,20],[21,30],[31,39],[40,50],[51,60] + // chunks: [0,20],[21,39],[40,60] + // chunks_offsets: [21],[40],[61] + // partial_unmodified: [((8,10),0),((40,46),4),((51,55),5)] + let partial_unmodified = vec![((8, 10), 0), ((40, 46), 4), ((51, 55), 5)]; + let chunks_offsets = vec![21, 40, 61]; + let intervals: Vec = vec![(0, 10), (11, 20), (21, 30), (31, 39), (40, 50), (51, 60)]; + let mut block_info_index = MergeIntoBlockInfoIndex::new_with_capacity(10); + for (idx, interval) in intervals.iter().enumerate() { + block_info_index.insert_block_offsets(*interval, idx as u64) + } + let res = block_info_index.chunk_offsets(&partial_unmodified, &chunks_offsets); + assert_eq!(res.len(), 2); + assert_eq!(res[0].0.len(), 1); + assert_eq!(res[0].0[0].0.0, 8); + assert_eq!(res[0].0[0].0.1, 10); + + assert_eq!(res[1].0.len(), 2); + assert_eq!(res[1].0[0].0.0, 40); + assert_eq!(res[1].0[0].0.1, 46); + + assert_eq!(res[1].0[1].0.0, 51); + assert_eq!(res[1].0[1].0.1, 55); +} diff --git a/src/query/catalog/src/lib.rs b/src/query/catalog/src/lib.rs index 98c76eb544ae0..57536d2dcba8d 100644 --- a/src/query/catalog/src/lib.rs +++ b/src/query/catalog/src/lib.rs @@ -26,5 +26,6 @@ pub mod table_args; pub mod table_context; pub mod table_function; +pub mod merge_into_join; pub mod runtime_filter_info; pub mod table; diff --git a/src/query/catalog/src/merge_into_join.rs b/src/query/catalog/src/merge_into_join.rs new file mode 100644 index 0000000000000..db48f84b66a5b --- /dev/null +++ b/src/query/catalog/src/merge_into_join.rs @@ -0,0 +1,45 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[derive(Clone)] +pub enum MergeIntoJoinType { + Left, + Right, + Inner, + LeftAnti, + RightAnti, + // it means this join is not a merge into join + NormalJoin, +} + +// for now, we just support MergeIntoJoinType::Left to use MergeIntoBlockInfoHashTable in two situations: +// 1. distributed broadcast join and target table as build side. +// 2. in standalone mode and target table as build side. +// we will support Inner next, so the merge_into_join_type is only Left for current implementation in fact. +pub struct MergeIntoJoin { + pub merge_into_join_type: MergeIntoJoinType, + pub is_distributed: bool, + pub target_tbl_idx: usize, +} + +impl Default for MergeIntoJoin { + fn default() -> Self { + Self { + merge_into_join_type: MergeIntoJoinType::NormalJoin, + is_distributed: false, + // Invalid Index + target_tbl_idx: usize::MAX, + } + } +} diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index ff3d2f79a37ab..5fe305cc77e3a 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -52,6 +52,7 @@ use xorf::BinaryFuse16; use crate::catalog::Catalog; use crate::cluster_info::Cluster; +use crate::merge_into_join::MergeIntoJoin; use crate::plan::DataSourcePlan; use crate::plan::PartInfoPtr; use crate::plan::Partitions; @@ -245,6 +246,10 @@ pub trait TableContext: Send + Sync { fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo)); + fn set_merge_into_join(&self, join: MergeIntoJoin); + + fn get_merge_into_join(&self) -> MergeIntoJoin; + fn get_bloom_runtime_filter_with_id(&self, id: usize) -> Vec<(String, BinaryFuse16)>; fn get_inlist_runtime_filter_with_id(&self, id: usize) -> Vec>; diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index dfae695fb898d..930a24c5cdc56 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::u64::MAX; +use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::table::TableExt; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -50,6 +51,8 @@ use databend_common_sql::plans::UpdatePlan; use databend_common_sql::IndexType; use databend_common_sql::ScalarExpr; use databend_common_sql::TypeCheck; +use databend_common_sql::DUMMY_COLUMN_INDEX; +use databend_common_sql::DUMMY_TABLE_INDEX; use databend_common_storages_factory::Table; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; @@ -138,8 +141,64 @@ impl MergeIntoInterpreter { merge_type, distributed, change_join_order, + split_idx, + row_id_index, .. } = &self.plan; + let mut columns_set = columns_set.clone(); + let table = self.ctx.get_table(catalog, database, table_name).await?; + let fuse_table = table.as_any().downcast_ref::().ok_or_else(|| { + ErrorCode::Unimplemented(format!( + "table {}, engine type {}, does not support MERGE INTO", + table.name(), + table.get_table_info().engine(), + )) + })?; + + // attentation!! for now we have some strategies: + // 1. target_build_optimization, this is enabled in standalone mode and in this case we don't need rowid column anymore. + // but we just support for `merge into xx using source on xxx when matched then update xxx when not matched then insert xxx`. + // 2. merge into join strategies: + // Left,Right,Inner,Left Anti, Right Anti + // important flag: + // I. change join order: if true, target table as build side, if false, source as build side. + // II. distributed: this merge into is executed at a distributed stargety. + // 2.1 Left: there are macthed and not macthed, and change join order is true. + // 2.2 Left Anti: change join order is true, but it's insert-only. + // 2.3 Inner: this is matched only case. + // 2.3.1 change join order is true, target table as build side,it's matched-only. + // 2.3.2 change join order is false, source data as build side,it's matched-only. + // 2.4 Right: change join order is false, there are macthed and not macthed + // 2.5 Right Anti: change join order is false, but it's insert-only. + // distributed execution stargeties: + // I. change join order is true, we use the `optimize_distributed_query`'s result. + // II. change join order is false and match_pattern and not enable spill, we use right outer join with rownumber distributed strategies. + // III otherwise, use `merge_into_join_sexpr` as standalone execution(so if change join order is false,but doesn't match_pattern, we don't support distributed,in fact. case I + // can take this at most time, if that's a hash shuffle, the I can take it. We think source is always very small). + + // for `target_build_optimization` we don't need to read rowId column. for now, there are two cases we don't read rowid: + // I. InsertOnly, the MergeIntoType is InsertOnly + // II. target build optimization for this pr. the MergeIntoType is MergeIntoType + let mut target_build_optimization = + matches!(self.plan.merge_type, MergeIntoType::FullOperation) + && !self.plan.columns_set.contains(&self.plan.row_id_index); + if target_build_optimization { + assert!(*change_join_order && !*distributed); + // so if `target_build_optimization` is true, it means the optimizer enable this rule. + // but we need to check if it's parquet format or native format. for now,we just support + // parquet. (we will support native in the next pr). + if fuse_table.is_native() { + target_build_optimization = false; + // and we need to add row_id back and forbidden target_build_optimization + columns_set.insert(*row_id_index); + let merge_into_join = self.ctx.get_merge_into_join(); + self.ctx.set_merge_into_join(MergeIntoJoin { + target_tbl_idx: DUMMY_TABLE_INDEX, + is_distributed: merge_into_join.is_distributed, + merge_into_join_type: merge_into_join.merge_into_join_type, + }); + } + } // check mutability let check_table = self.ctx.get_table(catalog, database, table_name).await?; @@ -174,7 +233,7 @@ impl MergeIntoInterpreter { let insert_only = matches!(merge_type, MergeIntoType::InsertOnly); - let mut row_id_idx = if !insert_only { + let mut row_id_idx = if !insert_only && !target_build_optimization { match meta_data .read() .row_id_index_by_table_index(*target_table_idx) @@ -200,11 +259,24 @@ impl MergeIntoInterpreter { } } + // we use `merge_into_split_idx` to specify a column from target table to spilt a block + // from join into macthed part and unmacthed part. + let mut merge_into_split_idx = DUMMY_COLUMN_INDEX; + if matches!(merge_type, MergeIntoType::FullOperation) { + for (idx, data_field) in join_output_schema.fields().iter().enumerate() { + if *data_field.name() == split_idx.to_string() { + merge_into_split_idx = idx; + break; + } + } + assert!(merge_into_split_idx != DUMMY_COLUMN_INDEX); + } + if *distributed && !*change_join_order { row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?); } - if !insert_only && !found_row_id { + if !target_build_optimization && !insert_only && !found_row_id { // we can't get row_id_idx, throw an exception return Err(ErrorCode::InvalidRowIdIndex( "can't get internal row_id_idx when running merge into", @@ -217,15 +289,6 @@ impl MergeIntoInterpreter { )); } - let table = self.ctx.get_table(catalog, database, &table_name).await?; - let fuse_table = table.as_any().downcast_ref::().ok_or_else(|| { - ErrorCode::Unimplemented(format!( - "table {}, engine type {}, does not support MERGE INTO", - table.name(), - table.get_table_info().engine(), - )) - })?; - let table_info = fuse_table.get_table_info().clone(); let catalog_ = self.ctx.get_catalog(catalog).await?; @@ -245,12 +308,14 @@ impl MergeIntoInterpreter { input: Box::new(rollback_join_input), row_id_idx: row_id_idx as u32, merge_type: merge_type.clone(), + merge_into_split_idx: merge_into_split_idx as u32, }) } else { PhysicalPlan::MergeIntoSource(MergeIntoSource { input: Box::new(join_input), row_id_idx: row_id_idx as u32, merge_type: merge_type.clone(), + merge_into_split_idx: merge_into_split_idx as u32, }) }; @@ -390,6 +455,7 @@ impl MergeIntoInterpreter { output_schema: DataSchemaRef::default(), merge_type: merge_type.clone(), change_join_order: *change_join_order, + target_build_optimization, })) } else { let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto { @@ -415,6 +481,7 @@ impl MergeIntoInterpreter { }, merge_type: merge_type.clone(), change_join_order: *change_join_order, + target_build_optimization: false, // we don't support for distributed mode for now.. })); // if change_join_order = true, it means the target is build side, // in this way, we will do matched operation and not matched operation diff --git a/src/query/service/src/pipelines/builders/builder_exchange.rs b/src/query/service/src/pipelines/builders/builder_exchange.rs index 733debac29d7a..66d13b358d6ab 100644 --- a/src/query/service/src/pipelines/builders/builder_exchange.rs +++ b/src/query/service/src/pipelines/builders/builder_exchange.rs @@ -29,7 +29,7 @@ impl PipelineBuilder { )?; // add sharing data self.join_state = build_res.builder_data.input_join_state; - self.probe_data_fields = build_res.builder_data.input_probe_schema; + self.merge_into_probe_data_fields = build_res.builder_data.input_probe_schema; self.main_pipeline = build_res.main_pipeline; self.pipelines.extend(build_res.sources_pipelines); diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index a415b0cd9b203..3db9373560dc0 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -125,12 +125,27 @@ impl PipelineBuilder { pub(crate) fn build_join(&mut self, join: &HashJoin) -> Result<()> { let id = join.probe.get_table_index(); - let state = self.build_join_state(join, id)?; + // for merge into target table as build side. + let (merge_into_build_table_index, merge_into_is_distributed) = + self.merge_into_get_optimization_flag(join); + + let state = self.build_join_state( + join, + id, + merge_into_build_table_index, + merge_into_is_distributed, + )?; self.expand_build_side_pipeline(&join.build, join, state.clone())?; self.build_join_probe(join, state) } - fn build_join_state(&mut self, join: &HashJoin, id: IndexType) -> Result> { + fn build_join_state( + &mut self, + join: &HashJoin, + id: IndexType, + merge_into_target_table_index: IndexType, + merge_into_is_distributed: bool, + ) -> Result> { HashJoinState::try_create( self.ctx.clone(), join.build.output_schema()?, @@ -138,6 +153,8 @@ impl PipelineBuilder { HashJoinDesc::create(join)?, &join.probe_to_build, id, + merge_into_target_table_index, + merge_into_is_distributed, ) } @@ -197,7 +214,7 @@ impl PipelineBuilder { Ok(ProcessorPtr::create(transform)) } }; - // for merge into + // for distributed merge into when source as build side. if hash_join_plan.need_hold_hash_table { self.join_state = Some(build_state.clone()) } @@ -284,7 +301,7 @@ impl PipelineBuilder { projected_probe_fields.push(field.clone()); } } - self.probe_data_fields = Some(projected_probe_fields); + self.merge_into_probe_data_fields = Some(projected_probe_fields); } Ok(()) diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 44c341d398776..0b7f554f5979a 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -134,6 +134,7 @@ impl PipelineBuilder { .ctx .build_table_by_table_info(catalog_info, table_info, None)?; let table = FuseTable::try_from_table(tbl.as_ref())?; + // case 1 if !*change_join_order { if let MergeIntoType::MatechedOnly = merge_type { @@ -141,7 +142,7 @@ impl PipelineBuilder { return Ok(()); } assert!(self.join_state.is_some()); - assert!(self.probe_data_fields.is_some()); + assert!(self.merge_into_probe_data_fields.is_some()); let join_state = self.join_state.clone().unwrap(); // split row_number and log @@ -160,7 +161,7 @@ impl PipelineBuilder { let pipe_items = vec![ ExtractHashTableByRowNumber::create( join_state, - self.probe_data_fields.clone().unwrap(), + self.merge_into_probe_data_fields.clone().unwrap(), merge_type.clone(), )? .into_pipe_item(), @@ -169,11 +170,13 @@ impl PipelineBuilder { self.main_pipeline.add_pipe(Pipe::create(2, 2, pipe_items)); // not macthed operation + let table_default_schema = &tbl.schema().remove_computed_fields(); let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( unmatched.clone(), input_schema.clone(), self.func_ctx.clone(), self.ctx.clone(), + Arc::new(DataSchema::from(table_default_schema)), )?; let pipe_items = vec![ merge_into_not_matched_processor.into_pipe_item(), @@ -308,6 +311,7 @@ impl PipelineBuilder { block_builder, io_request_semaphore, segments.clone(), + false, // we don't support for distributed mode. )?, create_dummy_item(), ])); @@ -318,14 +322,16 @@ impl PipelineBuilder { Ok(()) } + // Optimization Todo(@JackTan25): If insert only, we can reduce the target columns after join. pub(crate) fn build_merge_into_source( &mut self, merge_into_source: &MergeIntoSource, ) -> Result<()> { let MergeIntoSource { input, - row_id_idx, merge_type, + merge_into_split_idx, + .. } = merge_into_source; self.build_pipeline(input)?; @@ -340,7 +346,7 @@ impl PipelineBuilder { let output_len = self.main_pipeline.output_len(); for _ in 0..output_len { let merge_into_split_processor = - MergeIntoSplitProcessor::create(*row_id_idx, false)?; + MergeIntoSplitProcessor::create(*merge_into_split_idx, false)?; items.push(merge_into_split_processor.into_pipe_item()); } @@ -375,6 +381,7 @@ impl PipelineBuilder { } // build merge into pipeline. + // the block rows is limitd by join (65536 rows), but we don't promise the block size. pub(crate) fn build_merge_into(&mut self, merge_into: &MergeInto) -> Result<()> { let MergeInto { input, @@ -467,6 +474,7 @@ impl PipelineBuilder { field_index_of_input_schema.clone(), input.output_schema()?, Arc::new(DataSchema::from(tbl.schema())), + merge_into.target_build_optimization, )?; pipe_items.push(matched_split_processor.into_pipe_item()); } @@ -476,11 +484,13 @@ impl PipelineBuilder { // (distributed,change join order):(true,true) target is build side, we // need to support insert in local node. if !*distributed || *change_join_order { + let table_default_schema = &tbl.schema().remove_computed_fields(); let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( unmatched.clone(), input.output_schema()?, self.func_ctx.clone(), self.ctx.clone(), + Arc::new(DataSchema::from(table_default_schema)), )?; pipe_items.push(merge_into_not_matched_processor.into_pipe_item()); } else { @@ -767,6 +777,7 @@ impl PipelineBuilder { block_builder, io_request_semaphore, segments.clone(), + merge_into.target_build_optimization, )?); } } diff --git a/src/query/service/src/pipelines/builders/builder_replace_into.rs b/src/query/service/src/pipelines/builders/builder_replace_into.rs index 1c9c04e554935..9d75d6db565df 100644 --- a/src/query/service/src/pipelines/builders/builder_replace_into.rs +++ b/src/query/service/src/pipelines/builders/builder_replace_into.rs @@ -158,6 +158,7 @@ impl PipelineBuilder { return Ok(()); } + // The Block Size and Rows is promised by DataSource by user. if segment_partition_num == 0 { let dummy_item = create_dummy_item(); // ┌──────────────────────┐ ┌──────────────────┐ diff --git a/src/query/service/src/pipelines/builders/merge_into_join_optimizations.rs b/src/query/service/src/pipelines/builders/merge_into_join_optimizations.rs new file mode 100644 index 0000000000000..024aa5e38b6fd --- /dev/null +++ b/src/query/service/src/pipelines/builders/merge_into_join_optimizations.rs @@ -0,0 +1,40 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_sql::executor::physical_plans::HashJoin; +use databend_common_sql::executor::PhysicalPlan; +use databend_common_sql::IndexType; +use databend_common_sql::DUMMY_TABLE_INDEX; +use databend_common_storages_fuse::operations::need_reserve_block_info; + +use crate::pipelines::PipelineBuilder; + +impl PipelineBuilder { + pub(crate) fn merge_into_get_optimization_flag(&self, join: &HashJoin) -> (IndexType, bool) { + // for merge into target table as build side. + let (merge_into_build_table_index, merge_into_is_distributed) = + if matches!(&*join.build, PhysicalPlan::TableScan(_)) { + let (need_block_info, is_distributed) = + need_reserve_block_info(self.ctx.clone(), join.build.get_table_index()); + if need_block_info { + (join.build.get_table_index(), is_distributed) + } else { + (DUMMY_TABLE_INDEX, false) + } + } else { + (DUMMY_TABLE_INDEX, false) + }; + (merge_into_build_table_index, merge_into_is_distributed) + } +} diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index 0677001b9d457..cf1c1f33d2055 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -37,6 +37,7 @@ mod builder_udf; mod builder_union_all; mod builder_update; mod builder_window; +mod merge_into_join_optimizations; pub use builder_replace_into::ValueSource; pub use builder_sort::SortPipelineBuilder; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index d37c8f90cb5fd..d5ca7c0af5406 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -43,8 +43,8 @@ pub struct PipelineBuilder { pub pipelines: Vec, - // probe data_fields for merge into - pub probe_data_fields: Option>, + // probe data_fields for distributed merge into when source build + pub merge_into_probe_data_fields: Option>, pub join_state: Option>, // Cte -> state, each cte has it's own state @@ -74,7 +74,7 @@ impl PipelineBuilder { proc_profs: prof_span_set, exchange_injector: DefaultExchangeInjector::create(), cte_state: HashMap::new(), - probe_data_fields: None, + merge_into_probe_data_fields: None, join_state: None, } } @@ -97,7 +97,7 @@ impl PipelineBuilder { exchange_injector: self.exchange_injector, builder_data: PipelineBuilderData { input_join_state: self.join_state, - input_probe_schema: self.probe_data_fields, + input_probe_schema: self.merge_into_probe_data_fields, }, }) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/build_state.rs index 28e8538795b76..bddb87c0cf2a4 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/build_state.rs @@ -39,8 +39,10 @@ pub struct BuildBlockGenerationState { pub(crate) build_num_rows: usize, /// Data of the build side. pub(crate) chunks: Vec, + // we converted all chunks into ColumnVec for every column. pub(crate) build_columns: Vec, pub(crate) build_columns_data_type: Vec, + // after projected by build_projection, whether we still have data. pub(crate) is_build_projected: bool, } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index 1571683d8f1fd..3c53de4d70775 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -139,6 +139,7 @@ impl HashJoinProbeState { } impl HashJoinState { + /// if all cols in the same row are all null, we mark this row as null. pub(crate) fn init_markers( &self, cols: &[(Column, DataType)], diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs index 40418df4eefa3..3f7ea0c3ef1aa 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/desc.rs @@ -35,6 +35,9 @@ pub struct HashJoinDesc { pub(crate) build_keys: Vec, pub(crate) probe_keys: Vec, pub(crate) join_type: JoinType, + /// when we have non-equal conditions for hash join, + /// for example `a = b and c = d and e > f`, we will use `and_filters` + /// to wrap `e > f` as a other_predicate to do next step's check. pub(crate) other_predicate: Option, pub(crate) marker_join_desc: MarkJoinDesc, /// Whether the Join are derived from correlated subquery. diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index 82ca4110d8760..382831d941be5 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -156,6 +156,7 @@ impl HashJoinBuildState { } } let chunk_size_limit = ctx.get_settings().get_max_block_size()? as usize * 16; + Ok(Arc::new(Self { ctx: ctx.clone(), func_ctx, @@ -181,14 +182,17 @@ impl HashJoinBuildState { /// Add input `DataBlock` to `hash_join_state.row_space`. pub fn build(&self, input: DataBlock) -> Result<()> { let mut buffer = self.hash_join_state.row_space.buffer.write(); + let input_rows = input.num_rows(); - buffer.push(input); let old_size = self .hash_join_state .row_space .buffer_row_size .fetch_add(input_rows, Ordering::Relaxed); + self.merge_into_try_build_block_info_index(input.clone(), old_size); + buffer.push(input); + if old_size + input_rows < self.chunk_size_limit { return Ok(()); } @@ -227,8 +231,11 @@ impl HashJoinBuildState { if self.hash_join_state.need_mark_scan() { build_state.mark_scan_map.push(block_mark_scan_map); } + build_state.generation_state.build_num_rows += data_block.num_rows(); build_state.generation_state.chunks.push(data_block); + + self.merge_into_try_add_chunk_offset(build_state); } Ok(()) } @@ -386,6 +393,7 @@ impl HashJoinBuildState { }; let hashtable = unsafe { &mut *self.hash_join_state.hash_table.get() }; *hashtable = hashjoin_hashtable; + self.merge_into_try_generate_matched_memory(); } Ok(()) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index 10738a67e6f6d..534386ad86550 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -39,6 +39,7 @@ use databend_common_expression::Scalar; use databend_common_expression::Value; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_hashtable::HashJoinHashtableLike; +use databend_common_hashtable::Interval; use databend_common_sql::ColumnSet; use itertools::Itertools; use log::info; @@ -56,6 +57,11 @@ use crate::pipelines::processors::HashJoinState; use crate::sessions::QueryContext; use crate::sql::planner::plans::JoinType; +// ({(Interval,prefix),(Interval,repfix),...},chunk_idx) +// 1.The Interval is the partial unmodified interval offset in chunks. +// 2.Prefix is segment_idx_block_id +// 3.chunk_idx: the index of correlated chunk in chunks. +pub type MergeIntoChunkPartialUnmodified = (Vec<(Interval, u64)>, u64); /// Define some shared states for all hash join probe threads. pub struct HashJoinProbeState { pub(crate) ctx: Arc, @@ -80,6 +86,9 @@ pub struct HashJoinProbeState { /// Todo(xudong): add more detailed comments for the following fields. /// Final scan tasks pub(crate) final_scan_tasks: RwLock>, + /// for merge into target as build side. + pub(crate) merge_into_final_partial_unmodified_scan_tasks: + RwLock>, pub(crate) mark_scan_map_lock: Mutex<()>, /// Hash method pub(crate) hash_method: HashMethodKind, @@ -138,6 +147,7 @@ impl HashJoinProbeState { probe_schema, probe_projections: probe_projections.clone(), final_scan_tasks: RwLock::new(VecDeque::new()), + merge_into_final_partial_unmodified_scan_tasks: RwLock::new(VecDeque::new()), mark_scan_map_lock: Mutex::new(()), hash_method: method, spill_partitions: Default::default(), @@ -273,6 +283,7 @@ impl HashJoinProbeState { } else { input_num_rows as u64 }; + // We use the information from the probed data to predict the matching state of this probe. let prefer_early_filtering = (probe_state.num_keys_hash_matched as f64) / (probe_state.num_keys as f64) < 0.8; @@ -373,7 +384,12 @@ impl HashJoinProbeState { pub fn probe_attach(&self) -> Result { let mut worker_id = 0; - if self.hash_join_state.need_outer_scan() || self.hash_join_state.need_mark_scan() { + if self.hash_join_state.need_outer_scan() + || self.hash_join_state.need_mark_scan() + || self + .hash_join_state + .merge_into_need_target_partial_modified_scan() + { worker_id = self.probe_workers.fetch_add(1, Ordering::Relaxed); } if self.hash_join_state.enable_spill { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index dbbe0f44e7454..706de842b75b0 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -39,6 +39,7 @@ use databend_common_sql::IndexType; use ethnum::U256; use parking_lot::RwLock; +use super::merge_into_hash_join_optimization::MergeIntoState; use crate::pipelines::processors::transforms::hash_join::build_state::BuildState; use crate::pipelines::processors::transforms::hash_join::row::RowSpace; use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable; @@ -121,6 +122,8 @@ pub struct HashJoinState { /// If the join node generate runtime filters, the scan node will use it to do prune. pub(crate) table_index: IndexType, + + pub(crate) merge_into_state: Option>, } impl HashJoinState { @@ -131,6 +134,8 @@ impl HashJoinState { hash_join_desc: HashJoinDesc, probe_to_build: &[(usize, (bool, bool))], table_index: IndexType, + merge_into_target_table_index: IndexType, + merge_into_is_distributed: bool, ) -> Result> { if matches!( hash_join_desc.join_type, @@ -166,6 +171,10 @@ impl HashJoinState { partition_id: AtomicI8::new(-2), enable_spill, table_index, + merge_into_state: MergeIntoState::try_create_merge_into_state( + merge_into_target_table_index, + merge_into_is_distributed, + ), })) } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs new file mode 100644 index 0000000000000..08984a78b56ee --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/merge_into_hash_join_optimization.rs @@ -0,0 +1,347 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cell::SyncUnsafeCell; +use std::sync::atomic::AtomicU8; +use std::sync::atomic::Ordering; + +use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_catalog::plan::compute_row_id_prefix; +use databend_common_catalog::plan::split_prefix; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_hashtable::MergeIntoBlockInfoIndex; +use databend_common_hashtable::RowPtr; +use databend_common_sql::IndexType; +use databend_common_sql::DUMMY_TABLE_INDEX; +use databend_common_storages_fuse::operations::BlockMetaIndex; +use log::info; + +use super::build_state::BuildState; +use super::hash_join_probe_state::MergeIntoChunkPartialUnmodified; +use super::HashJoinBuildState; +use super::HashJoinProbeState; +use super::HashJoinState; +use super::TransformHashJoinProbe; +pub struct MatchedPtr(pub *mut AtomicU8); + +unsafe impl Send for MatchedPtr {} +unsafe impl Sync for MatchedPtr {} + +pub struct MergeIntoState { + /// for now we don't support distributed, we will support in the next pr. + #[allow(unused)] + pub(crate) merge_into_is_distributed: bool, + + /// FOR MERGE INTO TARGET TABLE AS BUILD SIDE + /// When merge into target table as build side, we should preserve block info index. + pub(crate) block_info_index: MergeIntoBlockInfoIndex, + /// we use matched to tag the matched offset in chunks. + pub(crate) matched: Vec, + /// the matched will be modified concurrently, so we use + /// atomic_pointers to pointer to matched + pub(crate) atomic_pointer: MatchedPtr, + /// chunk_offsets[chunk_idx] stands for the offset of chunk_idx_th chunk in chunks. + pub(crate) chunk_offsets: Vec, +} + +impl MergeIntoState { + pub(crate) fn try_create_merge_into_state( + merge_into_target_table_index: IndexType, + merge_into_is_distributed: bool, + ) -> Option> { + if merge_into_target_table_index != DUMMY_TABLE_INDEX { + Some(SyncUnsafeCell::new(MergeIntoState { + merge_into_is_distributed, + block_info_index: Default::default(), + matched: Vec::new(), + atomic_pointer: MatchedPtr(std::ptr::null_mut()), + chunk_offsets: Vec::with_capacity(100), + })) + } else { + None + } + } +} + +impl HashJoinBuildState { + pub(crate) fn merge_into_try_build_block_info_index(&self, input: DataBlock, old_size: usize) { + // merge into target table as build side. + if self + .hash_join_state + .merge_into_need_target_partial_modified_scan() + { + assert!(input.get_meta().is_some()); + let merge_into_state = unsafe { + &mut *self + .hash_join_state + .merge_into_state + .as_ref() + .unwrap() + .get() + }; + let build_state = unsafe { &*self.hash_join_state.build_state.get() }; + let start_offset = build_state.generation_state.build_num_rows + old_size; + let end_offset = start_offset + input.num_rows() - 1; + let block_meta_index = + BlockMetaIndex::downcast_ref_from(input.get_meta().unwrap()).unwrap(); + let row_prefix = compute_row_id_prefix( + block_meta_index.segment_idx as u64, + block_meta_index.block_idx as u64, + ); + let block_info_index = &mut merge_into_state.block_info_index; + block_info_index + .insert_block_offsets((start_offset as u32, end_offset as u32), row_prefix); + } + } + + pub(crate) fn merge_into_try_add_chunk_offset(&self, build_state: &mut BuildState) { + if self + .hash_join_state + .merge_into_need_target_partial_modified_scan() + { + let merge_into_state = unsafe { + &mut *self + .hash_join_state + .merge_into_state + .as_ref() + .unwrap() + .get() + }; + let chunk_offsets = &mut merge_into_state.chunk_offsets; + chunk_offsets.push(build_state.generation_state.build_num_rows as u32); + } + } + + pub(crate) fn merge_into_try_generate_matched_memory(&self) { + // generate macthed offsets memory. + if self + .hash_join_state + .merge_into_need_target_partial_modified_scan() + { + let merge_into_state = unsafe { + &mut *self + .hash_join_state + .merge_into_state + .as_ref() + .unwrap() + .get() + }; + let matched = &mut merge_into_state.matched; + let build_state = unsafe { &*self.hash_join_state.build_state.get() }; + let atomic_pointer = &mut merge_into_state.atomic_pointer; + *matched = vec![0; build_state.generation_state.build_num_rows]; + let pointer = + unsafe { std::mem::transmute::<*mut u8, *mut AtomicU8>(matched.as_mut_ptr()) }; + *atomic_pointer = MatchedPtr(pointer); + } + } +} + +impl HashJoinProbeState { + #[inline] + pub(crate) fn merge_into_check_and_set_matched( + &self, + build_indexes: &[RowPtr], + matched_idx: usize, + valids: &Bitmap, + ) -> Result<()> { + // merge into target table as build side. + if self + .hash_join_state + .merge_into_need_target_partial_modified_scan() + { + let merge_into_state = unsafe { + &*self + .hash_join_state + .merge_into_state + .as_ref() + .unwrap() + .get() + }; + let chunk_offsets = &merge_into_state.chunk_offsets; + + let pointer = &merge_into_state.atomic_pointer; + // add matched indexes. + for (idx, row_ptr) in build_indexes[0..matched_idx].iter().enumerate() { + unsafe { + if !valids.get_bit_unchecked(idx) { + continue; + } + } + let offset = if row_ptr.chunk_index == 0 { + row_ptr.row_index as usize + } else { + chunk_offsets[(row_ptr.chunk_index - 1) as usize] as usize + + row_ptr.row_index as usize + }; + + let mut old_mactehd_counts = + unsafe { (*pointer.0.add(offset)).load(Ordering::Relaxed) }; + let mut new_matched_count = old_mactehd_counts + 1; + loop { + if old_mactehd_counts > 0 { + return Err(ErrorCode::UnresolvableConflict( + "multi rows from source match one and the same row in the target_table multi times in probe phase", + )); + } + + let res = unsafe { + (*pointer.0.add(offset)).compare_exchange_weak( + old_mactehd_counts, + new_matched_count, + Ordering::SeqCst, + Ordering::SeqCst, + ) + }; + + match res { + Ok(_) => break, + Err(x) => { + old_mactehd_counts = x; + new_matched_count = old_mactehd_counts + 1; + } + }; + } + } + } + Ok(()) + } + + pub(crate) fn probe_merge_into_partial_modified_done(&self) -> Result<()> { + let old_count = self.probe_workers.fetch_sub(1, Ordering::Relaxed); + if old_count == 1 { + // Divide the final scan phase into multiple tasks. + self.generate_merge_into_final_scan_task()?; + } + Ok(()) + } + + pub(crate) fn generate_merge_into_final_scan_task(&self) -> Result<()> { + let merge_into_state = unsafe { + &*self + .hash_join_state + .merge_into_state + .as_ref() + .unwrap() + .get() + }; + let block_info_index = &merge_into_state.block_info_index; + let matched = &merge_into_state.matched; + let chunks_offsets = &merge_into_state.chunk_offsets; + let partial_unmodified = block_info_index.gather_all_partial_block_offsets(matched); + let all_matched_blocks = block_info_index.gather_matched_all_blocks(matched); + + // generate chunks + info!("chunk len: {}", chunks_offsets.len()); + info!("intervals len: {} ", block_info_index.intervals.len()); + info!( + "partial unmodified blocks num: {}", + partial_unmodified.len() + ); + info!( + "all_matched_blocks blocks num: {}", + all_matched_blocks.len() + ); + let mut tasks = block_info_index.chunk_offsets(&partial_unmodified, chunks_offsets); + info!("partial unmodified chunk num: {}", tasks.len()); + for prefix in all_matched_blocks { + // deleted block + tasks.push((Vec::new(), prefix)); + } + *self.merge_into_final_partial_unmodified_scan_tasks.write() = tasks.into(); + Ok(()) + } + + pub(crate) fn final_merge_into_partial_unmodified_scan_task( + &self, + ) -> Option { + let mut tasks = self.merge_into_final_partial_unmodified_scan_tasks.write(); + tasks.pop_front() + } +} + +impl HashJoinState { + pub(crate) fn merge_into_need_target_partial_modified_scan(&self) -> bool { + self.merge_into_state.is_some() + } +} + +impl TransformHashJoinProbe { + pub(crate) fn final_merge_into_partial_unmodified_scan( + &mut self, + item: MergeIntoChunkPartialUnmodified, + ) -> Result<()> { + // matched whole block, need to delete + if item.0.is_empty() { + let prefix = item.1; + let (segment_idx, block_idx) = split_prefix(prefix); + info!( + "matched whole block: segment_idx: {}, block_idx: {}", + segment_idx, block_idx + ); + let data_block = DataBlock::empty_with_meta(Box::new(BlockMetaIndex { + segment_idx: segment_idx as usize, + block_idx: block_idx as usize, + inner: None, + })); + self.output_data_blocks.push_back(data_block); + return Ok(()); + } + let merge_into_state = unsafe { + &*self + .join_probe_state + .hash_join_state + .merge_into_state + .as_ref() + .unwrap() + .get() + }; + let chunks_offsets = &merge_into_state.chunk_offsets; + let build_state = unsafe { &*self.join_probe_state.hash_join_state.build_state.get() }; + let chunk_block = &build_state.generation_state.chunks[item.1 as usize]; + let chunk_start = if item.1 == 0 { + 0 + } else { + chunks_offsets[(item.1 - 1) as usize] + }; + for (interval, prefix) in item.0 { + for start in ((interval.0 - chunk_start)..=(interval.1 - chunk_start)) + .step_by(self.max_block_size) + { + let end = (interval.1 - chunk_start).min(start + self.max_block_size as u32 - 1); + let range = (start..=end).collect::>(); + let data_block = chunk_block.take( + &range, + &mut self.probe_state.generation_state.string_items_buf, + )?; + assert!(!data_block.is_empty()); + let (segment_idx, block_idx) = split_prefix(prefix); + info!( + "matched partial block: segment_idx: {}, block_idx: {}", + segment_idx, block_idx + ); + let data_block = data_block.add_meta(Some(Box::new(BlockMetaIndex { + segment_idx: segment_idx as usize, + block_idx: block_idx as usize, + inner: None, + })))?; + self.output_data_blocks.push_back(data_block); + } + } + Ok(()) + } +} diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs index f5713ff83073f..2c7660191737f 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs @@ -19,6 +19,7 @@ mod desc; mod hash_join_build_state; mod hash_join_probe_state; mod hash_join_state; +mod merge_into_hash_join_optimization; mod probe_join; mod probe_spill; mod probe_state; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs index 42c494382cc03..bdf26a1dc984d 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_join/left_join.rs @@ -207,7 +207,7 @@ impl HashJoinProbeState { // Probe hash table and fill `build_indexes`. let (mut match_count, mut incomplete_ptr) = hash_table.next_probe(key, ptr, build_indexes_ptr, matched_idx, max_block_size); - + // `total_probe_matched` is used to record the matched rows count for current `idx` row from probe_block let mut total_probe_matched = 0; if match_count > 0 { total_probe_matched += match_count; @@ -454,6 +454,11 @@ impl HashJoinProbeState { }; } } + self.merge_into_check_and_set_matched( + build_indexes, + matched_idx, + &probe_state.true_validity, + )?; return Ok(()); } @@ -480,6 +485,11 @@ impl HashJoinProbeState { }; } } + self.merge_into_check_and_set_matched( + build_indexes, + matched_idx, + &probe_state.true_validity, + )?; } else if all_false { let mut idx = 0; while idx < matched_idx { @@ -510,6 +520,7 @@ impl HashJoinProbeState { } } else { let mut idx = 0; + self.merge_into_check_and_set_matched(build_indexes, matched_idx, &validity)?; while idx < matched_idx { unsafe { let valid = validity.get_bit_unchecked(idx); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs index c5e00e3e7ac5a..f4215a541e8df 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/probe_state.rs @@ -138,8 +138,17 @@ impl MutableIndexes { } pub struct ProbeBlockGenerationState { + /// in fact, it means whether we need to output some probe blocks's columns, + /// we use probe_projections to check whether we can get a non-empty result + /// block. pub(crate) is_probe_projected: bool, + /// for Right/Full/RightSingle we use true_validity to reduce memory, because + /// we need to wrap probe block's all column type as nullable(if they are not). + /// But when we need to wrap this way, the validity is all true, so we use this + /// one to share the memory. pub(crate) true_validity: Bitmap, + /// we use `string_items_buf` for Binary/String/Bitmap/Variant Column + /// to store the (pointer,length). So we can reuse the memory for all take. pub(crate) string_items_buf: Option>, } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs index 5cd8f3ffb2bcc..cf4af55b6b0c3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs @@ -37,7 +37,7 @@ impl HashJoinProbeState { /// non-equi-condition is subquery's child expr with subquery's output column. /// for example: select * from t1 where t1.a = ANY (select t2.a from t2 where t2.b = t1.b); [t1: a, b], [t2: a, b] /// subquery's outer columns: t1.b, and it'll derive a new column: subquery_5 when subquery cross join t1; - /// so equi-condition is t1.b = subquery_5, and non-equi-condition is t1.a = t2.a. + /// so equi-condition is t2.b = subquery_5, and non-equi-condition is t1.a = t2.a. /// 3. Correlated Exists subquery: only have one kind of join condition, equi-condition. /// equi-condition is subquery's outer columns with subquery's derived columns. (see the above example in correlated ANY subquery) pub(crate) fn result_blocks<'a, H: HashJoinHashtableLike>( diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index 732a39952431c..d7f49f74ea77b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -54,13 +54,13 @@ pub struct TransformHashJoinProbe { output_port: Arc, input_data: VecDeque, - output_data_blocks: VecDeque, + pub(crate) output_data_blocks: VecDeque, projections: ColumnSet, step: HashJoinProbeStep, step_logs: Vec, - join_probe_state: Arc, - probe_state: ProbeState, - max_block_size: usize, + pub(crate) join_probe_state: Arc, + pub(crate) probe_state: ProbeState, + pub(crate) max_block_size: usize, outer_scan_finished: bool, processor_id: usize, @@ -203,6 +203,21 @@ impl TransformHashJoinProbe { { self.join_probe_state.probe_done()?; Ok(Event::Async) + } else if self + .join_probe_state + .hash_join_state + .merge_into_need_target_partial_modified_scan() + { + assert!(matches!( + self.join_probe_state + .hash_join_state + .hash_join_desc + .join_type, + JoinType::Left + )); + self.join_probe_state + .probe_merge_into_partial_modified_done()?; + Ok(Event::Async) } else { if !self.join_probe_state.spill_partitions.read().is_empty() { self.join_probe_state.finish_final_probe()?; @@ -369,11 +384,23 @@ impl Processor for TransformHashJoinProbe { Ok(()) } HashJoinProbeStep::FinalScan => { - if let Some(task) = self.join_probe_state.final_scan_task() { + if self + .join_probe_state + .hash_join_state + .merge_into_need_target_partial_modified_scan() + { + if let Some(item) = self + .join_probe_state + .final_merge_into_partial_unmodified_scan_task() + { + self.final_merge_into_partial_unmodified_scan(item)?; + return Ok(()); + } + } else if let Some(task) = self.join_probe_state.final_scan_task() { self.final_scan(task)?; - } else { - self.outer_scan_finished = true; + return Ok(()); } + self.outer_scan_finished = true; Ok(()) } HashJoinProbeStep::FastReturn diff --git a/src/query/service/src/pipelines/processors/transforms/processor_extract_hash_table_by_row_number.rs b/src/query/service/src/pipelines/processors/transforms/processor_extract_hash_table_by_row_number.rs index 9d1cd00e1fae2..2a98320aa8a0b 100644 --- a/src/query/service/src/pipelines/processors/transforms/processor_extract_hash_table_by_row_number.rs +++ b/src/query/service/src/pipelines/processors/transforms/processor_extract_hash_table_by_row_number.rs @@ -41,7 +41,7 @@ pub struct ExtractHashTableByRowNumber { output_port: Arc, input_data: Option, output_data: Vec, - probe_data_fields: Vec, + merge_into_probe_data_fields: Vec, hashstate: Arc, // if insert only, we don't need to // fill null BlockEntries @@ -51,14 +51,14 @@ pub struct ExtractHashTableByRowNumber { impl ExtractHashTableByRowNumber { pub fn create( hashstate: Arc, - probe_data_fields: Vec, + merge_into_probe_data_fields: Vec, merge_type: MergeIntoType, ) -> Result { Ok(Self { input_port: InputPort::create(), output_port: OutputPort::create(), hashstate, - probe_data_fields, + merge_into_probe_data_fields, input_data: None, output_data: Vec::new(), merge_type, @@ -151,7 +151,7 @@ impl Processor for ExtractHashTableByRowNumber { } else { // Create null chunk for unmatched rows in probe side let mut null_block = DataBlock::new( - self.probe_data_fields + self.merge_into_probe_data_fields .iter() .map(|df| { BlockEntry::new( diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 7ce6c1a1a5750..49a1981fec0b1 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -36,6 +36,7 @@ use databend_common_base::base::tokio::task::JoinHandle; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::TrySpawn; +use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::plan::DataSourceInfo; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; @@ -919,6 +920,11 @@ impl TableContext for QueryContext { queries_profile } + fn set_merge_into_join(&self, join: MergeIntoJoin) { + let mut merge_into_join = self.shared.merge_into_join.write(); + *merge_into_join = join; + } + fn set_runtime_filter(&self, filters: (IndexType, RuntimeFilterInfo)) { let mut runtime_filters = self.shared.runtime_filters.write(); match runtime_filters.entry(filters.0) { @@ -939,6 +945,15 @@ impl TableContext for QueryContext { } } + fn get_merge_into_join(&self) -> MergeIntoJoin { + let merge_into_join = self.shared.merge_into_join.read(); + MergeIntoJoin { + merge_into_join_type: merge_into_join.merge_into_join_type.clone(), + is_distributed: merge_into_join.is_distributed, + target_tbl_idx: merge_into_join.target_tbl_idx, + } + } + fn get_bloom_runtime_filter_with_id(&self, id: IndexType) -> Vec<(String, BinaryFuse16)> { let runtime_filters = self.shared.runtime_filters.read(); match runtime_filters.get(&id) { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 8e02024525ed8..ecaea7d3471d6 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -24,6 +24,7 @@ use dashmap::DashMap; use databend_common_base::base::Progress; use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::CatalogManager; +use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; @@ -114,6 +115,9 @@ pub struct QueryContextShared { pub(in crate::sessions) query_profiles: Arc, PlanProfile>>>, pub(in crate::sessions) runtime_filters: Arc>>, + + pub(in crate::sessions) merge_into_join: Arc>, + // Records query level data cache metrics pub(in crate::sessions) query_cache_metrics: DataCacheMetrics, } @@ -162,6 +166,7 @@ impl QueryContextShared { query_cache_metrics: DataCacheMetrics::new(), query_profiles: Arc::new(RwLock::new(HashMap::new())), runtime_filters: Default::default(), + merge_into_join: Default::default(), })) } diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index 916cf062356a3..d5abe55eb88ec 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use std::any::Any; use std::collections::HashMap; use std::collections::HashSet; @@ -25,6 +24,7 @@ use databend_common_base::base::ProgressValues; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::cluster_info::Cluster; use databend_common_catalog::database::Database; +use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; @@ -753,6 +753,14 @@ impl TableContext for CtxDelegation { fn get_query_profiles(&self) -> Vec { todo!() } + fn set_merge_into_join(&self, _join: MergeIntoJoin) { + todo!() + } + + fn get_merge_into_join(&self) -> MergeIntoJoin { + todo!() + } + fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFilterInfo)) { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 38c8de66b4515..b5db2eaa35631 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -24,6 +24,7 @@ use databend_common_base::base::ProgressValues; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::cluster_info::Cluster; use databend_common_catalog::database::Database; +use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; @@ -703,6 +704,15 @@ impl TableContext for CtxDelegation { fn get_query_profiles(&self) -> Vec { todo!() } + + fn set_merge_into_join(&self, _join: MergeIntoJoin) { + todo!() + } + + fn get_merge_into_join(&self) -> MergeIntoJoin { + todo!() + } + fn set_runtime_filter(&self, _filters: (IndexType, RuntimeFilterInfo)) { todo!() } diff --git a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs index 7b177506e8e83..9c44a11d48533 100644 --- a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs +++ b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs @@ -32,6 +32,7 @@ pub struct MergeIntoSource { pub input: Box, pub row_id_idx: u32, pub merge_type: MergeIntoType, + pub merge_into_split_idx: u32, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] @@ -52,6 +53,7 @@ pub struct MergeInto { pub distributed: bool, pub merge_type: MergeIntoType, pub change_join_order: bool, + pub target_build_optimization: bool, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index 3ff2d3f8b0281..9bc0254f4547b 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -57,6 +57,7 @@ use crate::Metadata; use crate::ScalarBinder; use crate::ScalarExpr; use crate::Visibility; +use crate::DUMMY_COLUMN_INDEX; #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub enum MergeIntoType { @@ -73,8 +74,6 @@ pub enum MergeIntoType { // right outer // 3. matched only: // inner join -// we will import optimizer for these join type in the future. - impl Binder { #[allow(warnings)] #[async_backtrace::framed] @@ -391,6 +390,18 @@ impl Binder { .await?, ); } + let mut split_idx = DUMMY_COLUMN_INDEX; + // find any target table column index for merge_into_split + for column in self.metadata.read().columns() { + if column.table_index().is_some() + && *column.table_index().as_ref().unwrap() == table_index + && column.index() != column_binding.index + { + split_idx = column.index(); + break; + } + } + assert!(split_idx != DUMMY_COLUMN_INDEX); Ok(MergeInto { catalog: catalog_name.to_string(), @@ -409,6 +420,8 @@ impl Binder { merge_type, distributed: false, change_join_order: false, + row_id_index: column_binding.index, + split_idx, }) } diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index 516ee0492de60..3113315787020 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -23,6 +23,7 @@ use databend_common_expression::ROW_ID_COL_NAME; use itertools::Itertools; use crate::binder::ColumnBindingBuilder; +use crate::binder::MergeIntoType; use crate::format_scalar; use crate::optimizer::SExpr; use crate::planner::format::display_rel_operator::FormatContext; @@ -292,6 +293,16 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { table_entry.database(), table_entry.name(), )); + let target_build_optimization = matches!(merge_into.merge_type, MergeIntoType::FullOperation) + && !merge_into.columns_set.contains(&merge_into.row_id_index); + let target_build_optimization_format = FormatTreeNode::new(FormatContext::Text(format!( + "target_build_optimization: {}", + target_build_optimization + ))); + let distributed_format = FormatTreeNode::new(FormatContext::Text(format!( + "distributed: {}", + merge_into.distributed + ))); // add macthed clauses let mut matched_children = Vec::with_capacity(merge_into.matched_evaluators.len()); @@ -307,12 +318,14 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { condition_format )))); } else { - let update_format = evaluator - .update - .as_ref() - .unwrap() + let map = evaluator.update.as_ref().unwrap(); + let mut field_indexes: Vec = + map.iter().map(|(field_idx, _)| *field_idx).collect(); + field_indexes.sort(); + let update_format = field_indexes .iter() - .map(|(field_idx, expr)| { + .map(|field_idx| { + let expr = map.get(field_idx).unwrap(); format!( "{} = {}", taregt_schema.field(*field_idx).name(), @@ -351,9 +364,13 @@ fn format_merge_into(merge_into: &MergeInto) -> Result { } let s_expr = merge_into.input.as_ref(); let input_format_child = s_expr.to_format_tree(&merge_into.meta_data); - let all_children = [matched_children, unmatched_children, vec![ - input_format_child, - ]] + let all_children = [ + vec![distributed_format], + vec![target_build_optimization_format], + matched_children, + unmatched_children, + vec![input_format_child], + ] .concat(); let res = FormatTreeNode::with_children(target_table_format, all_children).format_pretty()?; Ok(format!("MergeInto:\n{res}")) diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 6fa2c49d8c23e..f5f02c7c3decd 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use databend_common_ast::ast::ExplainKind; +use databend_common_catalog::merge_into_join::MergeIntoJoin; +use databend_common_catalog::merge_into_join::MergeIntoJoinType; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -24,6 +27,7 @@ use log::info; use super::distributed::MergeSourceOptimizer; use super::format::display_memo; use super::Memo; +use crate::binder::MergeIntoType; use crate::optimizer::cascades::CascadesOptimizer; use crate::optimizer::decorrelate::decorrelate_subquery; use crate::optimizer::distributed::optimize_distributed_query; @@ -355,58 +359,116 @@ fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box) -> Resul false }; + // we just support left join to use MergeIntoBlockInfoHashTable, we + // don't support spill for now, and we need the macthed clauses' count + // is one, just support `merge into t using source when matched then + // update xx when not matched then insert xx`. + let flag = plan.matched_evaluators.len() == 1 + && plan.matched_evaluators[0].condition.is_none() + && plan.matched_evaluators[0].update.is_some() + && !opt_ctx + .table_ctx + .get_settings() + .get_enable_distributed_merge_into()?; + let mut new_columns_set = plan.columns_set.clone(); + if change_join_order + && matches!(plan.merge_type, MergeIntoType::FullOperation) + && opt_ctx + .table_ctx + .get_settings() + .get_join_spilling_threshold()? + == 0 + && flag + { + new_columns_set.remove(&plan.row_id_index); + opt_ctx.table_ctx.set_merge_into_join(MergeIntoJoin { + merge_into_join_type: MergeIntoJoinType::Left, + is_distributed: false, + target_tbl_idx: plan.target_table_idx, + }) + } // try to optimize distributed join, only if // - distributed optimization is enabled // - no local table scan // - distributed merge-into is enabled - // - join spilling is disabled if opt_ctx.enable_distributed_optimization && !contains_local_table_scan(&join_sexpr, &opt_ctx.metadata) && opt_ctx .table_ctx .get_settings() .get_enable_distributed_merge_into()? - && opt_ctx - .table_ctx - .get_settings() - .get_join_spilling_threshold()? - == 0 { + // distributed execution stargeties: + // I. change join order is true, we use the `optimize_distributed_query`'s result. + // II. change join order is false and match_pattern and not enable spill, we use right outer join with rownumber distributed strategies. + // III otherwise, use `merge_into_join_sexpr` as standalone execution(so if change join order is false,but doesn't match_pattern, we don't support distributed,in fact. case I + // can take this at most time, if that's a hash shuffle, the I can take it. We think source is always very small). // input is a Join_SExpr let mut merge_into_join_sexpr = optimize_distributed_query(opt_ctx.table_ctx.clone(), &join_sexpr)?; - // after optimize source, we need to add let merge_source_optimizer = MergeSourceOptimizer::create(); - let (optimized_distributed_merge_into_join_sexpr, distributed) = if !merge_into_join_sexpr - .match_pattern(&merge_source_optimizer.merge_source_pattern) - || change_join_order + // II. + // - join spilling is disabled + let (optimized_distributed_merge_into_join_sexpr, distributed) = if opt_ctx + .table_ctx + .get_settings() + .get_join_spilling_threshold()? + == 0 + && !change_join_order + && merge_into_join_sexpr.match_pattern(&merge_source_optimizer.merge_source_pattern) { - // we need to judge whether it'a broadcast join to support runtime filter. - merge_into_join_sexpr = try_to_change_as_broadcast_join(merge_into_join_sexpr)?; - (merge_into_join_sexpr.clone(), false) - } else { ( merge_source_optimizer.optimize(&merge_into_join_sexpr)?, true, ) + } else if change_join_order { + // I + // we need to judge whether it'a broadcast join to support runtime filter. + merge_into_join_sexpr = try_to_change_as_broadcast_join( + merge_into_join_sexpr, + change_join_order, + opt_ctx.table_ctx.clone(), + plan.as_ref(), + false, // we will open it, but for now we don't support distributed + new_columns_set.as_mut(), + )?; + ( + merge_into_join_sexpr.clone(), + matches!( + merge_into_join_sexpr.plan.as_ref(), + RelOperator::Exchange(_) + ), + ) + } else { + // III. + (merge_into_join_sexpr.clone(), false) }; Ok(Plan::MergeInto(Box::new(MergeInto { input: Box::new(optimized_distributed_merge_into_join_sexpr), distributed, change_join_order, + columns_set: new_columns_set.clone(), ..*plan }))) } else { Ok(Plan::MergeInto(Box::new(MergeInto { input: join_sexpr, change_join_order, + columns_set: new_columns_set, ..*plan }))) } } -fn try_to_change_as_broadcast_join(merge_into_join_sexpr: SExpr) -> Result { +fn try_to_change_as_broadcast_join( + merge_into_join_sexpr: SExpr, + _change_join_order: bool, + _table_ctx: Arc, + _plan: &MergeInto, + _only_one_matched_clause: bool, + _new_columns_set: &mut HashSet, +) -> Result { if let RelOperator::Exchange(Exchange::Merge) = merge_into_join_sexpr.plan.as_ref() { let right_exchange = merge_into_join_sexpr.child(0)?.child(1)?; if let RelOperator::Exchange(Exchange::Broadcast) = right_exchange.plan.as_ref() { @@ -415,6 +477,21 @@ fn try_to_change_as_broadcast_join(merge_into_join_sexpr: SExpr) -> Result Result { let part = FusePartInfo::from_part(&part)?; + self.deserialize_chunks( &part.location, part.nums_rows, diff --git a/src/query/storages/fuse/src/operations/common/mutation_log.rs b/src/query/storages/fuse/src/operations/common/mutation_log.rs index c0abe13f993d3..a87341350bfb8 100644 --- a/src/query/storages/fuse/src/operations/common/mutation_log.rs +++ b/src/query/storages/fuse/src/operations/common/mutation_log.rs @@ -18,6 +18,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfo; use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::BlockMetaInfoPtr; use databend_common_expression::DataBlock; use databend_common_pipeline_transforms::processors::AccumulatingTransform; use databend_storages_common_table_meta::meta::BlockMeta; @@ -66,10 +67,22 @@ pub enum MutationLogEntry { pub struct BlockMetaIndex { pub segment_idx: SegmentIndex, pub block_idx: BlockIndex, + pub inner: Option, // range is unused for now. // pub range: Option>, } +#[typetag::serde(name = "block_meta_index")] +impl BlockMetaInfo for BlockMetaIndex { + fn equals(&self, info: &Box) -> bool { + BlockMetaIndex::downcast_ref_from(info).is_some_and(|other| self == other) + } + + fn clone_self(&self) -> Box { + Box::new(self.clone()) + } +} + #[typetag::serde(name = "mutation_logs_meta")] impl BlockMetaInfo for MutationLogs { fn equals(&self, info: &Box) -> bool { diff --git a/src/query/storages/fuse/src/operations/merge.rs b/src/query/storages/fuse/src/operations/merge.rs index 450a3b96c2863..fc967f3e1a6bb 100644 --- a/src/query/storages/fuse/src/operations/merge.rs +++ b/src/query/storages/fuse/src/operations/merge.rs @@ -77,6 +77,7 @@ impl FuseTable { block_builder: BlockBuilder, io_request_semaphore: Arc, segment_locations: Vec<(SegmentIndex, Location)>, + target_build_optimization: bool, ) -> Result { let read_settings = ReadSettings::from_ctx(&ctx)?; let aggregator = MatchedAggregator::create( @@ -88,6 +89,7 @@ impl FuseTable { block_builder, io_request_semaphore, segment_locations, + target_build_optimization, )?; Ok(aggregator.into_pipe_item()) } diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/delete_by_expr_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/delete_by_expr_mutator.rs index e66c6c73d5085..9feecd26002cf 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/delete_by_expr_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/delete_by_expr_mutator.rs @@ -32,6 +32,8 @@ pub struct DeleteByExprMutator { row_id_idx: usize, func_ctx: FunctionContext, origin_input_columns: usize, + // if use target_build_optimization, we don't need to give row ids to `matched mutator` + target_build_optimization: bool, } impl DeleteByExprMutator { @@ -40,12 +42,14 @@ impl DeleteByExprMutator { func_ctx: FunctionContext, row_id_idx: usize, origin_input_columns: usize, + target_build_optimization: bool, ) -> Self { Self { expr, row_id_idx, func_ctx, origin_input_columns, + target_build_optimization, } } @@ -72,16 +76,20 @@ impl DeleteByExprMutator { } pub(crate) fn get_row_id_block(&self, block: DataBlock) -> DataBlock { - DataBlock::new( - vec![block.get_by_offset(self.row_id_idx).clone()], - block.num_rows(), - ) + if self.target_build_optimization { + DataBlock::empty() + } else { + DataBlock::new( + vec![block.get_by_offset(self.row_id_idx).clone()], + block.num_rows(), + ) + } } fn get_result_block( &self, predicate: &Value, - predicate_not: &Value, + predicate_not: &Value, // the rows which can be processed at this time. data_block: DataBlock, ) -> Result<(DataBlock, DataBlock)> { let res_block = data_block.clone().filter_boolean_value(predicate)?; @@ -121,6 +129,7 @@ impl DeleteByExprMutator { &self.func_ctx, data_block.num_rows(), )?; + // the rows can be processed by this time let res: Value = res.try_downcast().unwrap(); let (res_not, _) = get_not(res.clone(), &self.func_ctx, data_block.num_rows())?; diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index 035095a0e7d48..8b224560b5d4a 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -78,6 +78,8 @@ pub struct MatchedAggregator { segment_locations: AHashMap, block_mutation_row_offset: HashMap, HashSet)>, aggregation_ctx: Arc, + target_build_optimization: bool, + meta_indexes: HashSet<(SegmentIndex, BlockIndex)>, } impl MatchedAggregator { @@ -91,6 +93,7 @@ impl MatchedAggregator { block_builder: BlockBuilder, io_request_semaphore: Arc, segment_locations: Vec<(SegmentIndex, Location)>, + target_build_optimization: bool, ) -> Result { let segment_reader = MetaReaders::segment_info_reader(data_accessor.clone(), target_table_schema.clone()); @@ -123,11 +126,35 @@ impl MatchedAggregator { block_mutation_row_offset: HashMap::new(), segment_locations: AHashMap::from_iter(segment_locations), ctx: ctx.clone(), + target_build_optimization, + meta_indexes: HashSet::new(), }) } #[async_backtrace::framed] pub async fn accumulate(&mut self, data_block: DataBlock) -> Result<()> { + // An optimization: If we use target table as build side, the deduplicate will be done + // in hashtable probe phase.In this case, We don't support delete for now, so we + // don't to add MergeStatus here. + if data_block.get_meta().is_some() && data_block.is_empty() { + let meta_index = BlockMetaIndex::downcast_ref_from(data_block.get_meta().unwrap()); + if meta_index.is_some() { + let meta_index = meta_index.unwrap(); + if !self + .meta_indexes + .insert((meta_index.segment_idx, meta_index.block_idx)) + { + // we can get duplicated partial unmodified blocks,this is not an error + // |----------------------------block----------------------------------------| + // |----partial-unmodified----|-----macthed------|----partial-unmodified-----| + info!( + "duplicated block: segment_idx: {}, block_idx: {}", + meta_index.segment_idx, meta_index.block_idx + ); + } + } + return Ok(()); + } if data_block.is_empty() { return Ok(()); } @@ -192,10 +219,23 @@ impl MatchedAggregator { let start = Instant::now(); // 1.get modified segments let mut segment_infos = HashMap::::new(); + let segment_indexes = if self.target_build_optimization { + let mut vecs = Vec::with_capacity(self.meta_indexes.len()); + for prefix in &self.meta_indexes { + vecs.push(prefix.0); + } + vecs + } else { + let mut vecs = Vec::with_capacity(self.block_mutation_row_offset.len()); + for prefix in self.block_mutation_row_offset.keys() { + let (segment_idx, _) = split_prefix(*prefix); + let segment_idx = segment_idx as usize; + vecs.push(segment_idx); + } + vecs + }; - for prefix in self.block_mutation_row_offset.keys() { - let (segment_idx, _) = split_prefix(*prefix); - let segment_idx = segment_idx as usize; + for segment_idx in segment_indexes { if let Entry::Vacant(e) = segment_infos.entry(segment_idx) { let (path, ver) = self.segment_locations.get(&segment_idx).ok_or_else(|| { ErrorCode::Internal(format!( @@ -220,6 +260,30 @@ impl MatchedAggregator { } } + if self.target_build_optimization { + let mut mutation_logs = Vec::with_capacity(self.meta_indexes.len()); + for item in &self.meta_indexes { + let segment_idx = item.0; + let block_idx = item.1; + let segment_info = segment_infos.get(&item.0).unwrap(); + let block_idx = segment_info.blocks.len() - block_idx - 1; + info!( + "target_build_optimization, merge into apply: segment_idx:{},blk_idx:{}", + segment_idx, block_idx + ); + mutation_logs.push(MutationLogEntry::DeletedBlock { + index: BlockMetaIndex { + segment_idx, + block_idx, + inner: None, + }, + }) + } + return Ok(Some(MutationLogs { + entries: mutation_logs, + })); + } + let io_runtime = GlobalIORuntime::instance(); let mut mutation_log_handlers = Vec::with_capacity(self.block_mutation_row_offset.len()); @@ -229,12 +293,12 @@ impl MatchedAggregator { let permit = acquire_task_permit(self.io_request_semaphore.clone()).await?; let aggregation_ctx = self.aggregation_ctx.clone(); let segment_info = segment_infos.get(&segment_idx).unwrap(); + let block_idx = segment_info.blocks.len() - block_idx as usize - 1; + assert!(block_idx < segment_info.blocks.len()); info!( "merge into apply: segment_idx:{},blk_idx:{}", segment_idx, block_idx ); - let block_idx = segment_info.blocks.len() - block_idx as usize - 1; - assert!(block_idx < segment_info.blocks.len()); // the row_id is generated by block_id, not block_idx,reference to fill_internal_column_meta() let block_meta = segment_info.blocks[block_idx].clone(); @@ -334,6 +398,7 @@ impl AggregationContext { index: BlockMetaIndex { segment_idx, block_idx, + inner: None, }, })); } @@ -370,6 +435,7 @@ impl AggregationContext { index: BlockMetaIndex { segment_idx, block_idx, + inner: None, }, block_meta: Arc::new(new_block_meta), }; diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/merge_into_split_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/merge_into_split_mutator.rs index 929c1d05c3302..0d303cc5115c7 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/merge_into_split_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/merge_into_split_mutator.rs @@ -18,29 +18,24 @@ use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::DataType; -use databend_common_expression::types::NumberDataType; use databend_common_expression::DataBlock; pub struct MergeIntoSplitMutator { - pub row_id_idx: u32, + pub split_idx: u32, } impl MergeIntoSplitMutator { - #[allow(dead_code)] - pub fn try_create(row_id_idx: u32) -> Self { - Self { row_id_idx } + pub fn try_create(split_idx: u32) -> Self { + Self { split_idx } } // (matched_block,not_matched_block) pub fn split_data_block(&mut self, block: &DataBlock) -> Result<(DataBlock, DataBlock)> { - let row_id_column = &block.columns()[self.row_id_idx as usize]; - assert_eq!( - row_id_column.data_type, - DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))), - ); + let split_column = &block.columns()[self.split_idx as usize]; + assert!(matches!(split_column.data_type, DataType::Nullable(_)),); // get row_id do check duplicate and get filter - let filter: Bitmap = match &row_id_column.value { + let filter: Bitmap = match &split_column.value { databend_common_expression::Value::Scalar(scalar) => { // fast judge if scalar.is_null() { diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs index a7405ccd25801..66bfc5e4c840c 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs @@ -43,6 +43,7 @@ use databend_common_storage::MergeStatus; use crate::operations::common::MutationLogs; use crate::operations::merge_into::mutator::DeleteByExprMutator; use crate::operations::merge_into::mutator::UpdateByExprMutator; +use crate::operations::BlockMetaIndex; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] pub struct SourceFullMatched; @@ -122,6 +123,7 @@ pub struct MatchedSplitProcessor { output_data_row_id_data: Vec, output_data_updated_data: Option, target_table_schema: DataSchemaRef, + target_build_optimization: bool, } impl MatchedSplitProcessor { @@ -132,6 +134,7 @@ impl MatchedSplitProcessor { field_index_of_input_schema: HashMap, input_schema: DataSchemaRef, target_table_schema: DataSchemaRef, + target_build_optimization: bool, ) -> Result { let mut ops = Vec::::new(); for item in matched.iter() { @@ -144,6 +147,7 @@ impl MatchedSplitProcessor { ctx.get_function_context()?, row_id_idx, input_schema.num_fields(), + target_build_optimization, ), })) } else { @@ -183,6 +187,7 @@ impl MatchedSplitProcessor { row_id_idx, update_projections, target_table_schema, + target_build_optimization, }) } @@ -263,6 +268,16 @@ impl Processor for MatchedSplitProcessor { fn process(&mut self) -> Result<()> { if let Some(data_block) = self.input_data.take() { + // we receive a partial unmodified block data meta. + if data_block.get_meta().is_some() && data_block.is_empty() { + assert!(self.target_build_optimization); + let meta_index = BlockMetaIndex::downcast_ref_from(data_block.get_meta().unwrap()); + if meta_index.is_some() { + self.output_data_row_id_data.push(data_block); + return Ok(()); + } + } + if data_block.is_empty() { return Ok(()); } @@ -314,11 +329,16 @@ impl Processor for MatchedSplitProcessor { update_rows: current_block.num_rows(), deleted_rows: 0, }); - self.output_data_row_id_data.push(DataBlock::new_with_meta( - vec![current_block.get_by_offset(self.row_id_idx).clone()], - current_block.num_rows(), - Some(Box::new(RowIdKind::Update)), - )); + + // for target build optimization, there is only one matched clause without condition. we won't read rowid. + if !self.target_build_optimization { + self.output_data_row_id_data.push(DataBlock::new_with_meta( + vec![current_block.get_by_offset(self.row_id_idx).clone()], + current_block.num_rows(), + Some(Box::new(RowIdKind::Update)), + )); + } + let op = BlockOperator::Project { projection: self.update_projections.clone(), }; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs index 17a8958353e58..4fa753a005b53 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs @@ -20,6 +20,7 @@ use std::time::Instant; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::FunctionContext; @@ -37,6 +38,7 @@ use databend_common_storage::MergeStatus; use itertools::Itertools; use crate::operations::merge_into::mutator::SplitByExprMutator; +use crate::operations::BlockMetaIndex; // (source_schema,condition,values_exprs) type UnMatchedExprs = Vec<(DataSchemaRef, Option, Vec)>; @@ -56,6 +58,8 @@ pub struct MergeIntoNotMatchedProcessor { func_ctx: FunctionContext, // data_schemas[i] means the i-th op's result block's schema. data_schemas: HashMap, + // for target table build optimization + target_table_schema: DataSchemaRef, ctx: Arc, } @@ -65,6 +69,7 @@ impl MergeIntoNotMatchedProcessor { input_schema: DataSchemaRef, func_ctx: FunctionContext, ctx: Arc, + target_table_schema: DataSchemaRef, ) -> Result { let mut ops = Vec::::with_capacity(unmatched.len()); let mut data_schemas = HashMap::with_capacity(unmatched.len()); @@ -97,6 +102,7 @@ impl MergeIntoNotMatchedProcessor { output_data: Vec::new(), func_ctx, data_schemas, + target_table_schema, ctx, }) } @@ -153,10 +159,21 @@ impl Processor for MergeIntoNotMatchedProcessor { } fn process(&mut self) -> Result<()> { - if let Some(data_block) = self.input_data.take() { + if let Some(mut data_block) = self.input_data.take() { if data_block.is_empty() { return Ok(()); } + // target build optimization, we `take_meta` not `get_meta`, because the `BlockMetaIndex` is + // just used to judge whether we need to update `merge_status`, we shouldn't pass it through. + // no_need_add_status means this the origin data block from targe table, and we can push it directly. + let no_need_add_status = data_block.get_meta().is_some() + && BlockMetaIndex::downcast_from(data_block.take_meta().unwrap()).is_some(); + if no_need_add_status { + data_block = + data_block.add_meta(Some(Box::new(self.target_table_schema.clone())))?; + self.output_data.push(data_block); + return Ok(()); + } let start = Instant::now(); let mut current_block = data_block; for (idx, op) in self.ops.iter().enumerate() { @@ -169,7 +186,6 @@ impl Processor for MergeIntoNotMatchedProcessor { metrics_inc_merge_into_append_blocks_rows_counter( satisfied_block.num_rows() as u32 ); - self.ctx.add_merge_status(MergeStatus { insert_rows: satisfied_block.num_rows(), update_rows: 0, diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split.rs index ab3c5fa03d38a..54c7716d52600 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Instant; use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_metrics::storage::*; use databend_common_pipeline_core::processors::Event; @@ -29,7 +30,13 @@ use databend_common_pipeline_core::PipeItem; use super::processor_merge_into_matched_and_split::SourceFullMatched; use crate::operations::merge_into::mutator::MergeIntoSplitMutator; +use crate::operations::BlockMetaIndex; +// There are two kinds of usage for this processor: +// 1. we will receive a probed datablock from join, and split it by rowid into matched block and unmatched block +// 2. we will receive a unmatched datablock, but this is an optimization for target table as build side. The unmatched +// datablock is a physical block's partial unmodified block. And its meta is a prefix(segment_id_block_id). +// we use the meta to distinct 1 and 2. pub struct MergeIntoSplitProcessor { input_port: Arc, output_port_matched: Arc, @@ -45,8 +52,8 @@ pub struct MergeIntoSplitProcessor { } impl MergeIntoSplitProcessor { - pub fn create(row_id_idx: u32, target_table_empty: bool) -> Result { - let merge_into_split_mutator = MergeIntoSplitMutator::try_create(row_id_idx); + pub fn create(split_idx: u32, target_table_empty: bool) -> Result { + let merge_into_split_mutator = MergeIntoSplitMutator::try_create(split_idx); let input_port = InputPort::create(); let output_port_matched = OutputPort::create(); let output_port_not_matched = OutputPort::create(); @@ -143,9 +150,24 @@ impl Processor for MergeIntoSplitProcessor { } } - // Todo:(JackTan25) accutally, we should do insert-only optimization in the future. fn process(&mut self) -> Result<()> { if let Some(data_block) = self.input_data.take() { + // we receive a partial unmodified block data. please see details at the top of this file. + if data_block.get_meta().is_some() { + let meta_index = BlockMetaIndex::downcast_ref_from(data_block.get_meta().unwrap()); + if meta_index.is_some() { + // we reserve the meta in data_block to avoid adding insert `merge_status` in `merge_into_not_matched` by mistake. + // if `is_empty`, it's a whole block matched, we need to delete. + if !data_block.is_empty() { + self.output_data_not_matched_data = Some(data_block.clone()); + } + // if the downstream receive this, it should just treat this as a DeletedLog. + self.output_data_matched_data = Some(DataBlock::empty_with_meta(Box::new( + meta_index.unwrap().clone(), + ))); + return Ok(()); + } + } // for distributed execution, if one node matched all source data. // if we use right join, we will receive a empty block, but we must // give it to downstream. diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs index b458d068af21e..ac85c711b2e65 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_split_row_number_and_log.rs @@ -29,6 +29,7 @@ use databend_common_pipeline_core::PipeItem; use super::processor_merge_into_matched_and_split::SourceFullMatched; use crate::operations::merge_into::processors::RowIdKind; +// for distributed merge into (source as build and it will be broadcast) pub struct RowNumberAndLogSplitProcessor { input_port: Arc, output_port_row_number: Arc, diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 2cb5d826fd145..fbbad4b46af1f 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -41,6 +41,7 @@ pub use delete::MutationBlockPruningContext; pub use merge_into::*; pub use mutation::*; pub use read::build_row_fetcher_pipeline; +pub use read::need_reserve_block_info; pub use replace_into::*; pub use util::acquire_task_permit; pub use util::column_parquet_metas; diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index 5579e2814494f..ea2760582f37a 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -575,6 +575,7 @@ impl CompactTaskBuilder { CompactTaskInfo::create(blocks, BlockMetaIndex { segment_idx, block_idx, + inner: None, }), )))); } diff --git a/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs index e170f460ff70b..a3c2b7074cdc4 100644 --- a/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/processors/mutation_source.rs @@ -380,6 +380,7 @@ impl Processor for MutationSource { self.index = BlockMetaIndex { segment_idx: part.index.segment_idx, block_idx: part.index.block_idx, + inner: None, }; if matches!(self.action, MutationAction::Deletion) { self.stats_type = diff --git a/src/query/storages/fuse/src/operations/read/mod.rs b/src/query/storages/fuse/src/operations/read/mod.rs index c619d474f6579..be309849c7fb3 100644 --- a/src/query/storages/fuse/src/operations/read/mod.rs +++ b/src/query/storages/fuse/src/operations/read/mod.rs @@ -25,10 +25,11 @@ mod parquet_rows_fetcher; mod runtime_filter_prunner; mod data_source_with_meta; - +mod util; pub use fuse_rows_fetcher::build_row_fetcher_pipeline; pub use fuse_source::build_fuse_parquet_source_pipeline; pub use native_data_source_deserializer::NativeDeserializeDataTransform; pub use native_data_source_reader::ReadNativeDataSource; pub use parquet_data_source_deserializer::DeserializeDataTransform; pub use parquet_data_source_reader::ReadParquetDataSource; +pub use util::need_reserve_block_info; diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index e1f97d131d86a..54a4ceda24729 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -65,12 +65,14 @@ use xorf::BinaryFuse16; use super::fuse_source::fill_internal_column_meta; use super::native_data_source::NativeDataSource; +use super::util::need_reserve_block_info; use crate::fuse_part::FusePartInfo; use crate::io::AggIndexReader; use crate::io::BlockReader; use crate::io::VirtualColumnReader; use crate::operations::read::data_source_with_meta::DataSourceWithMeta; use crate::operations::read::runtime_filter_prunner::update_bitmap_with_bloom_filter; +use crate::operations::read::util::add_row_prefix_meta; use crate::DEFAULT_ROW_PER_PAGE; /// A helper struct to store the intermediate state while reading a native partition. @@ -235,6 +237,9 @@ pub struct NativeDeserializeDataTransform { /// Record how many sets of pages have been skipped. /// It's used for metrics. skipped_pages: usize, + + // for merge_into target build. + need_reserve_block_info: bool, } impl NativeDeserializeDataTransform { @@ -250,7 +255,7 @@ impl NativeDeserializeDataTransform { virtual_reader: Arc>, ) -> Result { let scan_progress = ctx.get_scan_progress(); - + let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); let mut src_schema: DataSchema = (block_reader.schema().as_ref()).into(); let mut prewhere_columns: Vec = @@ -381,6 +386,7 @@ impl NativeDeserializeDataTransform { base_block_ids: plan.base_block_ids.clone(), bloom_runtime_filter: None, read_state: ReadPartState::new(), + need_reserve_block_info, }, ))) } @@ -946,7 +952,9 @@ impl NativeDeserializeDataTransform { // All columns are default values, not need to read. let part = self.parts.front().unwrap(); let fuse_part = FusePartInfo::from_part(part)?; - let block = self.build_default_block(fuse_part)?; + let mut block = self.build_default_block(fuse_part)?; + // for merge into target build + block = add_row_prefix_meta(self.need_reserve_block_info, fuse_part, block)?; self.add_output_block(block); self.finish_partition(); return Ok(()); @@ -968,6 +976,7 @@ impl NativeDeserializeDataTransform { // Fill `InternalColumnMeta` as `DataBlock.meta` if query internal columns, // `TransformAddInternalColumns` will generate internal columns using `InternalColumnMeta` in next pipeline. let mut block = block.resort(&self.src_schema, &self.output_schema)?; + let fuse_part = FusePartInfo::from_part(&self.parts[0])?; if self.block_reader.query_internal_columns() { let offset = self.read_state.offset; let offsets = if let Some(count) = self.read_state.filtered_count { @@ -980,7 +989,6 @@ impl NativeDeserializeDataTransform { (offset..offset + origin_num_rows).collect() }; - let fuse_part = FusePartInfo::from_part(&self.parts[0])?; block = fill_internal_column_meta( block, fuse_part, @@ -989,13 +997,16 @@ impl NativeDeserializeDataTransform { )?; } + // we will do recluster for stream here. if self.block_reader.update_stream_columns() { let inner_meta = block.take_meta(); - let fuse_part = FusePartInfo::from_part(&self.parts[0])?; let meta = gen_mutation_stream_meta(inner_meta, &fuse_part.location)?; block = block.add_meta(Some(Box::new(meta)))?; } + // for merge into target build + block = add_row_prefix_meta(self.need_reserve_block_info, fuse_part, block)?; + self.read_state.offset += origin_num_rows; Ok(block) @@ -1088,6 +1099,9 @@ impl Processor for NativeDeserializeDataTransform { self.base_block_ids.clone(), )?; } + data_block = + add_row_prefix_meta(self.need_reserve_block_info, fuse_part, data_block)?; + self.finish_partition(); self.add_output_block(data_block); return Ok(()); diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index 4d732af5fa922..db6438159bc34 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -44,6 +44,8 @@ use xorf::BinaryFuse16; use super::fuse_source::fill_internal_column_meta; use super::parquet_data_source::ParquetDataSource; +use super::util::add_row_prefix_meta; +use super::util::need_reserve_block_info; use crate::fuse_part::FusePartInfo; use crate::io::AggIndexReader; use crate::io::BlockReader; @@ -72,6 +74,8 @@ pub struct DeserializeDataTransform { base_block_ids: Option, cached_runtime_filter: Option>, + // for merge_into target build. + need_reserve_block_info: bool, } unsafe impl Send for DeserializeDataTransform {} @@ -105,7 +109,7 @@ impl DeserializeDataTransform { let mut output_schema = plan.schema().as_ref().clone(); output_schema.remove_internal_fields(); let output_schema: DataSchema = (&output_schema).into(); - + let (need_reserve_block_info, _) = need_reserve_block_info(ctx.clone(), plan.table_index); Ok(ProcessorPtr::create(Box::new(DeserializeDataTransform { ctx, table_index: plan.table_index, @@ -123,6 +127,7 @@ impl DeserializeDataTransform { virtual_reader, base_block_ids: plan.base_block_ids.clone(), cached_runtime_filter: None, + need_reserve_block_info, }))) } @@ -314,12 +319,17 @@ impl Processor for DeserializeDataTransform { )?; } + // we will do recluster for stream here. if self.block_reader.update_stream_columns() { let inner_meta = data_block.take_meta(); let meta = gen_mutation_stream_meta(inner_meta, &part.location)?; data_block = data_block.add_meta(Some(Box::new(meta)))?; } + // for merge into target build + data_block = + add_row_prefix_meta(self.need_reserve_block_info, part, data_block)?; + self.output_data = Some(data_block); } } diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs index e132e8cd009c8..dbc0f4baf1b92 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs @@ -242,7 +242,6 @@ impl Processor for ReadParquetDataSource { .ctx .get_min_max_runtime_filter_with_id(self.table_index), ); - let mut fuse_part_infos = Vec::with_capacity(parts.len()); for part in parts.into_iter() { if runtime_filter_pruner( diff --git a/src/query/storages/fuse/src/operations/read/util.rs b/src/query/storages/fuse/src/operations/read/util.rs new file mode 100644 index 0000000000000..01dd9471b9b65 --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/util.rs @@ -0,0 +1,55 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::merge_into_join::MergeIntoJoinType; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; + +use crate::operations::BlockMetaIndex; +use crate::FusePartInfo; + +pub fn need_reserve_block_info(ctx: Arc, table_idx: usize) -> (bool, bool) { + let merge_into_join = ctx.get_merge_into_join(); + ( + matches!( + merge_into_join.merge_into_join_type, + MergeIntoJoinType::Left + ) && merge_into_join.target_tbl_idx == table_idx, + merge_into_join.is_distributed, + ) +} + +// for merge into target build, in this situation, we don't need rowid +pub(crate) fn add_row_prefix_meta( + need_reserve_block_info: bool, + fuse_part: &FusePartInfo, + mut block: DataBlock, +) -> Result { + if need_reserve_block_info && fuse_part.block_meta_index.is_some() { + let block_meta_index = fuse_part.block_meta_index.as_ref().unwrap(); + // in fact, inner_meta is none for now, for merge into target build, we don't need + // to get row_id. + let inner_meta = block.take_meta(); + block.add_meta(Some(Box::new(BlockMetaIndex { + segment_idx: block_meta_index.segment_idx, + block_idx: block_meta_index.block_id, + inner: inner_meta, + }))) + } else { + Ok(block) + } +} diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs index 7e162b2a14a46..7ff4b623af9b7 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs @@ -458,6 +458,7 @@ impl AggregationContext { index: BlockMetaIndex { segment_idx: segment_index, block_idx: block_index, + inner: None, }, }; @@ -534,6 +535,7 @@ impl AggregationContext { index: BlockMetaIndex { segment_idx: segment_index, block_idx: block_index, + inner: None, }, block_meta: Arc::new(new_block_meta), }; diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index 05bdf380266c2..9cc961bec2182 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -849,6 +849,8 @@ CREATE TABLE orders CLUSTER BY (to_yyyymmddhh(created_at), user_id) AS SELECT date_add('day', floor(rand() * 10 % 365)::int, '2021-01-01') AS updated_at FROM numbers(5000); +### for now, we disable target_table_optimization for native. Native will +### spilt one block into multi pages. We should fix this one in the future. statement ok MERGE INTO orders USING ( diff --git a/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test new file mode 100644 index 0000000000000..e9cdadf2b9a9f --- /dev/null +++ b/tests/sqllogictests/suites/mode/standalone/explain/09_0039_target_build_merge_into_standalone.test @@ -0,0 +1,306 @@ +statement ok +set enable_experimental_merge_into = 1; + +## Target Build Optimization Test +statement ok +create table target_build_optimization(a int,b string,c string); + +statement ok +create table source_optimization(a int,b string,c string); + +### 1. add 4 blocks for target_build_optimization +statement ok +insert into target_build_optimization values(1,'b1','c1'),(2,'b2','c2'); + +statement ok +insert into target_build_optimization values(3,'b3','c3'),(4,'b4','c4'); + +statement ok +insert into target_build_optimization values(5,'b5','c5'),(6,'b6','c6'); + +statement ok +insert into target_build_optimization values(7,'b7','c7'),(8,'b8','c8'); + +### 2. we need to make source_optimization is larger than target_build_optimization +### 2.1 test multi columns matched, +statement ok +insert into source_optimization values(1,'b1','c_1'),(1,'b1','c_2'); + +statement ok +insert into source_optimization values(3,'b3','c3'),(4,'b4','c4'); + +statement ok +insert into source_optimization values(5,'b5','c5'),(6,'b6','c6'); + +statement ok +insert into source_optimization values(7,'b7','c7'),(8,'b8','c8'); + +statement ok +insert into source_optimization values(7,'b7','c7'),(8,'b8','c8'); + +statement ok +insert into source_optimization values(5,'b5','c5'),(6,'b6','c6'); + +statement error 4001 +merge into target_build_optimization as t1 using source_optimization as t2 on t1.a = t2.a and t1.b = t2.b when matched then update * when not matched then insert *; + +### 2.2 make sure the plan is expected +query T +explain merge into target_build_optimization as t1 using source_optimization as t2 on t1.a = t2.a and t1.b = t2.b when matched then update set t1.a = t2.a,t1.b = t2.b,t1.c = t2.c when not matched then insert *; +---- +MergeInto: +target_table: default.default.target_build_optimization +├── distributed: false +├── target_build_optimization: true +├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] +├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] +└── HashJoin: LEFT OUTER + ├── equi conditions: [and(eq(t2.a (#0), t1.a (#3)), eq(t2.b (#1), t1.b (#4)))] + ├── non-equi conditions: [] + ├── LogicalGet + │ ├── table: default.default.source_optimization + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── LogicalGet + ├── table: default.default.target_build_optimization + ├── filters: [] + ├── order by: [] + └── limit: NONE + +### 3. test with conjunct and without conjunct +statement ok +truncate table source_optimization; + +## partial updated +statement ok +insert into source_optimization values(1,'b1','c_1') + +statement ok +insert into source_optimization values(3,'b3','c_3') + +## all updated +statement ok +insert into source_optimization values(5,'b5','c_5'),(6,'b6','c_6'); + +statement ok +insert into source_optimization values(7,'b7','c_7'),(8,'b8','c_8'); + +## insert +statement ok +insert into source_optimization values(9,'b9','c_9'),(10,'b10','c_10'); + +statement ok +insert into source_optimization values(11,'b11','c_11'),(12,'b12','c_12'); + +### test block counts +query T +select count(*) from fuse_block('default','source_optimization'); +---- +6 + +query T +select count(*) from fuse_block('default','target_build_optimization'); +---- +4 + +### make sure the plan is expected +query T +explain merge into target_build_optimization as t1 using source_optimization as t2 on t1.a = t2.a and t1.b = t2.b when matched then update set t1.a = t2.a,t1.b = t2.b,t1.c = t2.c when not matched then insert *; +---- +MergeInto: +target_table: default.default.target_build_optimization +├── distributed: false +├── target_build_optimization: true +├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] +├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] +└── HashJoin: LEFT OUTER + ├── equi conditions: [and(eq(t2.a (#0), t1.a (#3)), eq(t2.b (#1), t1.b (#4)))] + ├── non-equi conditions: [] + ├── LogicalGet + │ ├── table: default.default.source_optimization + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── LogicalGet + ├── table: default.default.target_build_optimization + ├── filters: [] + ├── order by: [] + └── limit: NONE + +## test without conjunct +query TT +merge into target_build_optimization as t1 using source_optimization as t2 on t1.a = t2.a and t1.b = t2.b when matched then update * when not matched then insert *; +---- +4 6 + +query TTT +select * from target_build_optimization order by a,b,c; +---- +1 b1 c_1 +2 b2 c2 +3 b3 c_3 +4 b4 c4 +5 b5 c_5 +6 b6 c_6 +7 b7 c_7 +8 b8 c_8 +9 b9 c_9 +10 b10 c_10 +11 b11 c_11 +12 b12 c_12 + +### test with conjunct +#### we need to make sure the blocks count and layout, so we should truncate and insert again. +statement ok +truncate table target_build_optimization; + +statement ok +insert into target_build_optimization values(1,'b1','c1'),(3,'b2','c2'); + +statement ok +insert into target_build_optimization values(3,'b3','c3'),(5,'b4','c4'); + +statement ok +insert into target_build_optimization values(7,'b5','c5'),(8,'b6','c6'); + +statement ok +insert into target_build_optimization values(7,'b7','c7'),(8,'b8','c8'); + +### test block counts +query T +select count(*) from fuse_block('default','source_optimization'); +---- +6 + +query T +select count(*) from fuse_block('default','target_build_optimization'); +---- +4 + +### make sure the plan is expected +query T +explain merge into target_build_optimization as t1 using source_optimization as t2 on t1.a = t2.a and t1.b > t2.b when matched then update set t1.a = t2.a,t1.b = t2.b,t1.c = t2.c when not matched then insert *; +---- +MergeInto: +target_table: default.default.target_build_optimization +├── distributed: false +├── target_build_optimization: true +├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] +├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] +└── HashJoin: LEFT OUTER + ├── equi conditions: [eq(t2.a (#0), t1.a (#3))] + ├── non-equi conditions: [gt(t1.b (#4), t2.b (#1))] + ├── LogicalGet + │ ├── table: default.default.source_optimization + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── LogicalGet + ├── table: default.default.target_build_optimization + ├── filters: [] + ├── order by: [] + └── limit: NONE + +statement ok +update source_optimization set a = 2,b = 'b2' where a = 3 and b = 'b3'; + +query TTT +select * from source_optimization order by a,b,c; +---- +1 b1 c_1 +2 b2 c_3 +5 b5 c_5 +6 b6 c_6 +7 b7 c_7 +8 b8 c_8 +9 b9 c_9 +10 b10 c_10 +11 b11 c_11 +12 b12 c_12 + +query TTT +select * from target_build_optimization order by a,b,c; +---- +1 b1 c1 +3 b2 c2 +3 b3 c3 +5 b4 c4 +7 b5 c5 +7 b7 c7 +8 b6 c6 +8 b8 c8 + +query TT +merge into target_build_optimization as t1 using source_optimization as t2 on t1.a > t2.a and t1.b = t2.b when matched then update * when not matched then insert *; +---- +7 3 + +query TTT +select * from target_build_optimization order by a,b,c; +---- +1 b1 c1 +1 b1 c_1 +2 b2 c_3 +3 b3 c3 +5 b4 c4 +5 b5 c_5 +6 b6 c_6 +7 b7 c7 +7 b7 c_7 +8 b8 c8 +8 b8 c_8 +9 b9 c_9 +10 b10 c_10 +11 b11 c_11 +12 b12 c_12 + +### test target_table is empty +statement ok +truncate table target_build_optimization; + +### make sure the plan is expected +query T +explain merge into target_build_optimization as t1 using source_optimization as t2 on t1.a = t2.a and t1.b > t2.b when matched then update set t1.a = t2.a,t1.b = t2.b,t1.c = t2.c when not matched then insert *; +---- +MergeInto: +target_table: default.default.target_build_optimization +├── distributed: false +├── target_build_optimization: true +├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)] +├── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))] +└── HashJoin: LEFT OUTER + ├── equi conditions: [eq(t2.a (#0), t1.a (#3))] + ├── non-equi conditions: [gt(t1.b (#4), t2.b (#1))] + ├── LogicalGet + │ ├── table: default.default.source_optimization + │ ├── filters: [] + │ ├── order by: [] + │ └── limit: NONE + └── LogicalGet + ├── table: default.default.target_build_optimization + ├── filters: [] + ├── order by: [] + └── limit: NONE + +query TT +merge into target_build_optimization as t1 using source_optimization as t2 on t1.a > t2.a and t1.b = t2.b when matched then update * when not matched then insert *; +---- +10 0 + +query TTT +select * from target_build_optimization order by a,b,c; +---- +1 b1 c_1 +2 b2 c_3 +5 b5 c_5 +6 b6 c_6 +7 b7 c_7 +8 b8 c_8 +9 b9 c_9 +10 b10 c_10 +11 b11 c_11 +12 b12 c_12 + +statement ok +set enable_experimental_merge_into = 0; \ No newline at end of file diff --git a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test index f713b4831cca3..e372a9a329e62 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/merge_into.test @@ -23,6 +23,8 @@ explain MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON s ---- MergeInto: target_table: default.default.salaries2 +├── distributed: false +├── target_build_optimization: false ├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)] ├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] ├── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))] @@ -50,6 +52,8 @@ explain MERGE INTO salaries2 USING (SELECT * FROM employees2) as employees2 ON s ---- MergeInto: target_table: default.default.salaries2 +├── distributed: false +├── target_build_optimization: false ├── matched update: [condition: eq(employees2.department (#2), 'HR'),update set salary = plus(salaries2.salary (#4), 1000.00)] ├── matched update: [condition: None,update set salary = plus(salaries2.salary (#4), 500.00)] ├── unmatched insert: [condition: None,insert into (employee_id,salary) values(CAST(employees2.employee_id (#0) AS Int32 NULL),CAST(55000.00 AS Decimal(10, 2) NULL))]