diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 08428e5472e23..4f7a62da41779 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::change_log::ChangeLogDelta; @@ -220,23 +220,8 @@ impl HummockManager { NewTableFragmentInfo::None => (HashMap::new(), None, None), }; - let mut group_members_table_ids: HashMap> = HashMap::new(); - { - // expand group_members_table_ids - for (table_id, group_id) in &table_compaction_group_mapping { - group_members_table_ids - .entry(*group_id) - .or_default() - .insert(*table_id); - } - } - let commit_sstables = self - .correct_commit_ssts( - sstables, - &table_compaction_group_mapping, - &group_members_table_ids, - ) + .correct_commit_ssts(sstables, &table_compaction_group_mapping) .await?; let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect(); @@ -389,7 +374,6 @@ impl HummockManager { &self, sstables: Vec, table_compaction_group_mapping: &HashMap, - group_members_table_ids: &HashMap>, ) -> Result>> { let mut new_sst_id_number = 0; let mut sst_to_cg_vec = Vec::with_capacity(sstables.len()); @@ -424,17 +408,16 @@ impl HummockManager { let mut commit_sstables: BTreeMap> = BTreeMap::new(); for (mut sst, group_table_ids) in sst_to_cg_vec { - for (group_id, match_ids) in group_table_ids { - let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap(); - if match_ids - .iter() - .all(|id| group_members_table_ids.contains(&TableId::new(*id))) - { + let len = group_table_ids.len(); + for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() { + if sst.sst_info.table_ids == match_ids { + // The SST contains all the tables in the group should be last key + assert!(index == len - 1); commit_sstables .entry(group_id) .or_default() - .push(sst.sst_info.clone()); - continue; + .push(sst.sst_info); + break; } let origin_sst_size = sst.sst_info.sst_size; diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 56b4836f585a1..dca7311f4778f 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -1327,7 +1327,22 @@ async fn test_split_compaction_group_on_commit() { sst_size: 100, ..Default::default() }, - table_stats: Default::default(), + table_stats: HashMap::from([ + ( + 100, + TableStats { + total_compressed_size: 50, + ..Default::default() + }, + ), + ( + 101, + TableStats { + total_compressed_size: 50, + ..Default::default() + }, + ), + ]), }; hummock_manager .commit_epoch_for_test(30, vec![sst_1], HashMap::from([(10, context_id)])) diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index ca6585f46fd51..f24a125aa7f01 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -354,7 +354,7 @@ impl HummockVersion { &mut self, parent_group_id: CompactionGroupId, group_id: CompactionGroupId, - member_table_ids: HashSet, + member_table_ids: BTreeSet, new_sst_start_id: u64, ) { let mut new_sst_id = new_sst_start_id; @@ -594,7 +594,7 @@ impl HummockVersion { } else { #[expect(deprecated)] // for backward-compatibility of previous hummock version delta - HashSet::from_iter(group_construct.table_ids.clone()) + BTreeSet::from_iter(group_construct.table_ids.clone()) }; self.init_with_parent_group( @@ -614,7 +614,7 @@ impl HummockVersion { self.init_with_parent_group( group_change.origin_group_id, group_change.target_group_id, - HashSet::from_iter(group_change.table_ids.clone()), + BTreeSet::from_iter(group_change.table_ids.clone()), group_change.new_sst_start_id, ); @@ -998,7 +998,7 @@ pub fn build_initial_compaction_group_levels( } fn split_sst_info_for_level( - member_table_ids: &HashSet, + member_table_ids: &BTreeSet, level: &mut Level, new_sst_id: &mut u64, ) -> Vec { @@ -1338,7 +1338,7 @@ pub fn split_sst( new_sst_id: &mut u64, old_sst_size: u64, new_sst_size: u64, - new_sst_table_ids: Vec, + new_table_ids: Vec, ) -> SstableInfo { let mut branch_table_info = sst_info.clone(); branch_table_info.sst_id = *new_sst_id; @@ -1350,9 +1350,11 @@ pub fn split_sst( { // related github.com/risingwavelabs/risingwave/pull/17898/ // This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898 - - let set1: HashSet<_> = sst_info.table_ids.iter().cloned().collect(); - let set2: HashSet<_> = new_sst_table_ids.iter().cloned().collect(); + // sst_info.table_ids = vec[1, 2, 3]; + // new_table_ids = vec[2, 3, 4]; + // branch_table_info.table_ids = vec[1, 2, 3] ∩ vec[2, 3, 4] = vec[2, 3] + let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect(); + let set2: BTreeSet<_> = new_table_ids.into_iter().collect(); let intersection: Vec<_> = set1.intersection(&set2).cloned().collect(); // Update table_ids diff --git a/src/storage/hummock_sdk/src/sstable_info.rs b/src/storage/hummock_sdk/src/sstable_info.rs index 9970c60f506c8..20943e4dd101a 100644 --- a/src/storage/hummock_sdk/src/sstable_info.rs +++ b/src/storage/hummock_sdk/src/sstable_info.rs @@ -63,6 +63,7 @@ impl SstableInfo { impl From for SstableInfo { fn from(pb_sstable_info: PbSstableInfo) -> Self { + assert!(pb_sstable_info.table_ids.is_sorted()); Self { object_id: pb_sstable_info.object_id, sst_id: pb_sstable_info.sst_id, @@ -100,6 +101,7 @@ impl From for SstableInfo { impl From<&PbSstableInfo> for SstableInfo { fn from(pb_sstable_info: &PbSstableInfo) -> Self { + assert!(pb_sstable_info.table_ids.is_sorted()); Self { object_id: pb_sstable_info.object_id, sst_id: pb_sstable_info.sst_id, @@ -137,6 +139,7 @@ impl From<&PbSstableInfo> for SstableInfo { impl From for PbSstableInfo { fn from(sstable_info: SstableInfo) -> Self { assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped()); + assert!(sstable_info.table_ids.is_sorted()); PbSstableInfo { object_id: sstable_info.object_id, sst_id: sstable_info.sst_id, @@ -175,6 +178,7 @@ impl From for PbSstableInfo { impl From<&SstableInfo> for PbSstableInfo { fn from(sstable_info: &SstableInfo) -> Self { assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped()); + assert!(sstable_info.table_ids.is_sorted()); PbSstableInfo { object_id: sstable_info.object_id, sst_id: sstable_info.sst_id, diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 7f3d35f16b80b..fc0fd6ae97b4f 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -31,6 +31,7 @@ use risingwave_hummock_sdk::key::{ gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, }; use risingwave_hummock_sdk::sstable_info::SstableInfo; +use risingwave_hummock_sdk::table_stats::TableStats; use risingwave_hummock_sdk::table_watermark::{ TableWatermarksIndex, VnodeWatermark, WatermarkDirection, }; @@ -2510,8 +2511,20 @@ async fn test_commit_multi_epoch() { new_table_watermarks: Default::default(), sst_to_context: context_id_map(&[sst.object_id]), sstables: vec![LocalSstableInfo { + table_stats: sst + .table_ids + .iter() + .map(|&table_id| { + ( + table_id, + TableStats { + total_compressed_size: 10, + ..Default::default() + }, + ) + }) + .collect(), sst_info: sst, - table_stats: Default::default(), }], new_table_fragment_info, change_log_delta: Default::default(),