diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 23b4f04119c0..3ffbda24b232 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -56,6 +56,7 @@ import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan" interface DispatcherNode { [actorId: number]: Dispatcher[] + fragment: TableFragments_Fragment } // Refresh interval (ms) for back pressure stats @@ -107,10 +108,14 @@ function buildPlanNodeDependency( dispatcherName = "noDispatcher" } - const dispatcherNode = fragment.actors.reduce((obj, actor) => { + let dispatcherNode = fragment.actors.reduce((obj, actor) => { obj[actor.actorId] = actor.dispatcher return obj }, {} as DispatcherNode) + dispatcherNode.fragment = { + ...fragment, + actors: [], + } return d3.hierarchy({ name: dispatcherName, diff --git a/proto/catalog.proto b/proto/catalog.proto index 3c3ee374a9e6..2bbfa39c1035 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -417,6 +417,20 @@ message Table { // upstream table. optional string cdc_table_id = 39; + // Total vnode count of the table. + // + // Can be unset if... + // - The catalog is generated by the frontend and not yet persisted, this is + // because the vnode count of each fragment (then internal tables) is determined + // by the meta service. + // - The table is created in older versions where variable vnode count is not + // supported, in which case a default value of 256 should be used. + // Use `VnodeCountCompat::vnode_count` to access it. + // + // Please note that this field is not intended to describe the expected vnode count + // for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.expected_vnode_count`. + optional uint32 maybe_vnode_count = 40; + // Per-table catalog version, used by schema change. `None` for internal // tables and tests. Not to be confused with the global catalog version for // notification service. diff --git a/proto/meta.proto b/proto/meta.proto index 93195e985d96..e088b7fb7330 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -94,6 +94,14 @@ message TableFragments { // so we pre-generate and store it here, this member will only be initialized when creating the Fragment // and modified when creating the mv-on-mv repeated uint32 upstream_fragment_ids = 7; + + // Total vnode count of the fragment (then all internal tables). + // Duplicated from the length of the vnode bitmap in any actor of the fragment. + // + // Can be unset if the fragment is created in older versions where variable vnode count is not + // supported, in which case a default value of 256 should be used. + // Use `VnodeCountCompat::vnode_count` to access it. + optional uint32 maybe_vnode_count = 8; } uint32 table_id = 1; State state = 2; diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 0f4e988e6c03..a552c9f0a5fa 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -96,6 +96,13 @@ message StorageTableDesc { repeated uint32 stream_key = 9; optional uint32 vnode_col_idx_in_pk = 10; optional uint32 retention_seconds = 11; + + // Total vnode count of the table. + // + // Can be unset if the table is created in older versions where variable vnode count is not + // supported, in which case a default value of 256 should be used. + // Use `VnodeCountCompat::vnode_count` to access it. + optional uint32 maybe_vnode_count = 12; } message AsOf { diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index d6a6ae0ed67e..7fe63054c565 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -990,6 +990,10 @@ message StreamContext { string timezone = 1; } +// Representation of a graph of stream fragments. +// Generated by the fragmenter in the frontend, only used in DDL requests and never persisted. +// +// For the persisted form, see `TableFragments`. message StreamFragmentGraph { message StreamFragment { // 0-based on frontend, and will be rewritten to global id on meta. @@ -1036,4 +1040,14 @@ message StreamFragmentGraph { StreamContext ctx = 5; // If none, default parallelism will be applied. Parallelism parallelism = 6; + + // Expected vnode count for the graph. + // The scheduler on the meta service will use this as a hint to decide the vnode count + // for each fragment. + // + // Note that the actual vnode count may be different from this value. + // For example, a no-shuffle exchange between current fragment graph and an existing + // upstream fragment graph requires two fragments to be in the same distribution, + // thus the same vnode count. + uint32 expected_vnode_count = 7; } diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index 74d7843013e4..0a328e2f985f 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -19,7 +19,7 @@ use futures::pin_mut; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema}; -use risingwave_common::hash::{HashKey, HashKeyDispatcher, VirtualNode}; +use risingwave_common::hash::{HashKey, HashKeyDispatcher, VnodeCountCompat}; use risingwave_common::memory::MemoryContext; use risingwave_common::row::OwnedRow; use risingwave_common::types::{DataType, Datum}; @@ -195,8 +195,8 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder { .collect(); // Lookup Join always contains distribution key, so we don't need vnode bitmap - // TODO(var-vnode): use vnode count from table desc - let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into()); + let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into()); + dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); let inner_side_builder = InnerSideExecutorBuilder::new( diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 7c7a08af5d87..7f6dfb1d99a7 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -21,7 +21,8 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{ - ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, WorkerSlotId, + ExpandedWorkerSlotMapping, HashKey, HashKeyDispatcher, VirtualNode, VnodeCountCompat, + WorkerSlotId, }; use risingwave_common::memory::MemoryContext; use risingwave_common::types::{DataType, Datum}; @@ -408,8 +409,8 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { }) .collect(); - // TODO(var-vnode): use vnode count from table desc - let vnodes = Some(Bitmap::ones(VirtualNode::COUNT).into()); + let vnodes = Some(Bitmap::ones(table_desc.vnode_count()).into()); + let inner_side_builder = InnerSideExecutorBuilder { table_desc: table_desc.clone(), table_distribution: TableDistribution::new_from_storage_table_desc(vnodes, table_desc), diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index a36440f1e010..79cc21b0f02e 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -22,7 +22,7 @@ use prometheus::Histogram; use risingwave_common::array::{DataChunk, Op}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Field, Schema}; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::VnodeCountCompat; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ScalarImpl; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -107,8 +107,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { Some(vnodes) => Some(Bitmap::from(vnodes).into()), // This is possible for dml. vnode_bitmap is not filled by scheduler. // Or it's single distribution, e.g., distinct agg. We scan in a single executor. - // TODO(var-vnode): use vnode count from table desc - None => Some(Bitmap::ones(VirtualNode::COUNT).into()), + None => Some(Bitmap::ones(table_desc.vnode_count()).into()), }; let chunk_size = source.context.get_config().developer.chunk_size as u32; diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 84e24f7e4794..b65f4bf8939b 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -21,7 +21,7 @@ use prometheus::Histogram; use risingwave_common::array::DataChunk; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Schema}; -use risingwave_common::hash::VirtualNode; +use risingwave_common::hash::VnodeCountCompat; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; @@ -210,8 +210,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder { Some(vnodes) => Some(Bitmap::from(vnodes).into()), // This is possible for dml. vnode_bitmap is not filled by scheduler. // Or it's single distribution, e.g., distinct agg. We scan in a single executor. - // TODO(var-vnode): use vnode count from table desc - None => Some(Bitmap::ones(VirtualNode::COUNT).into()), + None => Some(Bitmap::ones(table_desc.vnode_count()).into()), }; let scan_ranges = { diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index 5ed66b98de5c..df1b30fd41ee 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -21,6 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; use crate::catalog::get_dist_key_in_pk_indices; +use crate::hash::VnodeCountCompat; use crate::util::sort_util::ColumnOrder; /// Includes necessary information for compute node to access data of the table. @@ -57,6 +58,9 @@ pub struct TableDesc { /// the column indices which could receive watermarks. pub watermark_columns: FixedBitSet, + /// Total vnode count of the table. + pub vnode_count: usize, + /// Whether the table is versioned. If `true`, column-aware row encoding will be used /// to be compatible with schema changes. /// @@ -113,6 +117,7 @@ impl TableDesc { versioned: self.versioned, stream_key: self.stream_key.iter().map(|&x| x as u32).collect(), vnode_col_idx_in_pk, + maybe_vnode_count: Some(self.vnode_count as u32), }) } @@ -126,6 +131,8 @@ impl TableDesc { } pub fn from_pb_table(table: &Table) -> Self { + let vnode_count = table.vnode_count(); + Self { table_id: TableId::new(table.id), pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(), @@ -143,6 +150,7 @@ impl TableDesc { read_prefix_len_hint: table.read_prefix_len_hint as _, watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(), versioned: table.version.is_some(), + vnode_count, } } } diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs new file mode 100644 index 000000000000..58ff07a1514f --- /dev/null +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -0,0 +1,52 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::vnode::VirtualNode; + +/// A trait for accessing the vnode count field with backward compatibility. +pub trait VnodeCountCompat { + /// Returns the vnode count, or [`VirtualNode::COUNT`] if the vnode count is not set, + /// typically for backward compatibility. + /// + /// See the documentation on the field of the implementing type for more details. + fn vnode_count(&self) -> usize; +} + +/// Implement the trait for given types by delegating to the `maybe_vnode_count` field. +/// +/// The reason why there's a `maybe_` prefix is that, a getter method with the same name +/// as the field will be generated for `prost` structs. Directly naming it `vnode_count` +/// will lead to the method `vnode_count()` returning `0` when the field is unset, which +/// can be misleading sometimes. +/// +/// Instead, we name the field as `maybe_vnode_count` and provide the method `vnode_count` +/// through this trait, ensuring that backward compatibility is handled properly. +macro_rules! impl_maybe_vnode_count_compat { + ($($ty:ty),* $(,)?) => { + $( + impl VnodeCountCompat for $ty { + fn vnode_count(&self) -> usize { + self.maybe_vnode_count + .map_or(VirtualNode::COUNT, |v| v as _) + } + } + )* + }; +} + +impl_maybe_vnode_count_compat!( + risingwave_pb::plan_common::StorageTableDesc, + risingwave_pb::catalog::Table, + risingwave_pb::meta::table_fragments::Fragment, +); diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 0ab8f9e18fd2..80d5a56941cf 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -139,10 +139,10 @@ impl VnodeMapping { } } - /// Create a vnode mapping where all vnodes are mapped to the same single item. + /// Create a vnode mapping with the single item. Should only be used for singletons. + // TODO(var-vnode): make vnode count 1, also `Distribution::vnode_count` pub fn new_single(item: T::Item) -> Self { - // TODO(var-vnode): always 1 correct? - Self::new_uniform(std::iter::once(item), 1) + Self::new_uniform(std::iter::once(item), VirtualNode::COUNT) } /// The length (or count) of the vnode in this mapping. diff --git a/src/common/src/hash/consistent_hash/mod.rs b/src/common/src/hash/consistent_hash/mod.rs index c6bf218c0bde..98a970af5947 100644 --- a/src/common/src/hash/consistent_hash/mod.rs +++ b/src/common/src/hash/consistent_hash/mod.rs @@ -13,5 +13,6 @@ // limitations under the License. pub mod bitmap; +pub mod compat; pub mod mapping; pub mod vnode; diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index f71951568262..4dcfaf8d4a10 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -158,7 +158,7 @@ impl VirtualNode { .enumerate() .map(|(idx, serial)| { if let Some(serial) = serial { - extract_vnode_id_from_row_id(serial.as_row_id()) + extract_vnode_id_from_row_id(serial.as_row_id(), vnode_count) } else { // NOTE: here it will hash the entire row when the `_row_id` is missing, // which could result in rows from the same chunk being allocated to different chunks. @@ -188,7 +188,7 @@ impl VirtualNode { pub fn compute_row(row: impl Row, indices: &[usize], vnode_count: usize) -> VirtualNode { let project = row.project(indices); if let Ok(Some(ScalarRefImpl::Serial(s))) = project.iter().exactly_one().as_ref() { - return extract_vnode_id_from_row_id(s.as_row_id()); + return extract_vnode_id_from_row_id(s.as_row_id(), vnode_count); } project.hash(Crc32FastBuilder).to_vnode(vnode_count) diff --git a/src/common/src/hash/mod.rs b/src/common/src/hash/mod.rs index 8f2c89cec654..e54376bed2d5 100644 --- a/src/common/src/hash/mod.rs +++ b/src/common/src/hash/mod.rs @@ -19,6 +19,7 @@ mod key_v2; pub mod table_distribution; pub use consistent_hash::bitmap::*; +pub use consistent_hash::compat::*; pub use consistent_hash::mapping::*; pub use consistent_hash::vnode::*; pub use dispatcher::{calc_hash_key_kind, HashKeyDispatcher}; diff --git a/src/common/src/util/row_id.rs b/src/common/src/util/row_id.rs index 7f22c17e925e..ef41f61b5f53 100644 --- a/src/common/src/util/row_id.rs +++ b/src/common/src/util/row_id.rs @@ -52,12 +52,20 @@ pub struct RowIdGenerator { pub type RowId = i64; -// TODO(var-vnode): how should we handle this for different virtual node counts? #[inline] -pub fn extract_vnode_id_from_row_id(id: RowId) -> VirtualNode { +pub fn extract_vnode_id_from_row_id(id: RowId, vnode_count: usize) -> VirtualNode { let vnode_id = ((id >> VNODE_ID_SHIFT_BITS) & (VNODE_ID_UPPER_BOUND as i64 - 1)) as u32; assert!(vnode_id < VNODE_ID_UPPER_BOUND); - VirtualNode::from_index(vnode_id as usize) + + // Previously, the vnode count was fixed to 256 for all jobs in all clusters. As a result, the + // `vnode_id` must reside in the range of `0..256` and the following modulo operation will be + // no-op. So this will retrieve the exact same vnode as when it was generated. + // + // In newer versions, fragments can have different vnode counts. To make sure the vnode is + // within the range, we need to apply modulo operation here. Therefore, there is no guarantee + // that the vnode retrieved here is the same as when it was generated. However, the row ids + // generated under the same vnode will still yield the same result. + VirtualNode::from_index(vnode_id as usize % vnode_count) } impl RowIdGenerator { diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index 5b990c018640..04e0e42a1a7f 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -286,7 +286,6 @@ where visit_stream_node_tables_inner(stream_node, true, true, f) } -#[allow(dead_code)] pub fn visit_stream_node_tables(stream_node: &mut StreamNode, f: F) where F: FnMut(&mut Table, &str), @@ -301,3 +300,13 @@ where { visit_stream_node_internal_tables(fragment.node.as_mut().unwrap(), f) } + +/// Visit the tables of a [`StreamFragment`]. +/// +/// Compared to [`visit_internal_tables`], this function also visits the table of `Materialize` node. +pub fn visit_tables(fragment: &mut StreamFragment, f: F) +where + F: FnMut(&mut Table, &str), +{ + visit_stream_node_tables(fragment.node.as_mut().unwrap(), f) +} diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index 3002a3585fb4..f6b5f444b16f 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -15,7 +15,6 @@ use anyhow::{anyhow, Result}; use futures::{pin_mut, StreamExt}; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::VirtualNode; use risingwave_frontend::TableCatalog; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_rpc_client::MetaClient; @@ -54,13 +53,14 @@ pub fn print_table_catalog(table: &TableCatalog) { println!("{:#?}", table); } +// TODO: shall we work on `TableDesc` instead? pub async fn make_state_table(hummock: S, table: &TableCatalog) -> StateTable { StateTable::from_table_catalog( &table.to_internal_table_prost(), hummock, Some( // scan all vnodes - TableDistribution::all(table.distribution_key().to_vec(), VirtualNode::COUNT) + TableDistribution::all(table.distribution_key().to_vec(), table.vnode_count()) .vnodes() .clone(), ), @@ -68,6 +68,7 @@ pub async fn make_state_table(hummock: S, table: &TableCatalog) - .await } +// TODO: shall we work on `TableDesc` instead? pub fn make_storage_table( hummock: S, table: &TableCatalog, @@ -80,8 +81,7 @@ pub fn make_storage_table( Ok(StorageTable::new_partial( hummock, output_columns_ids, - // TODO(var-vnode): use vnode count from table desc - Some(Bitmap::ones(VirtualNode::COUNT).into()), + Some(Bitmap::ones(table.vnode_count()).into()), &table.table_desc().try_to_protobuf()?, )) } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index a97db5646fe2..8c33e55a0b16 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -20,6 +20,7 @@ use risingwave_common::catalog::{ ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId, }; +use risingwave_common::hash::VnodeCountCompat; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; @@ -170,6 +171,22 @@ pub struct TableCatalog { pub initialized_at_cluster_version: Option, pub cdc_table_id: Option, + + /// Total vnode count of the table. + /// + /// Can be unset if the catalog is generated by the frontend and not yet persisted. This is + /// because the vnode count of each fragment (then internal tables) is determined by the + /// meta service. See also [`StreamMaterialize::derive_table_catalog`] and + /// [`TableCatalogBuilder::build`]. + /// + /// On the contrary, if this comes from a [`PbTable`], the field must be `Some` no matter + /// whether the table is created before or after the version we introduced variable vnode + /// count support. This is because we've already handled backward compatibility during + /// conversion. + /// + /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog + /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build + pub vnode_count: Option, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -330,6 +347,9 @@ impl TableCatalog { } /// Get a [`TableDesc`] of the table. + /// + /// Note: this must be called on existing tables, otherwise it will fail to get the vnode count + /// (which is determined by the meta service) and panic. pub fn table_desc(&self) -> TableDesc { use risingwave_common::catalog::TableOption; @@ -348,6 +368,7 @@ impl TableCatalog { watermark_columns: self.watermark_columns.clone(), versioned: self.version.is_some(), vnode_col_index: self.vnode_col_index, + vnode_count: self.vnode_count(), } } @@ -383,6 +404,14 @@ impl TableCatalog { self.version().map(|v| v.version_id) } + /// Get the total vnode count of the table. + /// + /// Panics if it's called on an incomplete (and not yet persisted) table catalog. + pub fn vnode_count(&self) -> usize { + self.vnode_count + .expect("vnode count unset, called on an incomplete table catalog?") + } + pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbTable { PbTable { id: self.id.table_id, @@ -428,6 +457,7 @@ impl TableCatalog { initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), retention_seconds: self.retention_seconds, cdc_table_id: self.cdc_table_id.clone(), + maybe_vnode_count: self.vnode_count.map(|v| v as _), } } @@ -533,6 +563,8 @@ impl From for TableCatalog { OptionalAssociatedSourceId::AssociatedSourceId(id) => id, }); let name = tb.name.clone(); + let vnode_count = tb.vnode_count(); + let mut col_names = HashSet::new(); let mut col_index: HashMap = HashMap::new(); @@ -602,6 +634,7 @@ impl From for TableCatalog { .map(TableId::from) .collect_vec(), cdc_table_id: tb.cdc_table_id, + vnode_count: Some(vnode_count), /* from existing (persisted) tables, vnode_count must be set */ } } } @@ -692,6 +725,7 @@ mod tests { initialized_at_cluster_version: None, version_column_index: None, cdc_table_id: None, + maybe_vnode_count: Some(233), } .into(); @@ -755,6 +789,7 @@ mod tests { dependent_relations: vec![], version_column_index: None, cdc_table_id: None, + vnode_count: Some(233), } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/handler/alter_parallelism.rs b/src/frontend/src/handler/alter_parallelism.rs index ee3c26708908..8c3dbd6e419c 100644 --- a/src/frontend/src/handler/alter_parallelism.rs +++ b/src/frontend/src/handler/alter_parallelism.rs @@ -103,7 +103,8 @@ pub async fn handle_alter_parallelism( .filter(|w| w.is_streaming_schedulable()) .map(|w| w.parallelism) .sum::(); - // TODO(var-vnode): use vnode count from config + // TODO(var-vnode): get max parallelism from catalogs. + // Although the meta service will clamp the value for us, we should still check it here for better UI. let max_parallelism = VirtualNode::COUNT; let mut builder = RwPgResponse::builder(stmt_type); diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 4fd624929a17..e9ecb5713cb1 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -191,7 +191,7 @@ pub async fn get_replace_table_plan( panic!("unexpected statement type: {:?}", definition); }; - let (mut graph, mut table, source, job_type) = generate_stream_graph_for_table( + let (mut graph, table, source, job_type) = generate_stream_graph_for_table( session, table_name, original_catalog, @@ -241,7 +241,10 @@ pub async fn get_replace_table_plan( )?; } + // Set some fields ourselves so that the meta service does not need to maintain them. + let mut table = table; table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); + table.maybe_vnode_count = Some(original_catalog.vnode_count() as _); Ok((source, table, graph, col_index_mapping, job_type)) } diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index a6cc1e20548f..4b32bedc01ea 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -233,7 +233,7 @@ pub(crate) fn gen_create_index_plan( }) .collect(); let index_item = build_index_item( - index_table.table_desc().into(), + index_table, table.name(), table_desc, index_columns_ordered_expr, @@ -269,7 +269,7 @@ pub(crate) fn gen_create_index_plan( } fn build_index_item( - index_table_desc: Rc, + index_table: &TableCatalog, primary_table_name: &str, primary_table_desc: Rc, index_columns: Vec<(ExprImpl, OrderType)>, @@ -288,9 +288,10 @@ fn build_index_item( .into_iter() .map(|(expr, _)| expr.to_expr_proto()) .chain( - index_table_desc + index_table .columns .iter() + .map(|c| &c.column_desc) .skip(index_columns_len) .map(|x| { let name = if x.name.starts_with(&primary_table_name_prefix) { diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 57d4454ac254..c78f07f1270e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -283,6 +283,7 @@ impl StreamMaterialize { created_at_cluster_version: None, retention_seconds: retention_seconds.map(|i| i.into()), cdc_table_id: None, + vnode_count: None, // will be filled in by the meta service later }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 6a17d2507e55..d25aee9d20c8 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -179,6 +179,7 @@ impl TableCatalogBuilder { created_at_cluster_version: None, retention_seconds: None, cdc_table_id: None, + vnode_count: None, // will be filled in by the meta service later } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index a285039aed0f..d169874d5b3f 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -473,7 +473,7 @@ pub(crate) mod tests { ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus, DEFAULT_SUPER_USER_ID, }; - use risingwave_common::hash::{WorkerSlotId, WorkerSlotMapping}; + use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::types::DataType; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; @@ -538,6 +538,8 @@ pub(crate) mod tests { // let ctx = OptimizerContext::mock().await; let table_id = 0.into(); + let vnode_count = VirtualNode::COUNT_FOR_TEST; + let table_catalog: TableCatalog = TableCatalog { id: table_id, associated_source_id: None, @@ -587,6 +589,7 @@ pub(crate) mod tests { initialized_at_cluster_version: None, created_at_cluster_version: None, cdc_table_id: None, + vnode_count: Some(vnode_count), }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), @@ -715,15 +718,10 @@ pub(crate) mod tests { let workers = vec![worker1, worker2, worker3]; let worker_node_manager = Arc::new(WorkerNodeManager::mock(workers)); let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false); - worker_node_manager.insert_streaming_fragment_mapping( - 0, - WorkerSlotMapping::new_single(WorkerSlotId::new(0, 0)), - ); - worker_node_manager.set_serving_fragment_mapping( - vec![(0, WorkerSlotMapping::new_single(WorkerSlotId::new(0, 0)))] - .into_iter() - .collect(), - ); + let mapping = + WorkerSlotMapping::new_uniform(std::iter::once(WorkerSlotId::new(0, 0)), vnode_count); + worker_node_manager.insert_streaming_fragment_mapping(0, mapping.clone()); + worker_node_manager.set_serving_fragment_mapping(vec![(0, mapping)].into_iter().collect()); let catalog = Arc::new(parking_lot::RwLock::new(Catalog::default())); catalog.write().insert_table_id_mapping(table_id, 0); let catalog_reader = CatalogReader::new(catalog); diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 6777f9373b84..9bd1851383af 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -1229,6 +1229,8 @@ fn derive_partitions( vnode_mapping: &WorkerSlotMapping, ) -> SchedulerResult> { let vnode_count = vnode_mapping.len(); + assert_eq!(vnode_count, table_desc.vnode_count); + let mut partitions: HashMap)> = HashMap::new(); if scan_ranges.is_empty() { diff --git a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs index 9ab491ec3a41..6f2a07a5f3d7 100644 --- a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs +++ b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs @@ -108,6 +108,7 @@ impl StreamFragmentGraph { dependent_table_ids: vec![], table_ids_cnt: 0, parallelism: None, + expected_vnode_count: 0, } } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 790f18d109a7..ce08a8f0eb44 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -14,6 +14,7 @@ mod graph; use graph::*; +use risingwave_common::hash::VirtualNode; use risingwave_common::util::recursive::{self, Recurse as _}; use risingwave_connector::WithPropertiesExt; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -134,15 +135,18 @@ pub fn build_graph(plan_node: PlanRef) -> SchedulerResult Deref for JavaBindingIterator<'a> { #[no_mangle] extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { - // TODO(var-vnode): use vnode count from config + // TODO(var-vnode): vnode count can vary for different tables. VirtualNode::COUNT as jint } diff --git a/src/meta/model_v2/migration/README.md b/src/meta/model_v2/migration/README.md index 2ed136b8d60f..d0253be5c05d 100644 --- a/src/meta/model_v2/migration/README.md +++ b/src/meta/model_v2/migration/README.md @@ -13,7 +13,7 @@ - Generate a new migration file, a database endpoint is required but not used. Run this command in this directory, not project root. ```sh - export DATABASE_URL=sqlite::memory:; cargo run -- generate MIGRATION_NAME + DATABASE_URL=sqlite::memory: cargo run -- generate MIGRATION_NAME ``` - Apply all pending migrations for test purposes, change `DATABASE_URL` to the actual database endpoint. ```sh @@ -53,4 +53,4 @@ ## Adding a migration -- Add a new column to some catalogs. You can checkout the migration [m20240617_070131_index_column_properties.rs](src/m20240617_070131_index_column_properties.rs) as a reference. \ No newline at end of file +- Add a new column to some catalogs. You can checkout the migration [m20240617_070131_index_column_properties.rs](src/m20240617_070131_index_column_properties.rs) as a reference. diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 0b09f3c4d4e1..4b5caff8a9e6 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -21,6 +21,7 @@ mod m20240702_084927_unnecessary_fk; mod m20240726_063833_auto_schema_change; mod m20240806_143329_add_rate_limit_to_source_catalog; mod m20240820_081248_add_time_travel_per_table_epoch; +mod m20240911_083152_variable_vnode_count; pub struct Migrator; @@ -47,6 +48,7 @@ impl MigratorTrait for Migrator { Box::new(m20240726_063833_auto_schema_change::Migration), Box::new(m20240806_143329_add_rate_limit_to_source_catalog::Migration), Box::new(m20240820_081248_add_time_travel_per_table_epoch::Migration), + Box::new(m20240911_083152_variable_vnode_count::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs b/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs new file mode 100644 index 000000000000..0f93c9e3dc3d --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240911_083152_variable_vnode_count.rs @@ -0,0 +1,69 @@ +use sea_orm_migration::prelude::{Table as MigrationTable, *}; + +const VNODE_COUNT: i32 = 256; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Table::Table) + .add_column( + ColumnDef::new(Table::VnodeCount) + .integer() + .default(VNODE_COUNT), + ) + .to_owned(), + ) + .await?; + + manager + .alter_table( + MigrationTable::alter() + .table(Fragment::Table) + .add_column( + ColumnDef::new(Fragment::VnodeCount) + .integer() + .default(VNODE_COUNT), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + MigrationTable::alter() + .table(Table::Table) + .drop_column(Table::VnodeCount) + .to_owned(), + ) + .await?; + + manager + .alter_table( + MigrationTable::alter() + .table(Fragment::Table) + .drop_column(Fragment::VnodeCount) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Fragment { + Table, + VnodeCount, +} + +#[derive(DeriveIden)] +enum Table { + Table, + VnodeCount, +} diff --git a/src/meta/model_v2/src/fragment.rs b/src/meta/model_v2/src/fragment.rs index dd332f5fc76a..9a365f37377d 100644 --- a/src/meta/model_v2/src/fragment.rs +++ b/src/meta/model_v2/src/fragment.rs @@ -29,6 +29,7 @@ pub struct Model { pub stream_node: StreamNode, pub state_table_ids: I32Array, pub upstream_fragment_id: I32Array, + pub vnode_count: i32, } #[derive(Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 44cebfa6f70b..0a47208ff735 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -13,6 +13,7 @@ // limitations under the License. use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER; +use risingwave_common::hash::VnodeCountCompat; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType}; use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable}; use sea_orm::entity::prelude::*; @@ -133,6 +134,7 @@ pub struct Model { pub retention_seconds: Option, pub incoming_sinks: I32Array, pub cdc_table_id: Option, + pub vnode_count: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -207,6 +209,7 @@ impl From for ActiveModel { fn from(pb_table: PbTable) -> Self { let table_type = pb_table.table_type(); let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); + let vnode_count = pb_table.vnode_count(); let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER { NotSet @@ -255,6 +258,7 @@ impl From for ActiveModel { retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)), incoming_sinks: Set(pb_table.incoming_sinks.into()), cdc_table_id: Set(pb_table.cdc_table_id), + vnode_count: Set(vnode_count as _), } } } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 31575e72804f..b29c4cc207eb 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -19,7 +19,7 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::hash::WorkerSlotId; +use risingwave_common::hash::{VnodeCountCompat, WorkerSlotId}; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; @@ -177,6 +177,7 @@ impl CatalogController { Vec, HashMap>, )> { + let vnode_count = pb_fragment.vnode_count(); let PbFragment { fragment_id: pb_fragment_id, fragment_type_mask: pb_fragment_type_mask, @@ -294,6 +295,7 @@ impl CatalogController { stream_node, state_table_ids, upstream_fragment_id, + vnode_count: vnode_count as _, }; Ok((fragment, actors, actor_dispatchers)) @@ -365,6 +367,7 @@ impl CatalogController { stream_node, state_table_ids, upstream_fragment_id, + vnode_count, } = fragment; let stream_node_template = stream_node.to_protobuf(); @@ -463,6 +466,7 @@ impl CatalogController { actors: pb_actors, state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, + maybe_vnode_count: Some(vnode_count as _), }; Ok((pb_fragment, pb_actor_status, pb_actor_splits)) @@ -1541,6 +1545,7 @@ mod tests { .values() .flat_map(|m| m.keys().map(|x| *x as _)) .collect(), + maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as _), }; let pb_actor_status = (0..actor_count) @@ -1684,6 +1689,7 @@ mod tests { stream_node: StreamNode::from(&stream_node), state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]), upstream_fragment_id: I32Array::default(), + vnode_count: VirtualNode::COUNT_FOR_TEST as _, }; let (pb_fragment, pb_actor_status, pb_actor_splits) = CatalogController::compose_fragment( @@ -1793,6 +1799,7 @@ mod tests { actors: _, state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, + maybe_vnode_count: _, } = pb_fragment; assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32); diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 3e903802b86e..bfad20f668bc 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -163,6 +163,7 @@ impl From> for PbTable { created_at_cluster_version: value.1.created_at_cluster_version, retention_seconds: value.0.retention_seconds.map(|id| id as u32), cdc_table_id: value.0.cdc_table_id, + maybe_vnode_count: Some(value.0.vnode_count as _), } } } diff --git a/src/meta/src/manager/streaming_job.rs b/src/meta/src/manager/streaming_job.rs index 0aca9eccf897..7f80d8f384c2 100644 --- a/src/meta/src/manager/streaming_job.rs +++ b/src/meta/src/manager/streaming_job.rs @@ -133,6 +133,10 @@ impl StreamingJob { } } +// TODO: basically we want to ensure that the `Table` persisted in the catalog is the same as the +// one in the `Materialize` node in the actor. However, they are currently handled separately +// and can be out of sync. Shall we directly copy the whole struct from the actor to the catalog +// to avoid `set`ting each field separately? impl StreamingJob { pub fn set_id(&mut self, id: u32) { match self { @@ -160,6 +164,16 @@ impl StreamingJob { } } + /// Set the vnode count of the table. + pub fn set_table_vnode_count(&mut self, vnode_count: usize) { + match self { + Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => { + table.maybe_vnode_count = Some(vnode_count as u32); + } + Self::Sink(_, _) | Self::Source(_) => {} + } + } + /// Set the fragment id where the table dml is received. pub fn set_dml_fragment_id(&mut self, id: Option) { match self { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 7c1499552193..56e8edbf1d78 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -23,7 +23,7 @@ use itertools::Itertools; use rand::Rng; use risingwave_common::bitmap::Bitmap; use risingwave_common::config::DefaultParallelism; -use risingwave_common::hash::{ActorMapping, VirtualNode}; +use risingwave_common::hash::{ActorMapping, VnodeCountCompat}; use risingwave_common::secret::SecretEncryption; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::column_index_mapping::ColIndexMapping; @@ -1584,6 +1584,7 @@ impl DdlController { let specified_parallelism = fragment_graph.specified_parallelism(); let internal_tables = fragment_graph.internal_tables(); let expr_context = stream_ctx.to_expr_context(); + let max_parallelism = NonZeroUsize::new(fragment_graph.expected_vnode_count()).unwrap(); // 1. Resolve the upstream fragments, extend the fragment graph to a complete graph that // contains all information needed for building the actor graph. @@ -1631,16 +1632,11 @@ impl DdlController { let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?; let parallelism = self.resolve_stream_parallelism(specified_parallelism, &cluster_info)?; - - // TODO(var-vnode): use vnode count from config - const MAX_PARALLELISM: NonZeroUsize = NonZeroUsize::new(VirtualNode::COUNT).unwrap(); - - let parallelism_limited = parallelism > MAX_PARALLELISM; + let parallelism_limited = parallelism > max_parallelism; if parallelism_limited { - tracing::warn!("Too many parallelism, use {} instead", MAX_PARALLELISM); + tracing::warn!("Too many parallelism, use {} instead", max_parallelism); } - - let parallelism = parallelism.min(MAX_PARALLELISM); + let parallelism = parallelism.min(max_parallelism); let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, cluster_info, parallelism)?; @@ -1664,7 +1660,7 @@ impl DdlController { // Otherwise, it defaults to FIXED based on deduction. let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) { (None, DefaultParallelism::Full) if parallelism_limited => { - tracing::warn!("Parallelism limited to {MAX_PARALLELISM} in ADAPTIVE mode"); + tracing::warn!("Parallelism limited to {max_parallelism} in ADAPTIVE mode"); TableParallelism::Adaptive } (None, DefaultParallelism::Full) => TableParallelism::Adaptive, @@ -1679,6 +1675,10 @@ impl DdlController { table_parallelism, ); + if let Some(mview_fragment) = table_fragments.mview_fragment() { + stream_job.set_table_vnode_count(mview_fragment.vnode_count()); + } + let replace_table_job_info = match affected_table_replace_info { Some((streaming_job, fragment_graph)) => { if snapshot_backfill_info.is_some() { @@ -2160,6 +2160,9 @@ impl DdlController { old_table_fragments.assigned_parallelism, ); + // Note: no need to set `vnode_count` as it's already set by the frontend. + // See `get_replace_table_plan`. + let ctx = ReplaceTableContext { old_table_fragments, merge_updates, diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 30f1466eae7f..3d7ab9888362 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -57,7 +57,7 @@ impl ServingVnodeMapping { } else { None }; - // TODO(var-vnode): use vnode count from config + // TODO(var-vnode): also fetch vnode count for each fragment place_vnode(old_mapping, workers, max_parallelism, VirtualNode::COUNT) }; match new_mapping { diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 3c73b947d830..c2891d81b81a 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -28,7 +28,7 @@ use num_traits::abs; use risingwave_common::bail; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableId; -use risingwave_common::hash::{ActorMapping, VirtualNode}; +use risingwave_common::hash::{ActorMapping, VnodeCountCompat}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_meta_model_v2::{actor, fragment, ObjectId, StreamingParallelism}; use risingwave_pb::common::{PbActorLocation, WorkerNode, WorkerType}; @@ -645,6 +645,7 @@ impl ScaleController { stream_node, state_table_ids, upstream_fragment_id, + vnode_count: _, }, ) in fragments { @@ -1429,9 +1430,6 @@ impl ScaleController { FragmentDistributionType::Hash => { if !in_degree_types.contains(&DispatcherType::Hash) { None - } else if actors_after_reschedule.len() == 1 { - let actor_id = actors_after_reschedule.keys().next().cloned().unwrap(); - Some(ActorMapping::new_single(actor_id)) } else { // Changes of the bitmap must occur in the case of HashDistribution Some(ActorMapping::from_bitmaps( @@ -1900,8 +1898,7 @@ impl ScaleController { &self, policy: TableResizePolicy, ) -> MetaResult> { - // TODO(var-vnode): use vnode count from config - let max_parallelism = VirtualNode::COUNT; + type VnodeCount = usize; let TableResizePolicy { worker_ids, @@ -1940,7 +1937,7 @@ impl ScaleController { let mut no_shuffle_source_fragment_ids = HashSet::new(); let mut no_shuffle_target_fragment_ids = HashSet::new(); - // index for fragment_id -> distribution_type + // index for fragment_id -> (distribution_type, vnode_count) let mut fragment_distribution_map = HashMap::new(); // index for actor -> worker id let mut actor_location = HashMap::new(); @@ -1953,7 +1950,10 @@ impl ScaleController { fn build_index( no_shuffle_source_fragment_ids: &mut HashSet, no_shuffle_target_fragment_ids: &mut HashSet, - fragment_distribution_map: &mut HashMap, + fragment_distribution_map: &mut HashMap< + FragmentId, + (FragmentDistributionType, VnodeCount), + >, actor_location: &mut HashMap, table_fragment_id_map: &mut HashMap>, fragment_actor_id_map: &mut HashMap>, @@ -2021,7 +2021,10 @@ impl ScaleController { } } - fragment_distribution_map.insert(*fragment_id, fragment.distribution_type()); + fragment_distribution_map.insert( + *fragment_id, + (fragment.distribution_type(), fragment.vnode_count()), + ); table_fragment_id_map .entry(table_id.table_id()) @@ -2040,7 +2043,10 @@ impl ScaleController { async fn build_index_v2( no_shuffle_source_fragment_ids: &mut HashSet, no_shuffle_target_fragment_ids: &mut HashSet, - fragment_distribution_map: &mut HashMap, + fragment_distribution_map: &mut HashMap< + FragmentId, + (FragmentDistributionType, VnodeCount), + >, actor_location: &mut HashMap, table_fragment_id_map: &mut HashMap>, fragment_actor_id_map: &mut HashMap>, @@ -2073,7 +2079,10 @@ impl ScaleController { for (fragment_id, fragment) in fragments { fragment_distribution_map.insert( fragment_id as FragmentId, - FragmentDistributionType::from(fragment.distribution_type), + ( + FragmentDistributionType::from(fragment.distribution_type), + fragment.vnode_count as _, + ), ); table_fragment_id_map @@ -2166,7 +2175,10 @@ impl ScaleController { ); } - match fragment_distribution_map.get(&fragment_id).unwrap() { + let &(dist, vnode_count) = fragment_distribution_map.get(&fragment_id).unwrap(); + let max_parallelism = vnode_count; + + match dist { FragmentDistributionType::Unspecified => unreachable!(), FragmentDistributionType::Single => { let (single_worker_id, should_be_one) = @@ -2232,7 +2244,8 @@ impl ScaleController { } TableParallelism::Fixed(mut n) => { if n > max_parallelism { - // This should be unreachable, but we still intercept it to prevent accidental modifications. + // This should be unreachable as it was already checked and rewritten in the frontend. + // We still intercept it to prevent accidental modifications. tracing::warn!("specified parallelism {n} for table {table_id} is larger than max parallelism, force limit to {max_parallelism}"); n = max_parallelism } diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 22424a98f9ab..6799526b71e4 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -22,6 +22,7 @@ use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::{ActorId, ActorMapping, WorkerSlotId}; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common::util::stream_graph_visitor::visit_tables; use risingwave_pb::meta::table_fragments::Fragment; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::stream_plan::stream_node::NodeBody; @@ -665,6 +666,7 @@ impl ActorGraphBuilder { cluster_info: StreamingClusterInfo, default_parallelism: NonZeroUsize, ) -> MetaResult { + let expected_vnode_count = fragment_graph.expected_vnode_count(); let existing_distributions = fragment_graph.existing_distribution(); // Schedule the distribution of all building fragments. @@ -672,9 +674,19 @@ impl ActorGraphBuilder { streaming_job_id, &cluster_info.worker_nodes, default_parallelism, + expected_vnode_count, )?; let distributions = scheduler.schedule(&fragment_graph)?; + // Fill the vnode count for each internal table, based on schedule result. + let mut fragment_graph = fragment_graph; + for (id, fragment) in fragment_graph.building_fragments_mut() { + let vnode_count = distributions[id].vnode_count(); + visit_tables(fragment, |table, _| { + table.maybe_vnode_count = Some(vnode_count as _); + }) + } + Ok(Self { distributions, existing_distributions, @@ -854,7 +866,10 @@ impl ActorGraphBuilder { .worker_slots() .map(|worker_slot| { let actor_id = state.next_actor_id(); - let vnode_bitmap = bitmaps.as_ref().map(|m| &m[&worker_slot]).cloned(); + let vnode_bitmap = bitmaps + .as_ref() + .map(|m: &HashMap| &m[&worker_slot]) + .cloned(); state.inner.add_actor( actor_id, diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 782156775586..b997b54a3bbe 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -14,7 +14,7 @@ use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; use std::sync::LazyLock; use anyhow::{anyhow, Context}; @@ -241,6 +241,12 @@ impl Deref for BuildingFragment { } } +impl DerefMut for BuildingFragment { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + /// The ID of an edge in the fragment graph. For different types of edges, the ID will be in /// different variants. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)] @@ -319,6 +325,16 @@ pub struct StreamFragmentGraph { /// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session /// variable. If not specified, all active worker slots will be used. specified_parallelism: Option, + + /// Expected vnode count for the graph. + /// The scheduler on the meta service will use this as a hint to decide the vnode count + /// for each fragment. + /// + /// Note that the actual vnode count may be different from this value. + /// For example, a no-shuffle exchange between current fragment graph and an existing + /// upstream fragment graph requires two fragments to be in the same distribution, + /// thus the same vnode count. + expected_vnode_count: usize, } impl StreamFragmentGraph { @@ -394,12 +410,15 @@ impl StreamFragmentGraph { None }; + let expected_vnode_count = proto.expected_vnode_count as usize; + Ok(Self { fragments, downstreams, upstreams, dependent_table_ids, specified_parallelism, + expected_vnode_count, }) } @@ -499,6 +518,11 @@ impl StreamFragmentGraph { self.specified_parallelism } + /// Get the expected vnode count of the graph. See documentation of the field for more details. + pub fn expected_vnode_count(&self) -> usize { + self.expected_vnode_count + } + /// Get downstreams of a fragment. fn get_downstreams( &self, @@ -1069,6 +1093,8 @@ impl CompleteStreamFragmentGraph { } = building_fragment; let distribution_type = distribution.to_distribution_type() as i32; + let vnode_count = distribution.vnode_count(); + let materialized_fragment_id = if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 { table_id @@ -1094,6 +1120,7 @@ impl CompleteStreamFragmentGraph { actors, state_table_ids, upstream_fragment_ids, + maybe_vnode_count: Some(vnode_count as _), } } @@ -1148,4 +1175,16 @@ impl CompleteStreamFragmentGraph { pub(super) fn building_fragments(&self) -> &HashMap { &self.building_graph.fragments } + + /// Returns all building fragments in the graph, mutable. + pub(super) fn building_fragments_mut( + &mut self, + ) -> &mut HashMap { + &mut self.building_graph.fragments + } + + /// Get the expected vnode count of the building graph. See documentation of the field for more details. + pub(super) fn expected_vnode_count(&self) -> usize { + self.building_graph.expected_vnode_count() + } } diff --git a/src/meta/src/stream/stream_graph/schedule.rs b/src/meta/src/stream/stream_graph/schedule.rs index d054beb0772b..f67d8547e28a 100644 --- a/src/meta/src/stream/stream_graph/schedule.rs +++ b/src/meta/src/stream/stream_graph/schedule.rs @@ -151,6 +151,16 @@ impl Distribution { } } + /// Get the vnode count of the distribution. + // TODO(var-vnode): after `ServingVnodeMapping::upsert` is made vnode-count-aware, + // we may return 1 for singleton. + pub fn vnode_count(&self) -> usize { + match self { + Distribution::Singleton(_) => VirtualNode::COUNT, + Distribution::Hash(mapping) => mapping.len(), + } + } + /// Create a distribution from a persisted protobuf `Fragment`. pub fn from_fragment( fragment: &risingwave_pb::meta::table_fragments::Fragment, @@ -214,6 +224,7 @@ impl Scheduler { streaming_job_id: u32, workers: &HashMap, default_parallelism: NonZeroUsize, + expected_vnode_count: usize, ) -> MetaResult { // Group worker slots with worker node. @@ -223,6 +234,11 @@ impl Scheduler { .collect(); let parallelism = default_parallelism.get(); + assert!( + parallelism <= expected_vnode_count, + "parallelism should be limited by vnode count in previous steps" + ); + let scheduled = schedule_units_for_slots(&slots, parallelism, streaming_job_id)?; let scheduled_worker_slots = scheduled @@ -235,9 +251,8 @@ impl Scheduler { assert_eq!(scheduled_worker_slots.len(), parallelism); // Build the default hash mapping uniformly. - // TODO(var-vnode): use vnode count from config let default_hash_mapping = - WorkerSlotMapping::build_from_ids(&scheduled_worker_slots, VirtualNode::COUNT); + WorkerSlotMapping::build_from_ids(&scheduled_worker_slots, expected_vnode_count); let single_scheduled = schedule_units_for_slots(&slots, 1, streaming_job_id)?; let default_single_worker_id = single_scheduled.keys().exactly_one().cloned().unwrap(); diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index c313f8b4ade3..040ca0046e30 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -18,6 +18,7 @@ use std::vec; use itertools::Itertools; use risingwave_common::catalog::{DatabaseId, SchemaId, TableId}; +use risingwave_common::hash::VirtualNode; use risingwave_pb::catalog::PbTable; use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType, WorkerNode}; use risingwave_pb::data::data_type::TypeName; @@ -415,6 +416,7 @@ fn make_stream_graph() -> StreamFragmentGraphProto { dependent_table_ids: vec![], table_ids_cnt: 3, parallelism: None, + expected_vnode_count: VirtualNode::COUNT_FOR_TEST as _, } } diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index baf5c4070c74..e9326d37dcd8 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -554,6 +554,7 @@ mod tests { initialized_at_cluster_version: None, created_at_cluster_version: None, cdc_table_id: None, + maybe_vnode_count: None, } }