Skip to content

Commit

Permalink
feat(meta): add get back pressure RPC for UI dashboard (#14790)
Browse files Browse the repository at this point in the history
  • Loading branch information
yufansong authored Jan 25, 2024
1 parent a4f0ec2 commit 57d0b0e
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 0 deletions.
15 changes: 15 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ message WaitEpochCommitResponse {
common.Status status = 1;
}

// Back pressure
message GetBackPressureRequest {}

message BackPressureInfo {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 downstream_fragment_id = 3;
double value = 4;
}

message GetBackPressureResponse {
repeated BackPressureInfo back_pressure_infos = 1;
}

service StreamService {
rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse);
rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse);
Expand All @@ -112,6 +126,7 @@ service StreamService {
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc GetBackPressure(GetBackPressureRequest) returns (GetBackPressureResponse);
}

// TODO: Lifecycle management for actors.
36 changes: 36 additions & 0 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ use std::sync::Arc;

use await_tree::InstrumentAwait;
use itertools::Itertools;
use prometheus::core::Collector;
use risingwave_common::config::MetricLevel;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_stream::error::StreamError;
use risingwave_stream::executor::monitor::global_streaming_metrics;
use risingwave_stream::executor::Barrier;
use risingwave_stream::task::{BarrierCompleteResult, LocalStreamManager, StreamEnvironment};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -238,4 +241,37 @@ impl StreamService for StreamServiceImpl {

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

#[cfg_attr(coverage, coverage(off))]
async fn get_back_pressure(
&self,
_request: Request<GetBackPressureRequest>,
) -> Result<Response<GetBackPressureResponse>, Status> {
let metric_family = global_streaming_metrics(MetricLevel::Info)
.actor_output_buffer_blocking_duration_ns
.collect();
let metrics = metric_family.get(0).unwrap().get_metric();
let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new();
for label_pairs in metrics {
let mut back_pressure_info = BackPressureInfo::default();
for label_pair in label_pairs.get_label() {
if label_pair.get_name() == "actor_id" {
back_pressure_info.actor_id = label_pair.get_value().parse::<u32>().unwrap();
}
if label_pair.get_name() == "fragment_id" {
back_pressure_info.fragment_id = label_pair.get_value().parse::<u32>().unwrap();
}
if label_pair.get_name() == "downstream_fragment_id" {
back_pressure_info.downstream_fragment_id =
label_pair.get_value().parse::<u32>().unwrap();
}
}
back_pressure_info.value = label_pairs.get_counter().get_value();
back_pressure_infos.push(back_pressure_info);
}

Ok(Response::new(GetBackPressureResponse {
back_pressure_infos,
}))
}
}
27 changes: 27 additions & 0 deletions src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::heartbeat_request;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse};
use sea_orm::prelude::Expr;
use sea_orm::ActiveValue::Set;
use sea_orm::{
Expand Down Expand Up @@ -369,6 +370,32 @@ impl ClusterController {
.await
.get_worker_extra_info_by_id(worker_id)
}

pub async fn get_back_pressure(&self) -> MetaResult<GetBackPressureResponse> {
let nodes = self
.inner
.read()
.await
.list_active_serving_workers()
.await
.unwrap();
let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new();
for node in nodes {
let client = self.env.stream_client_pool().get(&node).await.unwrap();
let request = risingwave_pb::stream_service::GetBackPressureRequest {};
back_pressure_infos.extend(
client
.get_back_pressure(request)
.await
.unwrap()
.back_pressure_infos,
);
}

Ok(GetBackPressureResponse {
back_pressure_infos,
})
}
}

#[derive(Default, Clone)]
Expand Down
20 changes: 20 additions & 0 deletions src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use axum::routing::{get, get_service};
use axum::Router;
use hyper::Request;
use parking_lot::Mutex;
use risingwave_pb::stream_service::GetBackPressureResponse;
use risingwave_rpc_client::ComputeClientPool;
use tower::{ServiceBuilder, ServiceExt};
use tower_http::add_extension::AddExtensionLayer;
Expand Down Expand Up @@ -360,6 +361,24 @@ pub(super) mod handlers {

Ok(report)
}

pub async fn get_back_pressure(
// Path(worker_id): Path<WorkerId>,
Extension(srv): Extension<Service>,
) -> Result<Json<GetBackPressureResponse>> {
let back_pressure_infos = match &srv.metadata_manager {
MetadataManager::V1(mgr) => {
mgr.cluster_manager.get_back_pressure().await.map_err(err)?
}
MetadataManager::V2(mgr) => mgr
.cluster_controller
.get_back_pressure()
.await
.map_err(err)?,
};

Ok(back_pressure_infos.into())
}
}

impl DashboardService {
Expand Down Expand Up @@ -388,6 +407,7 @@ impl DashboardService {
"/metrics/actor/back_pressures",
get(prometheus::list_prometheus_actor_back_pressure),
)
.route("/metrics/back_pressures", get(get_back_pressure))
.route("/monitor/await_tree/:worker_id", get(dump_await_tree))
.route("/monitor/await_tree/", get(dump_await_tree_all))
.route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
Expand Down
28 changes: 28 additions & 0 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::heartbeat_request;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use risingwave_pb::stream_service::{BackPressureInfo, GetBackPressureResponse};
use thiserror_ext::AsReport;
use tokio::sync::oneshot::Sender;
use tokio::sync::{RwLock, RwLockReadGuard};
Expand Down Expand Up @@ -492,6 +493,33 @@ impl ClusterManager {
pub async fn get_worker_by_id(&self, worker_id: WorkerId) -> Option<Worker> {
self.core.read().await.get_worker_by_id(worker_id)
}

pub async fn get_back_pressure(&self) -> MetaResult<GetBackPressureResponse> {
let mut core = self.core.write().await;
let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new();
for worker in core.workers.values_mut() {
if worker.worker_type() != WorkerType::ComputeNode {
continue;
}
let client = self
.env
.stream_client_pool()
.get(&worker.worker_node)
.await
.unwrap();
let request = risingwave_pb::stream_service::GetBackPressureRequest {};
back_pressure_infos.extend(
client
.get_back_pressure(request)
.await
.unwrap()
.back_pressure_infos,
);
}
Ok(GetBackPressureResponse {
back_pressure_infos,
})
}
}

/// The cluster info used for scheduling a streaming job.
Expand Down
7 changes: 7 additions & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,13 @@ mod tests {
) -> std::result::Result<Response<WaitEpochCommitResponse>, Status> {
Ok(Response::new(WaitEpochCommitResponse::default()))
}

async fn get_back_pressure(
&self,
_request: Request<GetBackPressureRequest>,
) -> std::result::Result<Response<GetBackPressureResponse>, Status> {
Ok(Response::new(GetBackPressureResponse::default()))
}
}

struct MockServices {
Expand Down
1 change: 1 addition & 0 deletions src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ macro_rules! for_all_stream_rpc {
,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse }
,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse }
,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse }
,{ 0, get_back_pressure, GetBackPressureRequest, GetBackPressureResponse }
}
};
}
Expand Down

0 comments on commit 57d0b0e

Please sign in to comment.