Skip to content

Commit

Permalink
refactor: maintain subscriptions in local barrier worker (#18516)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Sep 18, 2024
1 parent 3ee3b2c commit 06d5cde
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 230 deletions.
13 changes: 4 additions & 9 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ import "stream_plan.proto";
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message BuildActorInfo {
stream_plan.StreamActor actor = 1;
message SubscriptionIds {
repeated uint32 subscription_ids = 1;
}
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
Expand All @@ -35,7 +27,9 @@ message InjectBarrierRequest {
repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7;

repeated common.ActorInfo broadcast_info = 8;
repeated BuildActorInfo actors_to_build = 9;
repeated stream_plan.StreamActor actors_to_build = 9;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_add = 10;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions_to_remove = 11;
}

message BarrierCompleteResponse {
Expand Down Expand Up @@ -74,6 +68,7 @@ message WaitEpochCommitResponse {
message StreamingControlStreamRequest {
message InitRequest {
uint64 version_id = 1;
repeated stream_plan.SubscriptionUpstreamInfo subscriptions = 2;
}

message RemovePartialGraphRequest {
Expand Down
28 changes: 8 additions & 20 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use std::mem::take;
use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
use prometheus::HistogramTimer;
use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::{BarrierCompleteResponse, BuildActorInfo};
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tracing::{debug, info};

use crate::barrier::command::CommandContext;
Expand Down Expand Up @@ -110,24 +109,7 @@ impl CreatingStreamingJobControl {
backfill_epoch,
pending_non_checkpoint_barriers: vec![],
snapshot_backfill_actors,
initial_barrier_info: Some((
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
(
worker_id,
actors
.into_iter()
.map(|actor| BuildActorInfo {
actor: Some(actor),
related_subscriptions: Default::default(),
})
.collect_vec(),
)
})
.collect(),
initial_mutation,
)),
initial_barrier_info: Some((actors_to_create, initial_mutation)),
},
upstream_lag: metrics
.snapshot_backfill_lag
Expand Down Expand Up @@ -298,6 +280,8 @@ impl CreatingStreamingJobControl {
Some(graph_info),
HashMap::new(),
new_actors,
vec![],
vec![],
)?;
self.barrier_control.enqueue_epoch(
prev_epoch.value().0,
Expand Down Expand Up @@ -363,6 +347,8 @@ impl CreatingStreamingJobControl {
Some(graph_info),
HashMap::new(),
None,
vec![],
vec![],
)?;
self.barrier_control.enqueue_epoch(
command_ctx.prev_epoch.value().0,
Expand Down Expand Up @@ -410,6 +396,8 @@ impl CreatingStreamingJobControl {
},
HashMap::new(),
None,
vec![],
vec![],
)?;
let prev_epoch = command_ctx.prev_epoch.value().0;
self.barrier_control.enqueue_epoch(
Expand Down
6 changes: 3 additions & 3 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::sync::Arc;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::StreamActor;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::BuildActorInfo;

use crate::barrier::command::CommandContext;
use crate::barrier::info::InflightGraphInfo;
Expand All @@ -43,7 +43,7 @@ pub(super) enum CreatingStreamingJobStatus {
snapshot_backfill_actors: HashMap<WorkerId, HashSet<ActorId>>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<BuildActorInfo>>, Mutation)>,
initial_barrier_info: Option<(HashMap<WorkerId, Vec<StreamActor>>, Mutation)>,
},
ConsumingLogStore {
graph_info: InflightGraphInfo,
Expand All @@ -62,7 +62,7 @@ pub(super) struct CreatingJobInjectBarrierInfo {
pub curr_epoch: TracedEpoch,
pub prev_epoch: TracedEpoch,
pub kind: BarrierKind,
pub new_actors: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
pub new_actors: Option<HashMap<WorkerId, Vec<StreamActor>>>,
pub mutation: Option<Mutation>,
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ impl GlobalBarrierManager {
.on_new_worker_node_map(self.active_streaming_nodes.current());
self.checkpoint_control.creating_streaming_job_controls.values().for_each(|job| job.on_new_worker_node_map(self.active_streaming_nodes.current()));
if let ActiveStreamingWorkerChange::Add(node) | ActiveStreamingWorkerChange::Update(node) = changed_worker {
self.control_stream_manager.add_worker(node).await;
self.control_stream_manager.add_worker(node, &self.state.inflight_subscription_info).await;
}
}

Expand Down
37 changes: 19 additions & 18 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::State;
use risingwave_pb::meta::{PausedReason, Recovery};
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::stream_service::BuildActorInfo;
use risingwave_pb::stream_plan::{AddMutation, StreamActor};
use thiserror_ext::AsReport;
use tokio::time::Instant;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -345,9 +344,21 @@ impl GlobalBarrierManager {
let mut control_stream_manager =
ControlStreamManager::new(self.context.clone());

let subscription_info = InflightSubscriptionInfo {
mv_depended_subscriptions: self
.context
.metadata_manager
.get_mv_depended_subscriptions()
.await?,
};

let reset_start_time = Instant::now();
control_stream_manager
.reset(version_id, active_streaming_nodes.current())
.reset(
version_id,
&subscription_info,
active_streaming_nodes.current(),
)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "reset compute nodes failed");
Expand All @@ -356,18 +367,10 @@ impl GlobalBarrierManager {

self.context.sink_manager.reset().await;

let subscription_info = InflightSubscriptionInfo {
mv_depended_subscriptions: self
.context
.metadata_manager
.get_mv_depended_subscriptions()
.await?,
};

// update and build all actors.
let node_actors = self
.context
.load_all_actors(&info, &subscription_info, &active_streaming_nodes)
.load_all_actors(&info, &active_streaming_nodes)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "update actors failed");
Expand Down Expand Up @@ -397,6 +400,8 @@ impl GlobalBarrierManager {
Some(&info),
HashMap::new(),
Some(node_actors),
vec![],
vec![],
)?;
debug!(?node_to_collect, "inject initial barrier");
while !node_to_collect.is_empty() {
Expand Down Expand Up @@ -1094,18 +1099,14 @@ impl GlobalBarrierManagerContext {
async fn load_all_actors(
&self,
info: &InflightGraphInfo,
subscription_info: &InflightSubscriptionInfo,
active_nodes: &ActiveStreamingWorkerNodes,
) -> MetaResult<HashMap<WorkerId, Vec<BuildActorInfo>>> {
) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
if info.actor_map.is_empty() {
tracing::debug!("no actor to update, skipping.");
return Ok(HashMap::new());
}

let all_node_actors = self
.metadata_manager
.all_node_actors(false, &subscription_info.mv_depended_subscriptions)
.await?;
let all_node_actors = self.metadata_manager.all_node_actors(false).await?;

// Check if any actors were dropped after info resolved.
if all_node_actors.iter().any(|(node_id, node_actors)| {
Expand Down
Loading

0 comments on commit 06d5cde

Please sign in to comment.