Skip to content

Commit

Permalink
feat: support scaling down in recovery loop (#14825)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Jan 29, 2024
1 parent 5f7e557 commit e3e109a
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 25 deletions.
22 changes: 22 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,28 @@ profile:
- use: frontend
- use: compactor

ci-3cn-1fe-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
- use: minio
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
port: 5687
exporter-port: 1222
enable-tiered-cache: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-tiered-cache: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-tiered-cache: true
- use: frontend
- use: compactor

ci-1cn-1fe-kafka-with-recovery:
config-path: src/config/ci-recovery.toml
steps:
Expand Down
33 changes: 26 additions & 7 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::meta::PausedReason;
Expand All @@ -34,7 +35,7 @@ use risingwave_pb::stream_service::{
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tracing::{debug, warn, Instrument};
use tracing::{debug, info, warn, Instrument};
use uuid::Uuid;

use super::TracedEpoch;
Expand Down Expand Up @@ -603,18 +604,36 @@ impl GlobalBarrierManagerContext {
};
debug!("start scaling-in offline actors.");

let expired_workers: HashSet<WorkerId> = info
.actor_map
let prev_worker_parallel_units = mgr.fragment_manager.all_worker_parallel_units().await;

let curr_worker_parallel_units: HashMap<WorkerId, HashSet<ParallelUnitId>> = info
.node_map
.iter()
.filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker))
.map(|(&worker, _)| worker)
.map(|(worker_id, worker_node)| {
(
*worker_id,
worker_node
.parallel_units
.iter()
.map(|parallel_unit| parallel_unit.id)
.collect(),
)
})
.collect();

if expired_workers.is_empty() {
debug!("no expired workers, skipping.");
// todo: maybe we can only check the reduced workers
if curr_worker_parallel_units == prev_worker_parallel_units {
debug!("no changed workers, skipping.");
return Ok(false);
}

info!("parallel unit has changed, triggering a forced reschedule.");

debug!(
"previous worker parallel units {:?}, current worker parallel units {:?}",
prev_worker_parallel_units, curr_worker_parallel_units
);

let table_parallelisms = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

Expand Down
48 changes: 31 additions & 17 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl ClusterManager {
property: AddNodeProperty,
resource: risingwave_pb::common::worker_node::Resource,
) -> MetaResult<WorkerNode> {
let worker_node_parallelism = property.worker_node_parallelism as usize;
let new_worker_parallelism = property.worker_node_parallelism as usize;
let mut property = self.parse_property(r#type, property);
let mut core = self.core.write().await;

Expand All @@ -123,40 +123,54 @@ impl ClusterManager {
.unwrap_or_default();
}

let current_parallelism = worker.worker_node.parallel_units.len();
if current_parallelism == worker_node_parallelism
let old_worker_parallelism = worker.worker_node.parallel_units.len();
if old_worker_parallelism == new_worker_parallelism
&& worker.worker_node.property == property
{
worker.update_expire_at(self.max_heartbeat_interval);
return Ok(worker.to_protobuf());
}

let mut new_worker = worker.clone();
match current_parallelism.cmp(&worker_node_parallelism) {
match old_worker_parallelism.cmp(&new_worker_parallelism) {
Ordering::Less => {
tracing::info!(
"worker {} parallelism updated from {} to {}",
new_worker.worker_node.id,
current_parallelism,
worker_node_parallelism
old_worker_parallelism,
new_worker_parallelism
);
let parallel_units = self
.generate_cn_parallel_units(
worker_node_parallelism - current_parallelism,
new_worker_parallelism - old_worker_parallelism,
new_worker.worker_id(),
)
.await?;
new_worker.worker_node.parallel_units.extend(parallel_units);
}
Ordering::Greater => {
// Warn and keep the original parallelism if the worker registered with a
// smaller parallelism.
tracing::warn!(
"worker {} parallelism is less than current, current is {}, but received {}",
new_worker.worker_id(),
current_parallelism,
worker_node_parallelism
);
if self.env.opts.enable_scale_in_when_recovery {
// Handing over to the subsequent recovery loop for a forced reschedule.
tracing::info!(
"worker {} parallelism reduced from {} to {}",
new_worker.worker_node.id,
old_worker_parallelism,
new_worker_parallelism
);
new_worker
.worker_node
.parallel_units
.truncate(new_worker_parallelism)
} else {
// Warn and keep the original parallelism if the worker registered with a
// smaller parallelism, entering compatibility mode.
tracing::warn!(
"worker {} parallelism is less than current, current is {}, but received {}",
new_worker.worker_id(),
new_worker_parallelism,
old_worker_parallelism,
);
}
}
Ordering::Equal => {}
}
Expand Down Expand Up @@ -193,7 +207,7 @@ impl ClusterManager {

// Generate parallel units.
let parallel_units = if r#type == WorkerType::ComputeNode {
self.generate_cn_parallel_units(worker_node_parallelism, worker_id)
self.generate_cn_parallel_units(new_worker_parallelism, worker_id)
.await?
} else {
vec![]
Expand Down Expand Up @@ -550,7 +564,7 @@ impl ClusterManagerCore {
None => {
return Err(MetaError::unavailable(
"no available transactional id for worker",
))
));
}
Some(id) => id,
};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2420,7 +2420,7 @@ impl GlobalStreamManager {
.prepare_reschedule_command(reschedules, options, table_parallelism.as_mut())
.await?;

tracing::debug!("reschedule plan: {:#?}", reschedule_fragment);
tracing::debug!("reschedule plan: {:?}", reschedule_fragment);

let command = Command::RescheduleFragment {
reschedules: reschedule_fragment,
Expand Down

0 comments on commit e3e109a

Please sign in to comment.