diff --git a/risedev.yml b/risedev.yml index 7be1334deb4b4..38ed00e15fc63 100644 --- a/risedev.yml +++ b/risedev.yml @@ -809,6 +809,28 @@ profile: - use: frontend - use: compactor + ci-3cn-1fe-with-recovery: + config-path: src/config/ci-recovery.toml + steps: + - use: minio + - use: etcd + unsafe-no-fsync: true + - use: meta-node + - use: compute-node + port: 5687 + exporter-port: 1222 + enable-tiered-cache: true + - use: compute-node + port: 5688 + exporter-port: 1223 + enable-tiered-cache: true + - use: compute-node + port: 5689 + exporter-port: 1224 + enable-tiered-cache: true + - use: frontend + - use: compactor + ci-1cn-1fe-kafka-with-recovery: config-path: src/config/ci-recovery.toml steps: diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index e0ace5f9678a4..c9e38bcad7d69 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -22,6 +22,7 @@ use futures::stream::FuturesUnordered; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_common::hash::ParallelUnitId; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::PausedReason; @@ -34,7 +35,7 @@ use risingwave_pb::stream_service::{ use thiserror_ext::AsReport; use tokio::sync::oneshot; use tokio_retry::strategy::{jitter, ExponentialBackoff}; -use tracing::{debug, warn, Instrument}; +use tracing::{debug, info, warn, Instrument}; use uuid::Uuid; use super::TracedEpoch; @@ -603,18 +604,36 @@ impl GlobalBarrierManagerContext { }; debug!("start scaling-in offline actors."); - let expired_workers: HashSet = info - .actor_map + let prev_worker_parallel_units = mgr.fragment_manager.all_worker_parallel_units().await; + + let curr_worker_parallel_units: HashMap> = info + .node_map .iter() - .filter(|(&worker, actors)| !actors.is_empty() && !info.node_map.contains_key(&worker)) - .map(|(&worker, _)| worker) + .map(|(worker_id, worker_node)| { + ( + *worker_id, + worker_node + .parallel_units + .iter() + .map(|parallel_unit| parallel_unit.id) + .collect(), + ) + }) .collect(); - if expired_workers.is_empty() { - debug!("no expired workers, skipping."); + // todo: maybe we can only check the reduced workers + if curr_worker_parallel_units == prev_worker_parallel_units { + debug!("no changed workers, skipping."); return Ok(false); } + info!("parallel unit has changed, triggering a forced reschedule."); + + debug!( + "previous worker parallel units {:?}, current worker parallel units {:?}", + prev_worker_parallel_units, curr_worker_parallel_units + ); + let table_parallelisms = { let guard = mgr.fragment_manager.get_fragment_read_guard().await; diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 8640088f30193..27a97167d3e4b 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -107,7 +107,7 @@ impl ClusterManager { property: AddNodeProperty, resource: risingwave_pb::common::worker_node::Resource, ) -> MetaResult { - let worker_node_parallelism = property.worker_node_parallelism as usize; + let new_worker_parallelism = property.worker_node_parallelism as usize; let mut property = self.parse_property(r#type, property); let mut core = self.core.write().await; @@ -123,8 +123,8 @@ impl ClusterManager { .unwrap_or_default(); } - let current_parallelism = worker.worker_node.parallel_units.len(); - if current_parallelism == worker_node_parallelism + let old_worker_parallelism = worker.worker_node.parallel_units.len(); + if old_worker_parallelism == new_worker_parallelism && worker.worker_node.property == property { worker.update_expire_at(self.max_heartbeat_interval); @@ -132,31 +132,45 @@ impl ClusterManager { } let mut new_worker = worker.clone(); - match current_parallelism.cmp(&worker_node_parallelism) { + match old_worker_parallelism.cmp(&new_worker_parallelism) { Ordering::Less => { tracing::info!( "worker {} parallelism updated from {} to {}", new_worker.worker_node.id, - current_parallelism, - worker_node_parallelism + old_worker_parallelism, + new_worker_parallelism ); let parallel_units = self .generate_cn_parallel_units( - worker_node_parallelism - current_parallelism, + new_worker_parallelism - old_worker_parallelism, new_worker.worker_id(), ) .await?; new_worker.worker_node.parallel_units.extend(parallel_units); } Ordering::Greater => { - // Warn and keep the original parallelism if the worker registered with a - // smaller parallelism. - tracing::warn!( - "worker {} parallelism is less than current, current is {}, but received {}", - new_worker.worker_id(), - current_parallelism, - worker_node_parallelism - ); + if self.env.opts.enable_scale_in_when_recovery { + // Handing over to the subsequent recovery loop for a forced reschedule. + tracing::info!( + "worker {} parallelism reduced from {} to {}", + new_worker.worker_node.id, + old_worker_parallelism, + new_worker_parallelism + ); + new_worker + .worker_node + .parallel_units + .truncate(new_worker_parallelism) + } else { + // Warn and keep the original parallelism if the worker registered with a + // smaller parallelism, entering compatibility mode. + tracing::warn!( + "worker {} parallelism is less than current, current is {}, but received {}", + new_worker.worker_id(), + new_worker_parallelism, + old_worker_parallelism, + ); + } } Ordering::Equal => {} } @@ -193,7 +207,7 @@ impl ClusterManager { // Generate parallel units. let parallel_units = if r#type == WorkerType::ComputeNode { - self.generate_cn_parallel_units(worker_node_parallelism, worker_id) + self.generate_cn_parallel_units(new_worker_parallelism, worker_id) .await? } else { vec![] @@ -550,7 +564,7 @@ impl ClusterManagerCore { None => { return Err(MetaError::unavailable( "no available transactional id for worker", - )) + )); } Some(id) => id, }; diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 2674975488e45..e0a2fb390f42d 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2420,7 +2420,7 @@ impl GlobalStreamManager { .prepare_reschedule_command(reschedules, options, table_parallelism.as_mut()) .await?; - tracing::debug!("reschedule plan: {:#?}", reschedule_fragment); + tracing::debug!("reschedule plan: {:?}", reschedule_fragment); let command = Command::RescheduleFragment { reschedules: reschedule_fragment,