diff --git a/e2e_test/ddl/max_parallelism.slt b/e2e_test/ddl/max_parallelism.slt new file mode 100644 index 0000000000000..45aa58185f31b --- /dev/null +++ b/e2e_test/ddl/max_parallelism.slt @@ -0,0 +1,70 @@ +statement ok +create view table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id; + +#### BEGIN + + +statement ok +set streaming_max_parallelism to 4; + +# When the parallelism is specified to a value greater than the max parallelism, return an error. +statement ok +set streaming_parallelism to 6; + +statement error specified parallelism 6 should not exceed max parallelism 4 +create table t; + +# When the parallelism is specified to an valid value, ok. +statement ok +set streaming_parallelism to 4; + +statement ok +create table t; + +query T +select parallelism from table_parallelism where name = 't'; +---- +FIXED(4) + +statement ok +drop table t; + +# When no parallelism is specified, ok, and the parallelism will be adaptive. + +statement ok +set streaming_parallelism to default; + +statement ok +create table t; + +query T +select parallelism from table_parallelism where name = 't'; +---- +ADAPTIVE + +# Alter parallelism to a valid value, ok. +statement ok +alter table t set parallelism to 4; + +query T +select parallelism from table_parallelism where name = 't'; +---- +FIXED(4) + +# Alter parallelism to an invalid value, return an error. +statement error specified parallelism 8 should not exceed max parallelism 4 +alter table t set parallelism to 8; + +statement ok +drop table t; + +#### END + +statement ok +set streaming_max_parallelism to default; + +statement ok +set streaming_parallelism to default; + +statement ok +drop view table_parallelism; diff --git a/proto/catalog.proto b/proto/catalog.proto index 2bbfa39c10350..f792eccc0cab6 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -428,7 +428,7 @@ message Table { // Use `VnodeCountCompat::vnode_count` to access it. // // Please note that this field is not intended to describe the expected vnode count - // for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.expected_vnode_count`. + // for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`. optional uint32 maybe_vnode_count = 40; // Per-table catalog version, used by schema change. `None` for internal diff --git a/proto/meta.proto b/proto/meta.proto index 75a9b5d5caf0b..19f8eeb025a93 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -111,7 +111,22 @@ message TableFragments { map actor_splits = 5; stream_plan.StreamContext ctx = 6; + TableParallelism parallelism = 7; + // The max parallelism specified when the streaming job was created, i.e., expected vnode count. + // + // The reason for persisting this value is mainly to check if a parallelism change (via `ALTER + // .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of + // the streaming job. + // + // Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different + // from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result, + // checking the parallelism change with this value can be inaccurate in some cases. However, + // when generating resizing plans, we still take the `vnode_count` of each fragment into account. + // + // Can be unset if the fragment is created in older versions where variable vnode count is not + // supported, in which case a default value of 256 should be used. + optional uint32 max_parallelism = 10; // Actors of a materialize view, sink, or table can only be scheduled on nodes with matching node_label. string node_label = 8; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 7fe63054c565c..2323bc8b8ba5f 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -1041,7 +1041,8 @@ message StreamFragmentGraph { // If none, default parallelism will be applied. Parallelism parallelism = 6; - // Expected vnode count for the graph. + // Specified max parallelism, i.e., expected vnode count for the graph. + // // The scheduler on the meta service will use this as a hint to decide the vnode count // for each fragment. // @@ -1049,5 +1050,5 @@ message StreamFragmentGraph { // For example, a no-shuffle exchange between current fragment graph and an existing // upstream fragment graph requires two fragments to be in the same distribution, // thus the same vnode count. - uint32 expected_vnode_count = 7; + uint32 max_parallelism = 7; } diff --git a/src/ctl/src/cmd_impl/meta/migration.rs b/src/ctl/src/cmd_impl/meta/migration.rs index fc735104dac63..58ccb4c93f254 100644 --- a/src/ctl/src/cmd_impl/meta/migration.rs +++ b/src/ctl/src/cmd_impl/meta/migration.rs @@ -439,6 +439,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an create_type: Set(CreateType::Foreground), timezone: Set(table_fragment.timezone()), parallelism: Set(streaming_parallelism), + max_parallelism: Set(table_fragment.max_parallelism as _), }) .exec(&meta_store_sql.conn) .await?; diff --git a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs index 6f2a07a5f3d72..78f79f1b13012 100644 --- a/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs +++ b/src/frontend/src/stream_fragmenter/graph/fragment_graph.rs @@ -108,7 +108,7 @@ impl StreamFragmentGraph { dependent_table_ids: vec![], table_ids_cnt: 0, parallelism: None, - expected_vnode_count: 0, + max_parallelism: 0, } } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index b671d0792e073..daa48d99969ca 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -144,7 +144,7 @@ pub fn build_graph(plan_node: PlanRef) -> SchedulerResult { + ColumnDef::new($name).integer().not_null().default(256) // compat vnode count + }; +} #[derive(DeriveMigrationName)] pub struct Migration; @@ -12,12 +16,7 @@ impl MigrationTrait for Migration { .alter_table( MigrationTable::alter() .table(Table::Table) - .add_column( - ColumnDef::new(Table::VnodeCount) - .integer() - .not_null() - .default(VNODE_COUNT), - ) + .add_column(col!(Table::VnodeCount)) .to_owned(), ) .await?; @@ -26,12 +25,16 @@ impl MigrationTrait for Migration { .alter_table( MigrationTable::alter() .table(Fragment::Table) - .add_column( - ColumnDef::new(Fragment::VnodeCount) - .integer() - .not_null() - .default(VNODE_COUNT), - ) + .add_column(col!(Fragment::VnodeCount)) + .to_owned(), + ) + .await?; + + manager + .alter_table( + MigrationTable::alter() + .table(StreamingJob::Table) + .add_column(col!(StreamingJob::MaxParallelism)) .to_owned(), ) .await @@ -54,6 +57,15 @@ impl MigrationTrait for Migration { .drop_column(Fragment::VnodeCount) .to_owned(), ) + .await?; + + manager + .alter_table( + MigrationTable::alter() + .table(StreamingJob::Table) + .drop_column(StreamingJob::MaxParallelism) + .to_owned(), + ) .await } } @@ -69,3 +81,9 @@ enum Table { Table, VnodeCount, } + +#[derive(DeriveIden)] +enum StreamingJob { + Table, + MaxParallelism, +} diff --git a/src/meta/model_v2/src/streaming_job.rs b/src/meta/model_v2/src/streaming_job.rs index 1ab9225f43cda..d3d0b2b48cc13 100644 --- a/src/meta/model_v2/src/streaming_job.rs +++ b/src/meta/model_v2/src/streaming_job.rs @@ -26,6 +26,7 @@ pub struct Model { pub create_type: CreateType, pub timezone: Option, pub parallelism: StreamingParallelism, + pub max_parallelism: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 6d1d9ceba40e2..22cab97ce6941 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -315,6 +315,7 @@ impl CatalogController { HashMap>, )>, parallelism: StreamingParallelism, + max_parallelism: usize, ) -> MetaResult { let mut pb_fragments = HashMap::new(); let mut pb_actor_splits = HashMap::new(); @@ -347,6 +348,7 @@ impl CatalogController { ), node_label: "".to_string(), backfill_done: true, + max_parallelism: Some(max_parallelism as _), }; Ok(table_fragments) @@ -669,6 +671,7 @@ impl CatalogController { job_info.timezone.map(|tz| PbStreamContext { timezone: tz }), fragment_info, job_info.parallelism.clone(), + job_info.max_parallelism as _, ) } @@ -689,6 +692,15 @@ impl CatalogController { Ok(job_states) } + pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult { + let inner = self.inner.read().await; + let job = StreamingJob::find_by_id(job_id) + .one(&inner.db) + .await? + .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?; + Ok(job.max_parallelism as usize) + } + /// Get all actor ids in the target streaming jobs. pub async fn get_job_actor_mapping( &self, @@ -790,6 +802,7 @@ impl CatalogController { job.timezone.map(|tz| PbStreamContext { timezone: tz }), fragment_info, job.parallelism.clone(), + job.max_parallelism as _, )?, ); } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index e3f4f711bb277..6bf2606a9354b 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -83,6 +83,7 @@ impl CatalogController { create_type: PbCreateType, ctx: &StreamContext, streaming_parallelism: StreamingParallelism, + max_parallelism: usize, ) -> MetaResult { let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?; let job = streaming_job::ActiveModel { @@ -91,6 +92,7 @@ impl CatalogController { create_type: Set(create_type.into()), timezone: Set(ctx.timezone.clone()), parallelism: Set(streaming_parallelism), + max_parallelism: Set(max_parallelism as _), }; job.insert(txn).await?; @@ -102,6 +104,7 @@ impl CatalogController { streaming_job: &mut StreamingJob, ctx: &StreamContext, parallelism: &Option, + max_parallelism: usize, ) -> MetaResult<()> { let inner = self.inner.write().await; let txn = inner.db.begin().await?; @@ -169,6 +172,7 @@ impl CatalogController { create_type, ctx, streaming_parallelism, + max_parallelism, ) .await?; table.id = job_id as _; @@ -204,6 +208,7 @@ impl CatalogController { create_type, ctx, streaming_parallelism, + max_parallelism, ) .await?; sink.id = job_id as _; @@ -220,6 +225,7 @@ impl CatalogController { create_type, ctx, streaming_parallelism, + max_parallelism, ) .await?; table.id = job_id as _; @@ -255,6 +261,7 @@ impl CatalogController { create_type, ctx, streaming_parallelism, + max_parallelism, ) .await?; // to be compatible with old implementation. @@ -285,6 +292,7 @@ impl CatalogController { create_type, ctx, streaming_parallelism, + max_parallelism, ) .await?; src.id = job_id as _; @@ -631,6 +639,7 @@ impl CatalogController { ctx: &StreamContext, version: &PbTableVersion, specified_parallelism: &Option, + max_parallelism: usize, ) -> MetaResult { let id = streaming_job.id(); let inner = self.inner.write().await; @@ -685,6 +694,7 @@ impl CatalogController { PbCreateType::Foreground, ctx, parallelism, + max_parallelism, ) .await?; diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 78df862e1d8e5..11d407ff39e1b 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::pin::pin; use std::time::Duration; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use futures::future::{select, Either}; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::{ObjectId, SourceId}; @@ -892,6 +892,24 @@ impl MetadataManager { } } + pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult { + match self { + MetadataManager::V1(mgr) => { + let fragments = mgr.fragment_manager.get_fragment_read_guard().await; + Ok(fragments + .table_fragments() + .get(&table_id) + .map(|tf| tf.max_parallelism) + .with_context(|| format!("job {table_id} not found"))?) + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .get_max_parallelism_by_id(table_id.table_id as _) + .await + } + } + } + pub fn cluster_id(&self) -> &ClusterId { match self { MetadataManager::V1(mgr) => mgr.cluster_manager.cluster_id(), diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index aaff076688785..954ee820e6a7f 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -17,7 +17,7 @@ use std::ops::AddAssign; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::hash::WorkerSlotId; +use risingwave_common::hash::{VirtualNode, WorkerSlotId}; use risingwave_connector::source::SplitImpl; use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::ActorState; @@ -115,6 +115,18 @@ pub struct TableFragments { /// The parallelism assigned to this table fragments pub assigned_parallelism: TableParallelism, + + /// The max parallelism specified when the streaming job was created, i.e., expected vnode count. + /// + /// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER + /// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of + /// the streaming job. + /// + /// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different + /// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result, + /// checking the parallelism change with this value can be inaccurate in some cases. However, + /// when generating resizing plans, we still take the `vnode_count` of each fragment into account. + pub max_parallelism: usize, } #[derive(Debug, Clone, Default)] @@ -167,6 +179,7 @@ impl MetadataModel for TableFragments { parallelism: Some(self.assigned_parallelism.into()), node_label: "".to_string(), backfill_done: true, + max_parallelism: Some(self.max_parallelism as _), } } @@ -187,6 +200,9 @@ impl MetadataModel for TableFragments { actor_splits: build_actor_split_impls(&prost.actor_splits), ctx, assigned_parallelism: prost.parallelism.unwrap_or(default_parallelism).into(), + max_parallelism: prost + .max_parallelism + .map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _), } } @@ -204,6 +220,7 @@ impl TableFragments { &BTreeMap::new(), StreamContext::default(), TableParallelism::Adaptive, + VirtualNode::COUNT_FOR_TEST, ) } @@ -215,6 +232,7 @@ impl TableFragments { actor_locations: &BTreeMap, ctx: StreamContext, table_parallelism: TableParallelism, + max_parallelism: usize, ) -> Self { let actor_status = actor_locations .iter() @@ -237,6 +255,7 @@ impl TableFragments { actor_splits: HashMap::default(), ctx, assigned_parallelism: table_parallelism, + max_parallelism, } } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 0d8ea1c2ae38c..2b288c25a8a1a 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1612,7 +1612,7 @@ impl DdlController { let specified_parallelism = fragment_graph.specified_parallelism(); let internal_tables = fragment_graph.internal_tables(); let expr_context = stream_ctx.to_expr_context(); - let max_parallelism = NonZeroUsize::new(fragment_graph.expected_vnode_count()).unwrap(); + let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap(); // 1. Resolve the upstream fragments, extend the fragment graph to a complete graph that // contains all information needed for building the actor graph. @@ -1693,6 +1693,7 @@ impl DdlController { &building_locations.actor_locations, stream_ctx.clone(), table_parallelism, + max_parallelism.get(), ); if let Some(mview_fragment) = table_fragments.mview_fragment() { @@ -1727,6 +1728,7 @@ impl DdlController { &stream_ctx, table.get_version()?, &fragment_graph.specified_parallelism(), + fragment_graph.max_parallelism(), ) .await? as u32 } @@ -2178,6 +2180,7 @@ impl DdlController { &building_locations.actor_locations, stream_ctx, old_table_fragments.assigned_parallelism, + old_table_fragments.max_parallelism, ); // Note: no need to set `vnode_count` as it's already set by the frontend. diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 5e83e49b767a7..43abd22fba8dd 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -46,7 +46,12 @@ impl DdlController { let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); mgr.catalog_controller - .create_job_catalog(&mut streaming_job, &ctx, &fragment_graph.parallelism) + .create_job_catalog( + &mut streaming_job, + &ctx, + &fragment_graph.parallelism, + fragment_graph.max_parallelism as _, + ) .await?; let job_id = streaming_job.id(); @@ -296,6 +301,7 @@ impl DdlController { &stream_ctx, table.get_version()?, &fragment_graph.specified_parallelism(), + fragment_graph.max_parallelism(), ) .await? as u32; @@ -440,6 +446,7 @@ impl DdlController { &ctx, table.get_version()?, &fragment_graph.specified_parallelism(), + fragment_graph.max_parallelism(), ) .await?; diff --git a/src/meta/src/stream/stream_graph/actor.rs b/src/meta/src/stream/stream_graph/actor.rs index 6799526b71e4b..b88de16a6dc20 100644 --- a/src/meta/src/stream/stream_graph/actor.rs +++ b/src/meta/src/stream/stream_graph/actor.rs @@ -666,7 +666,7 @@ impl ActorGraphBuilder { cluster_info: StreamingClusterInfo, default_parallelism: NonZeroUsize, ) -> MetaResult { - let expected_vnode_count = fragment_graph.expected_vnode_count(); + let expected_vnode_count = fragment_graph.max_parallelism(); let existing_distributions = fragment_graph.existing_distribution(); // Schedule the distribution of all building fragments. diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index b997b54a3bbe4..a28567560b4c1 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -326,7 +326,8 @@ pub struct StreamFragmentGraph { /// variable. If not specified, all active worker slots will be used. specified_parallelism: Option, - /// Expected vnode count for the graph. + /// Specified max parallelism, i.e., expected vnode count for the graph. + /// /// The scheduler on the meta service will use this as a hint to decide the vnode count /// for each fragment. /// @@ -334,7 +335,7 @@ pub struct StreamFragmentGraph { /// For example, a no-shuffle exchange between current fragment graph and an existing /// upstream fragment graph requires two fragments to be in the same distribution, /// thus the same vnode count. - expected_vnode_count: usize, + max_parallelism: usize, } impl StreamFragmentGraph { @@ -410,7 +411,7 @@ impl StreamFragmentGraph { None }; - let expected_vnode_count = proto.expected_vnode_count as usize; + let max_parallelism = proto.max_parallelism as usize; Ok(Self { fragments, @@ -418,7 +419,7 @@ impl StreamFragmentGraph { upstreams, dependent_table_ids, specified_parallelism, - expected_vnode_count, + max_parallelism, }) } @@ -519,8 +520,8 @@ impl StreamFragmentGraph { } /// Get the expected vnode count of the graph. See documentation of the field for more details. - pub fn expected_vnode_count(&self) -> usize { - self.expected_vnode_count + pub fn max_parallelism(&self) -> usize { + self.max_parallelism } /// Get downstreams of a fragment. @@ -1184,7 +1185,7 @@ impl CompleteStreamFragmentGraph { } /// Get the expected vnode count of the building graph. See documentation of the field for more details. - pub(super) fn expected_vnode_count(&self) -> usize { - self.building_graph.expected_vnode_count() + pub(super) fn max_parallelism(&self) -> usize { + self.building_graph.max_parallelism() } } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index f12de49cccbbe..63a6b3e228b1a 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -19,7 +19,6 @@ use futures::future::join_all; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::catalog::TableId; -use risingwave_common::hash::VirtualNode; use risingwave_meta_model_v2::ObjectId; use risingwave_pb::catalog::{CreateType, Subscription, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; @@ -665,6 +664,8 @@ impl GlobalStreamManager { ) -> MetaResult<()> { let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + let table_id = TableId::new(table_id); + let worker_nodes = self .metadata_manager .list_active_streaming_compute_nodes() @@ -683,8 +684,10 @@ impl GlobalStreamManager { .iter() .map(|w| w.parallelism as usize) .sum::(); - // TODO(var-vnode): get correct max parallelism from catalogs. - let max_parallelism = VirtualNode::COUNT_FOR_COMPAT; + let max_parallelism = self + .metadata_manager + .get_job_max_parallelism(table_id) + .await?; match parallelism { TableParallelism::Adaptive => { @@ -709,7 +712,7 @@ impl GlobalStreamManager { } } - let table_parallelism_assignment = HashMap::from([(TableId::new(table_id), parallelism)]); + let table_parallelism_assignment = HashMap::from([(table_id, parallelism)]); if deferred { tracing::debug!( @@ -1098,6 +1101,7 @@ mod tests { &locations.actor_locations, Default::default(), TableParallelism::Adaptive, + VirtualNode::COUNT_FOR_TEST, ); let ctx = CreateStreamingJobContext { building_locations: locations, diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index 040ca0046e30f..f4bbd9ad186fa 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -416,7 +416,7 @@ fn make_stream_graph() -> StreamFragmentGraphProto { dependent_table_ids: vec![], table_ids_cnt: 3, parallelism: None, - expected_vnode_count: VirtualNode::COUNT_FOR_TEST as _, + max_parallelism: VirtualNode::COUNT_FOR_TEST as _, } } diff --git a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs index 1604de034b6da..4dc1564ead2bd 100644 --- a/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/streaming_parallelism.rs @@ -177,12 +177,20 @@ async fn test_parallelism_exceed_virtual_node_max_create() -> Result<()> { #[tokio::test] async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { - let vnode_max = VirtualNode::COUNT_FOR_COMPAT; + const MAX_PARALLELISM: usize = 512; + let mut configuration = Configuration::for_scale(); configuration.compute_nodes = 1; - configuration.compute_node_cores = vnode_max + 100; + configuration.compute_node_cores = MAX_PARALLELISM + 100; let mut cluster = Cluster::start(configuration).await?; let mut session = cluster.start_session(); + + session + .run(format!( + "set streaming_max_parallelism = {}", + MAX_PARALLELISM + )) + .await?; session.run("set streaming_parallelism = 1").await?; session.run("create table t(v int)").await?; session @@ -190,13 +198,30 @@ async fn test_parallelism_exceed_virtual_node_max_alter_fixed() -> Result<()> { .await? .assert_result_eq("FIXED(1)"); - let result = session - .run(format!("alter table t set parallelism = {}", vnode_max + 1)) - .await; + { + // Set to the max parallelism, should be accepted. + session + .run(format!( + "alter table t set parallelism = {}", + MAX_PARALLELISM + )) + .await?; + session + .run("select parallelism from rw_streaming_parallelism where name = 't'") + .await? + .assert_result_eq(format!("FIXED({})", MAX_PARALLELISM)); + } - // This should be rejected. - // TODO(var-vnode): showing that it's rejected for different vnode counts. - assert!(result.is_err(), "{result:?}"); + { + // Exceed the max parallelism, should be rejected. + let result = session + .run(format!( + "alter table t set parallelism = {}", + MAX_PARALLELISM + 1 + )) + .await; + assert!(result.is_err(), "{result:?}"); + } Ok(()) }