Skip to content

Commit

Permalink
refactor(meta): persist job-level max parallelism & check when `ALTER…
Browse files Browse the repository at this point in the history
… .. SET PARALLELISM` (#18740)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Sep 29, 2024
1 parent 1da1815 commit 5e20f2d
Show file tree
Hide file tree
Showing 20 changed files with 250 additions and 44 deletions.
70 changes: 70 additions & 0 deletions e2e_test/ddl/max_parallelism.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
statement ok
create view table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;

#### BEGIN


statement ok
set streaming_max_parallelism to 4;

# When the parallelism is specified to a value greater than the max parallelism, return an error.
statement ok
set streaming_parallelism to 6;

statement error specified parallelism 6 should not exceed max parallelism 4
create table t;

# When the parallelism is specified to an valid value, ok.
statement ok
set streaming_parallelism to 4;

statement ok
create table t;

query T
select parallelism from table_parallelism where name = 't';
----
FIXED(4)

statement ok
drop table t;

# When no parallelism is specified, ok, and the parallelism will be adaptive.

statement ok
set streaming_parallelism to default;

statement ok
create table t;

query T
select parallelism from table_parallelism where name = 't';
----
ADAPTIVE

# Alter parallelism to a valid value, ok.
statement ok
alter table t set parallelism to 4;

query T
select parallelism from table_parallelism where name = 't';
----
FIXED(4)

# Alter parallelism to an invalid value, return an error.
statement error specified parallelism 8 should not exceed max parallelism 4
alter table t set parallelism to 8;

statement ok
drop table t;

#### END

statement ok
set streaming_max_parallelism to default;

statement ok
set streaming_parallelism to default;

statement ok
drop view table_parallelism;
2 changes: 1 addition & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ message Table {
// Use `VnodeCountCompat::vnode_count` to access it.
//
// Please note that this field is not intended to describe the expected vnode count
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.expected_vnode_count`.
// for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`.
optional uint32 maybe_vnode_count = 40;

// Per-table catalog version, used by schema change. `None` for internal
Expand Down
15 changes: 15 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,22 @@ message TableFragments {
map<uint32, source.ConnectorSplits> actor_splits = 5;

stream_plan.StreamContext ctx = 6;

TableParallelism parallelism = 7;
// The max parallelism specified when the streaming job was created, i.e., expected vnode count.
//
// The reason for persisting this value is mainly to check if a parallelism change (via `ALTER
// .. SET PARALLELISM`) is valid, so that the behavior can be consistent with the creation of
// the streaming job.
//
// Note that the actual vnode count, denoted by `vnode_count` in `fragments`, may be different
// from this value (see `StreamFragmentGraph.max_parallelism` for more details.). As a result,
// checking the parallelism change with this value can be inaccurate in some cases. However,
// when generating resizing plans, we still take the `vnode_count` of each fragment into account.
//
// Can be unset if the fragment is created in older versions where variable vnode count is not
// supported, in which case a default value of 256 should be used.
optional uint32 max_parallelism = 10;

// Actors of a materialize view, sink, or table can only be scheduled on nodes with matching node_label.
string node_label = 8;
Expand Down
5 changes: 3 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1041,13 +1041,14 @@ message StreamFragmentGraph {
// If none, default parallelism will be applied.
Parallelism parallelism = 6;

// Expected vnode count for the graph.
// Specified max parallelism, i.e., expected vnode count for the graph.
//
// The scheduler on the meta service will use this as a hint to decide the vnode count
// for each fragment.
//
// Note that the actual vnode count may be different from this value.
// For example, a no-shuffle exchange between current fragment graph and an existing
// upstream fragment graph requires two fragments to be in the same distribution,
// thus the same vnode count.
uint32 expected_vnode_count = 7;
uint32 max_parallelism = 7;
}
1 change: 1 addition & 0 deletions src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
create_type: Set(CreateType::Foreground),
timezone: Set(table_fragment.timezone()),
parallelism: Set(streaming_parallelism),
max_parallelism: Set(table_fragment.max_parallelism as _),
})
.exec(&meta_store_sql.conn)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/stream_fragmenter/graph/fragment_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl StreamFragmentGraph {
dependent_table_ids: vec![],
table_ids_cnt: 0,
parallelism: None,
expected_vnode_count: 0,
max_parallelism: 0,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub fn build_graph(plan_node: PlanRef) -> SchedulerResult<StreamFragmentGraphPro
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
});
fragment_graph.expected_vnode_count = config.streaming_max_parallelism() as _;
fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
}

// Set timezone.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use sea_orm_migration::prelude::{Table as MigrationTable, *};

const VNODE_COUNT: i32 = 256;
macro_rules! col {
($name:expr) => {
ColumnDef::new($name).integer().not_null().default(256) // compat vnode count
};
}

