Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 5, 2024
1 parent 6869271 commit c94f85c
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 203 deletions.
69 changes: 18 additions & 51 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::executor::monitor::StreamingMetrics;
use crate::executor::prelude::{try_stream, StreamExt};
use crate::executor::{
expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream,
DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation,
DispatcherBarrier, DispatcherMessage, Execute, InputExecutor, Message, Mutation,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::CreateMviewProgress;
Expand All @@ -50,7 +50,7 @@ pub struct SnapshotBackfillExecutor<S: StateStore> {
upstream_table: StorageTable<S>,

/// Upstream with the same schema with the upstream table.
upstream: Executor,
upstream: InputExecutor,

/// The column indices need to be forwarded to the downstream from the upstream and table scan.
output_indices: Vec<usize>,
Expand All @@ -68,9 +68,9 @@ pub struct SnapshotBackfillExecutor<S: StateStore> {

impl<S: StateStore> SnapshotBackfillExecutor<S> {
#[expect(clippy::too_many_arguments)]
pub fn new(
pub(crate) fn new(
upstream_table: StorageTable<S>,
upstream: Executor,
upstream: InputExecutor,
output_indices: Vec<usize>,
actor_ctx: ActorContextRef,
progress: CreateMviewProgress,
Expand Down Expand Up @@ -101,15 +101,14 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
debug!("snapshot backfill executor start");
let mut upstream = erase_upstream_mutation(self.upstream.execute());
let upstream_table_id = self.upstream_table.table_id();
let first_barrier = expect_first_barrier(&mut upstream).await?;
let first_barrier = expect_first_barrier(&mut self.upstream).await?;
debug!(epoch = ?first_barrier.epoch, "get first upstream barrier");
let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier");
let should_backfill = first_barrier.epoch != first_recv_barrier.epoch;

{
let mut barrier_epoch = {
if should_backfill {
let subscriber_ids = first_recv_barrier
.added_subscriber_on_mv_table(upstream_table_id)
Expand Down Expand Up @@ -140,7 +139,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
.with_guarded_label_values(&[&table_id_str, &actor_id_str, "consume_upstream"]);

let mut upstream_buffer = UpstreamBuffer::new(
&mut upstream,
&mut self.upstream,
upstream_table_id,
snapshot_backfill_table_fragment_id,
consume_upstream_row_count,
Expand Down Expand Up @@ -250,6 +249,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
table_id = self.upstream_table.table_id().table_id,
"finish consuming log store"
);
barrier_epoch
} else {
info!(
table_id = self.upstream_table.table_id().table_id,
Expand All @@ -258,19 +258,17 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(first_barrier.epoch, first_recv_barrier.epoch);
yield Message::Barrier(first_recv_barrier);
first_barrier.epoch
}
}
};
let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
// Phase 3: consume upstream
while let Some(msg) = upstream.try_next().await? {
yield match msg {
DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk),
DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
DispatcherMessage::Barrier(barrier) => {
let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(barrier.epoch, recv_barrier.epoch);
Message::Barrier(recv_barrier)
}
};
if let Message::Barrier(barrier) = &msg {
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
barrier_epoch = barrier.epoch;
}
yield msg;
}
}
}
Expand Down Expand Up @@ -404,39 +402,8 @@ impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> {
}
}

mod erase_upstream_mutation {
use futures::TryStreamExt;

use crate::executor::prelude::Stream;
use crate::executor::{BoxedMessageStream, DispatcherMessageStreamItem};

pub(super) fn erase_upstream_mutation(upstream: BoxedMessageStream) -> UpstreamStream {
upstream.map_ok(|msg| {
msg.map_mutation(|mutation| {
if let Some(mutation) = mutation {
if cfg!(debug_assertions) {
unreachable!(
"input of snapshot backfill should have erased mutation, but get {:?}",
mutation
);
} else {
warn!(
?mutation,
"receive non-empty mutation from upstream. ignored"
);
}
};
})
})
}

pub(super) type UpstreamStream = impl Stream<Item = DispatcherMessageStreamItem> + Unpin;
}

use erase_upstream_mutation::*;

struct UpstreamBuffer<'a, S> {
upstream: &'a mut UpstreamStream,
upstream: &'a mut InputExecutor,
state: S,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
upstream_table_id: TableId,
Expand All @@ -445,7 +412,7 @@ struct UpstreamBuffer<'a, S> {

impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> {
fn new(
upstream: &'a mut UpstreamStream,
upstream: &'a mut InputExecutor,
upstream_table_id: TableId,
current_subscriber_id: u32,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
Expand Down
Loading

0 comments on commit c94f85c

Please sign in to comment.