From 6b29ec919341cccd6ce41dcf1280b900a68485bf Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 20 Sep 2024 17:41:47 +0800 Subject: [PATCH] feat(storage): variable vnode count support (#18415) Signed-off-by: Bugen Zhao --- src/common/src/hash/consistent_hash/vnode.rs | 13 ++- .../sink_coordination/coordinator_worker.rs | 18 ++-- .../src/manager/sink_coordination/manager.rs | 34 ++++---- src/storage/benches/bench_table_watermarks.rs | 10 +-- src/storage/hummock_sdk/src/key.rs | 35 ++++++-- .../hummock_sdk/src/table_watermark.rs | 85 +++++++++++++------ .../hummock_test/src/compactor_tests.rs | 12 +-- .../src/hummock_read_version_tests.rs | 4 +- .../hummock_test/src/hummock_storage_tests.rs | 4 +- .../hummock_test/src/state_store_tests.rs | 2 +- src/storage/hummock_trace/src/opts.rs | 2 +- .../event_handler/hummock_event_handler.rs | 4 +- .../src/hummock/event_handler/uploader/mod.rs | 13 ++- .../shared_buffer/shared_buffer_batch.rs | 32 ------- .../src/hummock/sstable/multi_builder.rs | 34 +++++--- src/storage/src/store.rs | 2 +- .../common/log_store_impl/kv_log_store/mod.rs | 8 +- .../log_store_impl/kv_log_store/serde.rs | 16 ++-- .../log_store_impl/kv_log_store/writer.rs | 4 +- 19 files changed, 193 insertions(+), 139 deletions(-) diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 685f99d6cf4f4..f719515682625 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -68,7 +68,16 @@ impl VirtualNode { impl VirtualNode { /// The maximum count of virtual nodes that fits in [`VirtualNodeInner`]. - pub const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS; + /// + /// Note that the most significant bit is not used. This is because we use signed integers (`i16`) + /// for the scalar representation, where overflow can be confusing in terms of ordering. + // TODO(var-vnode): the only usage is in log-store, shall we update it by storing the vnode as + // bytea to enable 2^16 vnodes? + pub const MAX_COUNT: usize = 1 << (VirtualNodeInner::BITS - 1); + /// The maximum value of the virtual node that can be represented. + /// + /// Note that this is **NOT** the maximum value of the virtual node, which depends on the configuration. + pub const MAX_REPRESENTABLE: VirtualNode = VirtualNode::from_index(Self::MAX_COUNT - 1); /// The size of a virtual node in bytes, in memory or serialized representation. pub const SIZE: usize = std::mem::size_of::(); } @@ -96,6 +105,7 @@ impl VirtualNode { /// Creates a virtual node from the given scalar representation. Used by `VNODE` expression. pub const fn from_scalar(scalar: i16) -> Self { + debug_assert!(scalar >= 0); Self(scalar as _) } @@ -115,6 +125,7 @@ impl VirtualNode { /// Creates a virtual node from the given big-endian bytes representation. pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { let inner = VirtualNodeInner::from_be_bytes(bytes); + debug_assert!((inner as usize) < Self::MAX_COUNT); Self(inner) } diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index 8ed063e5325c0..0d20e62f8c529 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -23,7 +23,6 @@ use futures::future::{select, Either}; use futures::pin_mut; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::VirtualNode; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::{build_sink, Sink, SinkCommitCoordinator, SinkParam}; use risingwave_pb::connector_service::SinkMetadata; @@ -56,7 +55,7 @@ struct EpochCommitRequests { epoch: u64, metadatas: Vec, handle_ids: HashSet, - bitmap: Bitmap, + committed_bitmap: Option, // lazy-initialized on first request } impl EpochCommitRequests { @@ -65,7 +64,7 @@ impl EpochCommitRequests { epoch, metadatas: vec![], handle_ids: Default::default(), - bitmap: Bitmap::zeros(VirtualNode::COUNT), + committed_bitmap: None, } } @@ -75,24 +74,29 @@ impl EpochCommitRequests { metadata: SinkMetadata, vnode_bitmap: Bitmap, ) -> anyhow::Result<()> { + let committed_bitmap = self + .committed_bitmap + .get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len())); + assert_eq!(committed_bitmap.len(), vnode_bitmap.len()); + self.metadatas.push(metadata); assert!(self.handle_ids.insert(handle_id)); - let check_bitmap = (&self.bitmap) & &vnode_bitmap; + let check_bitmap = (&*committed_bitmap) & &vnode_bitmap; if check_bitmap.count_ones() > 0 { return Err(anyhow!( "duplicate vnode {:?} on epoch {}. request vnode: {:?}, prev vnode: {:?}", check_bitmap.iter_ones().collect_vec(), self.epoch, vnode_bitmap, - self.bitmap + committed_bitmap )); } - self.bitmap |= &vnode_bitmap; + *committed_bitmap |= &vnode_bitmap; Ok(()) } fn can_commit(&self) -> bool { - self.bitmap.count_ones() == VirtualNode::COUNT + self.committed_bitmap.as_ref().map_or(false, |b| b.all()) } } diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index 2fe2e8bfb3b8c..b603f0f629ac4 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -404,11 +404,11 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(*i, true); } @@ -568,9 +568,9 @@ mod tests { let epoch1 = 233; let epoch2 = 234; - let all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec(); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(*i, true); } @@ -686,11 +686,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(*i, true); } @@ -765,11 +765,11 @@ mod tests { let epoch = 233; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(*i, true); } @@ -861,11 +861,11 @@ mod tests { let epoch3 = 235; let epoch4 = 236; - let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec(); all_vnode.shuffle(&mut rand::thread_rng()); - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2); let build_bitmap = |indexes: &[usize]| { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(*i, true); } @@ -980,8 +980,8 @@ mod tests { } let (vnode1, vnode2, vnode3) = { - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 3); - let (second, third) = second.split_at(VirtualNode::COUNT / 3); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3); + let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3); ( build_bitmap(first), build_bitmap(second), @@ -1068,7 +1068,7 @@ mod tests { } let (vnode2, vnode3) = { - let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 3); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3); (build_bitmap(first), build_bitmap(second)) }; diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index fa4983f019951..b314d665c313d 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -37,18 +37,18 @@ use tokio::sync::mpsc::unbounded_channel; fn vnode_bitmaps(part_count: usize) -> impl Iterator> { static BITMAP_CACHE: LazyLock>>>> = LazyLock::new(|| Mutex::new(HashMap::new())); - assert_eq!(VirtualNode::COUNT % part_count, 0); + assert_eq!(VirtualNode::COUNT_FOR_TEST % part_count, 0); let mut cache = BITMAP_CACHE.lock(); match cache.entry(part_count) { Entry::Occupied(entry) => entry.get().clone().into_iter(), Entry::Vacant(entry) => entry .insert({ - let part_size = VirtualNode::COUNT / part_count; + let part_size = VirtualNode::COUNT_FOR_TEST / part_count; (0..part_count) .map(move |part_idx| { let start = part_idx * part_size; let end = part_idx * part_size + part_size; - let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in start..end { bitmap.set(i, true); } @@ -252,7 +252,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read latest watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::COUNT_FOR_TEST { let _ = table_watermarks.latest_watermark(VirtualNode::from_index(i)); } }) @@ -260,7 +260,7 @@ fn bench_table_watermarks(c: &mut Criterion) { c.bench_function("read committed watermark", |b| { b.iter(|| { - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::COUNT_FOR_TEST { let _ = table_watermarks.read_watermark( VirtualNode::from_index(i), test_epoch(committed_epoch_idx as u64), diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index 0f04440ec5489..9685b0cff3ecc 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -54,7 +54,15 @@ pub fn is_empty_key_range(key_range: &TableKeyRange) -> bool { } } -// returning left inclusive and right exclusive +/// Returns left inclusive and right exclusive vnode index of the given range. +/// +/// # Vnode count unawareness +/// +/// Note that this function is not aware of the vnode count that is actually used in this table. +/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255, +/// but this function will still return `Excluded(256)`. +/// +/// See also [`vnode`] and [`end_bound_of_vnode`] which hold such invariant. pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) { let (left, right) = range; let left = match left { @@ -73,12 +81,20 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) { vnode.to_index() + 1 } } - Unbounded => VirtualNode::COUNT, + Unbounded => VirtualNode::MAX_REPRESENTABLE.to_index() + 1, }; (left, right) } -// Ensure there is only one vnode involved in table key range and return the vnode +/// Ensure there is only one vnode involved in table key range and return the vnode. +/// +/// # Vnode count unawareness +/// +/// Note that this function is not aware of the vnode count that is actually used in this table. +/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255, +/// but this function will still require `Excluded(256)`. +/// +/// See also [`vnode_range`] and [`end_bound_of_vnode`] which hold such invariant. pub fn vnode(range: &TableKeyRange) -> VirtualNode { let (l, r_exclusive) = vnode_range(range); assert_eq!(r_exclusive - l, 1); @@ -319,8 +335,15 @@ pub fn prev_full_key(full_key: &[u8]) -> Vec { } } +/// [`Unbounded`] if the vnode is the maximum representable value (i.e. [`VirtualNode::MAX_REPRESENTABLE`]), +/// otherwise [`Excluded`] the next vnode. +/// +/// Note that this function is not aware of the vnode count that is actually used in this table. +/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255, +/// but this function will still return `Excluded(256)`. See also [`vnode`] and [`vnode_range`] which +/// rely on such invariant. pub fn end_bound_of_vnode(vnode: VirtualNode) -> Bound { - if vnode == VirtualNode::MAX { + if vnode == VirtualNode::MAX_REPRESENTABLE { Unbounded } else { let end_bound_index = vnode.to_index() + 1; @@ -1271,7 +1294,7 @@ mod tests { Excluded(TableKey(concat(234, b""))) ) ); - let max_vnode = VirtualNode::COUNT - 1; + let max_vnode = VirtualNode::MAX_REPRESENTABLE.to_index(); assert_eq!( prefixed_range_with_vnode( (Bound::::Unbounded, Bound::::Unbounded), @@ -1304,7 +1327,7 @@ mod tests { Excluded(b"1".as_slice()), Unbounded, ]; - for vnode in 0..VirtualNode::COUNT { + for vnode in 0..VirtualNode::MAX_COUNT { for left in &left_bound { for right in &right_bound { assert_eq!( diff --git a/src/storage/hummock_sdk/src/table_watermark.rs b/src/storage/hummock_sdk/src/table_watermark.rs index 250e9014a1d36..bddf876240bc8 100644 --- a/src/storage/hummock_sdk/src/table_watermark.rs +++ b/src/storage/hummock_sdk/src/table_watermark.rs @@ -160,6 +160,8 @@ impl TableWatermarksIndex { pub fn filter_regress_watermarks(&self, watermarks: &mut Vec) { let mut ret = Vec::with_capacity(watermarks.len()); for watermark in watermarks.drain(..) { + let vnode_count = watermark.vnode_count(); + let mut regress_vnodes = None; for vnode in watermark.vnode_bitmap.iter_vnodes() { if let Some(prev_watermark) = self.latest_watermark(vnode) { @@ -176,7 +178,7 @@ impl TableWatermarksIndex { prev_watermark ); regress_vnodes - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count)) .set(vnode.to_index(), true); } } @@ -187,7 +189,7 @@ impl TableWatermarksIndex { let vnode_index = vnode.to_index(); if !regress_vnodes.is_set(vnode_index) { bitmap_builder - .get_or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT)) + .get_or_insert_with(|| BitmapBuilder::zeroed(vnode_count)) .set(vnode_index, true); } } @@ -219,8 +221,9 @@ impl TableWatermarksIndex { assert_eq!(self.watermark_direction, direction); self.latest_epoch = epoch; #[cfg(debug_assertions)] - { - let mut vnode_is_set = BitmapBuilder::zeroed(VirtualNode::COUNT); + if !vnode_watermark_list.is_empty() { + let vnode_count = vnode_watermark_list[0].vnode_count(); + let mut vnode_is_set = BitmapBuilder::zeroed(vnode_count); for vnode_watermark in vnode_watermark_list.as_ref() { for vnode in vnode_watermark.vnode_bitmap.iter_ones() { assert!(!vnode_is_set.is_set(vnode)); @@ -324,6 +327,11 @@ impl VnodeWatermark { &self.vnode_bitmap } + /// Vnode count derived from the bitmap. + pub fn vnode_count(&self) -> usize { + self.vnode_bitmap.len() + } + pub fn watermark(&self) -> &Bytes { &self.watermark } @@ -382,10 +390,12 @@ impl TableWatermarks { watermarks: Vec, direction: WatermarkDirection, ) -> Self { - Self { + let mut this = Self { direction, - watermarks: vec![(epoch, Arc::from(watermarks))], - } + watermarks: Vec::new(), + }; + this.add_new_epoch_watermarks(epoch, watermarks.into(), direction); + this } pub fn add_new_epoch_watermarks( @@ -398,9 +408,27 @@ impl TableWatermarks { if let Some((prev_epoch, _)) = self.watermarks.last() { assert!(*prev_epoch < epoch); } + if !watermarks.is_empty() { + let vnode_count = watermarks[0].vnode_count(); + for watermark in &*watermarks { + assert_eq!(watermark.vnode_count(), vnode_count); + } + if let Some(existing_vnode_count) = self.vnode_count() { + assert_eq!(existing_vnode_count, vnode_count); + } + } self.watermarks.push((epoch, watermarks)); } + /// Vnode count derived from existing watermarks. Returns `None` if there is no watermark. + fn vnode_count(&self) -> Option { + self.watermarks + .iter() + .flat_map(|(_, watermarks)| watermarks.as_ref()) + .next() + .map(|w| w.vnode_count()) + } + pub fn from_protobuf(pb: &PbTableWatermarks) -> Self { Self { watermarks: pb @@ -507,15 +535,15 @@ impl TableWatermarks { } debug!("clear stale table watermark below epoch {}", safe_epoch); let mut result_epoch_watermark = Vec::with_capacity(self.watermarks.len()); - let mut unset_vnode: HashSet = (0..VirtualNode::COUNT) - .map(VirtualNode::from_index) - .collect(); + let mut set_vnode: HashSet = HashSet::new(); + let mut vnode_count: Option = None; // lazy initialized on first occurrence of vnode watermark while let Some((epoch, _)) = self.watermarks.last() { if *epoch >= safe_epoch { let (epoch, watermarks) = self.watermarks.pop().expect("have check Some"); for watermark in watermarks.as_ref() { + vnode_count.get_or_insert_with(|| watermark.vnode_count()); for vnode in watermark.vnode_bitmap.iter_vnodes() { - unset_vnode.remove(&vnode); + set_vnode.insert(vnode); } } result_epoch_watermark.push((epoch, watermarks)); @@ -523,20 +551,21 @@ impl TableWatermarks { break; } } - while !unset_vnode.is_empty() + while vnode_count.map_or(true, |vnode_count| set_vnode.len() != vnode_count) && let Some((_, watermarks)) = self.watermarks.pop() { let mut new_vnode_watermarks = Vec::new(); for vnode_watermark in watermarks.as_ref() { - let mut set_vnode = Vec::new(); + let mut new_set_vnode = Vec::new(); + vnode_count.get_or_insert_with(|| vnode_watermark.vnode_count()); for vnode in vnode_watermark.vnode_bitmap.iter_vnodes() { - if unset_vnode.remove(&vnode) { - set_vnode.push(vnode); + if set_vnode.insert(vnode) { + new_set_vnode.push(vnode); } } - if !set_vnode.is_empty() { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); - for vnode in set_vnode { + if !new_set_vnode.is_empty() { + let mut builder = BitmapBuilder::zeroed(vnode_watermark.vnode_count()); + for vnode in new_set_vnode { builder.set(vnode.to_index(), true); } let bitmap = Arc::new(builder.finish()); @@ -706,7 +735,7 @@ mod tests { use crate::version::HummockVersion; fn build_bitmap(vnodes: impl IntoIterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for vnode in vnodes { builder.set(vnode, true); } @@ -746,7 +775,7 @@ mod tests { let mut second_table_watermark = TableWatermarks::single_epoch( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )], direction, @@ -754,7 +783,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )] .into(), @@ -815,7 +844,7 @@ mod tests { table_watermarks.add_new_epoch_watermarks( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )] .into(), @@ -853,7 +882,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )] .into() @@ -879,7 +908,7 @@ mod tests { ( epoch3, vec![VnodeWatermark::new( - build_bitmap(0..VirtualNode::COUNT), + build_bitmap(0..VirtualNode::COUNT_FOR_TEST), watermark3.clone(), )] .into() @@ -905,7 +934,7 @@ mod tests { ( epoch4, vec![VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)), watermark3.clone() )] .into() @@ -932,7 +961,7 @@ mod tests { vec![ VnodeWatermark::new(build_bitmap(vec![0, 3, 4]), watermark4.clone()), VnodeWatermark::new( - build_bitmap((1..3).chain(5..VirtualNode::COUNT)), + build_bitmap((1..3).chain(5..VirtualNode::COUNT_FOR_TEST)), watermark3.clone() ) ] @@ -1164,7 +1193,7 @@ mod tests { EPOCH1, vec![VnodeWatermark { watermark: watermark1.clone(), - vnode_bitmap: build_bitmap(0..VirtualNode::COUNT), + vnode_bitmap: build_bitmap(0..VirtualNode::COUNT_FOR_TEST), }] .into(), )], @@ -1182,7 +1211,7 @@ mod tests { ); assert_eq!(EPOCH1, index.committed_epoch.unwrap()); assert_eq!(EPOCH2, index.latest_epoch); - for vnode in 0..VirtualNode::COUNT { + for vnode in 0..VirtualNode::COUNT_FOR_TEST { let vnode = VirtualNode::from_index(vnode); if (1..5).contains(&vnode.to_index()) { assert_eq!(watermark1, index.read_watermark(vnode, EPOCH1).unwrap()); diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 2b8247b1ec312..d0afa17b2a369 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -1860,8 +1860,8 @@ pub(crate) mod tests { Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), None, ); - let key_count = KEY_COUNT / VirtualNode::COUNT * 2; - for vnode_id in 0..VirtualNode::COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::COUNT_FOR_TEST * 2; + for vnode_id in 0..VirtualNode::COUNT_FOR_TEST / 2 { let mut last_k: u64 = 1; let init_epoch = test_epoch(100 * object_id); let mut last_epoch = init_epoch; @@ -1904,9 +1904,9 @@ pub(crate) mod tests { let target_file_size = max_sst_file_size / 4; let mut table_watermarks = BTreeMap::default(); - let key_count = KEY_COUNT / VirtualNode::COUNT * 2; - let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); - for i in 0..VirtualNode::COUNT / 2 { + let key_count = KEY_COUNT / VirtualNode::COUNT_FOR_TEST * 2; + let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); + for i in 0..VirtualNode::COUNT_FOR_TEST / 2 { if i % 2 == 0 { vnode_builder.set(i, true); } else { @@ -1970,7 +1970,7 @@ pub(crate) mod tests { direction: WatermarkDirection::Ascending, vnode_watermarks: BTreeMap::default(), }; - for i in 0..VirtualNode::COUNT { + for i in 0..VirtualNode::COUNT_FOR_TEST { if i % 2 == 0 { watermark .vnode_watermarks diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index e9721dd8a3197..9fae89b520bc2 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -50,7 +50,7 @@ async fn test_read_version_basic() { let mut epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)); let mut read_version = HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, @@ -278,7 +278,7 @@ async fn test_read_filter_basic() { let epoch = test_epoch(1); let table_id = 0; - let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT)); + let vnodes = Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)); let read_version = Arc::new(RwLock::new(HummockReadVersion::new( TableId::from(table_id), TEST_LOCAL_INSTANCE_ID, diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 0565b7f10204f..c59d33130d586 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -2100,13 +2100,13 @@ async fn test_table_watermark() { let vnode1 = VirtualNode::from_index(1); let vnode_bitmap1 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); builder.set(1, true); builder.finish() }); let vnode2 = VirtualNode::from_index(2); let vnode_bitmap2 = Arc::new({ - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); builder.set(2, true); builder.finish() }); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index d4aa43e6579f5..deb0493250f41 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1425,7 +1425,7 @@ async fn test_replicated_local_hummock_storage() { TableOption { retention_seconds: None, }, - Arc::new(Bitmap::ones(VirtualNode::COUNT)), + Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), )) .await; diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 562e989051395..fe4280c264f4d 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -182,7 +182,7 @@ impl TracedNewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT)), + vnodes: TracedBitmap::from(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), } } } diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 1c8abc78ddffc..61ee49c0b5340 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -939,7 +939,7 @@ mod tests { use futures::FutureExt; use parking_lot::Mutex; - use risingwave_common::bitmap::BitmapBuilder; + use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::version::HummockVersion; @@ -1160,7 +1160,7 @@ mod tests { table_id: TEST_TABLE_ID, new_read_version_sender: tx, is_replicated: false, - vnodes: Arc::new(BitmapBuilder::filled(VirtualNode::COUNT).finish()), + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), }); rx.await.unwrap() }; diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index 90e6a9306930a..0cf066f212056 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -32,7 +32,6 @@ use prometheus::core::{AtomicU64, GenericGauge}; use prometheus::{HistogramTimer, IntGauge}; use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::catalog::TableId; -use risingwave_common::hash::VirtualNode; use risingwave_common::must_match; use risingwave_hummock_sdk::table_watermark::{ TableWatermarks, VnodeWatermark, WatermarkDirection, @@ -339,6 +338,14 @@ impl TableUnsyncData { table_watermarks: Vec, direction: WatermarkDirection, ) { + if table_watermarks.is_empty() { + return; + } + let vnode_count = table_watermarks[0].vnode_count(); + for watermark in &table_watermarks { + assert_eq!(vnode_count, watermark.vnode_count()); + } + fn apply_new_vnodes( vnode_bitmap: &mut BitmapBuilder, vnode_watermarks: &Vec, @@ -368,14 +375,14 @@ impl TableUnsyncData { prev_watermarks.extend(table_watermarks); } Entry::Vacant(entry) => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); entry.insert((table_watermarks, vnode_bitmap)); } } } None => { - let mut vnode_bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap = BitmapBuilder::zeroed(vnode_count); apply_new_vnodes(&mut vnode_bitmap, &table_watermarks); self.table_watermarks = Some(( direction, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 53fccc922b2bc..c2c55559c6064 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -25,7 +25,6 @@ use std::sync::{Arc, LazyLock}; use bytes::Bytes; use prometheus::IntGauge; use risingwave_common::catalog::TableId; -use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange, UserKey}; use risingwave_hummock_sdk::EpochWithGap; @@ -530,37 +529,6 @@ impl SharedBufferBatch { } } - pub fn collect_vnodes(&self) -> Vec { - let mut vnodes = Vec::with_capacity(VirtualNode::COUNT); - let mut next_vnode_id = 0; - while next_vnode_id < VirtualNode::COUNT { - let seek_key = TableKey( - VirtualNode::from_index(next_vnode_id) - .to_be_bytes() - .to_vec(), - ); - let idx = match self - .inner - .entries - .binary_search_by(|m| (m.key.as_ref()).cmp(seek_key.as_slice())) - { - Ok(idx) => idx, - Err(idx) => idx, - }; - if idx >= self.inner.entries.len() { - break; - } - let item = &self.inner.entries[idx]; - if item.key.len() <= VirtualNode::SIZE { - break; - } - let current_vnode_id = item.key.vnode_part().to_index(); - vnodes.push(current_vnode_id); - next_vnode_id = current_vnode_id + 1; - } - vnodes - } - #[cfg(any(test, feature = "test"))] pub fn build_shared_buffer_batch_for_test( epoch: HummockEpoch, diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 97b448faec8d7..25bdd54df720f 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -67,7 +67,9 @@ where task_progress: Option>, last_table_id: u32, - table_partition_vnode: BTreeMap, + + vnode_count: usize, + table_vnode_partition: BTreeMap, split_weight_by_vnode: u32, /// When vnode of the coming key is greater than `largest_vnode_in_current_partition`, we will /// switch SST. @@ -88,9 +90,12 @@ where builder_factory: F, compactor_metrics: Arc, task_progress: Option>, - table_partition_vnode: BTreeMap, + table_vnode_partition: BTreeMap, concurrent_uploading_sst_count: Option, ) -> Self { + // TODO(var-vnode): should use value from caller + let vnode_count = VirtualNode::COUNT; + Self { builder_factory, sst_outputs: Vec::new(), @@ -98,9 +103,10 @@ where compactor_metrics, task_progress, last_table_id: 0, - table_partition_vnode, + table_vnode_partition, + vnode_count, split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), + largest_vnode_in_current_partition: vnode_count - 1, concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count, } @@ -114,9 +120,10 @@ where compactor_metrics: Arc::new(CompactorMetrics::unused()), task_progress: None, last_table_id: 0, - table_partition_vnode: BTreeMap::default(), + table_vnode_partition: BTreeMap::default(), + vnode_count: VirtualNode::COUNT_FOR_TEST, split_weight_by_vnode: 0, - largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), + largest_vnode_in_current_partition: VirtualNode::MAX_FOR_TEST.to_index(), concurrent_upload_join_handle: FuturesUnordered::new(), concurrent_uploading_sst_count: None, } @@ -213,10 +220,10 @@ where let mut switch_builder = false; if user_key.table_id.table_id != self.last_table_id { let new_vnode_partition_count = - self.table_partition_vnode.get(&user_key.table_id.table_id); + self.table_vnode_partition.get(&user_key.table_id.table_id); if new_vnode_partition_count.is_some() - || self.table_partition_vnode.contains_key(&self.last_table_id) + || self.table_vnode_partition.contains_key(&self.last_table_id) { if new_vnode_partition_count.is_some() { self.split_weight_by_vnode = *new_vnode_partition_count.unwrap(); @@ -229,22 +236,23 @@ where switch_builder = true; if self.split_weight_by_vnode > 1 { self.largest_vnode_in_current_partition = - VirtualNode::COUNT / (self.split_weight_by_vnode as usize) - 1; + self.vnode_count / (self.split_weight_by_vnode as usize) - 1; } else { // default - self.largest_vnode_in_current_partition = VirtualNode::MAX.to_index(); + self.largest_vnode_in_current_partition = self.vnode_count - 1; } } } - if self.largest_vnode_in_current_partition != VirtualNode::MAX.to_index() { + if self.largest_vnode_in_current_partition != self.vnode_count - 1 { let key_vnode = user_key.get_vnode_id(); if key_vnode > self.largest_vnode_in_current_partition { // vnode partition change switch_builder = true; // SAFETY: `self.split_weight_by_vnode > 1` here. - let (basic, remainder) = - VirtualNode::COUNT.div_rem(&(self.split_weight_by_vnode as usize)); + let (basic, remainder) = self + .vnode_count + .div_rem(&(self.split_weight_by_vnode as usize)); let small_segments_area = basic * (self.split_weight_by_vnode as usize - remainder); self.largest_vnode_in_current_partition = (if key_vnode < small_segments_area { (key_vnode / basic + 1) * basic diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index db21faa78c6cf..3f87d88617ca4 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -724,7 +724,7 @@ impl NewLocalOptions { retention_seconds: None, }, is_replicated: false, - vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), + vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)), } } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 440c7188d2fa1..3a9249ae26259 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -1001,15 +1001,15 @@ mod tests { test_env.register_table(table.clone()).await; fn build_bitmap(indexes: impl Iterator) -> Arc { - let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST); for i in indexes { builder.set(i, true); } Arc::new(builder.finish()) } - let vnodes1 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 0)); - let vnodes2 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 1)); + let vnodes1 = build_bitmap((0..VirtualNode::COUNT_FOR_TEST).filter(|i| i % 2 == 0)); + let vnodes2 = build_bitmap((0..VirtualNode::COUNT_FOR_TEST).filter(|i| i % 2 == 1)); let factory1 = KvLogStoreFactory::new( test_env.storage.clone(), @@ -1150,7 +1150,7 @@ mod tests { .clear_shared_buffer(test_env.manager.get_current_version().await.id) .await; - let vnodes = build_bitmap(0..VirtualNode::COUNT); + let vnodes = build_bitmap(0..VirtualNode::COUNT_FOR_TEST); let factory = KvLogStoreFactory::new( test_env.storage.clone(), table.clone(), diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 17ab103d758b4..ec7cc62f2d49c 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -335,7 +335,11 @@ impl LogStoreRowSerde { ) -> Bytes { let (epoch, seq_id) = offset; Bytes::from(next_key(&serialize_pk( - (self.pk_info.compute_pk)(VirtualNode::MAX, Self::encode_epoch(epoch), seq_id), + (self.pk_info.compute_pk)( + VirtualNode::MAX_REPRESENTABLE, + Self::encode_epoch(epoch), + seq_id, + ), &self.pk_serde, ))) } @@ -980,7 +984,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); @@ -1124,7 +1128,7 @@ mod tests { let table = gen_test_log_store_table(pk_info); let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); let (ops, rows) = gen_test_data(0); @@ -1283,7 +1287,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); @@ -1428,7 +1432,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); @@ -1538,7 +1542,7 @@ mod tests { let serde = LogStoreRowSerde::new( &table, - Some(Arc::new(Bitmap::ones(VirtualNode::COUNT))), + Some(Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST))), pk_info, ); diff --git a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs index 120a49e8e7ee3..93e4c1211fe61 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/writer.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/writer.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::util::epoch::EpochPair; use risingwave_common_estimate_size::EstimateSize; use risingwave_connector::sink::log_store::{LogStoreResult, LogWriter}; @@ -110,7 +110,7 @@ impl LogWriter for KvLogStoreWriter { { // When enter this branch, the chunk cannot be added directly, and should be add to // state store and flush - let mut vnode_bitmap_builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + let mut vnode_bitmap_builder = BitmapBuilder::zeroed(self.serde.vnodes().len()); let mut flush_info = FlushInfo::new(); for (i, (op, row)) in chunk.rows().enumerate() { let seq_id = start_seq_id + (i as SeqIdType);