Skip to content

Commit

Permalink
rename default vnode to singleton vnode
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Sep 4, 2024
1 parent 8a144d7 commit 3b38e6c
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
5 changes: 2 additions & 3 deletions src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use crate::hash::VirtualNode;
use crate::row::Row;
use crate::util::iter_util::ZipEqFast;

/// For tables without distribution (singleton), the `DEFAULT_VNODE` is encoded.
pub const DEFAULT_VNODE: VirtualNode = VirtualNode::ZERO;
pub use DEFAULT_VNODE as SINGLETON_VNODE;
/// For tables without distribution (singleton), the `SINGLETON_VNODE` is encoded.
pub const SINGLETON_VNODE: VirtualNode = VirtualNode::ZERO;

use super::VnodeBitmapExt;

Expand Down
13 changes: 7 additions & 6 deletions src/stream/src/common/log_store_impl/kv_log_store/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ mod tests {
use risingwave_storage::store::{
FromStreamStateStoreIter, StateStoreIterItem, StateStoreReadIter,
};
use risingwave_storage::table::DEFAULT_VNODE;
use risingwave_storage::table::SINGLETON_VNODE;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Sender;

Expand Down Expand Up @@ -1023,7 +1023,7 @@ mod tests {
seq_id += 1;
}

let (key, encoded_barrier) = serde.serialize_barrier(epoch, DEFAULT_VNODE, false);
let (key, encoded_barrier) = serde.serialize_barrier(epoch, SINGLETON_VNODE, false);
let key = remove_vnode_prefix(&key.0);
match serde.deserialize(&encoded_barrier).unwrap() {
(decoded_epoch, LogStoreRowOp::Barrier { is_checkpoint }) => {
Expand Down Expand Up @@ -1061,7 +1061,8 @@ mod tests {
seq_id += 1;
}

let (key, encoded_checkpoint_barrier) = serde.serialize_barrier(epoch, DEFAULT_VNODE, true);
let (key, encoded_checkpoint_barrier) =
serde.serialize_barrier(epoch, SINGLETON_VNODE, true);
let key = remove_vnode_prefix(&key.0);
match serde.deserialize(&encoded_checkpoint_barrier).unwrap() {
(decoded_epoch, LogStoreRowOp::Barrier { is_checkpoint }) => {
Expand Down Expand Up @@ -1199,7 +1200,7 @@ mod tests {
) {
let (ops, rows) = gen_test_data(base);
let first_barrier = {
let (key, value) = serde.serialize_barrier(EPOCH0, DEFAULT_VNODE, true);
let (key, value) = serde.serialize_barrier(EPOCH0, SINGLETON_VNODE, true);
Ok((FullKey::new(TEST_TABLE_ID, key, EPOCH0), value))
};
let stream = stream::once(async move { first_barrier });
Expand All @@ -1209,15 +1210,15 @@ mod tests {
let stream = stream.chain(stream::once({
let serde = serde.clone();
async move {
let (key, value) = serde.serialize_barrier(EPOCH1, DEFAULT_VNODE, false);
let (key, value) = serde.serialize_barrier(EPOCH1, SINGLETON_VNODE, false);
Ok((FullKey::new(TEST_TABLE_ID, key, EPOCH1), value))
}
}));
let (row_stream, tx2) =
gen_row_stream(serde.clone(), ops.clone(), rows.clone(), EPOCH2, seq_id);
let stream = stream.chain(row_stream).chain(stream::once({
async move {
let (key, value) = serde.serialize_barrier(EPOCH2, DEFAULT_VNODE, true);
let (key, value) = serde.serialize_barrier(EPOCH2, SINGLETON_VNODE, true);
Ok((FullKey::new(TEST_TABLE_ID, key, EPOCH2), value))
}
}));
Expand Down
16 changes: 8 additions & 8 deletions src/stream/src/common/table/test_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_hummock_test::test_utils::prepare_hummock_test_env;
use risingwave_storage::hummock::HummockStorage;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::DEFAULT_VNODE;
use risingwave_storage::table::SINGLETON_VNODE;

use crate::common::table::state_table::{
ReplicatedStateTable, StateTable, WatermarkCacheStateTable,
Expand Down Expand Up @@ -445,7 +445,7 @@ async fn test_state_table_iter_with_pk_range() {
std::ops::Bound::Included(OwnedRow::new(vec![Some(4_i32.into())])),
);
let iter = state_table
.iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default())
.iter_with_vnode(SINGLETON_VNODE, &pk_range, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand All @@ -470,7 +470,7 @@ async fn test_state_table_iter_with_pk_range() {
std::ops::Bound::<row::Empty>::Unbounded,
);
let iter = state_table
.iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default())
.iter_with_vnode(SINGLETON_VNODE, &pk_range, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand Down Expand Up @@ -1976,11 +1976,11 @@ async fn test_replicated_state_table_replication() {
std::ops::Bound::Included(OwnedRow::new(vec![Some(2_i32.into())])),
);
let iter = state_table
.iter_with_vnode(DEFAULT_VNODE, &range_bounds, Default::default())
.iter_with_vnode(SINGLETON_VNODE, &range_bounds, Default::default())
.await
.unwrap();
let replicated_iter = replicated_state_table
.iter_with_vnode_and_output_indices(DEFAULT_VNODE, &range_bounds, Default::default())
.iter_with_vnode_and_output_indices(SINGLETON_VNODE, &range_bounds, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand Down Expand Up @@ -2039,7 +2039,7 @@ async fn test_replicated_state_table_replication() {
);

let iter = state_table
.iter_with_vnode(DEFAULT_VNODE, &range_bounds, Default::default())
.iter_with_vnode(SINGLETON_VNODE, &range_bounds, Default::default())
.await
.unwrap();

Expand All @@ -2048,7 +2048,7 @@ async fn test_replicated_state_table_replication() {
std::ops::Bound::Unbounded,
);
let replicated_iter = replicated_state_table
.iter_with_vnode_and_output_indices(DEFAULT_VNODE, &range_bounds, Default::default())
.iter_with_vnode_and_output_indices(SINGLETON_VNODE, &range_bounds, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand Down Expand Up @@ -2079,7 +2079,7 @@ async fn test_replicated_state_table_replication() {
let range_bounds: (Bound<OwnedRow>, Bound<OwnedRow>) =
(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded);
let replicated_iter = replicated_state_table
.iter_with_vnode_and_output_indices(DEFAULT_VNODE, &range_bounds, Default::default())
.iter_with_vnode_and_output_indices(SINGLETON_VNODE, &range_bounds, Default::default())
.await
.unwrap();
pin_mut!(replicated_iter);
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/mview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl ExecutorBuilder for ArrangeExecutorBuilder {
let table = node.get_table()?;

// FIXME: Lookup is now implemented without cell-based table API and relies on all vnodes
// being `DEFAULT_VNODE`, so we need to make the Arrange a singleton.
// being `SINGLETON_VNODE`, so we need to make the Arrange a singleton.
let vnodes = params.vnode_bitmap.map(Arc::new);
let conflict_behavior =
ConflictBehavior::from_protobuf(&table.handle_pk_conflict_behavior());
Expand Down

0 comments on commit 3b38e6c

Please sign in to comment.