Skip to content

Commit

Permalink
feat(snapshot-backfill): only receive mutation from barrier worker fo…
Browse files Browse the repository at this point in the history
…r snapshot backfill (#18210)
wenym1 authored Sep 4, 2024
1 parent 9923c3a commit 0dd06ff
Showing 7 changed files with 321 additions and 114 deletions.
21 changes: 14 additions & 7 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
@@ -497,16 +497,16 @@ impl CommandContext {
}
}

impl CommandContext {
impl Command {
/// Generate a mutation for the given command.
pub fn to_mutation(&self) -> Option<Mutation> {
pub fn to_mutation(&self, current_paused_reason: Option<&PausedReason>) -> Option<Mutation> {
let mutation =
match &self.command {
match self {
Command::Plain(mutation) => mutation.clone(),

Command::Pause(_) => {
// Only pause when the cluster is not already paused.
if self.current_paused_reason.is_none() {
if current_paused_reason.is_none() {
Some(Mutation::Pause(PauseMutation {}))
} else {
None
@@ -515,7 +515,7 @@ impl CommandContext {

Command::Resume(reason) => {
// Only resume when the cluster is paused with the same reason.
if self.current_paused_reason == Some(*reason) {
if current_paused_reason == Some(reason) {
Some(Mutation::Resume(ResumeMutation {}))
} else {
None
@@ -607,7 +607,7 @@ impl CommandContext {
added_actors,
actor_splits,
// If the cluster is already paused, the new actors should be paused too.
pause: self.current_paused_reason.is_some(),
pause: current_paused_reason.is_some(),
subscriptions_to_add,
}));

@@ -846,7 +846,7 @@ impl CommandContext {
}

pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActor>>> {
match &self.command {
match self {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
CreateStreamingJobType::Normal => HashMap::new(),
@@ -914,6 +914,13 @@ impl CommandContext {
..Default::default()
}))
}
}

impl CommandContext {
pub fn to_mutation(&self) -> Option<Mutation> {
self.command
.to_mutation(self.current_paused_reason.as_ref())
}

/// Returns the paused reason after executing the current command.
pub fn next_paused_reason(&self) -> Option<PausedReason> {
10 changes: 7 additions & 3 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::{BarrierCompleteResponse, BuildActorInfo};
use tracing::{debug, info};

@@ -67,6 +68,7 @@ impl CreatingStreamingJobControl {
backfill_epoch: u64,
version_stat: &HummockVersionStats,
metrics: &MetaMetrics,
initial_mutation: Mutation,
) -> Self {
info!(
table_id = info.table_fragments.table_id().table_id,
@@ -108,7 +110,7 @@ impl CreatingStreamingJobControl {
backfill_epoch,
pending_non_checkpoint_barriers: vec![],
snapshot_backfill_actors,
actors_to_create: Some(
initial_barrier_info: Some((
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
@@ -124,7 +126,8 @@ impl CreatingStreamingJobControl {
)
})
.collect(),
),
initial_mutation,
)),
},
upstream_lag: metrics
.snapshot_backfill_lag
@@ -283,11 +286,12 @@ impl CreatingStreamingJobControl {
prev_epoch,
kind,
new_actors,
mutation,
} in barriers_to_inject
{
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
None,
mutation,
(&curr_epoch, &prev_epoch),
&kind,
graph_info,
21 changes: 17 additions & 4 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ use std::sync::Arc;

use risingwave_common::util::epoch::Epoch;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::BuildActorInfo;

@@ -40,7 +41,9 @@ pub(super) enum CreatingStreamingJobStatus {
/// The `prev_epoch` of pending non checkpoint barriers
pending_non_checkpoint_barriers: Vec<u64>,
snapshot_backfill_actors: HashMap<WorkerId, HashSet<ActorId>>,
actors_to_create: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<BuildActorInfo>>, Mutation)>,
},
ConsumingLogStore {
graph_info: InflightGraphInfo,
@@ -60,6 +63,7 @@ pub(super) struct CreatingJobInjectBarrierInfo {
pub prev_epoch: TracedEpoch,
pub kind: BarrierKind,
pub new_actors: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
pub mutation: Option<Mutation>,
}

impl CreatingStreamingJobStatus {
@@ -104,12 +108,12 @@ impl CreatingStreamingJobStatus {
graph_info,
pending_non_checkpoint_barriers,
ref backfill_epoch,
actors_to_create,
initial_barrier_info,
..
} = self
{
if create_mview_tracker.has_pending_finished_jobs() {
assert!(actors_to_create.is_none());
assert!(initial_barrier_info.is_none());
pending_non_checkpoint_barriers.push(*backfill_epoch);

let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
@@ -119,6 +123,7 @@ impl CreatingStreamingJobStatus {
prev_epoch: TracedEpoch::new(prev_epoch),
kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
new_actors: None,
mutation: None,
}]
.into_iter()
.chain(pending_commands.drain(..).map(|command_ctx| {
@@ -127,6 +132,7 @@ impl CreatingStreamingJobStatus {
prev_epoch: command_ctx.prev_epoch.clone(),
kind: command_ctx.kind.clone(),
new_actors: None,
mutation: None,
}
}))
.collect();
@@ -145,12 +151,19 @@ impl CreatingStreamingJobStatus {
} else {
BarrierKind::Barrier
};
let (new_actors, mutation) =
if let Some((new_actors, mutation)) = initial_barrier_info.take() {
(Some(new_actors), Some(mutation))
} else {
Default::default()
};
Some((
vec![CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors: actors_to_create.take(),
new_actors,
mutation,
}],
None,
))
14 changes: 14 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
@@ -965,6 +965,19 @@ impl GlobalBarrierManager {
info,
} = &command
{
if self.state.paused_reason().is_some() {
warn!("cannot create streaming job with snapshot backfill when paused");
for notifier in notifiers {
notifier.notify_start_failed(
anyhow!("cannot create streaming job with snapshot backfill when paused",)
.into(),
);
}
return Ok(());
}
let mutation = command
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
self.checkpoint_control
.creating_streaming_job_controls
.insert(
@@ -975,6 +988,7 @@ impl GlobalBarrierManager {
prev_epoch.value().0,
&self.checkpoint_control.hummock_version_stats,
&self.context.metrics,
mutation,
),
);
}
69 changes: 36 additions & 33 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
@@ -263,39 +263,42 @@ impl ControlStreamManager {
pre_applied_graph_info,
applied_graph_info,
actor_ids_to_pre_sync_mutation,
command_ctx.actors_to_create().map(|actors_to_create| {
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
(
worker_id,
actors
.into_iter()
.map(|actor| BuildActorInfo {
actor: Some(actor),
// TODO: consider subscriber of backfilling mv
related_subscriptions: command_ctx
.subscription_info
.mv_depended_subscriptions
.iter()
.map(|(table_id, subscriptions)| {
(
table_id.table_id,
SubscriptionIds {
subscription_ids: subscriptions
.keys()
.cloned()
.collect(),
},
)
})
.collect(),
})
.collect_vec(),
)
})
.collect()
}),
command_ctx
.command
.actors_to_create()
.map(|actors_to_create| {
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
(
worker_id,
actors
.into_iter()
.map(|actor| BuildActorInfo {
actor: Some(actor),
// TODO: consider subscriber of backfilling mv
related_subscriptions: command_ctx
.subscription_info
.mv_depended_subscriptions
.iter()
.map(|(table_id, subscriptions)| {
(
table_id.table_id,
SubscriptionIds {
subscription_ids: subscriptions
.keys()
.cloned()
.collect(),
},
)
})
.collect(),
})
.collect_vec(),
)
})
.collect()
}),
)
}

266 changes: 204 additions & 62 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
@@ -32,14 +32,16 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::ChangeLogRow;
use risingwave_storage::StateStore;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedReceiver;

use crate::executor::backfill::utils::{create_builder, mapping_chunk};
use crate::executor::monitor::StreamingMetrics;
use crate::executor::prelude::{try_stream, StreamExt};
use crate::executor::{
expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, Execute,
Executor, Message, Mutation, StreamExecutorError, StreamExecutorResult,
expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream,
DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::CreateMviewProgress;

@@ -99,7 +101,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
debug!("snapshot backfill executor start");
let mut upstream = self.upstream.execute();
let mut upstream = erase_upstream_mutation(self.upstream.execute());
let upstream_table_id = self.upstream_table.table_id();
let first_barrier = expect_first_barrier(&mut upstream).await?;
debug!(epoch = ?first_barrier.epoch, "get first upstream barrier");
@@ -109,7 +111,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {

{
if should_backfill {
let subscriber_ids = first_barrier
let subscriber_ids = first_recv_barrier
.added_subscriber_on_mv_table(upstream_table_id)
.collect_vec();
let snapshot_backfill_table_fragment_id = match subscriber_ids.as_slice() {
@@ -183,12 +185,15 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {

let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
assert_eq!(first_barrier.epoch, recv_barrier.epoch);
yield Message::Barrier(first_barrier);
yield Message::Barrier(recv_barrier);
}

let mut upstream_buffer =
upstream_buffer.start_consuming_log_store(&mut self.barrier_rx);

let mut barrier_epoch = first_barrier_epoch;

let initial_pending_barrier = upstream_buffer.barrier.len();
let initial_pending_barrier = upstream_buffer.state.barrier_count();
info!(
?barrier_epoch,
table_id = self.upstream_table.table_id().table_id,
@@ -207,8 +212,6 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {

// Phase 2: consume upstream log store
while let Some(barrier) = upstream_buffer.take_buffered_barrier().await? {
let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(barrier.epoch, recv_barrier.epoch);
assert_eq!(barrier_epoch.curr, barrier.epoch.prev);
barrier_epoch = barrier.epoch;

@@ -254,16 +257,20 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
);
let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(first_barrier.epoch, first_recv_barrier.epoch);
yield Message::Barrier(first_barrier);
yield Message::Barrier(first_recv_barrier);
}
}
// Phase 3: consume upstream
while let Some(msg) = upstream.try_next().await? {
if let Message::Barrier(barrier) = &msg {
let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(barrier.epoch, recv_barrier.epoch);
}
yield msg;
yield match msg {
DispatcherMessage::Chunk(chunk) => Message::Chunk(chunk),
DispatcherMessage::Watermark(watermark) => Message::Watermark(watermark),
DispatcherMessage::Barrier(barrier) => {
let recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?;
assert_eq!(barrier.epoch, recv_barrier.epoch);
Message::Barrier(recv_barrier)
}
};
}
}
}
@@ -324,101 +331,236 @@ async fn read_change_log(
}
}

struct UpstreamBuffer<'a> {
upstream: &'a mut BoxedMessageStream,
// newer barrier at the front
barrier: VecDeque<Barrier>,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
trait UpstreamBufferState {
// The future must be cancellation-safe
async fn is_finished(&mut self) -> StreamExecutorResult<bool>;
fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier);
}

struct StateOfConsumingSnapshot {
pending_barriers: Vec<DispatcherBarrier>,
}

impl UpstreamBufferState for StateOfConsumingSnapshot {
async fn is_finished(&mut self) -> StreamExecutorResult<bool> {
// never finish when consuming snapshot
Ok(false)
}

fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) {
self.pending_barriers.push(upstream_barrier)
}
}

struct StateOfConsumingLogStore<'a> {
barrier_rx: &'a mut mpsc::UnboundedReceiver<Barrier>,
/// Barriers received from upstream but not yet received the barrier from local barrier worker
/// newer barrier at the front
upstream_pending_barriers: VecDeque<DispatcherBarrier>,
/// Barriers received from both upstream and local barrier worker
/// newer barrier at the front
barriers: VecDeque<Barrier>,
is_finished: bool,
current_subscriber_id: u32,
upstream_table_id: TableId,
}

impl<'a> StateOfConsumingLogStore<'a> {
fn barrier_count(&self) -> usize {
self.upstream_pending_barriers.len() + self.barriers.len()
}

async fn handle_one_pending_barrier(&mut self) -> StreamExecutorResult<Barrier> {
assert!(!self.is_finished);
let barrier = receive_next_barrier(self.barrier_rx).await?;
assert_eq!(
self.upstream_pending_barriers
.pop_back()
.expect("non-empty")
.epoch,
barrier.epoch
);
if is_finish_barrier(&barrier, self.current_subscriber_id, self.upstream_table_id) {
self.is_finished = true;
}
Ok(barrier)
}
}

impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> {
async fn is_finished(&mut self) -> StreamExecutorResult<bool> {
while !self.upstream_pending_barriers.is_empty() {
let barrier = self.handle_one_pending_barrier().await?;
self.barriers.push_front(barrier);
}
if self.is_finished {
assert!(self.upstream_pending_barriers.is_empty());
}
Ok(self.is_finished)
}

fn on_upstream_barrier(&mut self, upstream_barrier: DispatcherBarrier) {
self.upstream_pending_barriers.push_front(upstream_barrier);
}
}

mod erase_upstream_mutation {
use futures::TryStreamExt;

use crate::executor::prelude::Stream;
use crate::executor::{BoxedMessageStream, DispatcherMessageStreamItem};

pub(super) fn erase_upstream_mutation(upstream: BoxedMessageStream) -> UpstreamStream {
upstream.map_ok(|msg| {
msg.map_mutation(|mutation| {
if let Some(mutation) = mutation {
// TODO: assert none mutation after we explicitly erase mutation
warn!(
?mutation,
"receive non-empty mutation from upstream. ignored"
);
};
})
})
}

pub(super) type UpstreamStream = impl Stream<Item = DispatcherMessageStreamItem> + Unpin;
}

use erase_upstream_mutation::*;

struct UpstreamBuffer<'a, S> {
upstream: &'a mut UpstreamStream,
state: S,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
upstream_table_id: TableId,
current_subscriber_id: u32,
}

impl<'a> UpstreamBuffer<'a> {
impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> {
fn new(
upstream: &'a mut BoxedMessageStream,
upstream: &'a mut UpstreamStream,
upstream_table_id: TableId,
current_subscriber_id: u32,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
) -> Self {
Self {
upstream,
barrier: Default::default(),
state: StateOfConsumingSnapshot {
pending_barriers: vec![],
},
consume_upstream_row_count,
is_finished: false,
upstream_table_id,
current_subscriber_id,
}
}

fn start_consuming_log_store<'s>(
self,
barrier_rx: &'s mut UnboundedReceiver<Barrier>,
) -> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> {
let StateOfConsumingSnapshot { pending_barriers } = self.state;
let mut upstream_pending_barriers = VecDeque::with_capacity(pending_barriers.len());
for pending_barrier in pending_barriers {
upstream_pending_barriers.push_front(pending_barrier);
}
UpstreamBuffer {
upstream: self.upstream,
state: StateOfConsumingLogStore {
barrier_rx,
upstream_pending_barriers,
barriers: Default::default(),
is_finished: false,
current_subscriber_id: self.current_subscriber_id,
upstream_table_id: self.upstream_table_id,
},
consume_upstream_row_count: self.consume_upstream_row_count,
upstream_table_id: self.upstream_table_id,
current_subscriber_id: self.current_subscriber_id,
}
}
}

impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> {
async fn concurrently_consume_upstream(&mut self) -> StreamExecutorError {
while !self.is_finished {
let result = self.consume_until_next_barrier().await;
let barrier = match result {
Ok(barrier) => barrier,
Err(e) => {
return e;
}
};
self.barrier.push_front(barrier);
if let Err(e) = try {
while !self.state.is_finished().await? {
self.consume_until_next_barrier().await?;
}
} {
return e;
}
pending().await
}

async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult<Barrier> {
assert!(!self.is_finished);
/// Consume the upstream until seeing the next barrier.
/// `pending_barriers` must be non-empty after this method returns.
async fn consume_until_next_barrier(&mut self) -> StreamExecutorResult<()> {
loop {
let msg: Message = self
let msg: DispatcherMessage = self
.upstream
.try_next()
.await?
.ok_or_else(|| anyhow!("end of upstream"))?;
match msg {
Message::Chunk(chunk) => {
DispatcherMessage::Chunk(chunk) => {
self.consume_upstream_row_count
.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
self.is_finished = self.is_finish_barrier(&barrier);
break Ok(barrier);
DispatcherMessage::Barrier(barrier) => {
self.state.on_upstream_barrier(barrier);
break Ok(());
}
Message::Watermark(_) => {}
DispatcherMessage::Watermark(_) => {}
}
}
}
}

impl<'a, 's> UpstreamBuffer<'a, StateOfConsumingLogStore<'s>> {
async fn take_buffered_barrier(&mut self) -> StreamExecutorResult<Option<Barrier>> {
Ok(if let Some(barrier) = self.barrier.pop_back() {
Ok(if let Some(barrier) = self.state.barriers.pop_back() {
Some(barrier)
} else if self.is_finished {
} else if !self.state.upstream_pending_barriers.is_empty() {
let barrier = self.state.handle_one_pending_barrier().await?;
Some(barrier)
} else if self.state.is_finished {
None
} else {
Some(self.consume_until_next_barrier().await?)
self.consume_until_next_barrier().await?;
let barrier = self.state.handle_one_pending_barrier().await?;
Some(barrier)
})
}
}

fn is_finish_barrier(&self, barrier: &Barrier) -> bool {
if let Some(Mutation::DropSubscriptions {
subscriptions_to_drop,
}) = barrier.mutation.as_deref()
{
let is_finished = subscriptions_to_drop
.iter()
.any(|(subscriber_id, _)| *subscriber_id == self.current_subscriber_id);
if is_finished {
assert!(subscriptions_to_drop.iter().any(
|(subscriber_id, subscribed_upstream_table_id)| {
*subscriber_id == self.current_subscriber_id
&& self.upstream_table_id == *subscribed_upstream_table_id
}
))
}
is_finished
} else {
false
fn is_finish_barrier(
barrier: &Barrier,
current_subscriber_id: u32,
upstream_table_id: TableId,
) -> bool {
if let Some(Mutation::DropSubscriptions {
subscriptions_to_drop,
}) = barrier.mutation.as_deref()
{
let is_finished = subscriptions_to_drop
.iter()
.any(|(subscriber_id, _)| *subscriber_id == current_subscriber_id);
if is_finished {
assert!(subscriptions_to_drop.iter().any(
|(subscriber_id, subscribed_upstream_table_id)| {
*subscriber_id == current_subscriber_id
&& upstream_table_id == *subscribed_upstream_table_id
}
))
}
is_finished
} else {
false
}
}

impl<'a, S: UpstreamBufferState> UpstreamBuffer<'a, S> {
/// Run a future while concurrently polling the upstream so that the upstream
/// won't be back-pressured.
async fn run_future<T, E: Into<StreamExecutorError>>(
34 changes: 29 additions & 5 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -164,13 +164,17 @@ pub use wrapper::WrapperExecutor;

use self::barrier_align::AlignedMessageStream;

pub type MessageStreamItem = StreamExecutorResult<Message>;
pub type MessageStreamItemInner<M> = StreamExecutorResult<MessageInner<M>>;
pub type MessageStreamItem = MessageStreamItemInner<BarrierMutationType>;
pub type DispatcherMessageStreamItem = MessageStreamItemInner<()>;
pub type BoxedMessageStream = BoxStream<'static, MessageStreamItem>;

pub use risingwave_common::util::epoch::task_local::{curr_epoch, epoch, prev_epoch};
use risingwave_pb::stream_plan::throttle_mutation::RateLimit;

pub trait MessageStream = futures::Stream<Item = MessageStreamItem> + Send;
pub trait MessageStreamInner<M> = Stream<Item = MessageStreamItemInner<M>> + Send;
pub trait MessageStream = Stream<Item = MessageStreamItem> + Send;
pub trait DispatcherMessageStream = Stream<Item = DispatcherMessageStreamItem> + Send;

/// Static information of an executor.
#[derive(Debug, Default, Clone)]
@@ -913,6 +917,16 @@ impl<M> BarrierInner<M> {
tracing_context: TracingContext::from_protobuf(&prost.tracing_context),
})
}

pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> BarrierInner<M2> {
BarrierInner {
epoch: self.epoch,
mutation: f(self.mutation),
kind: self.kind,
tracing_context: self.tracing_context,
passed_actors: self.passed_actors,
}
}
}

impl DispatcherBarrier {
@@ -1017,6 +1031,16 @@ pub enum MessageInner<M> {
Watermark(Watermark),
}

impl<M> MessageInner<M> {
pub fn map_mutation<M2>(self, f: impl FnOnce(M) -> M2) -> MessageInner<M2> {
match self {
MessageInner::Chunk(chunk) => MessageInner::Chunk(chunk),
MessageInner::Barrier(barrier) => MessageInner::Barrier(barrier.map_mutation(f)),
MessageInner::Watermark(watermark) => MessageInner::Watermark(watermark),
}
}
}

pub type Message = MessageInner<BarrierMutationType>;
pub type DispatcherMessage = MessageInner<()>;

@@ -1102,9 +1126,9 @@ pub type PkIndicesRef<'a> = &'a [usize];
pub type PkDataTypes = SmallVec<[DataType; 1]>;

/// Expect the first message of the given `stream` as a barrier.
pub async fn expect_first_barrier(
stream: &mut (impl MessageStream + Unpin),
) -> StreamExecutorResult<Barrier> {
pub async fn expect_first_barrier<M: Debug>(
stream: &mut (impl MessageStreamInner<M> + Unpin),
) -> StreamExecutorResult<BarrierInner<M>> {
let message = stream
.next()
.instrument_await("expect_first_barrier")

0 comments on commit 0dd06ff

Please sign in to comment.