From f0104ab8042527715b90fee34cea53e303f6e28a Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 22 Aug 2024 23:12:23 +0800 Subject: [PATCH 1/4] feat(snapshot-backfill): only receive mutation from barrier worker for snapshot backfill --- src/meta/src/barrier/command.rs | 14 +- src/meta/src/barrier/creating_job/mod.rs | 11 +- src/meta/src/barrier/creating_job/status.rs | 12 +- src/meta/src/barrier/mod.rs | 14 + src/meta/src/barrier/rpc.rs | 4 +- .../executor/backfill/snapshot_backfill.rs | 249 ++++++++++++++---- 6 files changed, 239 insertions(+), 65 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 04b8fed711dd..efbc03e215d2 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -496,16 +496,16 @@ impl CommandContext { } } -impl CommandContext { +impl Command { /// Generate a mutation for the given command. - pub fn to_mutation(&self) -> Option { + pub fn to_mutation(&self, current_paused_reason: Option<&PausedReason>) -> Option { let mutation = - match &self.command { + match self { Command::Plain(mutation) => mutation.clone(), Command::Pause(_) => { // Only pause when the cluster is not already paused. - if self.current_paused_reason.is_none() { + if current_paused_reason.is_none() { Some(Mutation::Pause(PauseMutation {})) } else { None @@ -514,7 +514,7 @@ impl CommandContext { Command::Resume(reason) => { // Only resume when the cluster is paused with the same reason. - if self.current_paused_reason == Some(*reason) { + if current_paused_reason == Some(reason) { Some(Mutation::Resume(ResumeMutation {})) } else { None @@ -606,7 +606,7 @@ impl CommandContext { added_actors, actor_splits, // If the cluster is already paused, the new actors should be paused too. - pause: self.current_paused_reason.is_some(), + pause: current_paused_reason.is_some(), subscriptions_to_add, })); @@ -877,7 +877,9 @@ impl CommandContext { ..Default::default() })) } +} +impl CommandContext { /// Returns the paused reason after executing the current command. pub fn next_paused_reason(&self) -> Option { match &self.command { diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 0d478fcf6614..77c947196359 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -27,6 +27,7 @@ use risingwave_common::util::epoch::Epoch; use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; +use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_service::BarrierCompleteResponse; use tracing::{debug, info}; @@ -64,6 +65,7 @@ impl CreatingStreamingJobControl { backfill_epoch: u64, version_stat: &HummockVersionStats, metrics: &MetaMetrics, + initial_mutation: Mutation, ) -> Self { info!( table_id = info.table_fragments.table_id().table_id, @@ -103,6 +105,7 @@ impl CreatingStreamingJobControl { backfill_epoch, pending_non_checkpoint_barriers: vec![], snapshot_backfill_actors, + initial_mutation: Some(initial_mutation), }, upstream_lag: metrics .snapshot_backfill_lag @@ -256,10 +259,10 @@ impl CreatingStreamingJobControl { .active_graph_info() .expect("must exist when having barriers to inject"); let table_id = self.info.table_fragments.table_id(); - for (curr_epoch, prev_epoch, kind) in barriers_to_inject { + for (curr_epoch, prev_epoch, kind, mutation) in barriers_to_inject { let node_to_collect = control_stream_manager.inject_barrier( Some(table_id), - None, + mutation, (&curr_epoch, &prev_epoch), &kind, graph_info, @@ -320,7 +323,9 @@ impl CreatingStreamingJobControl { Some(table_id), if start_consume_upstream { // erase the mutation on upstream except the last command - command_ctx.to_mutation() + command_ctx + .command + .to_mutation(command_ctx.current_paused_reason.as_ref()) } else { None }, diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index f2d002229834..6593023ae894 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use risingwave_common::util::epoch::Epoch; use risingwave_pb::hummock::HummockVersionStats; +use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use crate::barrier::command::CommandContext; @@ -39,6 +40,8 @@ pub(super) enum CreatingStreamingJobStatus { /// The `prev_epoch` of pending non checkpoint barriers pending_non_checkpoint_barriers: Vec, snapshot_backfill_actors: HashMap>, + /// Mutation of the first barrier. Take the mutation out when injecting the first barrier + initial_mutation: Option, }, ConsumingLogStore { graph_info: InflightGraphInfo, @@ -89,7 +92,7 @@ impl CreatingStreamingJobStatus { &mut self, is_checkpoint: bool, ) -> Option<( - Vec<(TracedEpoch, TracedEpoch, BarrierKind)>, + Vec<(TracedEpoch, TracedEpoch, BarrierKind, Option)>, Option, )> { if let CreatingStreamingJobStatus::ConsumingSnapshot { @@ -99,10 +102,12 @@ impl CreatingStreamingJobStatus { graph_info, pending_non_checkpoint_barriers, ref backfill_epoch, + initial_mutation, .. } = self { if create_mview_tracker.has_pending_finished_jobs() { + assert!(initial_mutation.is_none()); pending_non_checkpoint_barriers.push(*backfill_epoch); let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); @@ -110,6 +115,7 @@ impl CreatingStreamingJobStatus { TracedEpoch::new(Epoch(*backfill_epoch)), TracedEpoch::new(prev_epoch), BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), + None, )] .into_iter() .chain(pending_commands.drain(..).map(|command_ctx| { @@ -117,6 +123,7 @@ impl CreatingStreamingJobStatus { command_ctx.curr_epoch.clone(), command_ctx.prev_epoch.clone(), command_ctx.kind.clone(), + None, ) })) .collect(); @@ -135,7 +142,8 @@ impl CreatingStreamingJobStatus { } else { BarrierKind::Barrier }; - Some((vec![(curr_epoch, prev_epoch, kind)], None)) + let mutation = initial_mutation.take(); + Some((vec![(curr_epoch, prev_epoch, kind, mutation)], None)) } } else { None diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 8ffe3b1ac078..b57559adcc72 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -969,6 +969,19 @@ impl GlobalBarrierManager { info, } = &command { + if self.state.paused_reason().is_some() { + warn!("cannot create streaming job with snapshot backfill when paused"); + for notifier in notifiers { + notifier.notify_start_failed( + anyhow!("cannot create streaming job with snapshot backfill when paused",) + .into(), + ); + } + return Ok(()); + } + let mutation = command + .to_mutation(None) + .expect("should have some mutation in `CreateStreamingJob` command"); self.checkpoint_control .creating_streaming_job_controls .insert( @@ -979,6 +992,7 @@ impl GlobalBarrierManager { prev_epoch.value().0, &self.checkpoint_control.hummock_version_stats, &self.context.metrics, + mutation, ), ); } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 1b4ab6207db9..d588489d1750 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -257,7 +257,9 @@ impl ControlStreamManager { ) -> MetaResult> { self.inject_barrier( None, - command_ctx.to_mutation(), + command_ctx + .command + .to_mutation(command_ctx.current_paused_reason.as_ref()), (&command_ctx.curr_epoch, &command_ctx.prev_epoch), &command_ctx.kind, pre_applied_graph_info, diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 35adc33b81c4..12b5bce4151c 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -32,14 +32,16 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::ChangeLogRow; use risingwave_storage::StateStore; use tokio::select; +use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::backfill::utils::{create_builder, mapping_chunk}; use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::{try_stream, StreamExt}; use crate::executor::{ - expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, Execute, - Executor, Message, Mutation, StreamExecutorError, StreamExecutorResult, + expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, + DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation, + StreamExecutorError, StreamExecutorResult, }; use crate::task::CreateMviewProgress; @@ -99,7 +101,8 @@ impl SnapshotBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { debug!("snapshot backfill executor start"); - let mut upstream = self.upstream.execute(); + let upstream = erase_upstream_mutation(self.upstream.execute()); + pin_mut!(upstream); let upstream_table_id = self.upstream_table.table_id(); let first_barrier = expect_first_barrier(&mut upstream).await?; debug!(epoch = ?first_barrier.epoch, "get first upstream barrier"); @@ -109,7 +112,7 @@ impl SnapshotBackfillExecutor { { if should_backfill { - let subscriber_ids = first_barrier + let subscriber_ids = first_recv_barrier .added_subscriber_on_mv_table(upstream_table_id) .collect_vec(); let snapshot_backfill_table_fragment_id = match subscriber_ids.as_slice() { @@ -183,12 +186,16 @@ impl SnapshotBackfillExecutor { let recv_barrier = self.barrier_rx.recv().await.expect("should exist"); assert_eq!(first_barrier.epoch, recv_barrier.epoch); - yield Message::Barrier(first_barrier); + yield Message::Barrier(recv_barrier); } + let mut upstream_buffer = upstream_buffer + .start_consuming_log_store(&mut self.barrier_rx) + .await?; + let mut barrier_epoch = first_barrier_epoch; - let initial_pending_barrier = upstream_buffer.barrier.len(); + let initial_pending_barrier = upstream_buffer.state.barriers.len(); info!( ?barrier_epoch, table_id = self.upstream_table.table_id().table_id, @@ -207,8 +214,6 @@ impl SnapshotBackfillExecutor { // Phase 2: consume upstream log store while let Some(barrier) = upstream_buffer.take_buffered_barrier().await? { - let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; - assert_eq!(barrier.epoch, recv_barrier.epoch); assert_eq!(barrier_epoch.curr, barrier.epoch.prev); barrier_epoch = barrier.epoch; @@ -254,16 +259,20 @@ impl SnapshotBackfillExecutor { ); let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; assert_eq!(first_barrier.epoch, first_recv_barrier.epoch); - yield Message::Barrier(first_barrier); + yield Message::Barrier(first_recv_barrier); } } // Phase 3: consume upstream while let Some(msg) = upstream.try_next().await? { - if let Message::Barrier(barrier) = &msg { - let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; - assert_eq!(barrier.epoch, recv_barrier.epoch); - } - yield msg; + yield match msg { + DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk), + DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark), + DispatcherMessage::Barrier(barrier) => { + let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; + assert_eq!(barrier.epoch, recv_barrier.epoch); + Message::Barrier(recv_barrier) + } + }; } } } @@ -324,35 +333,159 @@ async fn read_change_log( } } -struct UpstreamBuffer<'a> { - upstream: &'a mut BoxedMessageStream, +trait UpstreamBufferState { + fn is_finished(&self) -> bool; + async fn on_upstream_barrier( + &mut self, + upstream_barrier: DispatcherBarrier, + ) -> StreamExecutorResult<()>; +} + +struct StateOfConsumingSnapshot { + pending_barriers: Vec, +} + +impl UpstreamBufferState for StateOfConsumingSnapshot { + fn is_finished(&self) -> bool { + // never finish when consuming snapshot + false + } + + async fn on_upstream_barrier( + &mut self, + upstream_barrier: DispatcherBarrier, + ) -> StreamExecutorResult<()> { + self.pending_barriers.push(upstream_barrier); + Ok(()) + } +} + +struct StateOfConsumingLogStore<'a> { + barrier_rx: &'a mut mpsc::UnboundedReceiver, // newer barrier at the front - barrier: VecDeque, - consume_upstream_row_count: LabelGuardedIntCounter<3>, + barriers: VecDeque, is_finished: bool, + current_subscriber_id: u32, + upstream_table_id: TableId, +} + +impl<'a> StateOfConsumingLogStore<'a> { + async fn recv_barrier( + &mut self, + upstream_barrier: DispatcherBarrier, + ) -> StreamExecutorResult { + assert!(!self.is_finished); + let barrier = receive_next_barrier(self.barrier_rx).await?; + assert_eq!(upstream_barrier.epoch, barrier.epoch); + if is_finish_barrier(&barrier, self.current_subscriber_id, self.upstream_table_id) { + self.is_finished = true; + } + Ok(barrier) + } +} + +impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> { + fn is_finished(&self) -> bool { + self.is_finished + } + + async fn on_upstream_barrier( + &mut self, + upstream_barrier: DispatcherBarrier, + ) -> StreamExecutorResult<()> { + let barrier = self.recv_barrier(upstream_barrier).await?; + self.barriers.push_front(barrier); + Ok(()) + } +} + +mod erase_upstream_mutation { + use futures::TryStreamExt; + + use crate::executor::prelude::Stream; + use crate::executor::{BoxedMessageStream, DispatcherMessageStreamItem}; + + pub(super) fn erase_upstream_mutation(upstream: BoxedMessageStream) -> UpstreamStream { + upstream.map_ok(|msg| { + msg.map_mutation(|mutation| { + if let Some(mutation) = mutation { + // TODO: assert none mutation after we explicitly erase mutation + warn!( + ?mutation, + "receive non-empty mutation from upstream. ignored" + ); + }; + }) + }) + } + + pub(super) type UpstreamStream = impl Stream + Unpin; +} + +use erase_upstream_mutation::*; + +struct UpstreamBuffer<'a, S> { + upstream: &'a mut UpstreamStream, + state: S, + consume_upstream_row_count: LabelGuardedIntCounter<3>, upstream_table_id: TableId, current_subscriber_id: u32, } -impl<'a> UpstreamBuffer<'a> { +impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { fn new( - upstream: &'a mut BoxedMessageStream, + upstream: &'a mut UpstreamStream, upstream_table_id: TableId, current_subscriber_id: u32, consume_upstream_row_count: LabelGuardedIntCounter<3>, ) -> Self { Self { upstream, - barrier: Default::default(), + state: StateOfConsumingSnapshot { + pending_barriers: vec![], + }, consume_upstream_row_count, - is_finished: false, upstream_table_id, current_subscriber_id, } } + async fn start_consuming_log_store<'s>( + self, + barrier_rx: &'s mut UnboundedReceiver, + ) -> StreamExecutorResult>> { + let StateOfConsumingSnapshot { pending_barriers } = self.state; + let mut barriers = VecDeque::with_capacity(pending_barriers.len()); + for pending_barrier in pending_barriers { + let barrier = receive_next_barrier(barrier_rx).await?; + assert!(barrier.epoch.prev <= pending_barrier.epoch.prev); + assert!(!is_finish_barrier( + &barrier, + self.current_subscriber_id, + self.upstream_table_id + )); + barriers.push_front(barrier); + } + let ret = UpstreamBuffer { + upstream: self.upstream, + state: StateOfConsumingLogStore { + barrier_rx, + barriers, + is_finished: false, + current_subscriber_id: self.current_subscriber_id, + upstream_table_id: self.upstream_table_id, + }, + consume_upstream_row_count: self.consume_upstream_row_count, + upstream_table_id: self.upstream_table_id, + current_subscriber_id: self.current_subscriber_id, + }; + Ok(ret) + } +} + +impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError { - while !self.is_finished { + while !self.state.is_finished() { let result = self.consume_until_next_barrier().await; let barrier = match result { Ok(barrier) => barrier, @@ -360,65 +493,75 @@ impl<'a> UpstreamBuffer<'a> { return e; } }; - self.barrier.push_front(barrier); + if let Err(e) = self.state.on_upstream_barrier(barrier).await { + return e; + } } pending().await } - async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult { - assert!(!self.is_finished); + async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult { loop { - let msg: Message = self + let msg: DispatcherMessage = self .upstream .try_next() .await? .ok_or_else(|| anyhow!("end of upstream"))?; match msg { - Message::Chunk(chunk) => { + DispatcherMessage::Chunk(chunk) => { self.consume_upstream_row_count .inc_by(chunk.cardinality() as _); } - Message::Barrier(barrier) => { - self.is_finished = self.is_finish_barrier(&barrier); + DispatcherMessage::Barrier(barrier) => { break Ok(barrier); } - Message::Watermark(_) => {} + DispatcherMessage::Watermark(_) => {} } } } +} +impl<'a, 's> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { async fn take_buffered_barrier(&mut self) -> StreamExecutorResult> { - Ok(if let Some(barrier) = self.barrier.pop_back() { + Ok(if let Some(barrier) = self.state.barriers.pop_back() { Some(barrier) - } else if self.is_finished { + } else if self.state.is_finished { None } else { - Some(self.consume_until_next_barrier().await?) + let upstream_barrier = self.consume_until_next_barrier().await?; + let barrier = self.state.recv_barrier(upstream_barrier).await?; + Some(barrier) }) } +} - fn is_finish_barrier(&self, barrier: &Barrier) -> bool { - if let Some(Mutation::DropSubscriptions { - subscriptions_to_drop, - }) = barrier.mutation.as_deref() - { - let is_finished = subscriptions_to_drop - .iter() - .any(|(subscriber_id, _)| *subscriber_id == self.current_subscriber_id); - if is_finished { - assert!(subscriptions_to_drop.iter().any( - |(subscriber_id, subscribed_upstream_table_id)| { - *subscriber_id == self.current_subscriber_id - && self.upstream_table_id == *subscribed_upstream_table_id - } - )) - } - is_finished - } else { - false +fn is_finish_barrier( + barrier: &Barrier, + current_subscriber_id: u32, + upstream_table_id: TableId, +) -> bool { + if let Some(Mutation::DropSubscriptions { + subscriptions_to_drop, + }) = barrier.mutation.as_deref() + { + let is_finished = subscriptions_to_drop + .iter() + .any(|(subscriber_id, _)| *subscriber_id == current_subscriber_id); + if is_finished { + assert!(subscriptions_to_drop.iter().any( + |(subscriber_id, subscribed_upstream_table_id)| { + *subscriber_id == current_subscriber_id + && upstream_table_id == *subscribed_upstream_table_id + } + )) } + is_finished + } else { + false } +} +impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { /// Run a future while concurrently polling the upstream so that the upstream /// won't be back-pressured. async fn run_future>( From f09903ddcb0632fa68bf0bc94d27c4f03f5d5585 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 22 Aug 2024 23:16:01 +0800 Subject: [PATCH 2/4] include more --- src/stream/src/executor/mod.rs | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index ff32af10d119..42d0831b5920 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -164,13 +164,17 @@ pub use wrapper::WrapperExecutor; use self::barrier_align::AlignedMessageStream; -pub type MessageStreamItem = StreamExecutorResult; +pub type MessageStreamItemInner = StreamExecutorResult>; +pub type MessageStreamItem = MessageStreamItemInner; +pub type DispatcherMessageStreamItem = MessageStreamItemInner<()>; pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>; pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch}; use risingwave_pb::stream_plan::throttle_mutation::RateLimit; -pub trait MessageStream = futures::Stream + Send; +pub trait MessageStreamInner = Stream> + Send; +pub trait MessageStream = Stream + Send; +pub trait DispatcherMessageStream = Stream + Send; /// Static information of an executor. #[derive(Debug, Default, Clone)] @@ -864,6 +868,16 @@ impl BarrierInner { tracing_context: TracingContext::from_protobuf(&prost.tracing_context), }) } + + pub fn map_mutation(self, f: impl FnOnce(M) -> M2) -> BarrierInner { + BarrierInner { + epoch: self.epoch, + mutation: f(self.mutation), + kind: self.kind, + tracing_context: self.tracing_context, + passed_actors: self.passed_actors, + } + } } impl DispatcherBarrier { @@ -968,6 +982,16 @@ pub enum MessageInner { Watermark(Watermark), } +impl MessageInner { + pub fn map_mutation(self, f: impl FnOnce(M) -> M2) -> MessageInner { + match self { + MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk), + MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)), + MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark), + } + } +} + pub type Message = MessageInner; pub type DispatcherMessage = MessageInner<()>; @@ -1053,9 +1077,9 @@ pub type PkIndicesRef<'a> = &'a [usize]; pub type PkDataTypes = SmallVec<[DataType; 1]>; /// Expect the first message of the given `stream` as a barrier. -pub async fn expect_first_barrier( - stream: &mut (impl MessageStream + Unpin), -) -> StreamExecutorResult { +pub async fn expect_first_barrier( + stream: &mut (impl MessageStreamInner + Unpin), +) -> StreamExecutorResult> { let message = stream .next() .instrument_await("expect_first_barrier") From 8bd4c6e4dd6f97acd2ad78bfc04d382edd5ae258 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 23 Aug 2024 12:58:42 +0800 Subject: [PATCH 3/4] refine --- src/meta/src/barrier/command.rs | 5 +++++ src/meta/src/barrier/creating_job/mod.rs | 4 +--- src/meta/src/barrier/rpc.rs | 4 +--- src/stream/src/executor/backfill/snapshot_backfill.rs | 3 +-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index efbc03e215d2..23bea2fef2fb 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -880,6 +880,11 @@ impl Command { } impl CommandContext { + pub fn to_mutation(&self) -> Option { + self.command + .to_mutation(self.current_paused_reason.as_ref()) + } + /// Returns the paused reason after executing the current command. pub fn next_paused_reason(&self) -> Option { match &self.command { diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 77c947196359..0e7f2c12ba61 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -323,9 +323,7 @@ impl CreatingStreamingJobControl { Some(table_id), if start_consume_upstream { // erase the mutation on upstream except the last command - command_ctx - .command - .to_mutation(command_ctx.current_paused_reason.as_ref()) + command_ctx.to_mutation() } else { None }, diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index d588489d1750..1b4ab6207db9 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -257,9 +257,7 @@ impl ControlStreamManager { ) -> MetaResult> { self.inject_barrier( None, - command_ctx - .command - .to_mutation(command_ctx.current_paused_reason.as_ref()), + command_ctx.to_mutation(), (&command_ctx.curr_epoch, &command_ctx.prev_epoch), &command_ctx.kind, pre_applied_graph_info, diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 12b5bce4151c..acc3a970c801 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -101,8 +101,7 @@ impl SnapshotBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { debug!("snapshot backfill executor start"); - let upstream = erase_upstream_mutation(self.upstream.execute()); - pin_mut!(upstream); + let mut upstream = erase_upstream_mutation(self.upstream.execute()); let upstream_table_id = self.upstream_table.table_id(); let first_barrier = expect_first_barrier(&mut upstream).await?; debug!(epoch = ?first_barrier.epoch, "get first upstream barrier"); From a7b55c42ce28133cb69e5d2e1b2f7498e079b43d Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 4 Sep 2024 14:55:57 +0800 Subject: [PATCH 4/4] fix cancellation safety --- .../executor/backfill/snapshot_backfill.rs | 122 +++++++++--------- 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index acc3a970c801..ac625f53a02d 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -188,13 +188,12 @@ impl SnapshotBackfillExecutor { yield Message::Barrier(recv_barrier); } - let mut upstream_buffer = upstream_buffer - .start_consuming_log_store(&mut self.barrier_rx) - .await?; + let mut upstream_buffer = + upstream_buffer.start_consuming_log_store(&mut self.barrier_rx); let mut barrier_epoch = first_barrier_epoch; - let initial_pending_barrier = upstream_buffer.state.barriers.len(); + let initial_pending_barrier = upstream_buffer.state.barrier_count(); info!( ?barrier_epoch, table_id = self.upstream_table.table_id().table_id, @@ -333,11 +332,9 @@ async fn read_change_log( } trait UpstreamBufferState { - fn is_finished(&self) -> bool; - async fn on_upstream_barrier( - &mut self, - upstream_barrier: DispatcherBarrier, - ) -> StreamExecutorResult<()>; + // The future must be cancellation-safe + async fn is_finished(&mut self) -> StreamExecutorResult; + fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier); } struct StateOfConsumingSnapshot { @@ -345,23 +342,23 @@ struct StateOfConsumingSnapshot { } impl UpstreamBufferState for StateOfConsumingSnapshot { - fn is_finished(&self) -> bool { + async fn is_finished(&mut self) -> StreamExecutorResult { // never finish when consuming snapshot - false + Ok(false) } - async fn on_upstream_barrier( - &mut self, - upstream_barrier: DispatcherBarrier, - ) -> StreamExecutorResult<()> { - self.pending_barriers.push(upstream_barrier); - Ok(()) + fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { + self.pending_barriers.push(upstream_barrier) } } struct StateOfConsumingLogStore<'a> { barrier_rx: &'a mut mpsc::UnboundedReceiver, - // newer barrier at the front + /// Barriers received from upstream but not yet received the barrier from local barrier worker + /// newer barrier at the front + upstream_pending_barriers: VecDeque, + /// Barriers received from both upstream and local barrier worker + /// newer barrier at the front barriers: VecDeque, is_finished: bool, current_subscriber_id: u32, @@ -369,13 +366,20 @@ struct StateOfConsumingLogStore<'a> { } impl<'a> StateOfConsumingLogStore<'a> { - async fn recv_barrier( - &mut self, - upstream_barrier: DispatcherBarrier, - ) -> StreamExecutorResult { + fn barrier_count(&self) -> usize { + self.upstream_pending_barriers.len() + self.barriers.len() + } + + async fn handle_one_pending_barrier(&mut self) -> StreamExecutorResult { assert!(!self.is_finished); let barrier = receive_next_barrier(self.barrier_rx).await?; - assert_eq!(upstream_barrier.epoch, barrier.epoch); + assert_eq!( + self.upstream_pending_barriers + .pop_back() + .expect("non-empty") + .epoch, + barrier.epoch + ); if is_finish_barrier(&barrier, self.current_subscriber_id, self.upstream_table_id) { self.is_finished = true; } @@ -384,17 +388,19 @@ impl<'a> StateOfConsumingLogStore<'a> { } impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> { - fn is_finished(&self) -> bool { - self.is_finished + async fn is_finished(&mut self) -> StreamExecutorResult { + while !self.upstream_pending_barriers.is_empty() { + let barrier = self.handle_one_pending_barrier().await?; + self.barriers.push_front(barrier); + } + if self.is_finished { + assert!(self.upstream_pending_barriers.is_empty()); + } + Ok(self.is_finished) } - async fn on_upstream_barrier( - &mut self, - upstream_barrier: DispatcherBarrier, - ) -> StreamExecutorResult<()> { - let barrier = self.recv_barrier(upstream_barrier).await?; - self.barriers.push_front(barrier); - Ok(()) + fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { + self.upstream_pending_barriers.push_front(upstream_barrier); } } @@ -449,27 +455,21 @@ impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { } } - async fn start_consuming_log_store<'s>( + fn start_consuming_log_store<'s>( self, barrier_rx: &'s mut UnboundedReceiver, - ) -> StreamExecutorResult>> { + ) -> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { let StateOfConsumingSnapshot { pending_barriers } = self.state; - let mut barriers = VecDeque::with_capacity(pending_barriers.len()); + let mut upstream_pending_barriers = VecDeque::with_capacity(pending_barriers.len()); for pending_barrier in pending_barriers { - let barrier = receive_next_barrier(barrier_rx).await?; - assert!(barrier.epoch.prev <= pending_barrier.epoch.prev); - assert!(!is_finish_barrier( - &barrier, - self.current_subscriber_id, - self.upstream_table_id - )); - barriers.push_front(barrier); + upstream_pending_barriers.push_front(pending_barrier); } - let ret = UpstreamBuffer { + UpstreamBuffer { upstream: self.upstream, state: StateOfConsumingLogStore { barrier_rx, - barriers, + upstream_pending_barriers, + barriers: Default::default(), is_finished: false, current_subscriber_id: self.current_subscriber_id, upstream_table_id: self.upstream_table_id, @@ -477,29 +477,25 @@ impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { consume_upstream_row_count: self.consume_upstream_row_count, upstream_table_id: self.upstream_table_id, current_subscriber_id: self.current_subscriber_id, - }; - Ok(ret) + } } } impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError { - while !self.state.is_finished() { - let result = self.consume_until_next_barrier().await; - let barrier = match result { - Ok(barrier) => barrier, - Err(e) => { - return e; - } - }; - if let Err(e) = self.state.on_upstream_barrier(barrier).await { - return e; + if let Err(e) = try { + while !self.state.is_finished().await? { + self.consume_until_next_barrier().await?; } + } { + return e; } pending().await } - async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult { + /// Consume the upstream until seeing the next barrier. + /// `pending_barriers` must be non-empty after this method returns. + async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult<()> { loop { let msg: DispatcherMessage = self .upstream @@ -512,7 +508,8 @@ impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { .inc_by(chunk.cardinality() as _); } DispatcherMessage::Barrier(barrier) => { - break Ok(barrier); + self.state.on_upstream_barrier(barrier); + break Ok(()); } DispatcherMessage::Watermark(_) => {} } @@ -524,11 +521,14 @@ impl<'a, 's> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { async fn take_buffered_barrier(&mut self) -> StreamExecutorResult> { Ok(if let Some(barrier) = self.state.barriers.pop_back() { Some(barrier) + } else if !self.state.upstream_pending_barriers.is_empty() { + let barrier = self.state.handle_one_pending_barrier().await?; + Some(barrier) } else if self.state.is_finished { None } else { - let upstream_barrier = self.consume_until_next_barrier().await?; - let barrier = self.state.recv_barrier(upstream_barrier).await?; + self.consume_until_next_barrier().await?; + let barrier = self.state.handle_one_pending_barrier().await?; Some(barrier) }) }