Skip to content

Commit

Permalink
feat: support per-fragment vnode count (#18444)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Sep 24, 2024
1 parent fe01e22 commit 7b977a2
Show file tree
Hide file tree
Showing 45 changed files with 433 additions and 74 deletions.
7 changes: 6 additions & 1 deletion dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan"

interface DispatcherNode {
[actorId: number]: Dispatcher[]
fragment: TableFragments_Fragment
}

// Refresh interval (ms) for back pressure stats
Expand Down Expand Up @@ -107,10 +108,14 @@ function buildPlanNodeDependency(
dispatcherName = "noDispatcher"
}

const dispatcherNode = fragment.actors.reduce((obj, actor) => {
let dispatcherNode = fragment.actors.reduce((obj, actor) => {
obj[actor.actorId] = actor.dispatcher
return obj
}, {} as DispatcherNode)
dispatcherNode.fragment = {
...fragment,
actors: [],
}

return d3.hierarchy({
name: dispatcherName,
Expand Down
14 changes: 14 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,20 @@ message Table {
// upstream table.
optional string cdc_table_id = 39;

// Total vnode count of the table.
//
// Can be unset if...
// - The catalog is generated by the frontend and not yet persisted, this is
// because the vnode count of each fragment (then internal tables) is determined
// by the meta service.
// - The table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
//
// Please note that this field is not intended to describe the expected vnode count
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.expected_vnode_count`.
optional uint32 maybe_vnode_count = 40;

// Per-table catalog version, used by schema change. `None` for internal
// tables and tests. Not to be confused with the global catalog version for
// notification service.
Expand Down
8 changes: 8 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ message TableFragments {
// so we pre-generate and store it here, this member will only be initialized when creating the Fragment
// and modified when creating the mv-on-mv
repeated uint32 upstream_fragment_ids = 7;

// Total vnode count of the fragment (then all internal tables).
// Duplicated from the length of the vnode bitmap in any actor of the fragment.
//
// Can be unset if the fragment is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 8;
}
uint32 table_id = 1;
State state = 2;
Expand Down
7 changes: 7 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ message StorageTableDesc {
repeated uint32 stream_key = 9;
optional uint32 vnode_col_idx_in_pk = 10;
optional uint32 retention_seconds = 11;

// Total vnode count of the table.
//
// Can be unset if the table is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
// Use `VnodeCountCompat::vnode_count` to access it.
optional uint32 maybe_vnode_count = 12;
}

message AsOf {
Expand Down
14 changes: 14 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,10 @@ message StreamContext {
string timezone = 1;
}

// Representation of a graph of stream fragments.
// Generated by the fragmenter in the frontend, only used in DDL requests and never persisted.
//
// For the persisted form, see `TableFragments`.
message StreamFragmentGraph {
message StreamFragment {
// 0-based on frontend, and will be rewritten to global id on meta.
Expand Down Expand Up @@ -1036,4 +1040,14 @@ message StreamFragmentGraph {
StreamContext ctx = 5;
// If none, default parallelism will be applied.
Parallelism parallelism = 6;

// Expected vnode count for the graph.
// The scheduler on the meta service will use this as a hint to decide the vnode count
// for each fragment.
//
// Note that the actual vnode count may be different from this value.
// For example, a no-shuffle exchange between current fragment graph and an existing
// upstream fragment graph requires two fragments to be in the same distribution,
// thus the same vnode count.
uint32 expected_vnode_count = 7;
}
6 changes: 3 additions & 3 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::pin_mut;
use itertools::Itertools;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
use risingwave_common::hash::{HashKey, HashKeyDispatcher, VirtualNode};
use risingwave_common::hash::{HashKey, HashKeyDispatcher, VnodeCountCompat};
use risingwave_common::memory::MemoryContext;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum};
Expand Down Expand Up @@ -195,8 +195,8 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.collect();

// Lookup Join always contains distribution key, so we don't need vnode bitmap
// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
let inner_side_builder = InnerSideExecutorBuilder::new(
Expand Down
7 changes: 4 additions & 3 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{
ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, WorkerSlotId,
ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat,
WorkerSlotId,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::types::{DataType, Datum};
Expand Down Expand Up @@ -408,8 +409,8 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
})
.collect();

// TODO(var-vnode): use vnode count from table desc
let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into());
let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into());

let inner_side_builder = InnerSideExecutorBuilder {
table_desc: table_desc.clone(),
table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc),
Expand Down
5 changes: 2 additions & 3 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use prometheus::Histogram;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -107,8 +107,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
// TODO(var-vnode): use vnode count from table desc
None => Some(Bitmap::ones(VirtualNode::COUNT).into()),
None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
};

let chunk_size = source.context.get_config().developer.chunk_size as u32;
Expand Down
5 changes: 2 additions & 3 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::hash::VirtualNode;
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
Expand Down Expand Up @@ -210,8 +210,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
Some(vnodes) => Some(Bitmap::from(vnodes).into()),
// This is possible for dml. vnode_bitmap is not filled by scheduler.
// Or it's single distribution, e.g., distinct agg. We scan in a single executor.
// TODO(var-vnode): use vnode count from table desc
None => Some(Bitmap::ones(VirtualNode::COUNT).into()),
None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
};

let scan_ranges = {
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc;

use super::{ColumnDesc, ColumnId, TableId};
use crate::catalog::get_dist_key_in_pk_indices;
use crate::hash::VnodeCountCompat;
use crate::util::sort_util::ColumnOrder;

/// Includes necessary information for compute node to access data of the table.
Expand Down Expand Up @@ -57,6 +58,9 @@ pub struct TableDesc {
/// the column indices which could receive watermarks.
pub watermark_columns: FixedBitSet,

/// Total vnode count of the table.
pub vnode_count: usize,

/// Whether the table is versioned. If `true`, column-aware row encoding will be used
/// to be compatible with schema changes.
///
Expand Down Expand Up @@ -113,6 +117,7 @@ impl TableDesc {
versioned: self.versioned,
stream_key: self.stream_key.iter().map(|&x| x as u32).collect(),
vnode_col_idx_in_pk,
maybe_vnode_count: Some(self.vnode_count as u32),
})
}

Expand All @@ -126,6 +131,8 @@ impl TableDesc {
}

pub fn from_pb_table(table: &Table) -> Self {
let vnode_count = table.vnode_count();

Self {
table_id: TableId::new(table.id),
pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
Expand All @@ -143,6 +150,7 @@ impl TableDesc {
read_prefix_len_hint: table.read_prefix_len_hint as _,
watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
versioned: table.version.is_some(),
vnode_count,
}
}
}
52 changes: 52 additions & 0 deletions src/common/src/hash/consistent_hash/compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::vnode::VirtualNode;

/// A trait for accessing the vnode count field with backward compatibility.
pub trait VnodeCountCompat {
/// Returns the vnode count, or [`VirtualNode::COUNT`] if the vnode count is not set,
/// typically for backward compatibility.
///
/// See the documentation on the field of the implementing type for more details.
fn vnode_count(&self) -> usize;
}

/// Implement the trait for given types by delegating to the `maybe_vnode_count` field.
///
/// The reason why there's a `maybe_` prefix is that, a getter method with the same name
/// as the field will be generated for `prost` structs. Directly naming it `vnode_count`
/// will lead to the method `vnode_count()` returning `0` when the field is unset, which
/// can be misleading sometimes.
///
/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count`
/// through this trait, ensuring that backward compatibility is handled properly.
macro_rules! impl_maybe_vnode_count_compat {
($($ty:ty),* $(,)?) => {
$(
impl VnodeCountCompat for $ty {
fn vnode_count(&self) -> usize {
self.maybe_vnode_count
.map_or(VirtualNode::COUNT, |v| v as _)
}
}
)*
};
}

impl_maybe_vnode_count_compat!(
risingwave_pb::plan_common::StorageTableDesc,
risingwave_pb::catalog::Table,
risingwave_pb::meta::table_fragments::Fragment,
);
6 changes: 3 additions & 3 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
}
}

/// Create a vnode mapping where all vnodes are mapped to the same single item.
/// Create a vnode mapping with the single item. Should only be used for singletons.
// TODO(var-vnode): make vnode count 1, also `Distribution::vnode_count`
pub fn new_single(item: T::Item) -> Self {
// TODO(var-vnode): always 1 correct?
Self::new_uniform(std::iter::once(item), 1)
Self::new_uniform(std::iter::once(item), VirtualNode::COUNT)
}

/// The length (or count) of the vnode in this mapping.
Expand Down
1 change: 1 addition & 0 deletions src/common/src/hash/consistent_hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
// limitations under the License.

pub mod bitmap;
pub mod compat;
pub mod mapping;
pub mod vnode;
4 changes: 2 additions & 2 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl VirtualNode {
.enumerate()
.map(|(idx, serial)| {
if let Some(serial) = serial {
extract_vnode_id_from_row_id(serial.as_row_id())
extract_vnode_id_from_row_id(serial.as_row_id(), vnode_count)
} else {
// NOTE: here it will hash the entire row when the `_row_id` is missing,
// which could result in rows from the same chunk being allocated to different chunks.
Expand Down Expand Up @@ -188,7 +188,7 @@ impl VirtualNode {
pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode {
let project = row.project(indices);
if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() {
return extract_vnode_id_from_row_id(s.as_row_id());
return extract_vnode_id_from_row_id(s.as_row_id(), vnode_count);
}

project.hash(Crc32FastBuilder).to_vnode(vnode_count)
Expand Down
1 change: 1 addition & 0 deletions src/common/src/hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod key_v2;
pub mod table_distribution;

pub use consistent_hash::bitmap::*;
pub use consistent_hash::compat::*;
pub use consistent_hash::mapping::*;
pub use consistent_hash::vnode::*;
pub use dispatcher::{calc_hash_key_kind, HashKeyDispatcher};
Expand Down
14 changes: 11 additions & 3 deletions src/common/src/util/row_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,20 @@ pub struct RowIdGenerator {

pub type RowId = i64;

// TODO(var-vnode): how should we handle this for different virtual node counts?
#[inline]
pub fn extract_vnode_id_from_row_id(id: RowId) -> VirtualNode {
pub fn extract_vnode_id_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode {
let vnode_id = ((id >> VNODE_ID_SHIFT_BITS) & (VNODE_ID_UPPER_BOUND as i64 - 1)) as u32;
assert!(vnode_id < VNODE_ID_UPPER_BOUND);
VirtualNode::from_index(vnode_id as usize)

// Previously, the vnode count was fixed to 256 for all jobs in all clusters. As a result, the
// `vnode_id` must reside in the range of `0..256` and the following modulo operation will be
// no-op. So this will retrieve the exact same vnode as when it was generated.
//
// In newer versions, fragments can have different vnode counts. To make sure the vnode is
// within the range, we need to apply modulo operation here. Therefore, there is no guarantee
// that the vnode retrieved here is the same as when it was generated. However, the row ids
// generated under the same vnode will still yield the same result.
VirtualNode::from_index(vnode_id as usize % vnode_count)
}

impl RowIdGenerator {
Expand Down
11 changes: 10 additions & 1 deletion src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ where
visit_stream_node_tables_inner(stream_node, true, true, f)
}

#[allow(dead_code)]
pub fn visit_stream_node_tables<F>(stream_node: &mut StreamNode, f: F)
where
F: FnMut(&mut Table, &str),
Expand All @@ -301,3 +300,13 @@ where
{
visit_stream_node_internal_tables(fragment.node.as_mut().unwrap(), f)
}

/// Visit the tables of a [`StreamFragment`].
///
/// Compared to [`visit_internal_tables`], this function also visits the table of `Materialize` node.
pub fn visit_tables<F>(fragment: &mut StreamFragment, f: F)
where
F: FnMut(&mut Table, &str),
{
visit_stream_node_tables(fragment.node.as_mut().unwrap(), f)
}
Loading

0 comments on commit 7b977a2

Please sign in to comment.