Skip to content

Commit

Permalink
fix(storage): fix split_sst condition
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Sep 5, 2024
1 parent 8c7e44f commit 22cb537
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 32 deletions.
33 changes: 7 additions & 26 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::change_log::ChangeLogDelta;
Expand Down Expand Up @@ -220,23 +220,8 @@ impl HummockManager {
NewTableFragmentInfo::None => (HashMap::new(), None, None),
};

let mut group_members_table_ids: HashMap<u64, BTreeSet<TableId>> = HashMap::new();
{
// expand group_members_table_ids
for (table_id, group_id) in &table_compaction_group_mapping {
group_members_table_ids
.entry(*group_id)
.or_default()
.insert(*table_id);
}
}

let commit_sstables = self
.correct_commit_ssts(
sstables,
&table_compaction_group_mapping,
&group_members_table_ids,
)
.correct_commit_ssts(sstables, &table_compaction_group_mapping)
.await?;

let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();
Expand Down Expand Up @@ -389,7 +374,6 @@ impl HummockManager {
&self,
sstables: Vec<LocalSstableInfo>,
table_compaction_group_mapping: &HashMap<TableId, CompactionGroupId>,
group_members_table_ids: &HashMap<CompactionGroupId, BTreeSet<TableId>>,
) -> Result<BTreeMap<CompactionGroupId, Vec<SstableInfo>>> {
let mut new_sst_id_number = 0;
let mut sst_to_cg_vec = Vec::with_capacity(sstables.len());
Expand Down Expand Up @@ -424,14 +408,11 @@ impl HummockManager {
let mut commit_sstables: BTreeMap<u64, Vec<SstableInfo>> = BTreeMap::new();

for (mut sst, group_table_ids) in sst_to_cg_vec {
for (group_id, match_ids) in group_table_ids {
let group_members_table_ids = group_members_table_ids.get(&group_id).unwrap();
if sst
.sst_info
.table_ids
.iter()
.all(|id| group_members_table_ids.contains(&TableId::new(*id)))
{
let len = group_table_ids.len();
for (index, (group_id, match_ids)) in group_table_ids.into_iter().enumerate() {
if sst.sst_info.table_ids == match_ids {
// The SST contains all the tables in the group should be last key
assert!(index == len - 1);
commit_sstables
.entry(group_id)
.or_default()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1350,16 +1350,13 @@ pub fn split_sst(
{
// related github.com/risingwavelabs/risingwave/pull/17898/
// This is a temporary implementation that will update `table_ids`` based on the new split rule after PR 17898

let set1: HashSet<_> = sst_info.table_ids.iter().cloned().collect();
let set2: HashSet<_> = new_sst_table_ids.iter().cloned().collect();
let intersection: Vec<_> = set1.intersection(&set2).cloned().collect();

// Update table_ids
branch_table_info.table_ids = intersection;
branch_table_info.table_ids = new_sst_table_ids;
sst_info
.table_ids
.retain(|table_id| !branch_table_info.table_ids.contains(table_id));
assert!(sst_info.table_ids.is_sorted());
assert!(branch_table_info.table_ids.is_sorted());
}

*new_sst_id += 1;
Expand Down

0 comments on commit 22cb537

Please sign in to comment.