From 71d6ee44fa23b2328211b187bd34183fe1999c2b Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Thu, 28 Mar 2024 15:27:47 +0800 Subject: [PATCH] fix(meta): wait for new worker join in migrate workers (#15967) --- src/meta/src/barrier/info.rs | 12 +-- src/meta/src/barrier/mod.rs | 8 +- src/meta/src/barrier/recovery.rs | 136 ++++++++++++++++++++----------- src/meta/src/manager/metadata.rs | 19 +++++ 4 files changed, 113 insertions(+), 62 deletions(-) diff --git a/src/meta/src/barrier/info.rs b/src/meta/src/barrier/info.rs index 742dcaeac213c..2c887eae02faa 100644 --- a/src/meta/src/barrier/info.rs +++ b/src/meta/src/barrier/info.rs @@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet}; use risingwave_pb::common::PbWorkerNode; use tracing::warn; -use crate::manager::{ActorInfos, WorkerId}; +use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, WorkerId}; use crate::model::ActorId; #[derive(Debug, Clone)] @@ -52,14 +52,8 @@ pub struct InflightActorInfo { impl InflightActorInfo { /// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors. - pub fn resolve( - all_nodes: impl IntoIterator, - actor_infos: ActorInfos, - ) -> Self { - let node_map = all_nodes - .into_iter() - .map(|node| (node.id, node)) - .collect::>(); + pub fn resolve(active_nodes: &ActiveStreamingWorkerNodes, actor_infos: ActorInfos) -> Self { + let node_map = active_nodes.current().clone(); let actor_map = actor_infos .actor_maps diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index e60f268b1ec6c..9b4cf35cee0a7 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -35,7 +35,6 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId}; use risingwave_pb::catalog::table::TableType; -use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; @@ -600,6 +599,7 @@ impl GlobalBarrierManager { changed_worker = self.active_streaming_nodes.changed() => { #[cfg(debug_assertions)] { + use risingwave_pb::common::WorkerNode; match self .context .metadata_manager @@ -1046,18 +1046,18 @@ impl GlobalBarrierManagerContext { /// will create or drop before this barrier flow through them. async fn resolve_actor_info( &self, - all_nodes: Vec, + active_nodes: &ActiveStreamingWorkerNodes, ) -> MetaResult { let info = match &self.metadata_manager { MetadataManager::V1(mgr) => { let all_actor_infos = mgr.fragment_manager.load_all_actors().await; - InflightActorInfo::resolve(all_nodes, all_actor_infos) + InflightActorInfo::resolve(active_nodes, all_actor_infos) } MetadataManager::V2(mgr) => { let all_actor_infos = mgr.catalog_controller.load_all_actors().await?; - InflightActorInfo::resolve(all_nodes, all_actor_infos) + InflightActorInfo::resolve(active_nodes, all_actor_infos) } }; diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 3fb1fd0b77f10..32b3700755674 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -22,7 +22,7 @@ use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::config::DefaultParallelism; use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::{ActorInfo, WorkerNode}; +use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; @@ -398,17 +398,11 @@ impl GlobalBarrierManager { .pre_apply_drop_cancel(&self.scheduled_barriers) .await?; - let active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot( + let mut active_streaming_nodes = ActiveStreamingWorkerNodes::new_snapshot( self.context.metadata_manager.clone(), ) .await?; - let all_nodes = active_streaming_nodes - .current() - .values() - .cloned() - .collect_vec(); - let background_streaming_jobs = self .context .metadata_manager @@ -422,14 +416,14 @@ impl GlobalBarrierManager { && background_streaming_jobs.is_empty() { self.context - .scale_actors(all_nodes.clone()) + .scale_actors(&active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "scale actors failed"); })?; self.context - .resolve_actor_info(all_nodes.clone()) + .resolve_actor_info(&active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); @@ -437,7 +431,7 @@ impl GlobalBarrierManager { } else { // Migrate actors in expired CN to newly joined one. self.context - .migrate_actors(all_nodes.clone()) + .migrate_actors(&mut active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "migrate actors failed"); @@ -463,7 +457,7 @@ impl GlobalBarrierManager { { info = self .context - .resolve_actor_info(all_nodes.clone()) + .resolve_actor_info(&active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); @@ -554,14 +548,20 @@ impl GlobalBarrierManager { impl GlobalBarrierManagerContext { /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors(&self, all_nodes: Vec) -> MetaResult { + async fn migrate_actors( + &self, + active_nodes: &mut ActiveStreamingWorkerNodes, + ) -> MetaResult { match &self.metadata_manager { - MetadataManager::V1(_) => self.migrate_actors_v1(all_nodes).await, - MetadataManager::V2(_) => self.migrate_actors_v2(all_nodes).await, + MetadataManager::V1(_) => self.migrate_actors_v1(active_nodes).await, + MetadataManager::V2(_) => self.migrate_actors_v2(active_nodes).await, } } - async fn migrate_actors_v2(&self, all_nodes: Vec) -> MetaResult { + async fn migrate_actors_v2( + &self, + active_nodes: &mut ActiveStreamingWorkerNodes, + ) -> MetaResult { let mgr = self.metadata_manager.as_v2_ref(); let all_inuse_parallel_units: HashSet<_> = mgr @@ -571,8 +571,9 @@ impl GlobalBarrierManagerContext { .into_iter() .collect(); - let active_parallel_units: HashSet<_> = all_nodes - .iter() + let active_parallel_units: HashSet<_> = active_nodes + .current() + .values() .flat_map(|node| node.parallel_units.iter().map(|pu| pu.id as i32)) .collect(); @@ -582,7 +583,7 @@ impl GlobalBarrierManagerContext { .collect(); if expired_parallel_units.is_empty() { debug!("no expired parallel units, skipping."); - return self.resolve_actor_info(all_nodes.clone()).await; + return self.resolve_actor_info(active_nodes).await; } debug!("start migrate actors."); @@ -599,8 +600,9 @@ impl GlobalBarrierManagerContext { let start = Instant::now(); let mut plan = HashMap::new(); 'discovery: while !to_migrate_parallel_units.is_empty() { - let new_parallel_units = all_nodes - .iter() + let new_parallel_units = active_nodes + .current() + .values() .flat_map(|node| { node.parallel_units .iter() @@ -623,26 +625,44 @@ impl GlobalBarrierManagerContext { } } } - warn!( - "waiting for new workers to join, elapsed: {}s", - start.elapsed().as_secs() - ); + + if to_migrate_parallel_units.is_empty() { + break; + } + // wait to get newly joined CN - tokio::time::sleep(Duration::from_millis(100)).await; + let changed = active_nodes + .wait_changed(Duration::from_millis(5000), |active_nodes| { + let current_nodes = active_nodes + .current() + .values() + .map(|node| (node.id, &node.host, &node.parallel_units)) + .collect_vec(); + warn!( + current_nodes = ?current_nodes, + "waiting for new workers to join, elapsed: {}s", + start.elapsed().as_secs() + ); + }) + .await; + warn!(?changed, "get worker changed. Retry migrate"); } mgr.catalog_controller.migrate_actors(plan).await?; debug!("migrate actors succeed."); - self.resolve_actor_info(all_nodes).await + self.resolve_actor_info(active_nodes).await } /// Migrate actors in expired CNs to newly joined ones, return true if any actor is migrated. - async fn migrate_actors_v1(&self, all_nodes: Vec) -> MetaResult { + async fn migrate_actors_v1( + &self, + active_nodes: &mut ActiveStreamingWorkerNodes, + ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let info = self.resolve_actor_info(all_nodes.clone()).await?; + let info = self.resolve_actor_info(active_nodes).await?; // 1. get expired workers. let expired_workers: HashSet = info @@ -658,7 +678,7 @@ impl GlobalBarrierManagerContext { debug!("start migrate actors."); let migration_plan = self - .generate_migration_plan(expired_workers, &all_nodes) + .generate_migration_plan(expired_workers, active_nodes) .await?; // 2. start to migrate fragment one-by-one. mgr.fragment_manager @@ -668,20 +688,20 @@ impl GlobalBarrierManagerContext { migration_plan.delete(self.env.meta_store_checked()).await?; debug!("migrate actors succeed."); - self.resolve_actor_info(all_nodes).await + self.resolve_actor_info(active_nodes).await } - async fn scale_actors(&self, all_nodes: Vec) -> MetaResult<()> { + async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { let Ok(_guard) = self.scale_controller.reschedule_lock.try_write() else { return Err(anyhow!("scale_actors failed to acquire reschedule_lock").into()); }; match &self.metadata_manager { - MetadataManager::V1(_) => self.scale_actors_v1(all_nodes).await, - MetadataManager::V2(_) => self.scale_actors_v2(all_nodes).await, + MetadataManager::V1(_) => self.scale_actors_v1(active_nodes).await, + MetadataManager::V2(_) => self.scale_actors_v2(active_nodes).await, } } - async fn scale_actors_v2(&self, workers: Vec) -> MetaResult<()> { + async fn scale_actors_v2(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { let mgr = self.metadata_manager.as_v2_ref(); debug!("start resetting actors distribution"); @@ -705,8 +725,9 @@ impl GlobalBarrierManagerContext { .collect() }; - let schedulable_worker_ids = workers - .iter() + let schedulable_worker_ids = active_nodes + .current() + .values() .filter(|worker| { !worker .property @@ -813,8 +834,8 @@ impl GlobalBarrierManagerContext { } } - async fn scale_actors_v1(&self, workers: Vec) -> MetaResult<()> { - let info = self.resolve_actor_info(workers.clone()).await?; + async fn scale_actors_v1(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> { + let info = self.resolve_actor_info(active_nodes).await?; let mgr = self.metadata_manager.as_v1_ref(); debug!("start resetting actors distribution"); @@ -859,8 +880,9 @@ impl GlobalBarrierManagerContext { .collect() }; - let schedulable_worker_ids: BTreeSet<_> = workers - .iter() + let schedulable_worker_ids: BTreeSet<_> = active_nodes + .current() + .values() .filter(|worker| { !worker .property @@ -935,7 +957,7 @@ impl GlobalBarrierManagerContext { async fn generate_migration_plan( &self, expired_workers: HashSet, - all_nodes: &Vec, + active_nodes: &mut ActiveStreamingWorkerNodes, ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); @@ -985,8 +1007,9 @@ impl GlobalBarrierManagerContext { let start = Instant::now(); // if in-used expire parallel units are not empty, should wait for newly joined worker. 'discovery: while !to_migrate_parallel_units.is_empty() { - let mut new_parallel_units = all_nodes - .iter() + let mut new_parallel_units = active_nodes + .current() + .values() .flat_map(|worker| worker.parallel_units.iter().cloned()) .collect_vec(); new_parallel_units.retain(|pu| !inuse_parallel_units.contains(&pu.id)); @@ -1008,12 +1031,27 @@ impl GlobalBarrierManagerContext { } } } - warn!( - "waiting for new workers to join, elapsed: {}s", - start.elapsed().as_secs() - ); + + if to_migrate_parallel_units.is_empty() { + break; + } + // wait to get newly joined CN - tokio::time::sleep(Duration::from_millis(100)).await; + let changed = active_nodes + .wait_changed(Duration::from_millis(5000), |active_nodes| { + let current_nodes = active_nodes + .current() + .values() + .map(|node| (node.id, &node.host, &node.parallel_units)) + .collect_vec(); + warn!( + current_nodes = ?current_nodes, + "waiting for new workers to join, elapsed: {}s", + start.elapsed().as_secs() + ); + }) + .await; + warn!(?changed, "get worker changed. Retry migrate"); } // update migration plan, if there is a chain in the plan, update it. diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index cb58eef48cd60..bd2379b90c94f 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -13,7 +13,10 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; +use std::pin::pin; +use std::time::Duration; +use futures::future::{select, Either}; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::SourceId; use risingwave_pb::catalog::{PbSource, PbTable}; @@ -23,6 +26,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, PbFragment}; use risingwave_pb::stream_plan::{PbDispatchStrategy, PbStreamActor, StreamActor}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tokio::time::sleep; use tracing::warn; use crate::barrier::Reschedule; @@ -91,6 +95,21 @@ impl ActiveStreamingWorkerNodes { &self.worker_nodes } + pub(crate) async fn wait_changed( + &mut self, + verbose_internal: Duration, + verbose_fn: impl Fn(&Self), + ) -> ActiveStreamingWorkerChange { + loop { + if let Either::Left((change, _)) = + select(pin!(self.changed()), pin!(sleep(verbose_internal))).await + { + break change; + } + verbose_fn(self) + } + } + pub(crate) async fn changed(&mut self) -> ActiveStreamingWorkerChange { let ret = loop { let notification = self