Skip to content

Commit

Permalink
feat: execute auto-scaling in batches (#15420)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Mar 8, 2024
1 parent 353efe2 commit e186c4a
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 61 deletions.
24 changes: 24 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ pub struct MetaConfig {
#[serde(default)]
pub disable_automatic_parallelism_control: bool,

/// The number of streaming jobs per scaling operation.
#[serde(default = "default::meta::parallelism_control_batch_size")]
pub parallelism_control_batch_size: usize,

/// The period of parallelism control trigger.
#[serde(default = "default::meta::parallelism_control_trigger_period_sec")]
pub parallelism_control_trigger_period_sec: u64,

/// The first delay of parallelism control.
#[serde(default = "default::meta::parallelism_control_trigger_first_delay_sec")]
pub parallelism_control_trigger_first_delay_sec: u64,

#[serde(default = "default::meta::meta_leader_lease_secs")]
pub meta_leader_lease_secs: u64,

Expand Down Expand Up @@ -1121,6 +1133,18 @@ pub mod default {
pub fn event_log_channel_max_size() -> u32 {
10
}

pub fn parallelism_control_batch_size() -> usize {
10
}

pub fn parallelism_control_trigger_period_sec() -> u64 {
10
}

pub fn parallelism_control_trigger_first_delay_sec() -> u64 {
30
}
}

pub mod server {
Expand Down
3 changes: 3 additions & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ This page is automatically generated by `./risedev generate-example-config`
| min_table_split_write_throughput | If the size of one table is smaller than `min_table_split_write_throughput`, we would not split it to an single group. | 4194304 |
| move_table_size_limit | | 10737418240 |
| node_num_monitor_interval_sec | | 10 |
| parallelism_control_batch_size | The number of streaming jobs per scaling operation. | 10 |
| parallelism_control_trigger_first_delay_sec | The first delay of parallelism control. | 30 |
| parallelism_control_trigger_period_sec | The period of parallelism control trigger. | 10 |
| partition_vnode_count | | 16 |
| periodic_compaction_interval_sec | Schedule compaction for all compaction groups with this interval. | 60 |
| periodic_space_reclaim_compaction_interval_sec | Schedule space_reclaim compaction for all compaction groups with this interval. | 3600 |
Expand Down
3 changes: 3 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ min_delta_log_num_for_hummock_version_checkpoint = 10
max_heartbeat_interval_secs = 300
disable_recovery = false
disable_automatic_parallelism_control = false
parallelism_control_batch_size = 10
parallelism_control_trigger_period_sec = 10
parallelism_control_trigger_first_delay_sec = 30
meta_leader_lease_secs = 30
default_parallelism = "Full"
enable_compaction_deterministic = false
Expand Down
7 changes: 7 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
disable_automatic_parallelism_control: config
.meta
.disable_automatic_parallelism_control,
parallelism_control_batch_size: config.meta.parallelism_control_batch_size,
parallelism_control_trigger_period_sec: config
.meta
.parallelism_control_trigger_period_sec,
parallelism_control_trigger_first_delay_sec: config
.meta
.parallelism_control_trigger_first_delay_sec,
in_flight_barrier_nums,
max_idle_ms,
compaction_deterministic_test: config.meta.enable_compaction_deterministic,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ impl CommandContext {
actor_splits,
actor_new_dispatchers,
});
tracing::debug!("update mutation: {mutation:#?}");
tracing::debug!("update mutation: {mutation:?}");
Some(mutation)
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ impl GlobalBarrierManagerContext {
return Err(e);
}

debug!("scaling-in actors succeed.");
debug!("scaling actors succeed.");
Ok(())
}

Expand Down Expand Up @@ -854,7 +854,7 @@ impl GlobalBarrierManagerContext {
return Err(e);
}

debug!("scaling-in actors succeed.");
debug!("scaling actors succeed.");
Ok(())
}

Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ pub struct MetaOpts {
pub enable_recovery: bool,
/// Whether to disable the auto-scaling feature.
pub disable_automatic_parallelism_control: bool,
/// The number of streaming jobs per scaling operation.
pub parallelism_control_batch_size: usize,
/// The period of parallelism control trigger.
pub parallelism_control_trigger_period_sec: u64,
/// The first delay of parallelism control.
pub parallelism_control_trigger_first_delay_sec: u64,
/// The maximum number of barriers in-flight in the compute nodes.
pub in_flight_barrier_nums: usize,
/// After specified seconds of idle (no mview or flush), the process will be exited.
Expand Down Expand Up @@ -221,6 +227,9 @@ impl MetaOpts {
Self {
enable_recovery,
disable_automatic_parallelism_control: false,
parallelism_control_batch_size: 1,
parallelism_control_trigger_period_sec: 10,
parallelism_control_trigger_first_delay_sec: 30,
in_flight_barrier_nums: 40,
max_idle_ms: 0,
compaction_deterministic_test: false,
Expand Down
166 changes: 108 additions & 58 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::{ActorMapping, ParallelUnitId, VirtualNode};
use risingwave_common::util::iter_util::ZipEqDebug;
use risingwave_meta_model_v2::StreamingParallelism;
use risingwave_pb::common::{ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode};
use risingwave_pb::common::{
ActorInfo, Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode, WorkerType,
};
use risingwave_pb::meta::get_reschedule_plan_request::{Policy, StableResizePolicy};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
Expand All @@ -48,7 +50,7 @@ use thiserror_ext::AsReport;
use tokio::sync::oneshot::Receiver;
use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;
use tokio::time::{Instant, MissedTickBehavior};

use crate::barrier::{Command, Reschedule, StreamRpcManager};
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv, MetadataManager, WorkerId};
Expand Down Expand Up @@ -2676,12 +2678,14 @@ impl GlobalStreamManager {
Ok(())
}

async fn trigger_parallelism_control(&self) -> MetaResult<()> {
async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
tracing::info!("trigger parallelism control");

let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

match &self.metadata_manager {
let (schedulable_worker_ids, table_parallelisms) = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let table_parallelisms = {
let table_parallelisms: HashMap<u32, TableParallelism> = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

guard
Expand All @@ -2697,7 +2701,7 @@ impl GlobalStreamManager {
.list_active_streaming_compute_nodes()
.await;

let schedulable_worker_ids = workers
let schedulable_worker_ids: BTreeSet<_> = workers
.iter()
.filter(|worker| {
!worker
Expand All @@ -2709,26 +2713,7 @@ impl GlobalStreamManager {
.map(|worker| worker.id)
.collect();

let reschedules = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms,
})
.await?;

if reschedules.is_empty() {
return Ok(());
}

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
)
.await?;
(schedulable_worker_ids, table_parallelisms)
}
MetadataManager::V2(mgr) => {
let table_parallelisms: HashMap<_, _> = {
Expand Down Expand Up @@ -2768,33 +2753,90 @@ impl GlobalStreamManager {
.map(|worker| worker.id)
.collect();

let reschedules = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids,
table_parallelisms: table_parallelisms.clone(),
})
.await?;
(schedulable_worker_ids, table_parallelisms)
}
};

if reschedules.is_empty() {
return Ok(());
}
if table_parallelisms.is_empty() {
tracing::info!("no streaming jobs for scaling, maybe an empty cluster");
return Ok(false);
}

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: true,
},
None,
)
let batch_size = match self.env.opts.parallelism_control_batch_size {
0 => table_parallelisms.len(),
n => n,
};

tracing::info!(
"total {} streaming jobs, batch size {}, schedulable worker ids: {:?}",
table_parallelisms.len(),
batch_size,
schedulable_worker_ids
);

let batches: Vec<_> = table_parallelisms
.into_iter()
.chunks(batch_size)
.into_iter()
.map(|chunk| chunk.collect_vec())
.collect();

let mut reschedules = None;

for batch in batches {
let parallelisms: HashMap<_, _> = batch.into_iter().collect();

let plan = self
.scale_controller
.generate_table_resize_plan(TableResizePolicy {
worker_ids: schedulable_worker_ids.clone(),
table_parallelisms: parallelisms.clone(),
})
.await?;

if !plan.is_empty() {
tracing::info!(
"reschedule plan generated for streaming jobs {:?}",
parallelisms
);
reschedules = Some(plan);
break;
}
}

Ok(())
let Some(reschedules) = reschedules else {
tracing::info!("no reschedule plan generated");
return Ok(false);
};

self.reschedule_actors(
reschedules,
RescheduleOptions {
resolve_no_shuffle_upstream: false,
},
None,
)
.await?;

Ok(true)
}

async fn run(&self, mut shutdown_rx: Receiver<()>) {
tracing::info!("starting automatic parallelism control monitor");

let check_period =
Duration::from_secs(self.env.opts.parallelism_control_trigger_period_sec);

let mut ticker = tokio::time::interval_at(
Instant::now()
+ Duration::from_secs(self.env.opts.parallelism_control_trigger_first_delay_sec),
check_period,
);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);

// waiting for first tick
ticker.tick().await;

let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();

Expand All @@ -2803,11 +2845,6 @@ impl GlobalStreamManager {
.insert_local_sender(local_notification_tx)
.await;

let check_period = Duration::from_secs(10);
let mut ticker = tokio::time::interval(check_period);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
ticker.reset();

let worker_nodes = self
.metadata_manager
.list_active_streaming_compute_nodes()
Expand All @@ -2819,7 +2856,7 @@ impl GlobalStreamManager {
.map(|worker| (worker.id, worker))
.collect();

let mut changed = true;
let mut should_trigger = false;

loop {
tokio::select! {
Expand All @@ -2830,18 +2867,18 @@ impl GlobalStreamManager {
break;
}

_ = ticker.tick(), if changed => {
_ = ticker.tick(), if should_trigger => {
let include_workers = worker_cache.keys().copied().collect_vec();

if include_workers.is_empty() {
tracing::debug!("no available worker nodes");
changed = false;
should_trigger = false;
continue;
}

match self.trigger_parallelism_control().await {
Ok(_) => {
changed = false;
Ok(cont) => {
should_trigger = cont;
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "Failed to trigger scale out, waiting for next tick to retry after {}s", ticker.period().as_secs());
Expand All @@ -2855,13 +2892,26 @@ impl GlobalStreamManager {

match notification {
LocalNotification::WorkerNodeActivated(worker) => {
match (worker.get_type(), worker.property.as_ref()) {
(Ok(WorkerType::ComputeNode), Some(prop)) if prop.is_streaming => {
tracing::info!("worker {} activated notification received", worker.id);
}
_ => continue
}

let prev_worker = worker_cache.insert(worker.id, worker.clone());

if let Some(prev_worker) = prev_worker && prev_worker.parallel_units != worker.parallel_units {
tracing::info!(worker = worker.id, "worker parallelism changed");
match prev_worker {
Some(prev_worker) if prev_worker.parallel_units != worker.parallel_units => {
tracing::info!(worker = worker.id, "worker parallelism changed");
should_trigger = true;
}
None => {
tracing::info!(worker = worker.id, "new worker joined");
should_trigger = true;
}
_ => {}
}

changed = true;
}

// Since our logic for handling passive scale-in is within the barrier manager,
Expand Down
3 changes: 3 additions & 0 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ impl Configuration {
r#"[meta]
max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
parallelism_control_trigger_first_delay_sec = 0
parallelism_control_batch_size = 0
parallelism_control_trigger_period_sec = 10
[system]
barrier_interval_ms = 250
Expand Down

0 comments on commit e186c4a

Please sign in to comment.