Skip to content

Commit

Permalink
fix cancellation safety
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 4, 2024
1 parent 63b1054 commit a7b55c4
Showing 1 changed file with 61 additions and 61 deletions.
122 changes: 61 additions & 61 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
@@ -188,13 +188,12 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
yield Message::Barrier(recv_barrier);
}

let mut upstream_buffer = upstream_buffer
.start_consuming_log_store(&mut self.barrier_rx)
.await?;
let mut upstream_buffer =
upstream_buffer.start_consuming_log_store(&mut self.barrier_rx);

let mut barrier_epoch = first_barrier_epoch;

let initial_pending_barrier = upstream_buffer.state.barriers.len();
let initial_pending_barrier = upstream_buffer.state.barrier_count();
info!(
?barrier_epoch,
table_id = self.upstream_table.table_id().table_id,
@@ -333,49 +332,54 @@ async fn read_change_log(
}

trait UpstreamBufferState {
fn is_finished(&self) -> bool;
async fn on_upstream_barrier(
&mut self,
upstream_barrier: DispatcherBarrier,
) -> StreamExecutorResult<()>;
// The future must be cancellation-safe
async fn is_finished(&mut self) -> StreamExecutorResult<bool>;
fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier);
}

struct StateOfConsumingSnapshot {
pending_barriers: Vec<DispatcherBarrier>,
}

impl UpstreamBufferState for StateOfConsumingSnapshot {
fn is_finished(&self) -> bool {
async fn is_finished(&mut self) -> StreamExecutorResult<bool> {
// never finish when consuming snapshot
false
Ok(false)
}

async fn on_upstream_barrier(
&mut self,
upstream_barrier: DispatcherBarrier,
) -> StreamExecutorResult<()> {
self.pending_barriers.push(upstream_barrier);
Ok(())
fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) {
self.pending_barriers.push(upstream_barrier)
}
}

struct StateOfConsumingLogStore<'a> {
barrier_rx: &'a mut mpsc::UnboundedReceiver<Barrier>,
// newer barrier at the front
/// Barriers received from upstream but not yet received the barrier from local barrier worker
/// newer barrier at the front
upstream_pending_barriers: VecDeque<DispatcherBarrier>,
/// Barriers received from both upstream and local barrier worker
/// newer barrier at the front
barriers: VecDeque<Barrier>,
is_finished: bool,
current_subscriber_id: u32,
upstream_table_id: TableId,
}

impl<'a> StateOfConsumingLogStore<'a> {
async fn recv_barrier(
&mut self,
upstream_barrier: DispatcherBarrier,
) -> StreamExecutorResult<Barrier> {
fn barrier_count(&self) -> usize {
self.upstream_pending_barriers.len() + self.barriers.len()
}

async fn handle_one_pending_barrier(&mut self) -> StreamExecutorResult<Barrier> {
assert!(!self.is_finished);
let barrier = receive_next_barrier(self.barrier_rx).await?;
assert_eq!(upstream_barrier.epoch, barrier.epoch);
assert_eq!(
self.upstream_pending_barriers
.pop_back()
.expect("non-empty")
.epoch,
barrier.epoch
);
if is_finish_barrier(&barrier, self.current_subscriber_id, self.upstream_table_id) {
self.is_finished = true;
}
@@ -384,17 +388,19 @@ impl<'a> StateOfConsumingLogStore<'a> {
}

impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> {
fn is_finished(&self) -> bool {
self.is_finished
async fn is_finished(&mut self) -> StreamExecutorResult<bool> {
while !self.upstream_pending_barriers.is_empty() {
let barrier = self.handle_one_pending_barrier().await?;
self.barriers.push_front(barrier);
}
if self.is_finished {
assert!(self.upstream_pending_barriers.is_empty());
}
Ok(self.is_finished)
}

async fn on_upstream_barrier(
&mut self,
upstream_barrier: DispatcherBarrier,
) -> StreamExecutorResult<()> {
let barrier = self.recv_barrier(upstream_barrier).await?;
self.barriers.push_front(barrier);
Ok(())
fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) {
self.upstream_pending_barriers.push_front(upstream_barrier);
}
}

@@ -449,57 +455,47 @@ impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> {
}
}

async fn start_consuming_log_store<'s>(
fn start_consuming_log_store<'s>(
self,
barrier_rx: &'s mut UnboundedReceiver<Barrier>,
) -> StreamExecutorResult<UpstreamBuffer<'a, StateOfConsumingLogStore<'s>>> {
) -> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> {
let StateOfConsumingSnapshot { pending_barriers } = self.state;
let mut barriers = VecDeque::with_capacity(pending_barriers.len());
let mut upstream_pending_barriers = VecDeque::with_capacity(pending_barriers.len());
for pending_barrier in pending_barriers {
let barrier = receive_next_barrier(barrier_rx).await?;
assert!(barrier.epoch.prev <= pending_barrier.epoch.prev);
assert!(!is_finish_barrier(
&barrier,
self.current_subscriber_id,
self.upstream_table_id
));
barriers.push_front(barrier);
upstream_pending_barriers.push_front(pending_barrier);
}
let ret = UpstreamBuffer {
UpstreamBuffer {
upstream: self.upstream,
state: StateOfConsumingLogStore {
barrier_rx,
barriers,
upstream_pending_barriers,
barriers: Default::default(),
is_finished: false,
current_subscriber_id: self.current_subscriber_id,
upstream_table_id: self.upstream_table_id,
},
consume_upstream_row_count: self.consume_upstream_row_count,
upstream_table_id: self.upstream_table_id,
current_subscriber_id: self.current_subscriber_id,
};
Ok(ret)
}
}
}

impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> {
async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
while !self.state.is_finished() {
let result = self.consume_until_next_barrier().await;
let barrier = match result {
Ok(barrier) => barrier,
Err(e) => {
return e;
}
};
if let Err(e) = self.state.on_upstream_barrier(barrier).await {
return e;
if let Err(e) = try {
while !self.state.is_finished().await? {
self.consume_until_next_barrier().await?;
}
} {
return e;
}
pending().await
}

async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult<DispatcherBarrier> {
/// Consume the upstream until seeing the next barrier.
/// `pending_barriers` must be non-empty after this method returns.
async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult<()> {
loop {
let msg: DispatcherMessage = self
.upstream
@@ -512,7 +508,8 @@ impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> {
.inc_by(chunk.cardinality() as _);
}
DispatcherMessage::Barrier(barrier) => {
break Ok(barrier);
self.state.on_upstream_barrier(barrier);
break Ok(());
}
DispatcherMessage::Watermark(_) => {}
}
@@ -524,11 +521,14 @@ impl<'a, 's> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> {
async fn take_buffered_barrier(&mut self) -> StreamExecutorResult<Option<Barrier>> {
Ok(if let Some(barrier) = self.state.barriers.pop_back() {
Some(barrier)
} else if !self.state.upstream_pending_barriers.is_empty() {
let barrier = self.state.handle_one_pending_barrier().await?;
Some(barrier)
} else if self.state.is_finished {
None
} else {
let upstream_barrier = self.consume_until_next_barrier().await?;
let barrier = self.state.recv_barrier(upstream_barrier).await?;
self.consume_until_next_barrier().await?;
let barrier = self.state.handle_one_pending_barrier().await?;
Some(barrier)
})
}

0 comments on commit a7b55c4

Please sign in to comment.