Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce cluster limit #18383

Merged
merged 15 commits into from
Sep 6, 2024
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ user backfill_rate_limit
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
user bypass_cluster_limits
user bytea_output
user cdc_source_wait_streaming_start_timeout
user client_encoding
Expand Down
28 changes: 28 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -789,3 +789,31 @@ message RelationIdInfos {
// relation_id -> FragmentIdToActorIdMap
map<uint32, FragmentIdToActorIdMap> map = 1;
}

message WorkerActorCount {
uint64 actor_count = 1;
uint64 parallelism = 2;
}
hzxa21 marked this conversation as resolved.
Show resolved Hide resolved

message ActorCountPerParallelism {
map<uint32, WorkerActorCount> worker_id_to_actor_count = 1;
uint64 hard_limit = 2;
uint64 soft_limit = 3;
}

message ClusterLimit {
oneof limit {
ActorCountPerParallelism actor_count = 1;
// TODO: limit DDL using compaction pending bytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure they are either-or relation?

}
}

message GetClusterLimitsRequest {}

message GetClusterLimitsResponse {
repeated ClusterLimit active_limits = 1;
}

service ClusterLimitService {
rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse);
}
22 changes: 22 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,20 @@ pub struct MetaDeveloperConfig {

#[serde(default = "default::developer::max_get_task_probe_times")]
pub max_get_task_probe_times: usize,

/// Max number of actor allowed per parallelism (default = 100).
/// CREATE MV/Table will be noticed when the number of actors exceeds this limit.
/// This limit is effective only for paid tier clusters.
/// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_SOFT_LIMIT`
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_soft_limit")]
pub actor_cnt_per_worker_parallelism_soft_limit: usize,

/// Max number of actor allowed per parallelism (default = 400).
/// CREATE MV/Table will be rejected when the number of actors exceeds this limit.
/// This limit is effective only for paid tier clusters.
/// Free tier cluster's limit will be hardcoded to `FREE_TIER_ACTOR_CNT_HARD_LIMIT`
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")]
pub actor_cnt_per_worker_parallelism_hard_limit: usize,
}

/// The section `[server]` in `risingwave.toml`.
Expand Down Expand Up @@ -1859,6 +1873,14 @@ pub mod default {
5
}

pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
100
}

pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
400
}

pub fn memory_controller_threshold_aggressive() -> f64 {
0.9
}
Expand Down
21 changes: 21 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ use chrono_tz::Tz;
pub use over_window::OverWindowCachePolicy;
pub use query_mode::QueryMode;
use risingwave_common_proc_macro::{ConfigDoc, SessionConfig};
use risingwave_license::LicenseManager;
pub use search_path::{SearchPath, USER_NAME_WILD_CARD};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;
use thiserror_ext::AsReport;

use self::non_zero64::ConfigNonZeroU64;
use crate::session_config::sink_decouple::SinkDecouple;
Expand Down Expand Up @@ -292,6 +294,13 @@ pub struct SessionConfig {

#[parameter(default = "hex", check_hook = check_bytea_output)]
bytea_output: String,

/// Bypass checks on cluster limits
///
/// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
/// This session variable is only mutable in paid tier.
#[parameter(default = false, check_hook = check_bypass_cluster_limits)]
bypass_cluster_limits: bool,
}

fn check_timezone(val: &str) -> Result<(), String> {
Expand All @@ -318,6 +327,18 @@ fn check_bytea_output(val: &str) -> Result<(), String> {
}
}

fn check_bypass_cluster_limits(_val: &bool) -> Result<(), String> {
match LicenseManager::get()
.tier()
.map_err(|e| e.to_report_string())?
{
risingwave_license::Tier::Free => {
Err("Bypassing cluster limits is only allowed in paid tier".to_string())
}
risingwave_license::Tier::Paid => Ok(()),
}
hzxa21 marked this conversation as resolved.
Show resolved Hide resolved
}

impl SessionConfig {
pub fn set_force_two_phase_agg(
&mut self,
Expand Down
137 changes: 137 additions & 0 deletions src/common/src/util/cluster_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};

use risingwave_pb::meta::cluster_limit::PbLimit;
use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit, PbWorkerActorCount};

pub const FREE_TIER_ACTOR_CNT_SOFT_LIMIT: usize = 25;
pub const FREE_TIER_ACTOR_CNT_HARD_LIMIT: usize = 100;

pub enum ClusterLimit {
ActorCount(ActorCountPerParallelism),
}

impl From<ClusterLimit> for PbClusterLimit {
fn from(limit: ClusterLimit) -> Self {
match limit {
ClusterLimit::ActorCount(actor_count_per_parallelism) => PbClusterLimit {
limit: Some(PbLimit::ActorCount(actor_count_per_parallelism.into())),
},
}
}
}

impl From<PbClusterLimit> for ClusterLimit {
fn from(pb_limit: PbClusterLimit) -> Self {
match pb_limit.limit.unwrap() {
PbLimit::ActorCount(actor_count_per_parallelism) => {
ClusterLimit::ActorCount(actor_count_per_parallelism.into())
}
}
}
}