#[derive(DeriveMigrationName)]
pub struct Migration;
Expand All @@ -12,12 +16,7 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(Table::Table)
.add_column(
ColumnDef::new(Table::VnodeCount)
.integer()
.not_null()
.default(VNODE_COUNT),
)
.add_column(col!(Table::VnodeCount))
.to_owned(),
)
.await?;
Expand All @@ -26,12 +25,16 @@ impl MigrationTrait for Migration {
.alter_table(
MigrationTable::alter()
.table(Fragment::Table)
.add_column(
ColumnDef::new(Fragment::VnodeCount)
.integer()
.not_null()
.default(VNODE_COUNT),
)
.add_column(col!(Fragment::VnodeCount))
.to_owned(),
)
.await?;

manager
.alter_table(
MigrationTable::alter()
.table(StreamingJob::Table)
.add_column(col!(StreamingJob::MaxParallelism))
.to_owned(),
)
.await
Expand All @@ -54,6 +57,15 @@ impl MigrationTrait for Migration {
.drop_column(Fragment::VnodeCount)
.to_owned(),
)
.await?;

manager
.alter_table(
MigrationTable::alter()
.table(StreamingJob::Table)
.drop_column(StreamingJob::MaxParallelism)
.to_owned(),
)
.await
}
}
Expand All @@ -69,3 +81,9 @@ enum Table {
Table,
VnodeCount,
}

#[derive(DeriveIden)]
enum StreamingJob {
Table,
MaxParallelism,
}
1 change: 1 addition & 0 deletions src/meta/model_v2/src/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Model {
pub create_type: CreateType,
pub timezone: Option<String>,
pub parallelism: StreamingParallelism,
pub max_parallelism: i32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
13 changes: 13 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ impl CatalogController {
HashMap<ActorId, Vec<actor_dispatcher::Model>>,
)>,
parallelism: StreamingParallelism,
max_parallelism: usize,
) -> MetaResult<PbTableFragments> {
let mut pb_fragments = HashMap::new();
let mut pb_actor_splits = HashMap::new();
Expand Down Expand Up @@ -347,6 +348,7 @@ impl CatalogController {
),
node_label: "".to_string(),
backfill_done: true,
max_parallelism: Some(max_parallelism as _),
};

Ok(table_fragments)
Expand Down Expand Up @@ -669,6 +671,7 @@ impl CatalogController {
job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
job_info.parallelism.clone(),
job_info.max_parallelism as _,
)
}

Expand All @@ -689,6 +692,15 @@ impl CatalogController {
Ok(job_states)
}

pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
let inner = self.inner.read().await;
let job = StreamingJob::find_by_id(job_id)
.one(&inner.db)
.await?
.ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
Ok(job.max_parallelism as usize)
}

/// Get all actor ids in the target streaming jobs.
pub async fn get_job_actor_mapping(
&self,
Expand Down Expand Up @@ -790,6 +802,7 @@ impl CatalogController {
job.timezone.map(|tz| PbStreamContext { timezone: tz }),
fragment_info,
job.parallelism.clone(),
job.max_parallelism as _,
)?,
);
}
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl CatalogController {
create_type: PbCreateType,
ctx: &StreamContext,
streaming_parallelism: StreamingParallelism,
max_parallelism: usize,
) -> MetaResult<ObjectId> {
let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
let job = streaming_job::ActiveModel {
Expand All @@ -91,6 +92,7 @@ impl CatalogController {
create_type: Set(create_type.into()),
timezone: Set(ctx.timezone.clone()),
parallelism: Set(streaming_parallelism),
max_parallelism: Set(max_parallelism as _),
};
job.insert(txn).await?;

Expand All @@ -102,6 +104,7 @@ impl CatalogController {
streaming_job: &mut StreamingJob,
ctx: &StreamContext,
parallelism: &Option<Parallelism>,
max_parallelism: usize,
) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
Expand Down Expand Up @@ -169,6 +172,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
table.id = job_id as _;
Expand Down Expand Up @@ -204,6 +208,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
sink.id = job_id as _;
Expand All @@ -220,6 +225,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
table.id = job_id as _;
Expand Down Expand Up @@ -255,6 +261,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
// to be compatible with old implementation.
Expand Down Expand Up @@ -285,6 +292,7 @@ impl CatalogController {
create_type,
ctx,
streaming_parallelism,
max_parallelism,
)
.await?;
src.id = job_id as _;
Expand Down Expand Up @@ -631,6 +639,7 @@ impl CatalogController {
ctx: &StreamContext,
version: &PbTableVersion,
specified_parallelism: &Option<NonZeroUsize>,
max_parallelism: usize,
) -> MetaResult<ObjectId> {
let id = streaming_job.id();
let inner = self.inner.write().await;
Expand Down Expand Up @@ -685,6 +694,7 @@ impl CatalogController {
PbCreateType::Foreground,
ctx,
parallelism,
max_parallelism,
)
.await?;

Expand Down
20 changes: 19 additions & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::pin::pin;
use std::time::Duration;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use futures::future::{select, Either};
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_meta_model_v2::{ObjectId, SourceId};
Expand Down Expand Up @@ -892,6 +892,24 @@ impl MetadataManager {
}
}

pub async fn get_job_max_parallelism(&self, table_id: TableId) -> MetaResult<usize> {
match self {
MetadataManager::V1(mgr) => {
let fragments = mgr.fragment_manager.get_fragment_read_guard().await;
Ok(fragments
.table_fragments()
.get(&table_id)
.map(|tf| tf.max_parallelism)
.with_context(|| format!("job {table_id} not found"))?)
}
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.get_max_parallelism_by_id(table_id.table_id as _)
.await
}
}
}

pub fn cluster_id(&self) -> &ClusterId {
match self {
MetadataManager::V1(mgr) => mgr.cluster_manager.cluster_id(),
Expand Down
Loading

0 comments on commit 5e20f2d

Please sign in to comment.