From d0cc7e3feb42013d77ff2a6ada332fa74bdc25e0 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 5 Sep 2024 12:16:34 +0800 Subject: [PATCH 1/6] fix(storage): fix correct_commit_ssts with sst table_ids --- src/meta/src/hummock/manager/commit_epoch.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 08428e5472e23..1110d54a0f3f5 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -426,15 +426,17 @@ impl HummockManager { for (mut sst, group_table_ids) in sst_to_cg_vec { for (group_id, match_ids) in group_table_ids { let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap(); - if match_ids + if sst + .sst_info + .table_ids .iter() .all(|id| group_members_table_ids.contains(&TableId::new(*id))) { commit_sstables .entry(group_id) .or_default() - .push(sst.sst_info.clone()); - continue; + .push(sst.sst_info); + break; } let origin_sst_size = sst.sst_info.sst_size; From 8c7e44f9051ad9c98d6ce7699c8e1e667d3a381a Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 5 Sep 2024 13:51:13 +0800 Subject: [PATCH 2/6] fix(storage): fix ut --- src/meta/src/hummock/manager/tests.rs | 17 ++++++++++++++++- .../hummock_test/src/hummock_storage_tests.rs | 15 ++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 56b4836f585a1..dca7311f4778f 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1327,7 +1327,22 @@ async fn test_split_compaction_group_on_commit() { sst_size: 100, ..Default::default() }, - table_stats: Default::default(), + table_stats: HashMap::from([ + ( + 100, + TableStats { + total_compressed_size: 50, + ..Default::default() + }, + ), + ( + 101, + TableStats { + total_compressed_size: 50, + ..Default::default() + }, + ), + ]), }; hummock_manager .commit_epoch_for_test(30, vec![sst_1], HashMap::from([(10, context_id)])) diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 7f3d35f16b80b..fc0fd6ae97b4f 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -31,6 +31,7 @@ use risingwave_hummock_sdk::key::{ gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, }; use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::table_stats::TableStats; use risingwave_hummock_sdk::table_watermark::{ TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; @@ -2510,8 +2511,20 @@ async fn test_commit_multi_epoch() { new_table_watermarks: Default::default(), sst_to_context: context_id_map(&[sst.object_id]), sstables: vec![LocalSstableInfo { + table_stats: sst + .table_ids + .iter() + .map(|&table_id| { + ( + table_id, + TableStats { + total_compressed_size: 10, + ..Default::default() + }, + ) + }) + .collect(), sst_info: sst, - table_stats: Default::default(), }], new_table_fragment_info, change_log_delta: Default::default(), From 22cb53788bb8bff00f5c1a355055de0ac7f05bdd Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 5 Sep 2024 15:27:19 +0800 Subject: [PATCH 3/6] fix(storage): fix split_sst condition --- src/meta/src/hummock/manager/commit_epoch.rs | 33 ++++--------------- .../compaction_group/hummock_version_ext.rs | 9 ++--- 2 files changed, 10 insertions(+), 32 deletions(-) diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 1110d54a0f3f5..4f7a62da41779 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -220,23 +220,8 @@ impl HummockManager { NewTableFragmentInfo::None => (HashMap::new(), None, None), }; - let mut group_members_table_ids: HashMap> = HashMap::new(); - { - // expand group_members_table_ids - for (table_id, group_id) in &table_compaction_group_mapping { - group_members_table_ids - .entry(*group_id) - .or_default() - .insert(*table_id); - } - } - let commit_sstables = self - .correct_commit_ssts( - sstables, - &table_compaction_group_mapping, - &group_members_table_ids, - ) + .correct_commit_ssts(sstables, &table_compaction_group_mapping) .await?; let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); @@ -389,7 +374,6 @@ impl HummockManager { &self, sstables: Vec, table_compaction_group_mapping: &HashMap, - group_members_table_ids: &HashMap>, ) -> Result>> { let mut new_sst_id_number = 0; let mut sst_to_cg_vec = Vec::with_capacity(sstables.len()); @@ -424,14 +408,11 @@ impl HummockManager { let mut commit_sstables: BTreeMap> = BTreeMap::new(); for (mut sst, group_table_ids) in sst_to_cg_vec { - for (group_id, match_ids) in group_table_ids { - let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap(); - if sst - .sst_info - .table_ids - .iter() - .all(|id| group_members_table_ids.contains(&TableId::new(*id))) - { + let len = group_table_ids.len(); + for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() { + if sst.sst_info.table_ids == match_ids { + // The SST contains all the tables in the group should be last key + assert!(index == len - 1); commit_sstables .entry(group_id) .or_default() diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index ca6585f46fd51..9f8f928ecb8a4 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1350,16 +1350,13 @@ pub fn split_sst( { // related github.com/risingwavelabs/risingwave/pull/17898/ // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898 - - let set1: HashSet<_> = sst_info.table_ids.iter().cloned().collect(); - let set2: HashSet<_> = new_sst_table_ids.iter().cloned().collect(); - let intersection: Vec<_> = set1.intersection(&set2).cloned().collect(); - // Update table_ids - branch_table_info.table_ids = intersection; + branch_table_info.table_ids = new_sst_table_ids; sst_info .table_ids .retain(|table_id| !branch_table_info.table_ids.contains(table_id)); + assert!(sst_info.table_ids.is_sorted()); + assert!(branch_table_info.table_ids.is_sorted()); } *new_sst_id += 1; From 6ec7218b1982ea28507d61bd1dc918b0736bc070 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 5 Sep 2024 15:51:49 +0800 Subject: [PATCH 4/6] fix(storage): Ensure ordering of sstable_info.table_ids --- .../compaction_group/hummock_version_ext.rs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 9f8f928ecb8a4..cd7d66d5b1b08 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -354,7 +354,7 @@ impl HummockVersion { &mut self, parent_group_id: CompactionGroupId, group_id: CompactionGroupId, - member_table_ids: HashSet, + member_table_ids: BTreeSet, new_sst_start_id: u64, ) { let mut new_sst_id = new_sst_start_id; @@ -594,7 +594,7 @@ impl HummockVersion { } else { #[expect(deprecated)] // for backward-compatibility of previous hummock version delta - HashSet::from_iter(group_construct.table_ids.clone()) + BTreeSet::from_iter(group_construct.table_ids.clone()) }; self.init_with_parent_group( @@ -614,7 +614,7 @@ impl HummockVersion { self.init_with_parent_group( group_change.origin_group_id, group_change.target_group_id, - HashSet::from_iter(group_change.table_ids.clone()), + BTreeSet::from_iter(group_change.table_ids.clone()), group_change.new_sst_start_id, ); @@ -998,7 +998,7 @@ pub fn build_initial_compaction_group_levels( } fn split_sst_info_for_level( - member_table_ids: &HashSet, + member_table_ids: &BTreeSet, level: &mut Level, new_sst_id: &mut u64, ) -> Vec { @@ -1338,7 +1338,7 @@ pub fn split_sst( new_sst_id: &mut u64, old_sst_size: u64, new_sst_size: u64, - new_sst_table_ids: Vec, + new_table_ids: Vec, ) -> SstableInfo { let mut branch_table_info = sst_info.clone(); branch_table_info.sst_id = *new_sst_id; @@ -1350,13 +1350,18 @@ pub fn split_sst( { // related github.com/risingwavelabs/risingwave/pull/17898/ // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898 + // sst.table_ids = vec[1, 2, 3]; + // new.table_ids = vec[2, 3, 4]; + // branch_table_info.table_ids = vec[1, 2, 3] ∩ vec[2, 3, 4] = vec[2, 3] + let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect(); + let set2: BTreeSet<_> = new_table_ids.into_iter().collect(); + let intersection: Vec<_> = set1.intersection(&set2).cloned().collect(); + // Update table_ids - branch_table_info.table_ids = new_sst_table_ids; + branch_table_info.table_ids = intersection; sst_info .table_ids .retain(|table_id| !branch_table_info.table_ids.contains(table_id)); - assert!(sst_info.table_ids.is_sorted()); - assert!(branch_table_info.table_ids.is_sorted()); } *new_sst_id += 1; From 2401f34eb91c6bfebe6f634db4457c795daeb672 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 5 Sep 2024 16:06:39 +0800 Subject: [PATCH 5/6] fix(storage): add assertion --- src/storage/hummock_sdk/src/sstable_info.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/storage/hummock_sdk/src/sstable_info.rs b/src/storage/hummock_sdk/src/sstable_info.rs index 9970c60f506c8..20943e4dd101a 100644 --- a/src/storage/hummock_sdk/src/sstable_info.rs +++ b/src/storage/hummock_sdk/src/sstable_info.rs @@ -63,6 +63,7 @@ impl SstableInfo { impl From for SstableInfo { fn from(pb_sstable_info: PbSstableInfo) -> Self { + assert!(pb_sstable_info.table_ids.is_sorted()); Self { object_id: pb_sstable_info.object_id, sst_id: pb_sstable_info.sst_id, @@ -100,6 +101,7 @@ impl From for SstableInfo { impl From<&PbSstableInfo> for SstableInfo { fn from(pb_sstable_info: &PbSstableInfo) -> Self { + assert!(pb_sstable_info.table_ids.is_sorted()); Self { object_id: pb_sstable_info.object_id, sst_id: pb_sstable_info.sst_id, @@ -137,6 +139,7 @@ impl From<&PbSstableInfo> for SstableInfo { impl From for PbSstableInfo { fn from(sstable_info: SstableInfo) -> Self { assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped()); + assert!(sstable_info.table_ids.is_sorted()); PbSstableInfo { object_id: sstable_info.object_id, sst_id: sstable_info.sst_id, @@ -175,6 +178,7 @@ impl From for PbSstableInfo { impl From<&SstableInfo> for PbSstableInfo { fn from(sstable_info: &SstableInfo) -> Self { assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped()); + assert!(sstable_info.table_ids.is_sorted()); PbSstableInfo { object_id: sstable_info.object_id, sst_id: sstable_info.sst_id, From e807d17d83d809e2d95aec7c1f5c3c9068937fb4 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 5 Sep 2024 17:57:53 +0800 Subject: [PATCH 6/6] typo --- .../hummock_sdk/src/compaction_group/hummock_version_ext.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index cd7d66d5b1b08..f24a125aa7f01 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1350,8 +1350,8 @@ pub fn split_sst( { // related github.com/risingwavelabs/risingwave/pull/17898/ // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898 - // sst.table_ids = vec[1, 2, 3]; - // new.table_ids = vec[2, 3, 4]; + // sst_info.table_ids = vec[1, 2, 3]; + // new_table_ids = vec[2, 3, 4]; // branch_table_info.table_ids = vec[1, 2, 3] ∩ vec[2, 3, 4] = vec[2, 3] let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect(); let set2: BTreeSet<_> = new_table_ids.into_iter().collect();