#[derive(Debug)]
pub struct WorkerActorCount {
pub actor_count: usize,
pub parallelism: usize,
}

impl From<WorkerActorCount> for PbWorkerActorCount {
fn from(worker_actor_count: WorkerActorCount) -> Self {
PbWorkerActorCount {
actor_count: worker_actor_count.actor_count as u64,
parallelism: worker_actor_count.parallelism as u64,
}
}
}

impl From<PbWorkerActorCount> for WorkerActorCount {
fn from(pb_worker_actor_count: PbWorkerActorCount) -> Self {
WorkerActorCount {
actor_count: pb_worker_actor_count.actor_count as usize,
parallelism: pb_worker_actor_count.parallelism as usize,
}
}
}

pub struct ActorCountPerParallelism {
pub worker_id_to_actor_count: HashMap<u32, WorkerActorCount>,
pub hard_limit: usize,
pub soft_limit: usize,
}

impl From<ActorCountPerParallelism> for PbActorCountPerParallelism {
fn from(actor_count_per_parallelism: ActorCountPerParallelism) -> Self {
PbActorCountPerParallelism {
worker_id_to_actor_count: actor_count_per_parallelism
.worker_id_to_actor_count
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
hard_limit: actor_count_per_parallelism.hard_limit as u64,
soft_limit: actor_count_per_parallelism.soft_limit as u64,
}
}
}

impl From<PbActorCountPerParallelism> for ActorCountPerParallelism {
fn from(pb_actor_count_per_parallelism: PbActorCountPerParallelism) -> Self {
ActorCountPerParallelism {
worker_id_to_actor_count: pb_actor_count_per_parallelism
.worker_id_to_actor_count
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
hard_limit: pb_actor_count_per_parallelism.hard_limit as usize,
soft_limit: pb_actor_count_per_parallelism.soft_limit as usize,
}
}
}

impl ActorCountPerParallelism {
pub fn exceed_hard_limit(&self) -> bool {
self.worker_id_to_actor_count
.values()
.any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism))
}

pub fn exceed_soft_limit(&self) -> bool {
self.worker_id_to_actor_count
.values()
.any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism))
}

pub fn exceed_limit(&self) -> bool {
self.exceed_soft_limit() || self.exceed_hard_limit()
}
}

impl Display for ActorCountPerParallelism {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let worker_id_to_actor_count_str: Vec<_> = self
.worker_id_to_actor_count
.iter()
.map(|(k, v)| format!("{} -> {:?}", k, v))
.collect();
write!(
f,
"ActorCountPerParallelism {{ critical limit: {:?}, recommended limit: {:?}. worker_id_to_actor_count: {:?} }}",
self.hard_limit, self.soft_limit, worker_id_to_actor_count_str
)
}
}
1 change: 1 addition & 0 deletions src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ pub mod tracing;
pub mod value_encoding;
pub mod worker_util;
pub use tokio_util;
pub mod cluster_limit;
2 changes: 2 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ meta_enable_trivial_move = true
meta_enable_check_task_level_overlap = false
meta_max_trivial_move_task_count_per_loop = 256
meta_max_get_task_probe_times = 5
meta_actor_cnt_per_worker_parallelism_soft_limit = 100
meta_actor_cnt_per_worker_parallelism_hard_limit = 400

[batch]
enable_barrier_read = false
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ mod rw_worker_nodes;

mod rw_actor_id_to_ddl;
mod rw_fragment_id_to_ddl;
mod rw_worker_actor_count;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

#[system_catalog(
view,
"rw_catalog.rw_worker_actor_count",
"SELECT t2.id as worker_id, parallelism, count(*) as actor_count
FROM rw_actors t1, rw_worker_nodes t2
where t1.worker_id = t2.id
GROUP BY t2.id, t2.parallelism;"
)]
#[derive(Fields)]
struct RwWorkerActorCount {
worker_id: i32,
parallelism: i32,
actor_count: i64,
}
3 changes: 3 additions & 0 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ pub async fn handle_create_mv_bound(
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

// Check cluster limits
session.check_cluster_limits().await?;

if let Either::Right(resp) = session.check_relation_name_duplicated(
name.clone(),
StatementType::CREATE_MATERIALIZED_VIEW,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ pub async fn handle_create_sink(
) -> Result<RwPgResponse> {
let session = handle_args.session.clone();

session.check_cluster_limits().await?;

if let Either::Right(resp) = session.check_relation_name_duplicated(
stmt.sink_name.clone(),
StatementType::CREATE_SINK,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,8 @@ pub async fn handle_create_table(
session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
}

session.check_cluster_limits().await?;

if let Either::Right(resp) = session.check_relation_name_duplicated(
table_name.clone(),
StatementType::CREATE_TABLE,
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use anyhow::Context;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::util::cluster_limit::ClusterLimit;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
Expand Down Expand Up @@ -136,6 +137,8 @@ pub trait FrontendMetaClient: Send + Sync {
) -> Result<Vec<u64>>;

async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;

async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -345,4 +348,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
self.0.get_cluster_recovery_status().await
}

async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
self.0.get_cluster_limits().await
}
}
Loading
Loading