Skip to content

Commit

Permalink
fix(storage): fix correct_commit_ssts with sst table_ids (#18414)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 5, 2024
1 parent 1d220ee commit 0134191
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 36 deletions.
35 changes: 9 additions & 26 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand Down Expand Up @@ -220,23 +220,8 @@ impl HummockManager {
NewTableFragmentInfo::None => (HashMap::new(), None, None),
};

let mut group_members_table_ids: HashMap<u64, BTreeSet<TableId>> = HashMap::new();
{
// expand group_members_table_ids
for (table_id, group_id) in &table_compaction_group_mapping {
group_members_table_ids
.entry(*group_id)
.or_default()
.insert(*table_id);
}
}

let commit_sstables = self
.correct_commit_ssts(
sstables,
&table_compaction_group_mapping,
&group_members_table_ids,
)
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
Expand Down Expand Up @@ -389,7 +374,6 @@ impl HummockManager {
&self,
sstables: Vec<LocalSstableInfo>,
table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
group_members_table_ids: &HashMap<CompactionGroupId, BTreeSet<TableId>>,
) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
let mut new_sst_id_number = 0;
let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
Expand Down Expand Up @@ -424,17 +408,16 @@ impl HummockManager {
let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();

for (mut sst, group_table_ids) in sst_to_cg_vec {
for (group_id, match_ids) in group_table_ids {
let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap();
if match_ids
.iter()
.all(|id| group_members_table_ids.contains(&TableId::new(*id)))
{
let len = group_table_ids.len();
for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
if sst.sst_info.table_ids == match_ids {
// The SST contains all the tables in the group should be last key
assert!(index == len - 1);
commit_sstables
.entry(group_id)
.or_default()
.push(sst.sst_info.clone());
continue;
.push(sst.sst_info);
break;
}

let origin_sst_size = sst.sst_info.sst_size;
Expand Down
17 changes: 16 additions & 1 deletion src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,22 @@ async fn test_split_compaction_group_on_commit() {
sst_size: 100,
..Default::default()
},
table_stats: Default::default(),
table_stats: HashMap::from([
(
100,
TableStats {
total_compressed_size: 50,
..Default::default()
},
),
(
101,
TableStats {
total_compressed_size: 50,
..Default::default()
},
),
]),
};
hummock_manager
.commit_epoch_for_test(30, vec![sst_1], HashMap::from([(10, context_id)]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl HummockVersion {
&mut self,
parent_group_id: CompactionGroupId,
group_id: CompactionGroupId,
member_table_ids: HashSet<StateTableId>,
member_table_ids: BTreeSet<StateTableId>,
new_sst_start_id: u64,
) {
let mut new_sst_id = new_sst_start_id;
Expand Down Expand Up @@ -594,7 +594,7 @@ impl HummockVersion {
} else {
#[expect(deprecated)]
// for backward-compatibility of previous hummock version delta
HashSet::from_iter(group_construct.table_ids.clone())
BTreeSet::from_iter(group_construct.table_ids.clone())
};

self.init_with_parent_group(
Expand All @@ -614,7 +614,7 @@ impl HummockVersion {
self.init_with_parent_group(
group_change.origin_group_id,
group_change.target_group_id,
HashSet::from_iter(group_change.table_ids.clone()),
BTreeSet::from_iter(group_change.table_ids.clone()),
group_change.new_sst_start_id,
);

Expand Down Expand Up @@ -998,7 +998,7 @@ pub fn build_initial_compaction_group_levels(
}

fn split_sst_info_for_level(
member_table_ids: &HashSet<u32>,
member_table_ids: &BTreeSet<u32>,
level: &mut Level,
new_sst_id: &mut u64,
) -> Vec<SstableInfo> {
Expand Down Expand Up @@ -1338,7 +1338,7 @@ pub fn split_sst(
new_sst_id: &mut u64,
old_sst_size: u64,
new_sst_size: u64,
new_sst_table_ids: Vec<u32>,
new_table_ids: Vec<u32>,
) -> SstableInfo {
let mut branch_table_info = sst_info.clone();
branch_table_info.sst_id = *new_sst_id;
Expand All @@ -1350,9 +1350,11 @@ pub fn split_sst(
{
// related github.com/risingwavelabs/risingwave/pull/17898/
// This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898

let set1: HashSet<_> = sst_info.table_ids.iter().cloned().collect();
let set2: HashSet<_> = new_sst_table_ids.iter().cloned().collect();
// sst_info.table_ids = vec[1, 2, 3];
// new_table_ids = vec[2, 3, 4];
// branch_table_info.table_ids = vec[1, 2, 3] ∩ vec[2, 3, 4] = vec[2, 3]
let set1: BTreeSet<_> = sst_info.table_ids.iter().cloned().collect();
let set2: BTreeSet<_> = new_table_ids.into_iter().collect();
let intersection: Vec<_> = set1.intersection(&set2).cloned().collect();

// Update table_ids
Expand Down
4 changes: 4 additions & 0 deletions src/storage/hummock_sdk/src/sstable_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl SstableInfo {

impl From<PbSstableInfo> for SstableInfo {
fn from(pb_sstable_info: PbSstableInfo) -> Self {
assert!(pb_sstable_info.table_ids.is_sorted());
Self {
object_id: pb_sstable_info.object_id,
sst_id: pb_sstable_info.sst_id,
Expand Down Expand Up @@ -100,6 +101,7 @@ impl From<PbSstableInfo> for SstableInfo {

impl From<&PbSstableInfo> for SstableInfo {
fn from(pb_sstable_info: &PbSstableInfo) -> Self {
assert!(pb_sstable_info.table_ids.is_sorted());
Self {
object_id: pb_sstable_info.object_id,
sst_id: pb_sstable_info.sst_id,
Expand Down Expand Up @@ -137,6 +139,7 @@ impl From<&PbSstableInfo> for SstableInfo {
impl From<SstableInfo> for PbSstableInfo {
fn from(sstable_info: SstableInfo) -> Self {
assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped());
assert!(sstable_info.table_ids.is_sorted());
PbSstableInfo {
object_id: sstable_info.object_id,
sst_id: sstable_info.sst_id,
Expand Down Expand Up @@ -175,6 +178,7 @@ impl From<SstableInfo> for PbSstableInfo {
impl From<&SstableInfo> for PbSstableInfo {
fn from(sstable_info: &SstableInfo) -> Self {
assert!(sstable_info.sst_size > 0 || sstable_info.is_stripped());
assert!(sstable_info.table_ids.is_sorted());
PbSstableInfo {
object_id: sstable_info.object_id,
sst_id: sstable_info.sst_id,
Expand Down
15 changes: 14 additions & 1 deletion src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use risingwave_hummock_sdk::key::{
gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN,
};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::table_stats::TableStats;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
};
Expand Down Expand Up @@ -2510,8 +2511,20 @@ async fn test_commit_multi_epoch() {
new_table_watermarks: Default::default(),
sst_to_context: context_id_map(&[sst.object_id]),
sstables: vec![LocalSstableInfo {
table_stats: sst
.table_ids
.iter()
.map(|&table_id| {
(
table_id,
TableStats {
total_compressed_size: 10,
..Default::default()
},
)
})
.collect(),
sst_info: sst,
table_stats: Default::default(),
}],
new_table_fragment_info,
change_log_delta: Default::default(),
Expand Down

0 comments on commit 0134191

Please sign in to comment.