diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index de5dec126fda..572acd986314 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -626,6 +626,8 @@ message StreamScanNode { // The state table used by ArrangementBackfill to replicate upstream mview's state table. // Used iff `ChainType::ArrangementBackfill`. catalog.Table arrangement_table = 10; + + optional uint64 snapshot_backfill_epoch = 11; } // Config options for CDC backfill diff --git a/src/meta/src/barrier/checkpoint/control.rs b/src/meta/src/barrier/checkpoint/control.rs index 62375e0fac1a..1a56a3ec6cdb 100644 --- a/src/meta/src/barrier/checkpoint/control.rs +++ b/src/meta/src/barrier/checkpoint/control.rs @@ -43,6 +43,7 @@ use crate::barrier::{ }; use crate::manager::ActiveStreamingWorkerNodes; use crate::rpc::metrics::GLOBAL_META_METRICS; +use crate::stream::fill_snapshot_backfill_epoch; use crate::{MetaError, MetaResult}; #[derive(Default)] @@ -535,7 +536,9 @@ impl DatabaseCheckpointControl { creating_job .snapshot_backfill_info .upstream_mv_table_ids - .clone(), + .keys() + .cloned() + .collect(), ), ) }) @@ -816,7 +819,7 @@ impl DatabaseCheckpointControl { if let Some(Command::CreateStreamingJob { job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info), info, - }) = &command + }) = &mut command { if self.state.paused_reason().is_some() { warn!("cannot create streaming job with snapshot backfill when paused"); @@ -828,18 +831,42 @@ impl DatabaseCheckpointControl { } return Ok(()); } + // set snapshot epoch of upstream table for snapshot backfill + for snapshot_backfill_epoch in snapshot_backfill_info.upstream_mv_table_ids.values_mut() + { + assert!( + snapshot_backfill_epoch + .replace(barrier_info.prev_epoch()) + .is_none(), + "must not set previously" + ); + } + for stream_actor in info + .stream_job_fragments + .fragments + .values_mut() + .flat_map(|fragment| fragment.actors.iter_mut()) + { + fill_snapshot_backfill_epoch( + stream_actor.nodes.as_mut().expect("should exist"), + &snapshot_backfill_info.upstream_mv_table_ids, + )?; + } + let info = info.clone(); + let job_id = info.stream_job_fragments.stream_job_id(); + let snapshot_backfill_info = snapshot_backfill_info.clone(); let mutation = command .as_ref() .expect("checked Some") .to_mutation(None) .expect("should have some mutation in `CreateStreamingJob` command"); - let job_id = info.stream_job_fragments.stream_job_id(); + control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?; self.creating_streaming_job_controls.insert( job_id, CreatingStreamingJobControl::new( - info.clone(), - snapshot_backfill_info.clone(), + info, + snapshot_backfill_info, barrier_info.prev_epoch(), hummock_version_stats, mutation, diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 3ded5c438a67..bdc1dada1157 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -227,7 +227,10 @@ impl CreateStreamingJobCommandInfo { #[derive(Debug, Clone)] pub struct SnapshotBackfillInfo { - pub upstream_mv_table_ids: HashSet, + /// `table_id` -> `Some(snapshot_backfill_epoch)` + /// The `snapshot_backfill_epoch` should be None at the beginning, and be filled + /// by global barrier worker when handling the command. + pub upstream_mv_table_ids: HashMap>, } #[derive(Debug, Clone)] @@ -698,7 +701,7 @@ impl Command { { snapshot_backfill_info .upstream_mv_table_ids - .iter() + .keys() .map(|table_id| SubscriptionUpstreamInfo { subscriber_id: table_fragments.stream_job_id().table_id, upstream_mv_table_id: table_id.table_id, @@ -747,7 +750,7 @@ impl Command { info: jobs_to_merge .iter() .flat_map(|(table_id, (backfill_info, _))| { - backfill_info.upstream_mv_table_ids.iter().map( + backfill_info.upstream_mv_table_ids.keys().map( move |upstream_table_id| SubscriptionUpstreamInfo { subscriber_id: table_id.table_id, upstream_mv_table_id: upstream_table_id.table_id, diff --git a/src/meta/src/barrier/context/context_impl.rs b/src/meta/src/barrier/context/context_impl.rs index 8fa96ba128ec..d502a5f2d7d1 100644 --- a/src/meta/src/barrier/context/context_impl.rs +++ b/src/meta/src/barrier/context/context_impl.rs @@ -19,6 +19,7 @@ use risingwave_common::catalog::DatabaseId; use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::PausedReason; +use risingwave_pb::stream_plan::PbFragmentTypeFlag; use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest; use risingwave_pb::stream_service::WaitEpochCommitRequest; use risingwave_rpc_client::StreamingControlHandle; @@ -174,8 +175,57 @@ impl CommandContext { .unregister_table_ids(unregistered_state_table_ids.iter().cloned()) .await?; } - Command::CreateStreamingJob { info, job_type } => { + let mut fragment_replacements = None; + let mut dropped_actors = None; + match job_type { + CreateStreamingJobType::SinkIntoTable( + replace_plan @ ReplaceStreamJobPlan { + new_fragments, + dispatchers, + init_split_assignment, + .. + }, + ) => { + barrier_manager_context + .metadata_manager + .catalog_controller + .post_collect_job_fragments( + new_fragments.stream_job_id().table_id as _, + new_fragments.actor_ids(), + dispatchers.clone(), + init_split_assignment, + ) + .await?; + fragment_replacements = Some(replace_plan.fragment_replacements()); + dropped_actors = Some(replace_plan.dropped_actors()); + } + CreateStreamingJobType::Normal => {} + CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => { + barrier_manager_context + .metadata_manager + .catalog_controller + .fill_snapshot_backfill_epoch( + info.stream_job_fragments.fragments.iter().filter_map( + |(fragment_id, fragment)| { + if (fragment.fragment_type_mask + & PbFragmentTypeFlag::SnapshotBackfillStreamScan as u32) + != 0 + { + Some(*fragment_id as _) + } else { + None + } + }, + ), + &snapshot_backfill_info.upstream_mv_table_ids, + ) + .await? + } + } + + // Do `post_collect_job_fragments` of the original streaming job in the end, so that in any previous failure, + // we won't mark the job as `Creating`, and then the job will be later clean by the recovery triggered by the returned error. let CreateStreamingJobCommandInfo { stream_job_fragments, dispatchers, @@ -193,31 +243,6 @@ impl CommandContext { ) .await?; - let mut fragment_replacements = None; - let mut dropped_actors = None; - if let CreateStreamingJobType::SinkIntoTable( - replace_plan @ ReplaceStreamJobPlan { - new_fragments, - dispatchers, - init_split_assignment, - .. - }, - ) = job_type - { - barrier_manager_context - .metadata_manager - .catalog_controller - .post_collect_job_fragments( - new_fragments.stream_job_id().table_id as _, - new_fragments.actor_ids(), - dispatchers.clone(), - init_split_assignment, - ) - .await?; - fragment_replacements = Some(replace_plan.fragment_replacements()); - dropped_actors = Some(replace_plan.dropped_actors()); - } - // Extract the fragments that include source operators. let source_fragments = stream_job_fragments.stream_source_fragments(); let backfill_fragments = stream_job_fragments.source_backfill_fragments()?; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index a35de428130c..2938ffde051e 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1273,6 +1273,34 @@ impl CatalogController { Ok(()) } + pub async fn fill_snapshot_backfill_epoch( + &self, + fragment_ids: impl Iterator, + upstream_mv_snapshot_epoch: &HashMap>, + ) -> MetaResult<()> { + let inner = self.inner.write().await; + let txn = inner.db.begin().await?; + for fragment_id in fragment_ids { + let fragment = Fragment::find_by_id(fragment_id) + .one(&txn) + .await? + .context(format!("fragment {} not found", fragment_id))?; + let mut node = fragment.stream_node.to_protobuf(); + if crate::stream::fill_snapshot_backfill_epoch(&mut node, upstream_mv_snapshot_epoch)? { + let node = StreamNode::from(&node); + Fragment::update(fragment::ActiveModel { + fragment_id: Set(fragment_id), + stream_node: Set(node), + ..Default::default() + }) + .exec(&txn) + .await?; + } + } + txn.commit().await?; + Ok(()) + } + /// Get the actor ids of the fragment with `fragment_id` with `Running` status. pub async fn get_running_actors_of_fragment( &self, diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index 3268dcb4d16c..c1e7763e94b8 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -18,5 +18,7 @@ mod id; mod schedule; pub use actor::{ActorGraphBuildResult, ActorGraphBuilder}; -pub use fragment::{CompleteStreamFragmentGraph, StreamFragmentGraph}; +pub use fragment::{ + fill_snapshot_backfill_epoch, CompleteStreamFragmentGraph, StreamFragmentGraph, +}; pub use schedule::Locations; diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 21103fe1460c..ef53ac1645d8 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -27,7 +27,9 @@ use risingwave_common::catalog::{ use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor; -use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; +use risingwave_common::util::stream_graph_visitor::{ + visit_stream_node_cont, visit_stream_node_cont_mut, +}; use risingwave_meta_model::WorkerId; use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::TableJobType; @@ -38,7 +40,7 @@ use risingwave_pb::stream_plan::stream_fragment_graph::{ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ DispatchStrategy, DispatcherType, FragmentTypeFlag, StreamActor, - StreamFragmentGraph as StreamFragmentGraphProto, StreamScanNode, StreamScanType, + StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode, StreamScanType, }; use crate::barrier::SnapshotBackfillInfo; @@ -603,7 +605,7 @@ impl StreamFragmentGraph { (Some(prev_snapshot_backfill_info), true) => { prev_snapshot_backfill_info .upstream_mv_table_ids - .insert(TableId::new(stream_scan.table_id)); + .insert(TableId::new(stream_scan.table_id), None); true } (None, false) => true, @@ -617,8 +619,9 @@ impl StreamFragmentGraph { prev_stream_scan = Some(( if is_snapshot_backfill { Some(SnapshotBackfillInfo { - upstream_mv_table_ids: HashSet::from_iter([TableId::new( - stream_scan.table_id, + upstream_mv_table_ids: HashMap::from_iter([( + TableId::new(stream_scan.table_id), + None, )]), }) } else { @@ -642,6 +645,44 @@ impl StreamFragmentGraph { } } +/// Fill snapshot epoch for `StreamScanNode` of `SnapshotBackfill`. +/// Return `true` when has change applied. +pub fn fill_snapshot_backfill_epoch( + node: &mut StreamNode, + upstream_mv_table_snapshot_epoch: &HashMap>, +) -> MetaResult { + let mut result = Ok(()); + let mut applied = false; + visit_stream_node_cont_mut(node, |node| { + if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut() + && stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32 + { + result = try { + let table_id = TableId::new(stream_scan.table_id); + let snapshot_epoch = upstream_mv_table_snapshot_epoch + .get(&table_id) + .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))? + .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?; + if let Some(prev_snapshot_epoch) = + stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch) + { + Err(anyhow!( + "snapshot backfill epoch set again: {} {} {}", + table_id, + prev_snapshot_epoch, + snapshot_epoch + ))?; + } + applied = true; + }; + result.is_ok() + } else { + true + } + }); + result.map(|_| applied) +} + static EMPTY_HASHMAP: LazyLock> = LazyLock::new(HashMap::new); diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 7d33ac89d403..6851d81ce790 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -62,6 +62,8 @@ pub struct SnapshotBackfillExecutor { actor_ctx: ActorContextRef, metrics: Arc, + + snapshot_epoch: Option, } impl SnapshotBackfillExecutor { @@ -76,6 +78,7 @@ impl SnapshotBackfillExecutor { rate_limit: Option, barrier_rx: UnboundedReceiver, metrics: Arc, + snapshot_epoch: Option, ) -> Self { assert_eq!(&upstream.info.schema, upstream_table.schema()); if let Some(rate_limit) = rate_limit { @@ -94,17 +97,37 @@ impl SnapshotBackfillExecutor { barrier_rx, actor_ctx, metrics, + snapshot_epoch, } } #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { debug!("snapshot backfill executor start"); - let first_barrier = expect_first_barrier(&mut self.upstream).await?; - debug!(epoch = ?first_barrier.epoch, "get first upstream barrier"); + let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?; + debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier"); let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier"); - let should_backfill = first_barrier.epoch != first_recv_barrier.epoch; + let should_backfill = if let Some(snapshot_epoch) = self.snapshot_epoch { + if first_upstream_barrier.epoch != first_recv_barrier.epoch { + assert_eq!(snapshot_epoch, first_upstream_barrier.epoch.prev); + true + } else { + false + } + } else { + // when snapshot epoch is not set, the StreamNode must be created previously and has finished the backfill + if cfg!(debug_assertions) { + panic!( + "snapshot epoch not set. first_upstream_epoch: {:?}, first_recv_epoch: {:?}", + first_upstream_barrier.epoch, first_recv_barrier.epoch + ); + } else { + warn!(first_upstream_epoch = ?first_upstream_barrier.epoch, first_recv_epoch=?first_recv_barrier.epoch, "snapshot epoch not set"); + assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch); + false + } + }; let (mut barrier_epoch, mut need_report_finish) = { if should_backfill { @@ -119,7 +142,7 @@ impl SnapshotBackfillExecutor { let mut upstream_buffer = UpstreamBuffer::new(&mut self.upstream, consume_upstream_row_count); - let first_barrier_epoch = first_barrier.epoch; + let first_barrier_epoch = first_upstream_barrier.epoch; // Phase 1: consume upstream snapshot { @@ -156,7 +179,7 @@ impl SnapshotBackfillExecutor { } let recv_barrier = self.barrier_rx.recv().await.expect("should exist"); - assert_eq!(first_barrier.epoch, recv_barrier.epoch); + assert_eq!(first_upstream_barrier.epoch, recv_barrier.epoch); yield Message::Barrier(recv_barrier); } @@ -239,9 +262,9 @@ impl SnapshotBackfillExecutor { table_id = self.upstream_table.table_id().table_id, "skip backfill" ); - assert_eq!(first_barrier.epoch, first_recv_barrier.epoch); + assert_eq!(first_upstream_barrier.epoch, first_recv_barrier.epoch); yield Message::Barrier(first_recv_barrier); - (first_barrier.epoch, false) + (first_upstream_barrier.epoch, false) } }; let mut upstream = self.upstream.into_executor(self.barrier_rx).execute(); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index f1c5812aca08..6a4bbc605fe3 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -408,6 +408,7 @@ impl StreamActorManager { node.rate_limit.map(|x| x as _), barrier_rx, self.streaming_metrics.clone(), + node.snapshot_backfill_epoch, ) .boxed();