Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot-backfill): only receive mutation from barrier worker for snapshot backfill #18210

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,16 +496,16 @@ impl CommandContext {
}
}

impl CommandContext {
impl Command {
/// Generate a mutation for the given command.
pub fn to_mutation(&self) -> Option<Mutation> {
pub fn to_mutation(&self, current_paused_reason: Option<&PausedReason>) -> Option<Mutation> {
let mutation =
match &self.command {
match self {
Command::Plain(mutation) => mutation.clone(),

Command::Pause(_) => {
// Only pause when the cluster is not already paused.
if self.current_paused_reason.is_none() {
if current_paused_reason.is_none() {
Some(Mutation::Pause(PauseMutation {}))
} else {
None
Expand All @@ -514,7 +514,7 @@ impl CommandContext {

Command::Resume(reason) => {
// Only resume when the cluster is paused with the same reason.
if self.current_paused_reason == Some(*reason) {
if current_paused_reason == Some(reason) {
Some(Mutation::Resume(ResumeMutation {}))
} else {
None
Expand Down Expand Up @@ -606,7 +606,7 @@ impl CommandContext {
added_actors,
actor_splits,
// If the cluster is already paused, the new actors should be paused too.
pause: self.current_paused_reason.is_some(),
pause: current_paused_reason.is_some(),
subscriptions_to_add,
}));

Expand Down Expand Up @@ -845,7 +845,7 @@ impl CommandContext {
}

pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActor>>> {
match &self.command {
match self {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
CreateStreamingJobType::Normal => HashMap::new(),
Expand Down Expand Up @@ -913,6 +913,13 @@ impl CommandContext {
..Default::default()
}))
}
}

impl CommandContext {
pub fn to_mutation(&self) -> Option<Mutation> {
self.command
.to_mutation(self.current_paused_reason.as_ref())
}

/// Returns the paused reason after executing the current command.
pub fn next_paused_reason(&self) -> Option<PausedReason> {
Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::{BarrierCompleteResponse, BuildActorInfo};
use tracing::{debug, info};

Expand Down Expand Up @@ -67,6 +68,7 @@ impl CreatingStreamingJobControl {
backfill_epoch: u64,
version_stat: &HummockVersionStats,
metrics: &MetaMetrics,
initial_mutation: Mutation,
) -> Self {
info!(
table_id = info.table_fragments.table_id().table_id,
Expand Down Expand Up @@ -108,7 +110,7 @@ impl CreatingStreamingJobControl {
backfill_epoch,
pending_non_checkpoint_barriers: vec![],
snapshot_backfill_actors,
actors_to_create: Some(
initial_barrier_info: Some((
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
Expand All @@ -124,7 +126,8 @@ impl CreatingStreamingJobControl {
)
})
.collect(),
),
initial_mutation,
)),
},
upstream_lag: metrics
.snapshot_backfill_lag
Expand Down Expand Up @@ -283,11 +286,12 @@ impl CreatingStreamingJobControl {
prev_epoch,
kind,
new_actors,
mutation,
} in barriers_to_inject
{
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
None,
mutation,
(&curr_epoch, &prev_epoch),
&kind,
graph_info,
Expand Down
21 changes: 17 additions & 4 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use risingwave_common::util::epoch::Epoch;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::BuildActorInfo;

Expand All @@ -40,7 +41,9 @@ pub(super) enum CreatingStreamingJobStatus {
/// The `prev_epoch` of pending non checkpoint barriers
pending_non_checkpoint_barriers: Vec<u64>,
snapshot_backfill_actors: HashMap<WorkerId, HashSet<ActorId>>,
actors_to_create: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<BuildActorInfo>>, Mutation)>,
},
ConsumingLogStore {
graph_info: InflightGraphInfo,
Expand All @@ -60,6 +63,7 @@ pub(super) struct CreatingJobInjectBarrierInfo {
pub prev_epoch: TracedEpoch,
pub kind: BarrierKind,
pub new_actors: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
pub mutation: Option<Mutation>,
}

impl CreatingStreamingJobStatus {
Expand Down Expand Up @@ -104,12 +108,12 @@ impl CreatingStreamingJobStatus {
graph_info,
pending_non_checkpoint_barriers,
ref backfill_epoch,
actors_to_create,
initial_barrier_info,
..
} = self
{
if create_mview_tracker.has_pending_finished_jobs() {
assert!(actors_to_create.is_none());
assert!(initial_barrier_info.is_none());
pending_non_checkpoint_barriers.push(*backfill_epoch);

let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
Expand All @@ -119,6 +123,7 @@ impl CreatingStreamingJobStatus {
prev_epoch: TracedEpoch::new(prev_epoch),
kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
new_actors: None,
mutation: None,
}]
.into_iter()
.chain(pending_commands.drain(..).map(|command_ctx| {
Expand All @@ -127,6 +132,7 @@ impl CreatingStreamingJobStatus {
prev_epoch: command_ctx.prev_epoch.clone(),
kind: command_ctx.kind.clone(),
new_actors: None,
mutation: None,
}
}))
.collect();
Expand All @@ -145,12 +151,19 @@ impl CreatingStreamingJobStatus {
} else {
BarrierKind::Barrier
};
let (new_actors, mutation) =
if let Some((new_actors, mutation)) = initial_barrier_info.take() {
(Some(new_actors), Some(mutation))
} else {
Default::default()
};
Some((
vec![CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors: actors_to_create.take(),
new_actors,
mutation,
}],
None,
))
Expand Down
14 changes: 14 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,19 @@ impl GlobalBarrierManager {
info,
} = &command
{
if self.state.paused_reason().is_some() {
warn!("cannot create streaming job with snapshot backfill when paused");
for notifier in notifiers {
notifier.notify_start_failed(
anyhow!("cannot create streaming job with snapshot backfill when paused",)
.into(),
);
}
return Ok(());
}
let mutation = command
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
self.checkpoint_control
.creating_streaming_job_controls
.insert(
Expand All @@ -975,6 +988,7 @@ impl GlobalBarrierManager {
prev_epoch.value().0,
&self.checkpoint_control.hummock_version_stats,
&self.context.metrics,
mutation,
),
);
}
Expand Down
69 changes: 36 additions & 33 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,39 +263,42 @@ impl ControlStreamManager {
pre_applied_graph_info,
applied_graph_info,
actor_ids_to_pre_sync_mutation,
command_ctx.actors_to_create().map(|actors_to_create| {
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
(
worker_id,
actors
.into_iter()
.map(|actor| BuildActorInfo {
actor: Some(actor),
// TODO: consider subscriber of backfilling mv
related_subscriptions: command_ctx
.subscription_info
.mv_depended_subscriptions
.iter()
.map(|(table_id, subscriptions)| {
(
table_id.table_id,
SubscriptionIds {
subscription_ids: subscriptions
.keys()
.cloned()
.collect(),
},
)
})
.collect(),
})
.collect_vec(),
)
})
.collect()
}),
command_ctx
.command
.actors_to_create()
.map(|actors_to_create| {
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
(
worker_id,
actors
.into_iter()
.map(|actor| BuildActorInfo {
actor: Some(actor),
// TODO: consider subscriber of backfilling mv
related_subscriptions: command_ctx
.subscription_info
.mv_depended_subscriptions
.iter()
.map(|(table_id, subscriptions)| {
(
table_id.table_id,
SubscriptionIds {
subscription_ids: subscriptions
.keys()
.cloned()
.collect(),
},
)
})
.collect(),
})
.collect_vec(),
)
})
.collect()
}),
)
}

Expand Down
Loading
Loading