Skip to content

Commit

Permalink
perf(storage): simplify table watermark index (#15931)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Mar 29, 2024
1 parent 029b266 commit fb862a5
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 190 deletions.
48 changes: 23 additions & 25 deletions src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::sync::Arc;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
Expand All @@ -33,7 +34,7 @@ use super::StateTableId;
use crate::compaction_group::StaticCompactionGroupId;
use crate::key_range::KeyRangeCommon;
use crate::prost_key_range::KeyRangeExt;
use crate::table_watermark::{TableWatermarks, TableWatermarksIndex, VnodeWatermark};
use crate::table_watermark::{TableWatermarks, VnodeWatermark};
use crate::version::{HummockVersion, HummockVersionDelta};
use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId};

Expand Down Expand Up @@ -195,18 +196,6 @@ impl HummockVersion {
.unwrap_or(0)
}

pub fn build_table_watermarks_index(&self) -> HashMap<TableId, TableWatermarksIndex> {
self.table_watermarks
.iter()
.map(|(table_id, table_watermarks)| {
(
*table_id,
table_watermarks.build_index(self.max_committed_epoch),
)
})
.collect()
}

pub fn safe_epoch_table_watermarks(
&self,
existing_table_ids: &[u32],
Expand Down Expand Up @@ -598,25 +587,34 @@ impl HummockVersion {
for table_id in &version_delta.removed_table_ids {
let _ = self.table_watermarks.remove(table_id);
}

let mut modified_table_watermarks: HashMap<TableId, TableWatermarks> = HashMap::new();

for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
match self.table_watermarks.entry(*table_id) {
Entry::Occupied(mut entry) => {
entry.get_mut().apply_new_table_watermarks(table_watermarks);
}
Entry::Vacant(entry) => {
entry.insert(table_watermarks.clone());
}
if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) {
let mut current_table_watermarks = (**current_table_watermarks).clone();
current_table_watermarks.apply_new_table_watermarks(table_watermarks);
modified_table_watermarks.insert(*table_id, current_table_watermarks);
} else {
modified_table_watermarks.insert(*table_id, table_watermarks.clone());
}
}
if version_delta.safe_epoch != self.safe_epoch {
assert!(version_delta.safe_epoch > self.safe_epoch);
self.table_watermarks
.values_mut()
.for_each(|table_watermarks| {
table_watermarks.clear_stale_epoch_watermark(version_delta.safe_epoch)
});
for (table_id, table_watermarks) in &self.table_watermarks {
let table_watermarks = modified_table_watermarks
.entry(*table_id)
.or_insert_with(|| (**table_watermarks).clone());
table_watermarks.clear_stale_epoch_watermark(version_delta.safe_epoch);
}
self.safe_epoch = version_delta.safe_epoch;
}

for (table_id, table_watermarks) in modified_table_watermarks {
self.table_watermarks
.insert(table_id, Arc::new(table_watermarks));
}

sst_split_info
}

Expand Down
Loading

0 comments on commit fb862a5

Please sign in to comment.