diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index cf6f251b359c2..6e4ebe40b93b0 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -497,16 +497,16 @@ impl CommandContext { } } -impl CommandContext { +impl Command { /// Generate a mutation for the given command. - pub fn to_mutation(&self) -> Option { + pub fn to_mutation(&self, current_paused_reason: Option<&PausedReason>) -> Option { let mutation = - match &self.command { + match self { Command::Plain(mutation) => mutation.clone(), Command::Pause(_) => { // Only pause when the cluster is not already paused. - if self.current_paused_reason.is_none() { + if current_paused_reason.is_none() { Some(Mutation::Pause(PauseMutation {})) } else { None @@ -515,7 +515,7 @@ impl CommandContext { Command::Resume(reason) => { // Only resume when the cluster is paused with the same reason. - if self.current_paused_reason == Some(*reason) { + if current_paused_reason == Some(reason) { Some(Mutation::Resume(ResumeMutation {})) } else { None @@ -607,7 +607,7 @@ impl CommandContext { added_actors, actor_splits, // If the cluster is already paused, the new actors should be paused too. - pause: self.current_paused_reason.is_some(), + pause: current_paused_reason.is_some(), subscriptions_to_add, })); @@ -846,7 +846,7 @@ impl CommandContext { } pub fn actors_to_create(&self) -> Option>> { - match &self.command { + match self { Command::CreateStreamingJob { info, job_type } => { let mut map = match job_type { CreateStreamingJobType::Normal => HashMap::new(), @@ -914,6 +914,13 @@ impl CommandContext { ..Default::default() })) } +} + +impl CommandContext { + pub fn to_mutation(&self) -> Option { + self.command + .to_mutation(self.current_paused_reason.as_ref()) + } /// Returns the paused reason after executing the current command. pub fn next_paused_reason(&self) -> Option { diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index c5a52437e2b7d..9e4e52b0e36b8 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -28,6 +28,7 @@ use risingwave_common::util::epoch::Epoch; use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; +use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_service::{BarrierCompleteResponse, BuildActorInfo}; use tracing::{debug, info}; @@ -67,6 +68,7 @@ impl CreatingStreamingJobControl { backfill_epoch: u64, version_stat: &HummockVersionStats, metrics: &MetaMetrics, + initial_mutation: Mutation, ) -> Self { info!( table_id = info.table_fragments.table_id().table_id, @@ -108,7 +110,7 @@ impl CreatingStreamingJobControl { backfill_epoch, pending_non_checkpoint_barriers: vec![], snapshot_backfill_actors, - actors_to_create: Some( + initial_barrier_info: Some(( actors_to_create .into_iter() .map(|(worker_id, actors)| { @@ -124,7 +126,8 @@ impl CreatingStreamingJobControl { ) }) .collect(), - ), + initial_mutation, + )), }, upstream_lag: metrics .snapshot_backfill_lag @@ -283,11 +286,12 @@ impl CreatingStreamingJobControl { prev_epoch, kind, new_actors, + mutation, } in barriers_to_inject { let node_to_collect = control_stream_manager.inject_barrier( Some(table_id), - None, + mutation, (&curr_epoch, &prev_epoch), &kind, graph_info, diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index 0569752b1056b..f5d4c37d247a6 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use risingwave_common::util::epoch::Epoch; use risingwave_pb::hummock::HummockVersionStats; +use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_pb::stream_service::BuildActorInfo; @@ -40,7 +41,9 @@ pub(super) enum CreatingStreamingJobStatus { /// The `prev_epoch` of pending non checkpoint barriers pending_non_checkpoint_barriers: Vec, snapshot_backfill_actors: HashMap>, - actors_to_create: Option>>, + /// Info of the first barrier: (`actors_to_create`, `mutation`) + /// Take the mutation out when injecting the first barrier + initial_barrier_info: Option<(HashMap>, Mutation)>, }, ConsumingLogStore { graph_info: InflightGraphInfo, @@ -60,6 +63,7 @@ pub(super) struct CreatingJobInjectBarrierInfo { pub prev_epoch: TracedEpoch, pub kind: BarrierKind, pub new_actors: Option>>, + pub mutation: Option, } impl CreatingStreamingJobStatus { @@ -104,12 +108,12 @@ impl CreatingStreamingJobStatus { graph_info, pending_non_checkpoint_barriers, ref backfill_epoch, - actors_to_create, + initial_barrier_info, .. } = self { if create_mview_tracker.has_pending_finished_jobs() { - assert!(actors_to_create.is_none()); + assert!(initial_barrier_info.is_none()); pending_non_checkpoint_barriers.push(*backfill_epoch); let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); @@ -119,6 +123,7 @@ impl CreatingStreamingJobStatus { prev_epoch: TracedEpoch::new(prev_epoch), kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), new_actors: None, + mutation: None, }] .into_iter() .chain(pending_commands.drain(..).map(|command_ctx| { @@ -127,6 +132,7 @@ impl CreatingStreamingJobStatus { prev_epoch: command_ctx.prev_epoch.clone(), kind: command_ctx.kind.clone(), new_actors: None, + mutation: None, } })) .collect(); @@ -145,12 +151,19 @@ impl CreatingStreamingJobStatus { } else { BarrierKind::Barrier }; + let (new_actors, mutation) = + if let Some((new_actors, mutation)) = initial_barrier_info.take() { + (Some(new_actors), Some(mutation)) + } else { + Default::default() + }; Some(( vec![CreatingJobInjectBarrierInfo { curr_epoch, prev_epoch, kind, - new_actors: actors_to_create.take(), + new_actors, + mutation, }], None, )) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 5fc9dc5112a65..daa82306bff6d 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -965,6 +965,19 @@ impl GlobalBarrierManager { info, } = &command { + if self.state.paused_reason().is_some() { + warn!("cannot create streaming job with snapshot backfill when paused"); + for notifier in notifiers { + notifier.notify_start_failed( + anyhow!("cannot create streaming job with snapshot backfill when paused",) + .into(), + ); + } + return Ok(()); + } + let mutation = command + .to_mutation(None) + .expect("should have some mutation in `CreateStreamingJob` command"); self.checkpoint_control .creating_streaming_job_controls .insert( @@ -975,6 +988,7 @@ impl GlobalBarrierManager { prev_epoch.value().0, &self.checkpoint_control.hummock_version_stats, &self.context.metrics, + mutation, ), ); } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 7ad468b04aa4c..14ee8b0c15f7b 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -263,39 +263,42 @@ impl ControlStreamManager { pre_applied_graph_info, applied_graph_info, actor_ids_to_pre_sync_mutation, - command_ctx.actors_to_create().map(|actors_to_create| { - actors_to_create - .into_iter() - .map(|(worker_id, actors)| { - ( - worker_id, - actors - .into_iter() - .map(|actor| BuildActorInfo { - actor: Some(actor), - // TODO: consider subscriber of backfilling mv - related_subscriptions: command_ctx - .subscription_info - .mv_depended_subscriptions - .iter() - .map(|(table_id, subscriptions)| { - ( - table_id.table_id, - SubscriptionIds { - subscription_ids: subscriptions - .keys() - .cloned() - .collect(), - }, - ) - }) - .collect(), - }) - .collect_vec(), - ) - }) - .collect() - }), + command_ctx + .command + .actors_to_create() + .map(|actors_to_create| { + actors_to_create + .into_iter() + .map(|(worker_id, actors)| { + ( + worker_id, + actors + .into_iter() + .map(|actor| BuildActorInfo { + actor: Some(actor), + // TODO: consider subscriber of backfilling mv + related_subscriptions: command_ctx + .subscription_info + .mv_depended_subscriptions + .iter() + .map(|(table_id, subscriptions)| { + ( + table_id.table_id, + SubscriptionIds { + subscription_ids: subscriptions + .keys() + .cloned() + .collect(), + }, + ) + }) + .collect(), + }) + .collect_vec(), + ) + }) + .collect() + }), ) } diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 35adc33b81c4f..ac625f53a02dd 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -32,14 +32,16 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::ChangeLogRow; use risingwave_storage::StateStore; use tokio::select; +use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::backfill::utils::{create_builder, mapping_chunk}; use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::{try_stream, StreamExt}; use crate::executor::{ - expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, Execute, - Executor, Message, Mutation, StreamExecutorError, StreamExecutorResult, + expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, + DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation, + StreamExecutorError, StreamExecutorResult, }; use crate::task::CreateMviewProgress; @@ -99,7 +101,7 @@ impl SnapshotBackfillExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { debug!("snapshot backfill executor start"); - let mut upstream = self.upstream.execute(); + let mut upstream = erase_upstream_mutation(self.upstream.execute()); let upstream_table_id = self.upstream_table.table_id(); let first_barrier = expect_first_barrier(&mut upstream).await?; debug!(epoch = ?first_barrier.epoch, "get first upstream barrier"); @@ -109,7 +111,7 @@ impl SnapshotBackfillExecutor { { if should_backfill { - let subscriber_ids = first_barrier + let subscriber_ids = first_recv_barrier .added_subscriber_on_mv_table(upstream_table_id) .collect_vec(); let snapshot_backfill_table_fragment_id = match subscriber_ids.as_slice() { @@ -183,12 +185,15 @@ impl SnapshotBackfillExecutor { let recv_barrier = self.barrier_rx.recv().await.expect("should exist"); assert_eq!(first_barrier.epoch, recv_barrier.epoch); - yield Message::Barrier(first_barrier); + yield Message::Barrier(recv_barrier); } + let mut upstream_buffer = + upstream_buffer.start_consuming_log_store(&mut self.barrier_rx); + let mut barrier_epoch = first_barrier_epoch; - let initial_pending_barrier = upstream_buffer.barrier.len(); + let initial_pending_barrier = upstream_buffer.state.barrier_count(); info!( ?barrier_epoch, table_id = self.upstream_table.table_id().table_id, @@ -207,8 +212,6 @@ impl SnapshotBackfillExecutor { // Phase 2: consume upstream log store while let Some(barrier) = upstream_buffer.take_buffered_barrier().await? { - let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; - assert_eq!(barrier.epoch, recv_barrier.epoch); assert_eq!(barrier_epoch.curr, barrier.epoch.prev); barrier_epoch = barrier.epoch; @@ -254,16 +257,20 @@ impl SnapshotBackfillExecutor { ); let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; assert_eq!(first_barrier.epoch, first_recv_barrier.epoch); - yield Message::Barrier(first_barrier); + yield Message::Barrier(first_recv_barrier); } } // Phase 3: consume upstream while let Some(msg) = upstream.try_next().await? { - if let Message::Barrier(barrier) = &msg { - let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; - assert_eq!(barrier.epoch, recv_barrier.epoch); - } - yield msg; + yield match msg { + DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk), + DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark), + DispatcherMessage::Barrier(barrier) => { + let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; + assert_eq!(barrier.epoch, recv_barrier.epoch); + Message::Barrier(recv_barrier) + } + }; } } } @@ -324,101 +331,236 @@ async fn read_change_log( } } -struct UpstreamBuffer<'a> { - upstream: &'a mut BoxedMessageStream, - // newer barrier at the front - barrier: VecDeque, - consume_upstream_row_count: LabelGuardedIntCounter<3>, +trait UpstreamBufferState { + // The future must be cancellation-safe + async fn is_finished(&mut self) -> StreamExecutorResult; + fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier); +} + +struct StateOfConsumingSnapshot { + pending_barriers: Vec, +} + +impl UpstreamBufferState for StateOfConsumingSnapshot { + async fn is_finished(&mut self) -> StreamExecutorResult { + // never finish when consuming snapshot + Ok(false) + } + + fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { + self.pending_barriers.push(upstream_barrier) + } +} + +struct StateOfConsumingLogStore<'a> { + barrier_rx: &'a mut mpsc::UnboundedReceiver, + /// Barriers received from upstream but not yet received the barrier from local barrier worker + /// newer barrier at the front + upstream_pending_barriers: VecDeque, + /// Barriers received from both upstream and local barrier worker + /// newer barrier at the front + barriers: VecDeque, is_finished: bool, + current_subscriber_id: u32, + upstream_table_id: TableId, +} + +impl<'a> StateOfConsumingLogStore<'a> { + fn barrier_count(&self) -> usize { + self.upstream_pending_barriers.len() + self.barriers.len() + } + + async fn handle_one_pending_barrier(&mut self) -> StreamExecutorResult { + assert!(!self.is_finished); + let barrier = receive_next_barrier(self.barrier_rx).await?; + assert_eq!( + self.upstream_pending_barriers + .pop_back() + .expect("non-empty") + .epoch, + barrier.epoch + ); + if is_finish_barrier(&barrier, self.current_subscriber_id, self.upstream_table_id) { + self.is_finished = true; + } + Ok(barrier) + } +} + +impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> { + async fn is_finished(&mut self) -> StreamExecutorResult { + while !self.upstream_pending_barriers.is_empty() { + let barrier = self.handle_one_pending_barrier().await?; + self.barriers.push_front(barrier); + } + if self.is_finished { + assert!(self.upstream_pending_barriers.is_empty()); + } + Ok(self.is_finished) + } + + fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) { + self.upstream_pending_barriers.push_front(upstream_barrier); + } +} + +mod erase_upstream_mutation { + use futures::TryStreamExt; + + use crate::executor::prelude::Stream; + use crate::executor::{BoxedMessageStream, DispatcherMessageStreamItem}; + + pub(super) fn erase_upstream_mutation(upstream: BoxedMessageStream) -> UpstreamStream { + upstream.map_ok(|msg| { + msg.map_mutation(|mutation| { + if let Some(mutation) = mutation { + // TODO: assert none mutation after we explicitly erase mutation + warn!( + ?mutation, + "receive non-empty mutation from upstream. ignored" + ); + }; + }) + }) + } + + pub(super) type UpstreamStream = impl Stream + Unpin; +} + +use erase_upstream_mutation::*; + +struct UpstreamBuffer<'a, S> { + upstream: &'a mut UpstreamStream, + state: S, + consume_upstream_row_count: LabelGuardedIntCounter<3>, upstream_table_id: TableId, current_subscriber_id: u32, } -impl<'a> UpstreamBuffer<'a> { +impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { fn new( - upstream: &'a mut BoxedMessageStream, + upstream: &'a mut UpstreamStream, upstream_table_id: TableId, current_subscriber_id: u32, consume_upstream_row_count: LabelGuardedIntCounter<3>, ) -> Self { Self { upstream, - barrier: Default::default(), + state: StateOfConsumingSnapshot { + pending_barriers: vec![], + }, consume_upstream_row_count, - is_finished: false, upstream_table_id, current_subscriber_id, } } + fn start_consuming_log_store<'s>( + self, + barrier_rx: &'s mut UnboundedReceiver, + ) -> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { + let StateOfConsumingSnapshot { pending_barriers } = self.state; + let mut upstream_pending_barriers = VecDeque::with_capacity(pending_barriers.len()); + for pending_barrier in pending_barriers { + upstream_pending_barriers.push_front(pending_barrier); + } + UpstreamBuffer { + upstream: self.upstream, + state: StateOfConsumingLogStore { + barrier_rx, + upstream_pending_barriers, + barriers: Default::default(), + is_finished: false, + current_subscriber_id: self.current_subscriber_id, + upstream_table_id: self.upstream_table_id, + }, + consume_upstream_row_count: self.consume_upstream_row_count, + upstream_table_id: self.upstream_table_id, + current_subscriber_id: self.current_subscriber_id, + } + } +} + +impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError { - while !self.is_finished { - let result = self.consume_until_next_barrier().await; - let barrier = match result { - Ok(barrier) => barrier, - Err(e) => { - return e; - } - }; - self.barrier.push_front(barrier); + if let Err(e) = try { + while !self.state.is_finished().await? { + self.consume_until_next_barrier().await?; + } + } { + return e; } pending().await } - async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult { - assert!(!self.is_finished); + /// Consume the upstream until seeing the next barrier. + /// `pending_barriers` must be non-empty after this method returns. + async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult<()> { loop { - let msg: Message = self + let msg: DispatcherMessage = self .upstream .try_next() .await? .ok_or_else(|| anyhow!("end of upstream"))?; match msg { - Message::Chunk(chunk) => { + DispatcherMessage::Chunk(chunk) => { self.consume_upstream_row_count .inc_by(chunk.cardinality() as _); } - Message::Barrier(barrier) => { - self.is_finished = self.is_finish_barrier(&barrier); - break Ok(barrier); + DispatcherMessage::Barrier(barrier) => { + self.state.on_upstream_barrier(barrier); + break Ok(()); } - Message::Watermark(_) => {} + DispatcherMessage::Watermark(_) => {} } } } +} +impl<'a, 's> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> { async fn take_buffered_barrier(&mut self) -> StreamExecutorResult> { - Ok(if let Some(barrier) = self.barrier.pop_back() { + Ok(if let Some(barrier) = self.state.barriers.pop_back() { Some(barrier) - } else if self.is_finished { + } else if !self.state.upstream_pending_barriers.is_empty() { + let barrier = self.state.handle_one_pending_barrier().await?; + Some(barrier) + } else if self.state.is_finished { None } else { - Some(self.consume_until_next_barrier().await?) + self.consume_until_next_barrier().await?; + let barrier = self.state.handle_one_pending_barrier().await?; + Some(barrier) }) } +} - fn is_finish_barrier(&self, barrier: &Barrier) -> bool { - if let Some(Mutation::DropSubscriptions { - subscriptions_to_drop, - }) = barrier.mutation.as_deref() - { - let is_finished = subscriptions_to_drop - .iter() - .any(|(subscriber_id, _)| *subscriber_id == self.current_subscriber_id); - if is_finished { - assert!(subscriptions_to_drop.iter().any( - |(subscriber_id, subscribed_upstream_table_id)| { - *subscriber_id == self.current_subscriber_id - && self.upstream_table_id == *subscribed_upstream_table_id - } - )) - } - is_finished - } else { - false +fn is_finish_barrier( + barrier: &Barrier, + current_subscriber_id: u32, + upstream_table_id: TableId, +) -> bool { + if let Some(Mutation::DropSubscriptions { + subscriptions_to_drop, + }) = barrier.mutation.as_deref() + { + let is_finished = subscriptions_to_drop + .iter() + .any(|(subscriber_id, _)| *subscriber_id == current_subscriber_id); + if is_finished { + assert!(subscriptions_to_drop.iter().any( + |(subscriber_id, subscribed_upstream_table_id)| { + *subscriber_id == current_subscriber_id + && upstream_table_id == *subscribed_upstream_table_id + } + )) } + is_finished + } else { + false } +} +impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> { /// Run a future while concurrently polling the upstream so that the upstream /// won't be back-pressured. async fn run_future>( diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 7b22a48a25ab6..8b9f7b3f2242b 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -164,13 +164,17 @@ pub use wrapper::WrapperExecutor; use self::barrier_align::AlignedMessageStream; -pub type MessageStreamItem = StreamExecutorResult; +pub type MessageStreamItemInner = StreamExecutorResult>; +pub type MessageStreamItem = MessageStreamItemInner; +pub type DispatcherMessageStreamItem = MessageStreamItemInner<()>; pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>; pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch}; use risingwave_pb::stream_plan::throttle_mutation::RateLimit; -pub trait MessageStream = futures::Stream + Send; +pub trait MessageStreamInner = Stream> + Send; +pub trait MessageStream = Stream + Send; +pub trait DispatcherMessageStream = Stream + Send; /// Static information of an executor. #[derive(Debug, Default, Clone)] @@ -913,6 +917,16 @@ impl BarrierInner { tracing_context: TracingContext::from_protobuf(&prost.tracing_context), }) } + + pub fn map_mutation(self, f: impl FnOnce(M) -> M2) -> BarrierInner { + BarrierInner { + epoch: self.epoch, + mutation: f(self.mutation), + kind: self.kind, + tracing_context: self.tracing_context, + passed_actors: self.passed_actors, + } + } } impl DispatcherBarrier { @@ -1017,6 +1031,16 @@ pub enum MessageInner { Watermark(Watermark), } +impl MessageInner { + pub fn map_mutation(self, f: impl FnOnce(M) -> M2) -> MessageInner { + match self { + MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk), + MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)), + MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark), + } + } +} + pub type Message = MessageInner; pub type DispatcherMessage = MessageInner<()>; @@ -1102,9 +1126,9 @@ pub type PkIndicesRef<'a> = &'a [usize]; pub type PkDataTypes = SmallVec<[DataType; 1]>; /// Expect the first message of the given `stream` as a barrier. -pub async fn expect_first_barrier( - stream: &mut (impl MessageStream + Unpin), -) -> StreamExecutorResult { +pub async fn expect_first_barrier( + stream: &mut (impl MessageStreamInner + Unpin), +) -> StreamExecutorResult> { let message = stream .next() .instrument_await("expect_first_barrier")