diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index d7bd0208b3873..d28c4e8e5b93e 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -535,7 +535,7 @@ pub async fn start_service_as_election_leader( let (sink_manager, shutdown_handle) = SinkCoordinatorManager::start_worker(); let mut sub_tasks = vec![shutdown_handle]; - let barrier_manager = Arc::new(GlobalBarrierManager::new( + let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, env.clone(), metadata_manager.clone(), @@ -543,7 +543,7 @@ pub async fn start_service_as_election_leader( source_manager.clone(), sink_manager.clone(), meta_metrics.clone(), - )); + ); { let source_manager = source_manager.clone(); @@ -611,7 +611,7 @@ pub async fn start_service_as_election_leader( metadata_manager.clone(), stream_manager.clone(), source_manager.clone(), - barrier_manager.clone(), + barrier_manager.context().clone(), sink_manager.clone(), ) .await; @@ -622,7 +622,7 @@ pub async fn start_service_as_election_leader( metadata_manager.clone(), source_manager, stream_manager.clone(), - barrier_manager.clone(), + barrier_manager.context().clone(), ); let cluster_srv = ClusterServiceImpl::new(metadata_manager.clone()); diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 96eed8cba6846..39bc3ced0023a 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -33,19 +33,14 @@ use risingwave_pb::stream_plan::{ UpdateMutation, }; use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest}; -use risingwave_rpc_client::StreamClientPoolRef; use uuid::Uuid; use super::info::BarrierActorInfo; use super::trace::TracedEpoch; -use crate::barrier::CommandChanges; -use crate::hummock::HummockManagerRef; +use crate::barrier::{CommandChanges, GlobalBarrierManagerContext}; use crate::manager::{DdlType, MetadataManager, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; -use crate::stream::{ - build_actor_connector_splits, ScaleControllerRef, SourceManagerRef, SplitAssignment, - ThrottleConfig, -}; +use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; use crate::MetaResult; /// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors @@ -266,12 +261,6 @@ impl Command { /// [`CommandContext`] is used for generating barrier and doing post stuffs according to the given /// [`Command`]. pub struct CommandContext { - pub metadata_manager: MetadataManager, - - hummock_manager: HummockManagerRef, - - client_pool: StreamClientPoolRef, - /// Resolved info in this barrier loop. // TODO: this could be stale when we are calling `post_collect`, check if it matters pub info: Arc, @@ -285,9 +274,7 @@ pub struct CommandContext { pub kind: BarrierKind, - source_manager: SourceManagerRef, - - scale_controller: Option, + barrier_manager_context: GlobalBarrierManagerContext, /// The tracing span of this command. /// @@ -300,34 +287,30 @@ pub struct CommandContext { impl CommandContext { #[allow(clippy::too_many_arguments)] pub(super) fn new( - metadata_manager: MetadataManager, - hummock_manager: HummockManagerRef, - client_pool: StreamClientPoolRef, info: BarrierActorInfo, prev_epoch: TracedEpoch, curr_epoch: TracedEpoch, current_paused_reason: Option, command: Command, kind: BarrierKind, - source_manager: SourceManagerRef, - scale_controller: Option, + barrier_manager_context: GlobalBarrierManagerContext, span: tracing::Span, ) -> Self { Self { - metadata_manager, - hummock_manager, - client_pool, info: Arc::new(info), prev_epoch, curr_epoch, current_paused_reason, command, kind, - source_manager, - scale_controller, + barrier_manager_context, span, } } + + pub fn metadata_manager(&self) -> &MetadataManager { + &self.barrier_manager_context.metadata_manager + } } impl CommandContext { @@ -382,7 +365,8 @@ impl CommandContext { } Command::DropStreamingJobs(table_ids) => { - let MetadataManager::V1(mgr) = &self.metadata_manager else { + let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager + else { unreachable!("only available in v1"); }; @@ -477,7 +461,8 @@ impl CommandContext { ), Command::RescheduleFragment { reschedules, .. } => { - let MetadataManager::V1(mgr) = &self.metadata_manager else { + let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager + else { unimplemented!("implement scale functions in v2"); }; let mut dispatcher_update = HashMap::new(); @@ -736,7 +721,12 @@ impl CommandContext { let request_id = Uuid::new_v4().to_string(); async move { - let client = self.client_pool.get(node).await?; + let client = self + .barrier_manager_context + .env + .stream_client_pool() + .get(node) + .await?; let request = DropActorsRequest { request_id, actor_ids: actors.to_owned(), @@ -751,7 +741,12 @@ impl CommandContext { pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> { let futures = self.info.node_map.values().map(|worker_node| async { - let client = self.client_pool.get(worker_node).await?; + let client = self + .barrier_manager_context + .env + .stream_client_pool() + .get(worker_node) + .await?; let request = WaitEpochCommitRequest { epoch }; client.wait_epoch_commit(request).await }); @@ -782,19 +777,22 @@ impl CommandContext { Command::Resume(_) => {} Command::SourceSplitAssignment(split_assignment) => { - let MetadataManager::V1(mgr) = &self.metadata_manager else { + let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager + else { unimplemented!("implement config change funcs in v2"); }; mgr.fragment_manager .update_actor_splits_by_split_assignment(split_assignment) .await?; - self.source_manager + self.barrier_manager_context + .source_manager .apply_source_change(None, Some(split_assignment.clone()), None) .await; } Command::DropStreamingJobs(table_ids) => { - let MetadataManager::V1(mgr) = &self.metadata_manager else { + let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager + else { unreachable!("only available in v1"); }; // Tell compute nodes to drop actors. @@ -834,11 +832,12 @@ impl CommandContext { let table_id = table_fragments.table_id().table_id; let mut table_ids = table_fragments.internal_table_ids(); table_ids.push(table_id); - self.hummock_manager + self.barrier_manager_context + .hummock_manager .unregister_table_ids_fail_fast(&table_ids) .await; - match &self.metadata_manager { + match &self.barrier_manager_context.metadata_manager { MetadataManager::V1(mgr) => { // NOTE(kwannoel): At this point, catalog manager has persisted the tables already. // We need to cleanup the table state. So we can do it here. @@ -889,7 +888,7 @@ impl CommandContext { replace_table, .. } => { - match &self.metadata_manager { + match &self.barrier_manager_context.metadata_manager { MetadataManager::V1(mgr) => { let mut dependent_table_actors = Vec::with_capacity(upstream_mview_actors.len()); @@ -944,7 +943,8 @@ impl CommandContext { // Extract the fragments that include source operators. let source_fragments = table_fragments.stream_source_fragments(); - self.source_manager + self.barrier_manager_context + .source_manager .apply_source_change( Some(source_fragments), Some(init_split_assignment.clone()), @@ -958,6 +958,7 @@ impl CommandContext { table_parallelism, } => { let node_dropped_actors = self + .barrier_manager_context .scale_controller .as_ref() .unwrap() @@ -973,7 +974,8 @@ impl CommandContext { dispatchers, init_split_assignment, }) => { - let MetadataManager::V1(mgr) = &self.metadata_manager else { + let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager + else { unimplemented!("implement replace funcs in v2"); }; let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id())); diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 5f5227e8090d1..dc7e6f8d8fb01 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -27,6 +27,7 @@ use prometheus::HistogramTimer; use risingwave_common::bail; use risingwave_common::catalog::TableId; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; +use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_common::util::tracing::TracingContext; use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, @@ -162,6 +163,28 @@ pub enum CommandChanges { /// No changes. None, } + +#[derive(Clone)] +pub struct GlobalBarrierManagerContext { + status: Arc>, + + tracker: Arc>, + + metadata_manager: MetadataManager, + + hummock_manager: HummockManagerRef, + + source_manager: SourceManagerRef, + + scale_controller: Option, + + sink_manager: SinkCoordinatorManager, + + metrics: Arc, + + env: MetaSrvEnv, +} + /// [`crate::barrier::GlobalBarrierManager`] sends barriers to all registered compute nodes and /// collect them, with monotonic increasing epoch numbers. On compute nodes, `LocalBarrierManager` /// in `risingwave_stream` crate will serve these requests and dispatch them to source actors. @@ -175,29 +198,19 @@ pub struct GlobalBarrierManager { /// Enable recovery or not when failover. enable_recovery: bool, - status: Mutex, - /// The queue of scheduled barriers. scheduled_barriers: schedule::ScheduledBarriers, /// The max barrier nums in flight in_flight_barrier_nums: usize, - metadata_manager: MetadataManager, + context: GlobalBarrierManagerContext, - hummock_manager: HummockManagerRef, + env: MetaSrvEnv, - source_manager: SourceManagerRef, - - scale_controller: Option, - - sink_manager: SinkCoordinatorManager, - - metrics: Arc, + state: BarrierManagerState, - pub env: MetaSrvEnv, - - tracker: Mutex, + checkpoint_control: CheckpointControl, } /// Controls the concurrent execution of commands. @@ -566,6 +579,10 @@ impl GlobalBarrierManager { let enable_recovery = env.opts.enable_recovery; let in_flight_barrier_nums = env.opts.in_flight_barrier_nums; + let initial_invalid_state = + BarrierManagerState::new(TracedEpoch::new(Epoch(INVALID_EPOCH)), None); + let checkpoint_control = CheckpointControl::new(metrics.clone()); + let tracker = CreateMviewProgressTracker::new(); let scale_controller = match &metadata_manager { @@ -576,23 +593,34 @@ impl GlobalBarrierManager { ))), MetadataManager::V2(_) => None, }; - Self { - enable_recovery, - status: Mutex::new(BarrierManagerStatus::Starting), - scheduled_barriers, - in_flight_barrier_nums, + let context = GlobalBarrierManagerContext { + status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)), metadata_manager, hummock_manager, source_manager, scale_controller, sink_manager, metrics, + tracker: Arc::new(Mutex::new(tracker)), + env: env.clone(), + }; + + Self { + enable_recovery, + scheduled_barriers, + in_flight_barrier_nums, + context, env, - tracker: Mutex::new(tracker), + state: initial_invalid_state, + checkpoint_control, } } - pub fn start(barrier_manager: BarrierManagerRef) -> (JoinHandle<()>, Sender<()>) { + pub fn context(&self) -> &GlobalBarrierManagerContext { + &self.context + } + + pub fn start(barrier_manager: GlobalBarrierManager) -> (JoinHandle<()>, Sender<()>) { let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); let join_handle = tokio::spawn(async move { barrier_manager.run(shutdown_rx).await; @@ -601,27 +629,6 @@ impl GlobalBarrierManager { (join_handle, shutdown_tx) } - /// Check the status of barrier manager, return error if it is not `Running`. - pub async fn check_status_running(&self) -> MetaResult<()> { - let status = self.status.lock().await; - match &*status { - BarrierManagerStatus::Starting - | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => { - bail!("The cluster is bootstrapping") - } - BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => { - Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))? - } - BarrierManagerStatus::Running => Ok(()), - } - } - - /// Set barrier manager status. - async fn set_status(&self, new_status: BarrierManagerStatus) { - let mut status = self.status.lock().await; - *status = new_status; - } - /// Check whether we should pause on bootstrap from the system parameter and reset it. async fn take_pause_on_bootstrap(&self) -> MetaResult { let paused = self @@ -651,7 +658,7 @@ impl GlobalBarrierManager { } /// Start an infinite loop to take scheduled barriers and send them. - async fn run(&self, mut shutdown_rx: Receiver<()>) { + async fn run(mut self, mut shutdown_rx: Receiver<()>) { // Initialize the barrier manager. let interval = Duration::from_millis( self.env.system_params_reader().await.barrier_interval_ms() as u64, @@ -664,7 +671,7 @@ impl GlobalBarrierManager { ); if !self.enable_recovery { - let job_exist = match &self.metadata_manager { + let job_exist = match &self.context.metadata_manager { MetadataManager::V1(mgr) => mgr.fragment_manager.has_any_table_fragments().await, MetadataManager::V2(mgr) => mgr .catalog_controller @@ -680,8 +687,8 @@ impl GlobalBarrierManager { } } - let mut state = { - let latest_snapshot = self.hummock_manager.latest_snapshot(); + self.state = { + let latest_snapshot = self.context.hummock_manager.latest_snapshot(); assert_eq!( latest_snapshot.committed_epoch, latest_snapshot.current_epoch, "persisted snapshot must be from a checkpoint barrier" @@ -692,24 +699,25 @@ impl GlobalBarrierManager { // consistency. // Even if there's no actor to recover, we still go through the recovery process to // inject the first `Initial` barrier. - self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)) + self.context + .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap)) .await; let span = tracing::info_span!("bootstrap_recovery", prev_epoch = prev_epoch.value().0); let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.recovery(prev_epoch, paused_reason) + self.context + .recovery(prev_epoch, paused_reason, &self.scheduled_barriers) .instrument(span) .await }; - self.set_status(BarrierManagerStatus::Running).await; + self.context.set_status(BarrierManagerStatus::Running).await; let mut min_interval = tokio::time::interval(interval); min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let (barrier_complete_tx, mut barrier_complete_rx) = tokio::sync::mpsc::unbounded_channel(); - let mut checkpoint_control = CheckpointControl::new(self.metrics.clone()); let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel(); self.env @@ -745,34 +753,32 @@ impl GlobalBarrierManager { completion = barrier_complete_rx.recv() => { self.handle_barrier_complete( completion.unwrap(), - &mut state, - &mut checkpoint_control, ) .await; } // There's barrier scheduled. - _ = self.scheduled_barriers.wait_one(), if checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { + _ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { min_interval.reset(); // Reset the interval as we have a new barrier. - self.handle_new_barrier(&barrier_complete_tx, &mut state, &mut checkpoint_control).await; + self.handle_new_barrier(&barrier_complete_tx).await; } // Minimum interval reached. - _ = min_interval.tick(), if checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { - self.handle_new_barrier(&barrier_complete_tx, &mut state, &mut checkpoint_control).await; + _ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { + self.handle_new_barrier(&barrier_complete_tx).await; } } - checkpoint_control.update_barrier_nums_metrics(); + self.checkpoint_control.update_barrier_nums_metrics(); } } /// Handle the new barrier from the scheduled queue and inject it. async fn handle_new_barrier( - &self, + &mut self, barrier_complete_tx: &UnboundedSender, - state: &mut BarrierManagerState, - checkpoint_control: &mut CheckpointControl, ) { - assert!(checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums)); + assert!(self + .checkpoint_control + .can_inject_barrier(self.in_flight_barrier_nums)); let Scheduled { command, @@ -781,9 +787,17 @@ impl GlobalBarrierManager { checkpoint, span, } = self.scheduled_barriers.pop_or_default().await; - let info = self.resolve_actor_info(checkpoint_control, &command).await; + self.checkpoint_control.pre_resolve(&command); + let info = self + .context + .resolve_actor_info(|s: ActorState, table_id: TableId, actor_id: ActorId| { + self.checkpoint_control + .can_actor_send_or_collect(s, table_id, actor_id) + }) + .await; + self.checkpoint_control.post_resolve(&command); - let (prev_epoch, curr_epoch) = state.next_epoch_pair(); + let (prev_epoch, curr_epoch) = self.state.next_epoch_pair(); let kind = if checkpoint { BarrierKind::Checkpoint } else { @@ -795,28 +809,25 @@ impl GlobalBarrierManager { span.record("epoch", curr_epoch.value().0); let command_ctx = Arc::new(CommandContext::new( - self.metadata_manager.clone(), - self.hummock_manager.clone(), - self.env.stream_client_pool_ref(), info, prev_epoch.clone(), curr_epoch.clone(), - state.paused_reason(), + self.state.paused_reason(), command, kind, - self.source_manager.clone(), - self.scale_controller.clone(), + self.context.clone(), span.clone(), )); send_latency_timer.observe_duration(); - self.inject_barrier(command_ctx.clone(), barrier_complete_tx) + self.context + .inject_barrier(command_ctx.clone(), barrier_complete_tx) .instrument(span) .await; // Notify about the injection. - let prev_paused_reason = state.paused_reason(); + let prev_paused_reason = self.state.paused_reason(); let curr_paused_reason = command_ctx.next_paused_reason(); let info = BarrierInfo { @@ -828,182 +839,22 @@ impl GlobalBarrierManager { notifiers.iter_mut().for_each(|n| n.notify_injected(info)); // Update the paused state after the barrier is injected. - state.set_paused_reason(curr_paused_reason); + self.state.set_paused_reason(curr_paused_reason); // Record the in-flight barrier. - checkpoint_control.enqueue_command(command_ctx.clone(), notifiers); - } - - /// Inject a barrier to all CNs and spawn a task to collect it - async fn inject_barrier( - &self, - command_context: Arc, - barrier_complete_tx: &UnboundedSender, - ) { - let prev_epoch = command_context.prev_epoch.value().0; - let result = self.inject_barrier_inner(command_context.clone()).await; - match result { - Ok(node_need_collect) => { - // todo: the collect handler should be abort when recovery. - tokio::spawn(Self::collect_barrier( - self.env.clone(), - node_need_collect, - self.env.stream_client_pool_ref(), - command_context, - barrier_complete_tx.clone(), - )); - } - Err(e) => { - let _ = barrier_complete_tx.send(BarrierCompletion { - prev_epoch, - result: Err(e), - }); - } - } - } - - /// Send inject-barrier-rpc to stream service and wait for its response before returns. - async fn inject_barrier_inner( - &self, - command_context: Arc, - ) -> MetaResult> { - fail_point!("inject_barrier_err", |_| bail!("inject_barrier_err")); - let mutation = command_context.to_mutation().await?; - let info = command_context.info.clone(); - let mut node_need_collect = HashMap::new(); - let inject_futures = info.node_map.iter().filter_map(|(node_id, node)| { - let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec(); - let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec(); - if actor_ids_to_collect.is_empty() { - // No need to send or collect barrier for this node. - assert!(actor_ids_to_send.is_empty()); - node_need_collect.insert(*node_id, false); - None - } else { - node_need_collect.insert(*node_id, true); - let mutation = mutation.clone(); - let request_id = Uuid::new_v4().to_string(); - let barrier = Barrier { - epoch: Some(risingwave_pb::data::Epoch { - curr: command_context.curr_epoch.value().0, - prev: command_context.prev_epoch.value().0, - }), - mutation: mutation.clone().map(|_| BarrierMutation { mutation }), - tracing_context: TracingContext::from_span(command_context.curr_epoch.span()) - .to_protobuf(), - kind: command_context.kind as i32, - passed_actors: vec![], - }; - async move { - let client = self.env.stream_client_pool().get(node).await?; - - let request = InjectBarrierRequest { - request_id, - barrier: Some(barrier), - actor_ids_to_send, - actor_ids_to_collect, - }; - tracing::debug!( - target: "events::meta::barrier::inject_barrier", - ?request, "inject barrier request" - ); - - // This RPC returns only if this worker node has injected this barrier. - client.inject_barrier(request).await - } - .into() - } - }); - try_join_all(inject_futures).await.inspect_err(|e| { - // Record failure in event log. - use risingwave_pb::meta::event_log; - use thiserror_ext::AsReport; - let event = event_log::EventInjectBarrierFail { - prev_epoch: command_context.prev_epoch.value().0, - cur_epoch: command_context.curr_epoch.value().0, - error: e.to_report_string(), - }; - self.env - .event_log_manager_ref() - .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); - })?; - Ok(node_need_collect) - } - - /// Send barrier-complete-rpc and wait for responses from all CNs - async fn collect_barrier( - env: MetaSrvEnv, - node_need_collect: HashMap, - client_pool_ref: StreamClientPoolRef, - command_context: Arc, - barrier_complete_tx: UnboundedSender, - ) { - let prev_epoch = command_context.prev_epoch.value().0; - let tracing_context = - TracingContext::from_span(command_context.prev_epoch.span()).to_protobuf(); - - let info = command_context.info.clone(); - let client_pool = client_pool_ref.deref(); - let collect_futures = info.node_map.iter().filter_map(|(node_id, node)| { - if !*node_need_collect.get(node_id).unwrap() { - // No need to send or collect barrier for this node. - None - } else { - let request_id = Uuid::new_v4().to_string(); - let tracing_context = tracing_context.clone(); - async move { - let client = client_pool.get(node).await?; - let request = BarrierCompleteRequest { - request_id, - prev_epoch, - tracing_context, - }; - tracing::debug!( - target: "events::meta::barrier::barrier_complete", - ?request, "barrier complete" - ); - - // This RPC returns only if this worker node has collected this barrier. - client.barrier_complete(request).await - } - .into() - } - }); - - let result = try_join_all(collect_futures) - .await - .inspect_err(|e| { - // Record failure in event log. - use risingwave_pb::meta::event_log; - use thiserror_ext::AsReport; - let event = event_log::EventCollectBarrierFail { - prev_epoch: command_context.prev_epoch.value().0, - cur_epoch: command_context.curr_epoch.value().0, - error: e.to_report_string(), - }; - env.event_log_manager_ref() - .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); - }) - .map_err(Into::into); - let _ = barrier_complete_tx - .send(BarrierCompletion { prev_epoch, result }) - .inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion")); + self.checkpoint_control + .enqueue_command(command_ctx.clone(), notifiers); } /// Changes the state to `Complete`, and try to commit all epoch that state is `Complete` in /// order. If commit is err, all nodes will be handled. - async fn handle_barrier_complete( - &self, - completion: BarrierCompletion, - state: &mut BarrierManagerState, - checkpoint_control: &mut CheckpointControl, - ) { + async fn handle_barrier_complete(&mut self, completion: BarrierCompletion) { let BarrierCompletion { prev_epoch, result } = completion; // Received barrier complete responses with an epoch that is not managed by checkpoint // control, which means a recovery has been triggered. We should ignore it because // trying to complete and commit the epoch is not necessary and could cause // meaningless recovery again. - if !checkpoint_control.contains_epoch(prev_epoch) { + if !self.checkpoint_control.contains_epoch(prev_epoch) { tracing::warn!( "received barrier complete response for an unknown epoch: {}", prev_epoch @@ -1015,24 +866,21 @@ impl GlobalBarrierManager { // FIXME: If it is a connector source error occurred in the init barrier, we should pass // back to frontend fail_point!("inject_barrier_err_success"); - let fail_node = checkpoint_control.barrier_failed(); + let fail_node = self.checkpoint_control.barrier_failed(); tracing::warn!("Failed to complete epoch {}: {:?}", prev_epoch, err); - self.failure_recovery(err, fail_node, state, checkpoint_control) - .await; + self.failure_recovery(err, fail_node).await; return; } // change the state to Complete - let mut complete_nodes = checkpoint_control.barrier_completed(prev_epoch, result.unwrap()); + let mut complete_nodes = self + .checkpoint_control + .barrier_completed(prev_epoch, result.unwrap()); // try commit complete nodes let (mut index, mut err_msg) = (0, None); for (i, node) in complete_nodes.iter_mut().enumerate() { assert!(matches!(node.state, Completed(_))); let span = node.command_ctx.span.clone(); - if let Err(err) = self - .complete_barrier(node, checkpoint_control) - .instrument(span) - .await - { + if let Err(err) = self.complete_barrier(node).instrument(span).await { index = i; err_msg = Some(err); break; @@ -1042,21 +890,19 @@ impl GlobalBarrierManager { if let Some(err) = err_msg { let fail_nodes = complete_nodes .drain(index..) - .chain(checkpoint_control.barrier_failed().into_iter()); + .chain(self.checkpoint_control.barrier_failed().into_iter()) + .collect_vec(); tracing::warn!("Failed to commit epoch {}: {:?}", prev_epoch, err); - self.failure_recovery(err, fail_nodes, state, checkpoint_control) - .await; + self.failure_recovery(err, fail_nodes).await; } } async fn failure_recovery( - &self, + &mut self, err: MetaError, fail_nodes: impl IntoIterator, - state: &mut BarrierManagerState, - checkpoint_control: &mut CheckpointControl, ) { - checkpoint_control.clear_changes(); + self.checkpoint_control.clear_changes(); for node in fail_nodes { if let Some(timer) = node.timer { @@ -1071,11 +917,12 @@ impl GlobalBarrierManager { } if self.enable_recovery { - self.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover( - err.clone(), - ))) - .await; - let latest_snapshot = self.hummock_manager.latest_snapshot(); + self.context + .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Failover( + err.clone(), + ))) + .await; + let latest_snapshot = self.context.hummock_manager.latest_snapshot(); let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch let span = tracing::info_span!( "failure_recovery", @@ -1085,19 +932,19 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - *state = self.recovery(prev_epoch, None).instrument(span).await; - self.set_status(BarrierManagerStatus::Running).await; + self.state = self + .context + .recovery(prev_epoch, None, &self.scheduled_barriers) + .instrument(span) + .await; + self.context.set_status(BarrierManagerStatus::Running).await; } else { panic!("failed to execute barrier: {:?}", err); } } /// Try to commit this node. If err, returns - async fn complete_barrier( - &self, - node: &mut EpochNode, - checkpoint_control: &mut CheckpointControl, - ) -> MetaResult<()> { + async fn complete_barrier(&mut self, node: &mut EpochNode) -> MetaResult<()> { let prev_epoch = node.command_ctx.prev_epoch.value().0; match &mut node.state { Completed(resps) => { @@ -1118,12 +965,17 @@ impl GlobalBarrierManager { ), BarrierKind::Checkpoint => { new_snapshot = self + .context .hummock_manager .commit_epoch(node.command_ctx.prev_epoch.value().0, commit_info) .await?; } BarrierKind::Barrier => { - new_snapshot = Some(self.hummock_manager.update_current_epoch(prev_epoch)); + new_snapshot = Some( + self.context + .hummock_manager + .update_current_epoch(prev_epoch), + ); // if we collect a barrier(checkpoint = false), // we need to ensure that command is Plain and the notifier's checkpoint is // false @@ -1152,7 +1004,7 @@ impl GlobalBarrierManager { // Save `cancelled_command` for Create MVs. let actors_to_cancel = node.command_ctx.actors_to_cancel(); let cancelled_command = if !actors_to_cancel.is_empty() { - let mut tracker = self.tracker.lock().await; + let mut tracker = self.context.tracker.lock().await; tracker.find_cancelled_command(actors_to_cancel) } else { None @@ -1161,8 +1013,8 @@ impl GlobalBarrierManager { // Save `finished_commands` for Create MVs. let finished_commands = { let mut commands = vec![]; - let version_stats = self.hummock_manager.get_version_stats().await; - let mut tracker = self.tracker.lock().await; + let version_stats = self.context.hummock_manager.get_version_stats().await; + let mut tracker = self.context.tracker.lock().await; // Add the command to tracker. if let Some(command) = tracker.add( TrackingCommand { @@ -1188,18 +1040,21 @@ impl GlobalBarrierManager { }; for command in finished_commands { - checkpoint_control.stash_command_to_finish(command); + self.checkpoint_control.stash_command_to_finish(command); } if let Some(command) = cancelled_command { - checkpoint_control.cancel_command(command); + self.checkpoint_control.cancel_command(command); } else if let Some(table_id) = node.command_ctx.table_to_cancel() { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. - checkpoint_control.cancel_stashed_command(table_id); + self.checkpoint_control.cancel_stashed_command(table_id); } - let remaining = checkpoint_control.finish_jobs(kind.is_checkpoint()).await?; + let remaining = self + .checkpoint_control + .finish_jobs(kind.is_checkpoint()) + .await?; // If there are remaining commands (that requires checkpoint to finish), we force // the next barrier to be a checkpoint. if remaining { @@ -1230,37 +1085,206 @@ impl GlobalBarrierManager { InFlight => unreachable!(), } } +} + +impl GlobalBarrierManagerContext { + /// Check the status of barrier manager, return error if it is not `Running`. + pub async fn check_status_running(&self) -> MetaResult<()> { + let status = self.status.lock().await; + match &*status { + BarrierManagerStatus::Starting + | BarrierManagerStatus::Recovering(RecoveryReason::Bootstrap) => { + bail!("The cluster is bootstrapping") + } + BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => { + Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))? + } + BarrierManagerStatus::Running => Ok(()), + } + } + + /// Set barrier manager status. + async fn set_status(&self, new_status: BarrierManagerStatus) { + let mut status = self.status.lock().await; + *status = new_status; + } + + /// Inject a barrier to all CNs and spawn a task to collect it + async fn inject_barrier( + &self, + command_context: Arc, + barrier_complete_tx: &UnboundedSender, + ) { + let prev_epoch = command_context.prev_epoch.value().0; + let result = self.inject_barrier_inner(command_context.clone()).await; + match result { + Ok(node_need_collect) => { + // todo: the collect handler should be abort when recovery. + tokio::spawn(Self::collect_barrier( + self.env.clone(), + node_need_collect, + self.env.stream_client_pool_ref(), + command_context, + barrier_complete_tx.clone(), + )); + } + Err(e) => { + let _ = barrier_complete_tx.send(BarrierCompletion { + prev_epoch, + result: Err(e), + }); + } + } + } + + /// Send inject-barrier-rpc to stream service and wait for its response before returns. + async fn inject_barrier_inner( + &self, + command_context: Arc, + ) -> MetaResult> { + fail_point!("inject_barrier_err", |_| bail!("inject_barrier_err")); + let mutation = command_context.to_mutation().await?; + let info = command_context.info.clone(); + let mut node_need_collect = HashMap::new(); + let inject_futures = info.node_map.iter().filter_map(|(node_id, node)| { + let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec(); + let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec(); + if actor_ids_to_collect.is_empty() { + // No need to send or collect barrier for this node. + assert!(actor_ids_to_send.is_empty()); + node_need_collect.insert(*node_id, false); + None + } else { + node_need_collect.insert(*node_id, true); + let mutation = mutation.clone(); + let request_id = Uuid::new_v4().to_string(); + let barrier = Barrier { + epoch: Some(risingwave_pb::data::Epoch { + curr: command_context.curr_epoch.value().0, + prev: command_context.prev_epoch.value().0, + }), + mutation: mutation.clone().map(|_| BarrierMutation { mutation }), + tracing_context: TracingContext::from_span(command_context.curr_epoch.span()) + .to_protobuf(), + kind: command_context.kind as i32, + passed_actors: vec![], + }; + async move { + let client = self.env.stream_client_pool().get(node).await?; + + let request = InjectBarrierRequest { + request_id, + barrier: Some(barrier), + actor_ids_to_send, + actor_ids_to_collect, + }; + tracing::debug!( + target: "events::meta::barrier::inject_barrier", + ?request, "inject barrier request" + ); + + // This RPC returns only if this worker node has injected this barrier. + client.inject_barrier(request).await + } + .into() + } + }); + try_join_all(inject_futures).await.inspect_err(|e| { + // Record failure in event log. + use risingwave_pb::meta::event_log; + use thiserror_ext::AsReport; + let event = event_log::EventInjectBarrierFail { + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, + error: e.to_report_string(), + }; + self.env + .event_log_manager_ref() + .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); + })?; + Ok(node_need_collect) + } + + /// Send barrier-complete-rpc and wait for responses from all CNs + async fn collect_barrier( + env: MetaSrvEnv, + node_need_collect: HashMap, + client_pool_ref: StreamClientPoolRef, + command_context: Arc, + barrier_complete_tx: UnboundedSender, + ) { + let prev_epoch = command_context.prev_epoch.value().0; + let tracing_context = + TracingContext::from_span(command_context.prev_epoch.span()).to_protobuf(); + + let info = command_context.info.clone(); + let client_pool = client_pool_ref.deref(); + let collect_futures = info.node_map.iter().filter_map(|(node_id, node)| { + if !*node_need_collect.get(node_id).unwrap() { + // No need to send or collect barrier for this node. + None + } else { + let request_id = Uuid::new_v4().to_string(); + let tracing_context = tracing_context.clone(); + async move { + let client = client_pool.get(node).await?; + let request = BarrierCompleteRequest { + request_id, + prev_epoch, + tracing_context, + }; + tracing::debug!( + target: "events::meta::barrier::barrier_complete", + ?request, "barrier complete" + ); + + // This RPC returns only if this worker node has collected this barrier. + client.barrier_complete(request).await + } + .into() + } + }); + + let result = try_join_all(collect_futures) + .await + .inspect_err(|e| { + // Record failure in event log. + use risingwave_pb::meta::event_log; + use thiserror_ext::AsReport; + let event = event_log::EventCollectBarrierFail { + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, + error: e.to_report_string(), + }; + env.event_log_manager_ref() + .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); + }) + .map_err(Into::into); + let _ = barrier_complete_tx + .send(BarrierCompletion { prev_epoch, result }) + .inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion")); + } /// Resolve actor information from cluster, fragment manager and `ChangedTableId`. /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor /// will create or drop before this barrier flow through them. async fn resolve_actor_info( &self, - checkpoint_control: &mut CheckpointControl, - command: &Command, + check_state: impl Fn(ActorState, TableId, ActorId) -> bool, ) -> BarrierActorInfo { - checkpoint_control.pre_resolve(command); - let info = match &self.metadata_manager { MetadataManager::V1(mgr) => { - let check_state = |s: ActorState, table_id: TableId, actor_id: ActorId| { - checkpoint_control.can_actor_send_or_collect(s, table_id, actor_id) - }; let all_nodes = mgr .cluster_manager .list_active_streaming_compute_nodes() .await; - let all_actor_infos = mgr.fragment_manager.load_all_actors(check_state).await; + let all_actor_infos = mgr.fragment_manager.load_all_actors(&check_state).await; BarrierActorInfo::resolve(all_nodes, all_actor_infos) } MetadataManager::V2(mgr) => { let check_state = |s: ActorState, table_id: ObjectId, actor_id: i32| { - checkpoint_control.can_actor_send_or_collect( - s, - TableId::new(table_id as _), - actor_id as _, - ) + check_state(s, TableId::new(table_id as _), actor_id as _) }; let all_nodes = mgr .cluster_controller @@ -1281,8 +1305,6 @@ impl GlobalBarrierManager { } }; - checkpoint_control.post_resolve(command); - info } @@ -1327,7 +1349,7 @@ impl GlobalBarrierManager { } } -pub type BarrierManagerRef = Arc; +pub type BarrierManagerRef = GlobalBarrierManagerContext; fn collect_commit_epoch_info(resps: &mut [BarrierCompleteResponse]) -> CommitEpochInfo { let mut sst_to_worker: HashMap = HashMap::new(); diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index ba1e11c9c6fa3..0c753a3c3f025 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -161,7 +161,7 @@ pub enum TrackingJob { impl TrackingJob { fn metadata_manager(&self) -> &MetadataManager { match self { - TrackingJob::New(command) => &command.context.metadata_manager, + TrackingJob::New(command) => command.context.metadata_manager(), TrackingJob::Recovered(recovered) => &recovered.metadata_manager, } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 4a5b13c03caee..be9658d57b74e 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -41,14 +41,15 @@ use crate::barrier::command::CommandContext; use crate::barrier::info::BarrierActorInfo; use crate::barrier::notifier::Notifier; use crate::barrier::progress::CreateMviewProgressTracker; -use crate::barrier::{CheckpointControl, Command, GlobalBarrierManager}; +use crate::barrier::schedule::ScheduledBarriers; +use crate::barrier::{CheckpointControl, Command, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; use crate::manager::{MetadataManager, WorkerId}; use crate::model::{BarrierManagerState, MetadataModel, MigrationPlan, TableFragments}; use crate::stream::{build_actor_connector_splits, RescheduleOptions, TableResizePolicy}; use crate::MetaResult; -impl GlobalBarrierManager { +impl GlobalBarrierManagerContext { // Retry base interval in milliseconds. const RECOVERY_RETRY_BASE_INTERVAL: u64 = 20; // Retry max interval. @@ -63,10 +64,10 @@ impl GlobalBarrierManager { } async fn resolve_actor_info_for_recovery(&self) -> BarrierActorInfo { - self.resolve_actor_info( - &mut CheckpointControl::new(self.metrics.clone()), - &Command::barrier(), - ) + let default_checkpoint_control = CheckpointControl::new(self.metrics.clone()); + self.resolve_actor_info(|s, table_id, actor_id| { + default_checkpoint_control.can_actor_send_or_collect(s, table_id, actor_id) + }) .await } @@ -326,9 +327,10 @@ impl GlobalBarrierManager { &self, prev_epoch: TracedEpoch, paused_reason: Option, + scheduled_barriers: &ScheduledBarriers, ) -> BarrierManagerState { // Mark blocked and abort buffered schedules, they might be dirty already. - self.scheduled_barriers + scheduled_barriers .abort_and_mark_blocked("cluster is under recovering") .await; @@ -358,8 +360,7 @@ impl GlobalBarrierManager { // some table fragments might have been cleaned as dirty, but it's fine since the drop // interface is idempotent. if let MetadataManager::V1(mgr) = &self.metadata_manager { - let to_drop_tables = - self.scheduled_barriers.pre_apply_drop_scheduled().await; + let to_drop_tables = scheduled_barriers.pre_apply_drop_scheduled().await; mgr.fragment_manager .drop_table_fragments_vec(&to_drop_tables) .await?; @@ -412,17 +413,13 @@ impl GlobalBarrierManager { // Inject the `Initial` barrier to initialize all executors. let command_ctx = Arc::new(CommandContext::new( - self.metadata_manager.clone(), - self.hummock_manager.clone(), - self.env.stream_client_pool_ref(), info, prev_epoch.clone(), new_epoch.clone(), paused_reason, command, BarrierKind::Initial, - self.source_manager.clone(), - self.scale_controller.clone(), + self.clone(), tracing::Span::current(), // recovery span )); @@ -470,7 +467,7 @@ impl GlobalBarrierManager { .expect("Retry until recovery success."); recovery_timer.observe_duration(); - self.scheduled_barriers.mark_ready().await; + scheduled_barriers.mark_ready().await; tracing::info!( epoch = state.in_flight_prev_epoch().value().0, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index ad3da73d1115f..f10738d3f5cab 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -996,7 +996,7 @@ mod tests { let (sink_manager, _) = SinkCoordinatorManager::start_worker(); - let barrier_manager = Arc::new(GlobalBarrierManager::new( + let barrier_manager = GlobalBarrierManager::new( scheduled_barriers, env.clone(), metadata_manager.clone(), @@ -1004,7 +1004,7 @@ mod tests { source_manager.clone(), sink_manager, meta_metrics.clone(), - )); + ); let stream_manager = GlobalStreamManager::new( env.clone(),