Skip to content

Commit

Permalink
feat(storage): variable vnode count support (#18415)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Sep 20, 2024
1 parent eb2d9f4 commit 6b29ec9
Show file tree
Hide file tree
Showing 19 changed files with 193 additions and 139 deletions.
13 changes: 12 additions & 1 deletion src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,16 @@ impl VirtualNode {

impl VirtualNode {
/// The maximum count of virtual nodes that fits in [`VirtualNodeInner`].
pub const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS;
///
/// Note that the most significant bit is not used. This is because we use signed integers (`i16`)
/// for the scalar representation, where overflow can be confusing in terms of ordering.
// TODO(var-vnode): the only usage is in log-store, shall we update it by storing the vnode as
// bytea to enable 2^16 vnodes?
pub const MAX_COUNT: usize = 1 << (VirtualNodeInner::BITS - 1);
/// The maximum value of the virtual node that can be represented.
///
/// Note that this is **NOT** the maximum value of the virtual node, which depends on the configuration.
pub const MAX_REPRESENTABLE: VirtualNode = VirtualNode::from_index(Self::MAX_COUNT - 1);
/// The size of a virtual node in bytes, in memory or serialized representation.
pub const SIZE: usize = std::mem::size_of::<Self>();
}
Expand Down Expand Up @@ -96,6 +105,7 @@ impl VirtualNode {

/// Creates a virtual node from the given scalar representation. Used by `VNODE` expression.
pub const fn from_scalar(scalar: i16) -> Self {
debug_assert!(scalar >= 0);
Self(scalar as _)
}

Expand All @@ -115,6 +125,7 @@ impl VirtualNode {
/// Creates a virtual node from the given big-endian bytes representation.
pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
let inner = VirtualNodeInner::from_be_bytes(bytes);
debug_assert!((inner as usize) < Self::MAX_COUNT);
Self(inner)
}

Expand Down
18 changes: 11 additions & 7 deletions src/meta/src/manager/sink_coordination/coordinator_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use futures::future::{select, Either};
use futures::pin_mut;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::hash::VirtualNode;
use risingwave_connector::dispatch_sink;
use risingwave_connector::sink::{build_sink, Sink, SinkCommitCoordinator, SinkParam};
use risingwave_pb::connector_service::SinkMetadata;
Expand Down Expand Up @@ -56,7 +55,7 @@ struct EpochCommitRequests {
epoch: u64,
metadatas: Vec<SinkMetadata>,
handle_ids: HashSet<usize>,
bitmap: Bitmap,
committed_bitmap: Option<Bitmap>, // lazy-initialized on first request
}

impl EpochCommitRequests {
Expand All @@ -65,7 +64,7 @@ impl EpochCommitRequests {
epoch,
metadatas: vec![],
handle_ids: Default::default(),
bitmap: Bitmap::zeros(VirtualNode::COUNT),
committed_bitmap: None,
}
}

Expand All @@ -75,24 +74,29 @@ impl EpochCommitRequests {
metadata: SinkMetadata,
vnode_bitmap: Bitmap,
) -> anyhow::Result<()> {
let committed_bitmap = self
.committed_bitmap
.get_or_insert_with(|| Bitmap::zeros(vnode_bitmap.len()));
assert_eq!(committed_bitmap.len(), vnode_bitmap.len());

self.metadatas.push(metadata);
assert!(self.handle_ids.insert(handle_id));
let check_bitmap = (&self.bitmap) & &vnode_bitmap;
let check_bitmap = (&*committed_bitmap) & &vnode_bitmap;
if check_bitmap.count_ones() > 0 {
return Err(anyhow!(
"duplicate vnode {:?} on epoch {}. request vnode: {:?}, prev vnode: {:?}",
check_bitmap.iter_ones().collect_vec(),
self.epoch,
vnode_bitmap,
self.bitmap
committed_bitmap
));
}
self.bitmap |= &vnode_bitmap;
*committed_bitmap |= &vnode_bitmap;
Ok(())
}

fn can_commit(&self) -> bool {
self.bitmap.count_ones() == VirtualNode::COUNT
self.committed_bitmap.as_ref().map_or(false, |b| b.all())
}
}

Expand Down
34 changes: 17 additions & 17 deletions src/meta/src/manager/sink_coordination/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,11 @@ mod tests {
let epoch1 = 233;
let epoch2 = 234;

let mut all_vnode = (0..VirtualNode::COUNT).collect_vec();
let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
all_vnode.shuffle(&mut rand::thread_rng());
let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2);
let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
let build_bitmap = |indexes: &[usize]| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in indexes {
builder.set(*i, true);
}
Expand Down Expand Up @@ -568,9 +568,9 @@ mod tests {
let epoch1 = 233;
let epoch2 = 234;

let all_vnode = (0..VirtualNode::COUNT).collect_vec();
let all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
let build_bitmap = |indexes: &[usize]| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in indexes {
builder.set(*i, true);
}
Expand Down Expand Up @@ -686,11 +686,11 @@ mod tests {

let epoch = 233;

let mut all_vnode = (0..VirtualNode::COUNT).collect_vec();
let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
all_vnode.shuffle(&mut rand::thread_rng());
let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2);
let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
let build_bitmap = |indexes: &[usize]| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in indexes {
builder.set(*i, true);
}
Expand Down Expand Up @@ -765,11 +765,11 @@ mod tests {

let epoch = 233;

let mut all_vnode = (0..VirtualNode::COUNT).collect_vec();
let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
all_vnode.shuffle(&mut rand::thread_rng());
let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2);
let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
let build_bitmap = |indexes: &[usize]| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in indexes {
builder.set(*i, true);
}
Expand Down Expand Up @@ -861,11 +861,11 @@ mod tests {
let epoch3 = 235;
let epoch4 = 236;

let mut all_vnode = (0..VirtualNode::COUNT).collect_vec();
let mut all_vnode = (0..VirtualNode::COUNT_FOR_TEST).collect_vec();
all_vnode.shuffle(&mut rand::thread_rng());
let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2);
let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 2);
let build_bitmap = |indexes: &[usize]| {
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in indexes {
builder.set(*i, true);
}
Expand Down Expand Up @@ -980,8 +980,8 @@ mod tests {
}

let (vnode1, vnode2, vnode3) = {
let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 3);
let (second, third) = second.split_at(VirtualNode::COUNT / 3);
let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
let (second, third) = second.split_at(VirtualNode::COUNT_FOR_TEST / 3);
(
build_bitmap(first),
build_bitmap(second),
Expand Down Expand Up @@ -1068,7 +1068,7 @@ mod tests {
}

let (vnode2, vnode3) = {
let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 3);
let (first, second) = all_vnode.split_at(VirtualNode::COUNT_FOR_TEST / 3);
(build_bitmap(first), build_bitmap(second))
};

Expand Down
10 changes: 5 additions & 5 deletions src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ use tokio::sync::mpsc::unbounded_channel;
fn vnode_bitmaps(part_count: usize) -> impl Iterator<Item = Arc<Bitmap>> {
static BITMAP_CACHE: LazyLock<Mutex<HashMap<usize, Vec<Arc<Bitmap>>>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
assert_eq!(VirtualNode::COUNT % part_count, 0);
assert_eq!(VirtualNode::COUNT_FOR_TEST % part_count, 0);
let mut cache = BITMAP_CACHE.lock();
match cache.entry(part_count) {
Entry::Occupied(entry) => entry.get().clone().into_iter(),
Entry::Vacant(entry) => entry
.insert({
let part_size = VirtualNode::COUNT / part_count;
let part_size = VirtualNode::COUNT_FOR_TEST / part_count;
(0..part_count)
.map(move |part_idx| {
let start = part_idx * part_size;
let end = part_idx * part_size + part_size;
let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut bitmap = BitmapBuilder::zeroed(VirtualNode::COUNT_FOR_TEST);
for i in start..end {
bitmap.set(i, true);
}
Expand Down Expand Up @@ -252,15 +252,15 @@ fn bench_table_watermarks(c: &mut Criterion) {

c.bench_function("read latest watermark", |b| {
b.iter(|| {
for i in 0..VirtualNode::COUNT {
for i in 0..VirtualNode::COUNT_FOR_TEST {
let _ = table_watermarks.latest_watermark(VirtualNode::from_index(i));
}
})
});

c.bench_function("read committed watermark", |b| {
b.iter(|| {
for i in 0..VirtualNode::COUNT {
for i in 0..VirtualNode::COUNT_FOR_TEST {
let _ = table_watermarks.read_watermark(
VirtualNode::from_index(i),
test_epoch(committed_epoch_idx as u64),
Expand Down
35 changes: 29 additions & 6 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,15 @@ pub fn is_empty_key_range(key_range: &TableKeyRange) -> bool {
}
}

// returning left inclusive and right exclusive
/// Returns left inclusive and right exclusive vnode index of the given range.
///
/// # Vnode count unawareness
///
/// Note that this function is not aware of the vnode count that is actually used in this table.
/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255,
/// but this function will still return `Excluded(256)`.
///
/// See also [`vnode`] and [`end_bound_of_vnode`] which hold such invariant.
pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) {
let (left, right) = range;
let left = match left {
Expand All @@ -73,12 +81,20 @@ pub fn vnode_range(range: &TableKeyRange) -> (usize, usize) {
vnode.to_index() + 1
}
}
Unbounded => VirtualNode::COUNT,
Unbounded => VirtualNode::MAX_REPRESENTABLE.to_index() + 1,
};
(left, right)
}

// Ensure there is only one vnode involved in table key range and return the vnode
/// Ensure there is only one vnode involved in table key range and return the vnode.
///
/// # Vnode count unawareness
///
/// Note that this function is not aware of the vnode count that is actually used in this table.
/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255,
/// but this function will still require `Excluded(256)`.
///
/// See also [`vnode_range`] and [`end_bound_of_vnode`] which hold such invariant.
pub fn vnode(range: &TableKeyRange) -> VirtualNode {
let (l, r_exclusive) = vnode_range(range);
assert_eq!(r_exclusive - l, 1);
Expand Down Expand Up @@ -319,8 +335,15 @@ pub fn prev_full_key(full_key: &[u8]) -> Vec<u8> {
}
}

/// [`Unbounded`] if the vnode is the maximum representable value (i.e. [`VirtualNode::MAX_REPRESENTABLE`]),
/// otherwise [`Excluded`] the next vnode.
///
/// Note that this function is not aware of the vnode count that is actually used in this table.
/// For example, if the total vnode count is 256, `Unbounded` can be a correct end bound for vnode 255,
/// but this function will still return `Excluded(256)`. See also [`vnode`] and [`vnode_range`] which
/// rely on such invariant.
pub fn end_bound_of_vnode(vnode: VirtualNode) -> Bound<Bytes> {
if vnode == VirtualNode::MAX {
if vnode == VirtualNode::MAX_REPRESENTABLE {
Unbounded
} else {
let end_bound_index = vnode.to_index() + 1;
Expand Down Expand Up @@ -1271,7 +1294,7 @@ mod tests {
Excluded(TableKey(concat(234, b"")))
)
);
let max_vnode = VirtualNode::COUNT - 1;
let max_vnode = VirtualNode::MAX_REPRESENTABLE.to_index();
assert_eq!(
prefixed_range_with_vnode(
(Bound::<Bytes>::Unbounded, Bound::<Bytes>::Unbounded),
Expand Down Expand Up @@ -1304,7 +1327,7 @@ mod tests {
Excluded(b"1".as_slice()),
Unbounded,
];
for vnode in 0..VirtualNode::COUNT {
for vnode in 0..VirtualNode::MAX_COUNT {
for left in &left_bound {
for right in &right_bound {
assert_eq!(
Expand Down
Loading

0 comments on commit 6b29ec9

Please sign in to comment.