diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 9e07598d07920..2a1e924d99144 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::sync::Arc; use itertools::Itertools; use risingwave_common::catalog::TableId; @@ -33,7 +34,7 @@ use super::StateTableId; use crate::compaction_group::StaticCompactionGroupId; use crate::key_range::KeyRangeCommon; use crate::prost_key_range::KeyRangeExt; -use crate::table_watermark::{TableWatermarks, TableWatermarksIndex, VnodeWatermark}; +use crate::table_watermark::{TableWatermarks, VnodeWatermark}; use crate::version::{HummockVersion, HummockVersionDelta}; use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId}; @@ -195,18 +196,6 @@ impl HummockVersion { .unwrap_or(0) } - pub fn build_table_watermarks_index(&self) -> HashMap { - self.table_watermarks - .iter() - .map(|(table_id, table_watermarks)| { - ( - *table_id, - table_watermarks.build_index(self.max_committed_epoch), - ) - }) - .collect() - } - pub fn safe_epoch_table_watermarks( &self, existing_table_ids: &[u32], @@ -598,25 +587,34 @@ impl HummockVersion { for table_id in &version_delta.removed_table_ids { let _ = self.table_watermarks.remove(table_id); } + + let mut modified_table_watermarks: HashMap = HashMap::new(); + for (table_id, table_watermarks) in &version_delta.new_table_watermarks { - match self.table_watermarks.entry(*table_id) { - Entry::Occupied(mut entry) => { - entry.get_mut().apply_new_table_watermarks(table_watermarks); - } - Entry::Vacant(entry) => { - entry.insert(table_watermarks.clone()); - } + if let Some(current_table_watermarks) = self.table_watermarks.get(table_id) { + let mut current_table_watermarks = (**current_table_watermarks).clone(); + current_table_watermarks.apply_new_table_watermarks(table_watermarks); + modified_table_watermarks.insert(*table_id, current_table_watermarks); + } else { + modified_table_watermarks.insert(*table_id, table_watermarks.clone()); } } if version_delta.safe_epoch != self.safe_epoch { assert!(version_delta.safe_epoch > self.safe_epoch); - self.table_watermarks - .values_mut() - .for_each(|table_watermarks| { - table_watermarks.clear_stale_epoch_watermark(version_delta.safe_epoch) - }); + for (table_id, table_watermarks) in &self.table_watermarks { + let table_watermarks = modified_table_watermarks + .entry(*table_id) + .or_insert_with(|| (**table_watermarks).clone()); + table_watermarks.clear_stale_epoch_watermark(version_delta.safe_epoch); + } self.safe_epoch = version_delta.safe_epoch; } + + for (table_id, table_watermarks) in modified_table_watermarks { + self.table_watermarks + .insert(table_id, Arc::new(table_watermarks)); + } + sst_split_info } diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 59dbf17701eb4..c9e46fd99cb85 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -14,18 +14,19 @@ use std::cmp::Ordering; use std::collections::hash_map::Entry; -use std::collections::{btree_map, BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::mem::size_of; use std::ops::Bound::{Excluded, Included, Unbounded}; use std::sync::Arc; use bytes::Bytes; +use itertools::Itertools; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks; -use risingwave_pb::hummock::{PbTableWatermarks, PbVnodeWatermark}; +use risingwave_pb::hummock::{PbVnodeWatermark, TableWatermarks as PbTableWatermarks}; use tracing::{debug, warn}; use crate::key::{prefix_slice_with_vnode, vnode, TableKey, TableKeyRange}; @@ -37,40 +38,12 @@ pub struct ReadTableWatermark { pub vnode_watermarks: BTreeMap, } -impl ReadTableWatermark { - pub fn merge_multiple(mut watermarks: Vec) -> Option { - fn merge_other(this: &mut ReadTableWatermark, other: ReadTableWatermark) { - assert_eq!(this.direction, other.direction); - for (vnode, watermark) in other.vnode_watermarks { - match this.vnode_watermarks.entry(vnode) { - btree_map::Entry::Vacant(entry) => { - entry.insert(watermark); - } - btree_map::Entry::Occupied(mut entry) => { - let prev_watermark = entry.get(); - let overwrite = match this.direction { - WatermarkDirection::Ascending => watermark > prev_watermark, - WatermarkDirection::Descending => watermark < prev_watermark, - }; - if overwrite { - entry.insert(watermark); - } - } - } - } - } - let mut ret = watermarks.pop()?; - while let Some(watermark) = watermarks.pop() { - merge_other(&mut ret, watermark); - } - Some(ret) - } -} - #[derive(Clone)] pub struct TableWatermarksIndex { - watermark_direction: WatermarkDirection, - index: HashMap>, + pub watermark_direction: WatermarkDirection, + // later epoch at the back + pub staging_watermarks: VecDeque<(HummockEpoch, Arc<[VnodeWatermark]>)>, + pub committed_watermarks: Option>, latest_epoch: HummockEpoch, committed_epoch: HummockEpoch, } @@ -79,23 +52,43 @@ impl TableWatermarksIndex { pub fn new(watermark_direction: WatermarkDirection, committed_epoch: HummockEpoch) -> Self { Self { watermark_direction, - index: Default::default(), + staging_watermarks: VecDeque::new(), + committed_watermarks: None, latest_epoch: committed_epoch, committed_epoch, } } - pub fn index(&self) -> &HashMap> { - &self.index + pub fn new_committed( + committed_watermarks: Arc, + committed_epoch: HummockEpoch, + ) -> Self { + Self { + watermark_direction: committed_watermarks.direction, + staging_watermarks: VecDeque::new(), + committed_epoch, + latest_epoch: committed_epoch, + committed_watermarks: Some(committed_watermarks), + } } pub fn read_watermark(&self, vnode: VirtualNode, epoch: HummockEpoch) -> Option { - self.index.get(&vnode).and_then(|epoch_watermarks| { - epoch_watermarks - .upper_bound(Included(&epoch)) - .value() - .cloned() - }) + // iterate from new epoch to old epoch + for (watermark_epoch, vnode_watermark_list) in self.staging_watermarks.iter().rev().chain( + self.committed_watermarks + .iter() + .flat_map(|watermarks| watermarks.watermarks.iter().rev()), + ) { + if *watermark_epoch > epoch { + continue; + } + for vnode_watermark in vnode_watermark_list.as_ref() { + if vnode_watermark.vnode_bitmap.is_set(vnode.to_index()) { + return Some(vnode_watermark.watermark.clone()); + } + } + } + None } pub fn latest_watermark(&self, vnode: VirtualNode) -> Option { @@ -208,45 +201,54 @@ impl TableWatermarksIndex { pub fn add_epoch_watermark( &mut self, epoch: HummockEpoch, - vnode_watermark_list: &Vec, + vnode_watermark_list: Arc<[VnodeWatermark]>, direction: WatermarkDirection, ) { assert!(epoch > self.latest_epoch); assert_eq!(self.watermark_direction, direction); self.latest_epoch = epoch; - for vnode_watermark in vnode_watermark_list { - for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() { - let epoch_watermarks = self.index.entry(vnode).or_default(); - if let Some((prev_epoch, prev_watermark)) = epoch_watermarks.last_key_value() { - assert!(*prev_epoch < epoch); - match self.watermark_direction { - WatermarkDirection::Ascending => { - assert!(vnode_watermark.watermark >= prev_watermark); - } - WatermarkDirection::Descending => { - assert!(vnode_watermark.watermark <= prev_watermark); - } - }; - }; - assert!(self - .index - .entry(vnode) - .or_default() - .insert(epoch, vnode_watermark.watermark.clone()) - .is_none()); + #[cfg(debug_assertions)] + { + let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT); + for vnode_watermark in vnode_watermark_list.as_ref() { + for vnode in vnode_watermark.vnode_bitmap.iter_ones() { + assert!(!vnode_is_set.is_set(vnode)); + vnode_is_set.set(vnode, true); + let vnode = VirtualNode::from_index(vnode); + if let Some(prev_watermark) = self.latest_watermark(vnode) { + match self.watermark_direction { + WatermarkDirection::Ascending => { + assert!(vnode_watermark.watermark >= prev_watermark); + } + WatermarkDirection::Descending => { + assert!(vnode_watermark.watermark <= prev_watermark); + } + }; + } + } } } + self.staging_watermarks + .push_back((epoch, vnode_watermark_list)); } - pub fn apply_committed_watermarks(&mut self, committed_index: &TableWatermarksIndex) { - self.committed_epoch = committed_index.committed_epoch; - for (vnode, committed_epoch_watermark) in &committed_index.index { - let epoch_watermark = self.index.entry(*vnode).or_default(); - // keep only watermark higher than committed epoch - *epoch_watermark = epoch_watermark.split_off(&committed_index.committed_epoch); - for (epoch, watermark) in committed_epoch_watermark { - epoch_watermark.insert(*epoch, watermark.clone()); - } + pub fn apply_committed_watermarks( + &mut self, + committed_watermark: Arc, + committed_epoch: HummockEpoch, + ) { + assert_eq!(self.watermark_direction, committed_watermark.direction); + assert!(self.committed_epoch <= committed_epoch); + if self.committed_epoch == committed_epoch { + return; + } + self.committed_epoch = committed_epoch; + self.committed_watermarks = Some(committed_watermark); + // keep only watermark higher than committed epoch + while let Some((old_epoch, _)) = self.staging_watermarks.front() + && *old_epoch <= committed_epoch + { + let _ = self.staging_watermarks.pop_front(); } } } @@ -315,8 +317,8 @@ impl VnodeWatermark { #[derive(Clone, Debug, PartialEq)] pub struct TableWatermarks { // later epoch at the back - pub(crate) watermarks: Vec<(HummockEpoch, Vec)>, - pub(crate) direction: WatermarkDirection, + pub watermarks: Vec<(HummockEpoch, Arc<[VnodeWatermark]>)>, + pub direction: WatermarkDirection, } impl TableWatermarks { @@ -327,7 +329,7 @@ impl TableWatermarks { ) -> Self { Self { direction, - watermarks: vec![(epoch, watermarks)], + watermarks: vec![(epoch, Arc::from(watermarks))], } } @@ -366,7 +368,7 @@ impl TableWatermarks { pub fn add_new_epoch_watermarks( &mut self, epoch: HummockEpoch, - watermarks: Vec, + watermarks: Arc<[VnodeWatermark]>, direction: WatermarkDirection, ) { assert_eq!(self.direction, direction); @@ -387,8 +389,8 @@ impl TableWatermarks { .watermarks .iter() .map(VnodeWatermark::from_protobuf) - .collect(); - (epoch, watermarks) + .collect_vec(); + (epoch, Arc::from(watermarks)) }) .collect(), direction: if pb.is_ascending { @@ -398,20 +400,6 @@ impl TableWatermarks { }, } } - - pub fn build_index(&self, committed_epoch: HummockEpoch) -> TableWatermarksIndex { - let mut ret = TableWatermarksIndex { - index: HashMap::new(), - watermark_direction: self.direction, - latest_epoch: HummockEpoch::MIN, - committed_epoch: HummockEpoch::MIN, - }; - for (epoch, vnode_watermark_list) in &self.watermarks { - ret.add_epoch_watermark(*epoch, vnode_watermark_list, self.direction); - } - ret.committed_epoch = committed_epoch; - ret - } } pub fn merge_multiple_new_table_watermarks( @@ -437,7 +425,7 @@ pub fn merge_multiple_new_table_watermarks( epoch_watermarks .entry(new_epoch) .or_insert_with(Vec::new) - .extend(new_epoch_watermarks); + .extend(new_epoch_watermarks.iter().cloned()); } } } @@ -448,7 +436,10 @@ pub fn merge_multiple_new_table_watermarks( TableWatermarks { direction, // ordered from earlier epoch to later epoch - watermarks: epoch_watermarks.into_iter().collect(), + watermarks: epoch_watermarks + .into_iter() + .map(|(epoch, watermarks)| (epoch, Arc::from(watermarks))) + .collect(), }, ) }) @@ -499,7 +490,7 @@ impl TableWatermarks { while let Some((epoch, _)) = self.watermarks.last() { if *epoch >= safe_epoch { let (epoch, watermarks) = self.watermarks.pop().expect("have check Some"); - for watermark in &watermarks { + for watermark in watermarks.as_ref() { for vnode in watermark.vnode_bitmap.iter_vnodes() { unset_vnode.remove(&vnode); } @@ -513,7 +504,7 @@ impl TableWatermarks { && let Some((_, watermarks)) = self.watermarks.pop() { let mut new_vnode_watermarks = Vec::new(); - for vnode_watermark in watermarks { + for vnode_watermark in watermarks.as_ref() { let mut set_vnode = Vec::new(); for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() { if unset_vnode.remove(&vnode) { @@ -528,7 +519,7 @@ impl TableWatermarks { let bitmap = Arc::new(builder.finish()); new_vnode_watermarks.push(VnodeWatermark { vnode_bitmap: bitmap, - watermark: vnode_watermark.watermark, + watermark: vnode_watermark.watermark.clone(), }) } } @@ -536,9 +527,15 @@ impl TableWatermarks { if let Some((last_epoch, last_watermarks)) = result_epoch_watermark.last_mut() && *last_epoch == safe_epoch { - last_watermarks.extend(new_vnode_watermarks); + *last_watermarks = Arc::from( + last_watermarks + .iter() + .cloned() + .chain(new_vnode_watermarks.into_iter()) + .collect_vec(), + ); } else { - result_epoch_watermark.push((safe_epoch, new_vnode_watermarks)); + result_epoch_watermark.push((safe_epoch, Arc::from(new_vnode_watermarks))); } } } @@ -568,6 +565,7 @@ mod tests { use std::vec; use bytes::Bytes; + use itertools::Itertools; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; @@ -609,7 +607,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 1, 2, 3]), watermark2.clone(), - )], + )] + .into(), direction, ); @@ -629,7 +628,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(0..VirtualNode::COUNT), watermark3.clone(), - )], + )] + .into(), direction, ); let epoch4 = epoch3 + 1; @@ -639,7 +639,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 3, 4]), watermark4.clone(), - )], + )] + .into(), direction, ); second_table_watermark.add_new_epoch_watermarks( @@ -647,7 +648,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 3, 4]), watermark4.clone(), - )], + )] + .into(), direction, ); @@ -677,7 +679,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 1, 2, 3]), watermark2.clone(), - )], + )] + .into(), direction, ); let epoch3 = epoch2 + 1; @@ -686,7 +689,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(0..VirtualNode::COUNT), watermark3.clone(), - )], + )] + .into(), direction, ); let epoch4 = epoch3 + 1; @@ -696,7 +700,8 @@ mod tests { vec![VnodeWatermark::new( build_bitmap(vec![0, 3, 4]), watermark4.clone(), - )], + )] + .into(), direction, ); @@ -715,6 +720,7 @@ mod tests { build_bitmap(vec![0, 1, 2, 3]), watermark2.clone(), )] + .into() ), ( epoch3, @@ -722,6 +728,7 @@ mod tests { build_bitmap(0..VirtualNode::COUNT), watermark3.clone(), )] + .into() ), ( epoch5, @@ -729,6 +736,7 @@ mod tests { build_bitmap(vec![0, 3, 4]), watermark4.clone(), )] + .into() ) ], direction, @@ -746,6 +754,7 @@ mod tests { build_bitmap(0..VirtualNode::COUNT), watermark3.clone(), )] + .into() ), ( epoch5, @@ -753,6 +762,7 @@ mod tests { build_bitmap(vec![0, 3, 4]), watermark4.clone(), )] + .into() ) ], direction, @@ -770,6 +780,7 @@ mod tests { build_bitmap((1..3).chain(5..VirtualNode::COUNT)), watermark3.clone() )] + .into() ), ( epoch5, @@ -777,6 +788,7 @@ mod tests { build_bitmap(vec![0, 3, 4]), watermark4.clone(), )] + .into() ) ], direction, @@ -796,6 +808,7 @@ mod tests { watermark3.clone() ) ] + .into() )], direction, } @@ -804,7 +817,7 @@ mod tests { #[test] fn test_merge_multiple_new_table_watermarks() { - fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Vec) { + fn epoch_new_watermark(epoch: u64, bitmaps: Vec<&Bitmap>) -> (u64, Arc<[VnodeWatermark]>) { ( epoch, bitmaps @@ -813,7 +826,8 @@ mod tests { watermark: Bytes::from(vec![1, 2, epoch as _]), vnode_bitmap: Arc::new(bitmap.clone()), }) - .collect(), + .collect_vec() + .into(), ) } fn build_table_watermark( @@ -886,12 +900,12 @@ mod tests { let mut index = TableWatermarksIndex::new(direction, COMMITTED_EPOCH); index.add_epoch_watermark( EPOCH1, - &vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())], + vec![VnodeWatermark::new(build_bitmap(0..4), watermark1.clone())].into(), direction, ); index.add_epoch_watermark( EPOCH2, - &vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())], + vec![VnodeWatermark::new(build_bitmap(1..5), watermark2.clone())].into(), direction, ); @@ -1026,28 +1040,30 @@ mod tests { vec![VnodeWatermark { watermark: watermark1.clone(), vnode_bitmap: build_bitmap(0..VirtualNode::COUNT), - }], + }] + .into(), )], direction: WatermarkDirection::Ascending, - }, + } + .into(), + ); + index.apply_committed_watermarks( + version + .table_watermarks + .get(&test_table_id) + .unwrap() + .clone(), + EPOCH1, ); - let committed_index = version - .build_table_watermarks_index() - .remove(&test_table_id) - .unwrap(); - index.apply_committed_watermarks(&committed_index); assert_eq!(EPOCH1, index.committed_epoch); assert_eq!(EPOCH2, index.latest_epoch); for vnode in 0..VirtualNode::COUNT { let vnode = VirtualNode::from_index(vnode); - let epoch_watermark = index.index.get(&vnode).unwrap(); if (1..5).contains(&vnode.to_index()) { - assert_eq!(2, epoch_watermark.len()); - assert_eq!(&watermark1, epoch_watermark.get(&EPOCH1).unwrap()); - assert_eq!(&watermark2, epoch_watermark.get(&EPOCH2).unwrap()); + assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap()); + assert_eq!(watermark2, index.read_watermark(vnode, EPOCH2).unwrap()); } else { - assert_eq!(1, epoch_watermark.len()); - assert_eq!(&watermark1, epoch_watermark.get(&EPOCH1).unwrap()); + assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap()); } } } diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 0c6680d7cee86..6f227b9889363 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::mem::size_of; +use std::sync::Arc; use prost::Message; use risingwave_common::catalog::TableId; @@ -30,7 +31,7 @@ pub struct HummockVersion { pub levels: HashMap, pub max_committed_epoch: u64, pub safe_epoch: u64, - pub table_watermarks: HashMap, + pub table_watermarks: HashMap>, } impl Default for HummockVersion { @@ -68,7 +69,7 @@ impl HummockVersion { .map(|(table_id, table_watermark)| { ( TableId::new(*table_id), - TableWatermarks::from_protobuf(table_watermark), + Arc::new(TableWatermarks::from_protobuf(table_watermark)), ) }) .collect(), diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 3762cdce95517..7e7b98d63ddfd 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -26,13 +26,13 @@ pub(crate) mod tests { use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::constants::hummock::CompactionFilterFlag; + use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::Epoch; use risingwave_common_service::observer_manager::NotificationClient; use risingwave_hummock_sdk::can_concat; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{ - next_key, prefix_slice_with_vnode, prefixed_range_with_vnode, FullKey, TableKey, - TABLE_PREFIX_LEN, + next_key, prefixed_range_with_vnode, FullKey, TableKey, TABLE_PREFIX_LEN, }; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 04ff5c456557e..5037d550ca2b5 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -26,7 +26,9 @@ use risingwave_common::range::RangeBoundsExt; use risingwave_hummock_sdk::key::{ gen_key_from_bytes, prefixed_range_with_vnode, FullKey, TableKey, UserKey, TABLE_PREFIX_LEN, }; -use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; +use risingwave_hummock_sdk::table_watermark::{ + TableWatermarksIndex, VnodeWatermark, WatermarkDirection, +}; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::store::version::read_filter_for_version; @@ -2116,21 +2118,23 @@ async fn test_table_watermark() { let (local1, local2) = test_after_epoch2(local1, local2).await; let check_version_table_watermark = |version: PinnedVersion| { - let table_watermarks = version.table_watermark_index().get(&TEST_TABLE_ID).unwrap(); + let table_watermarks = TableWatermarksIndex::new_committed( + version + .version() + .table_watermarks + .get(&TEST_TABLE_ID) + .unwrap() + .clone(), + version.max_committed_epoch(), + ); assert_eq!(WatermarkDirection::Ascending, table_watermarks.direction()); - let index = table_watermarks.index(); - assert_eq!(2, index.len()); - let vnode1_watermark = index.get(&vnode1).unwrap(); - assert_eq!(1, vnode1_watermark.len()); assert_eq!( - &gen_inner_key(watermark1), - vnode1_watermark.get(&epoch1).unwrap() + gen_inner_key(watermark1), + table_watermarks.read_watermark(vnode1, epoch1).unwrap() ); - let vnode2_watermark = index.get(&vnode2).unwrap(); - assert_eq!(1, vnode2_watermark.len()); assert_eq!( - &gen_inner_key(watermark1), - vnode2_watermark.get(&epoch1).unwrap() + gen_inner_key(watermark1), + table_watermarks.read_watermark(vnode2, epoch1).unwrap() ); }; diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 2b096a448614f..4e3aaeec57b4b 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -506,9 +506,11 @@ impl SealedData { for (table_id, (direction, watermarks, _)) in unseal_epoch_data.table_watermarks { match self.table_watermarks.entry(table_id) { Entry::Occupied(mut entry) => { - entry - .get_mut() - .add_new_epoch_watermarks(epoch, watermarks, direction); + entry.get_mut().add_new_epoch_watermarks( + epoch, + Arc::from(watermarks), + direction, + ); } Entry::Vacant(entry) => { entry.insert(TableWatermarks::single_epoch(epoch, watermarks, direction)); diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 46ef8edc442b3..da9569e6bb83c 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -19,7 +19,6 @@ use std::time::{Duration, Instant}; use auto_enums::auto_enum; use risingwave_common::catalog::TableId; -use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID}; use risingwave_pb::hummock::hummock_version::Levels; @@ -77,7 +76,6 @@ impl Drop for PinnedVersionGuard { pub struct PinnedVersion { version: Arc, compaction_group_index: Arc>, - table_watermark_index: Arc>, guard: Arc, } @@ -88,12 +86,10 @@ impl PinnedVersion { ) -> Self { let version_id = version.id; let compaction_group_index = version.build_compaction_group_info(); - let table_watermark_index = version.build_table_watermarks_index(); PinnedVersion { version: Arc::new(version), compaction_group_index: Arc::new(compaction_group_index), - table_watermark_index: Arc::new(table_watermark_index), guard: Arc::new(PinnedVersionGuard::new( version_id, pinned_version_manager_tx, @@ -105,11 +101,7 @@ impl PinnedVersion { self.compaction_group_index.clone() } - pub fn table_watermark_index(&self) -> &Arc> { - &self.table_watermark_index - } - - pub(crate) fn new_pin_version(&self, version: HummockVersion) -> Self { + pub fn new_pin_version(&self, version: HummockVersion) -> Self { assert!( version.id >= self.version.id, "pinning a older version {}. Current is {}", @@ -118,12 +110,10 @@ impl PinnedVersion { ); let version_id = version.id; let compaction_group_index = version.build_compaction_group_info(); - let table_watermark_index = version.build_table_watermarks_index(); PinnedVersion { version: Arc::new(version), compaction_group_index: Arc::new(compaction_group_index), - table_watermark_index: Arc::new(table_watermark_index), guard: Arc::new(PinnedVersionGuard::new( version_id, self.guard.pinned_version_manager_tx.clone(), diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 3ddd2280752d4..99f5b30767883 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -27,6 +27,7 @@ use risingwave_common_service::observer_manager::{NotificationClient, ObserverMa use risingwave_hummock_sdk::key::{ is_empty_key_range, vnode, vnode_range, TableKey, TableKeyRange, }; +use risingwave_hummock_sdk::table_watermark::TableWatermarksIndex; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; @@ -118,8 +119,9 @@ pub fn get_committed_read_version_tuple( mut key_range: TableKeyRange, epoch: HummockEpoch, ) -> (TableKeyRange, ReadVersionTuple) { - if let Some(index) = version.table_watermark_index().get(&table_id) { - index.rewrite_range_with_table_watermark(epoch, &mut key_range) + if let Some(table_watermarks) = version.version().table_watermarks.get(&table_id) { + TableWatermarksIndex::new_committed(table_watermarks.clone(), version.max_committed_epoch()) + .rewrite_range_with_table_watermark(epoch, &mut key_range) } (key_range, (vec![], vec![], version)) } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 45148376838ed..1f60538596f50 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -236,9 +236,15 @@ impl HummockReadVersion { Self { table_id, table_watermarks: committed_version - .table_watermark_index() + .version() + .table_watermarks .get(&table_id) - .cloned(), + .map(|table_watermarks| { + TableWatermarksIndex::new_committed( + table_watermarks.clone(), + committed_version.max_committed_epoch(), + ) + }), staging: StagingVersion { imm: VecDeque::default(), sst: VecDeque::default(), @@ -379,13 +385,22 @@ impl HummockReadVersion { })); } - if let Some(committed_watermarks) = - self.committed.table_watermark_index().get(&self.table_id) + if let Some(committed_watermarks) = self + .committed + .version() + .table_watermarks + .get(&self.table_id) { if let Some(watermark_index) = &mut self.table_watermarks { - watermark_index.apply_committed_watermarks(committed_watermarks); + watermark_index.apply_committed_watermarks( + committed_watermarks.clone(), + self.committed.max_committed_epoch(), + ); } else { - self.table_watermarks = Some(committed_watermarks.clone()); + self.table_watermarks = Some(TableWatermarksIndex::new_committed( + committed_watermarks.clone(), + self.committed.max_committed_epoch(), + )); } } } @@ -398,7 +413,7 @@ impl HummockReadVersion { .get_or_insert_with(|| { TableWatermarksIndex::new(direction, self.committed.max_committed_epoch()) }) - .add_epoch_watermark(epoch, &vnode_watermarks, direction), + .add_epoch_watermark(epoch, Arc::from(vnode_watermarks), direction), } } @@ -425,9 +440,15 @@ impl HummockReadVersion { self.staging.sst.clear(); self.table_watermarks = self .committed - .table_watermark_index() + .version() + .table_watermarks .get(&self.table_id) - .cloned() + .map(|table_watermark| { + TableWatermarksIndex::new_committed( + table_watermark.clone(), + self.committed.max_committed_epoch(), + ) + }); } pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable) {