From a7b55c42ce28133cb69e5d2e1b2f7498e079b43d Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 4 Sep 2024 14:55:57 +0800 Subject: [PATCH] fix cancellation safety --- .../executor/backfill/snapshot_backfill.rs | 122 +++++++++--------- 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index acc3a970c8019..ac625f53a02dd 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -188,13 +188,12 @@ impl SnapshotBackfillExecutor { yield Message::Barrier(recv_barrier); } - let mut upstream_buffer = upstream_buffer - .start_consuming_log_store(&mut self.barrier_rx) - .await?; + let mut upstream_buffer = + upstream_buffer.start_consuming_log_store(&mut self.barrier_rx); let mut barrier_epoch = first_barrier_epoch; - let initial_pending_barrier = upstream_buffer.state.barriers.len(); + let initial_pending_barrier = upstream_buffer.state.barrier_count(); info!( ?barrier_epoch, table_id = self.upstream_table.table_id().table_id, @@ -333,11 +332,9 @@ async fn read_change_log( } trait UpstreamBufferState { - fn is_finished(&self) -> bool; - async fn on_upstream_barrier( - &mut self, - upstream_barrier: DispatcherBarrier, - ) -> StreamExecutorResult<()>; + // The future must be cancellation-safe + async fn is_finished(&mut self) -> StreamExecutorResult; + fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier); } struct StateOfConsumingSnapshot { @@ -345,23 +342,23 @@ struct StateOfConsumingSnapshot { } impl UpstreamBufferState for StateOfConsumingSnapshot { - fn is_finished(&self) -> bool { + async fn is_finished(&mut self) -> StreamExecutorResult { // never finish when consuming snapshot - false + Ok(false) } - async fn on_upstream_barrier( - &mut self, - upstream_barrier: DispatcherBarrier, - ) -> StreamExecutorResult<()> { - self.pending_barriers.push(upstream_barrier); - Ok(()) + fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { + self.pending_barriers.push(upstream_barrier) } } struct StateOfConsumingLogStore<'a> { barrier_rx: &'a mut mpsc::UnboundedReceiver, - // newer barrier at the front + /// Barriers received from upstream but not yet received the barrier from local barrier worker + /// newer barrier at the front + upstream_pending_barriers: VecDeque, + /// Barriers received from both upstream and local barrier worker + /// newer barrier at the front barriers: VecDeque, is_finished: bool, current_subscriber_id: u32, @@ -369,13 +366,20 @@ struct StateOfConsumingLogStore<'a> { } impl<'a> StateOfConsumingLogStore<'a> { - async fn recv_barrier( - &mut self, - upstream_barrier: DispatcherBarrier, - ) -> StreamExecutorResult { + fn barrier_count(&self) -> usize { + self.upstream_pending_barriers.len() + self.barriers.len() + } + + async fn handle_one_pending_barrier(&mut self) -> StreamExecutorResult { assert!(!self.is_finished); let barrier = receive_next_barrier(self.barrier_rx).await?; - assert_eq!(upstream_barrier.epoch, barrier.epoch); + assert_eq!( + self.upstream_pending_barriers + .pop_back() + .expect("non-empty") + .epoch, + barrier.epoch + ); if is_finish_barrier(&barrier, self.current_subscriber_id, self.upstream_table_id) { self.is_finished = true; } @@ -384,17 +388,19 @@ impl<'a> StateOfConsumingLogStore<'a> { } impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> { - fn is_finished(&self) -> bool { - self.is_finished + async fn is_finished(&mut self) -> StreamExecutorResult { + while !self.upstream_pending_barriers.is_empty() { + let barrier = self.handle_one_pending_barrier().await?; + self.barriers.push_front(barrier); + } + if self.is_finished { + assert!(self.upstream_pending_barriers.is_empty()); + } + Ok(self.is_finished) } - async fn on_upstream_barrier( - &mut self, - upstream_barrier: DispatcherBarrier, - ) -> StreamExecutorResult<()> { - let barrier = self.recv_barrier(upstream_barrier).await?; - self.barriers.push_front(barrier); - Ok(()) + fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { + self.upstream_pending_barriers.push_front(upstream_barrier); } } @@ -449,27 +455,21 @@ impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { } } - async fn start_consuming_log_store<'s>( + fn start_consuming_log_store<'s>( self, barrier_rx: &'s mut UnboundedReceiver, - ) -> StreamExecutorResult>> { + ) -> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { let StateOfConsumingSnapshot { pending_barriers } = self.state; - let mut barriers = VecDeque::with_capacity(pending_barriers.len()); + let mut upstream_pending_barriers = VecDeque::with_capacity(pending_barriers.len()); for pending_barrier in pending_barriers { - let barrier = receive_next_barrier(barrier_rx).await?; - assert!(barrier.epoch.prev <= pending_barrier.epoch.prev); - assert!(!is_finish_barrier( - &barrier, - self.current_subscriber_id, - self.upstream_table_id - )); - barriers.push_front(barrier); + upstream_pending_barriers.push_front(pending_barrier); } - let ret = UpstreamBuffer { + UpstreamBuffer { upstream: self.upstream, state: StateOfConsumingLogStore { barrier_rx, - barriers, + upstream_pending_barriers, + barriers: Default::default(), is_finished: false, current_subscriber_id: self.current_subscriber_id, upstream_table_id: self.upstream_table_id, @@ -477,29 +477,25 @@ impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { consume_upstream_row_count: self.consume_upstream_row_count, upstream_table_id: self.upstream_table_id, current_subscriber_id: self.current_subscriber_id, - }; - Ok(ret) + } } } impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError { - while !self.state.is_finished() { - let result = self.consume_until_next_barrier().await; - let barrier = match result { - Ok(barrier) => barrier, - Err(e) => { - return e; - } - }; - if let Err(e) = self.state.on_upstream_barrier(barrier).await { - return e; + if let Err(e) = try { + while !self.state.is_finished().await? { + self.consume_until_next_barrier().await?; } + } { + return e; } pending().await } - async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult { + /// Consume the upstream until seeing the next barrier. + /// `pending_barriers` must be non-empty after this method returns. + async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult<()> { loop { let msg: DispatcherMessage = self .upstream @@ -512,7 +508,8 @@ impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { .inc_by(chunk.cardinality() as _); } DispatcherMessage::Barrier(barrier) => { - break Ok(barrier); + self.state.on_upstream_barrier(barrier); + break Ok(()); } DispatcherMessage::Watermark(_) => {} } @@ -524,11 +521,14 @@ impl<'a, 's> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { async fn take_buffered_barrier(&mut self) -> StreamExecutorResult> { Ok(if let Some(barrier) = self.state.barriers.pop_back() { Some(barrier) + } else if !self.state.upstream_pending_barriers.is_empty() { + let barrier = self.state.handle_one_pending_barrier().await?; + Some(barrier) } else if self.state.is_finished { None } else { - let upstream_barrier = self.consume_until_next_barrier().await?; - let barrier = self.state.recv_barrier(upstream_barrier).await?; + self.consume_until_next_barrier().await?; + let barrier = self.state.handle_one_pending_barrier().await?; Some(barrier) }) }