Skip to content

Commit

Permalink
refactor(meta): only retry on connection error when registering worke…
Browse files Browse the repository at this point in the history
…r node (#18061)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Aug 19, 2024
1 parent f54c7fe commit 0986508
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 86 deletions.
3 changes: 0 additions & 3 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ message AddWorkerNodeRequest {
}

message AddWorkerNodeResponse {
reserved 3;
reserved "system_params";
common.Status status = 1;
optional uint32 node_id = 2;
string cluster_id = 4;
}
Expand Down
3 changes: 1 addition & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ pub async fn compute_node_serve(
},
&config.meta,
)
.await
.unwrap();
.await;

let state_store_url = system_params.state_store();

Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/common/meta_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Note: the default value of `RW_META_ADDR` is 'http://127.0.0.1:5690'.";
Property::default(),
&MetaConfig::default(),
)
.await?;
.await;
let worker_id = client.worker_id();
tracing::info!("registered as RiseCtl worker, worker_id = {}", worker_id);
Ok(client)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl FrontendEnv {
Default::default(),
&config.meta,
)
.await?;
.await;

let worker_id = meta_client.worker_id();
info!("Assigned worker node id {}", worker_id);
Expand Down
30 changes: 7 additions & 23 deletions src/meta/service/src/cluster_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_pb::meta::{
ListAllNodesResponse, UpdateWorkerNodeSchedulabilityRequest,
UpdateWorkerNodeSchedulabilityResponse,
};
use thiserror_ext::AsReport;
use tonic::{Request, Response, Status};

use crate::MetaError;
Expand Down Expand Up @@ -58,31 +57,16 @@ impl ClusterService for ClusterServiceImpl {
.property
.ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?;
let resource = req.resource.unwrap_or_default();
let result = self
let worker_id = self
.metadata_manager
.add_worker_node(worker_type, host, property, resource)
.await;
.await?;
let cluster_id = self.metadata_manager.cluster_id().to_string();
match result {
Ok(worker_id) => Ok(Response::new(AddWorkerNodeResponse {
status: None,
node_id: Some(worker_id),
cluster_id,
})),
Err(e) => {
if e.is_invalid_worker() {
return Ok(Response::new(AddWorkerNodeResponse {
status: Some(risingwave_pb::common::Status {
code: risingwave_pb::common::status::Code::UnknownWorker as i32,
message: e.to_report_string(),
}),
node_id: None,
cluster_id,
}));
}
Err(e.into())
}
}

Ok(Response::new(AddWorkerNodeResponse {
node_id: Some(worker_id),
cluster_id,
}))
}

/// Update schedulability of a compute node. Will not affect actors which are already running on
Expand Down
20 changes: 20 additions & 0 deletions src/rpc_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,23 @@ macro_rules! impl_from_status {
}

impl_from_status!(stream, batch, meta, compute, compactor, connector);

impl RpcError {
/// Returns `true` if the error is a connection error. Typically used to determine if
/// the error is transient and can be retried.
pub fn is_connection_error(&self) -> bool {
match self {
RpcError::TransportError(_) => true,
RpcError::GrpcStatus(status) => matches!(
status.inner().code(),
tonic::Code::Unavailable // server not started
| tonic::Code::Unknown // could be transport error
| tonic::Code::Unimplemented // meta leader service not started
),
RpcError::MetaAddressParse(_) => false,
RpcError::Internal(anyhow) => anyhow
.downcast_ref::<Self>() // this skips all contexts attached to the error
.map_or(false, Self::is_connection_error),
}
}
}
130 changes: 80 additions & 50 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::RW_VERSION;
use risingwave_error::bail;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{
Expand Down Expand Up @@ -220,12 +221,33 @@ impl MetaClient {
}

/// Register the current node to the cluster and set the corresponding worker id.
///
/// Retry if there's connection issue with the meta node. Exit the process if the registration fails.
pub async fn register_new(
addr_strategy: MetaAddressStrategy,
worker_type: WorkerType,
addr: &HostAddr,
property: Property,
meta_config: &MetaConfig,
) -> (Self, SystemParamsReader) {
let ret =
Self::register_new_inner(addr_strategy, worker_type, addr, property, meta_config).await;

match ret {
Ok(ret) => ret,
Err(err) => {
tracing::error!(error = %err.as_report(), "failed to register worker, exiting...");
std::process::exit(1);
}
}
}

async fn register_new_inner(
addr_strategy: MetaAddressStrategy,
worker_type: WorkerType,
addr: &HostAddr,
property: Property,
meta_config: &MetaConfig,
) -> Result<(Self, SystemParamsReader)> {
tracing::info!("register meta client using strategy: {}", addr_strategy);

Expand All @@ -238,34 +260,35 @@ impl MetaClient {
if property.is_unschedulable {
tracing::warn!("worker {:?} registered as unschedulable", addr.clone());
}
let init_result: Result<_> = tokio_retry::Retry::spawn(retry_strategy, || async {
let grpc_meta_client = GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;

let add_worker_resp = grpc_meta_client
.add_worker_node(AddWorkerNodeRequest {
worker_type: worker_type as i32,
host: Some(addr.to_protobuf()),
property: Some(property),
resource: Some(risingwave_pb::common::worker_node::Resource {
rw_version: RW_VERSION.to_string(),
total_memory_bytes: system_memory_available_bytes() as _,
total_cpu_cores: total_cpu_available() as _,
}),
})
.await?;
if let Some(status) = &add_worker_resp.status
&& status.code() == risingwave_pb::common::status::Code::UnknownWorker
{
tracing::error!("invalid worker: {}", status.message);
std::process::exit(1);
}
let init_result: Result<_> = tokio_retry::RetryIf::spawn(
retry_strategy,
|| async {
let grpc_meta_client =
GrpcMetaClient::new(&addr_strategy, meta_config.clone()).await?;

let add_worker_resp = grpc_meta_client
.add_worker_node(AddWorkerNodeRequest {
worker_type: worker_type as i32,
host: Some(addr.to_protobuf()),
property: Some(property),
resource: Some(risingwave_pb::common::worker_node::Resource {
rw_version: RW_VERSION.to_string(),
total_memory_bytes: system_memory_available_bytes() as _,
total_cpu_cores: total_cpu_available() as _,
}),
})
.await
.context("failed to add worker node")?;

let system_params_resp = grpc_meta_client
.get_system_params(GetSystemParamsRequest {})
.await?;
let system_params_resp = grpc_meta_client
.get_system_params(GetSystemParamsRequest {})
.await
.context("failed to get initial system params")?;

Ok((add_worker_resp, system_params_resp, grpc_meta_client))
})
Ok((add_worker_resp, system_params_resp, grpc_meta_client))
},
RpcError::is_connection_error,
)
.await;

let (add_worker_resp, system_params_resp, grpc_meta_client) = init_result?;
Expand Down Expand Up @@ -1708,38 +1731,40 @@ impl MetaMemberManagement {
let mut fetched_members = None;

for (addr, client) in &mut member_group.members {
let client: Result<MetaMemberClient> = try {
match client {
let members: Result<_> = try {
let mut client = match client {
Some(cached_client) => cached_client.to_owned(),
None => {
let endpoint = GrpcMetaClient::addr_to_endpoint(addr.clone());
let channel = GrpcMetaClient::connect_to_endpoint(endpoint).await?;
let channel = GrpcMetaClient::connect_to_endpoint(endpoint)
.await
.context("failed to create client")?;
let new_client: MetaMemberClient =
MetaMemberServiceClient::new(channel);
*client = Some(new_client.clone());

new_client
}
}
};

let resp = client
.members(MembersRequest {})
.await
.context("failed to fetch members")?;

resp.into_inner().members
};
if let Err(err) = client {
tracing::warn!(%addr, error = %err.as_report(), "failed to create client");
continue;
}
match client.unwrap().members(MembersRequest {}).await {
Err(err) => {
tracing::warn!(%addr, error = %err.as_report(), "failed to fetch members");
continue;
}
Ok(resp) => {
fetched_members = Some(resp.into_inner().members);
break;
}

let fetched = members.is_ok();
fetched_members = Some(members);
if fetched {
break;
}
}

let members =
fetched_members.ok_or_else(|| anyhow!("could not refresh members"))?;
let members = fetched_members
.context("no member available in the list")?
.context("could not refresh members")?;

// find new leader
let mut leader = None;
Expand Down Expand Up @@ -1916,7 +1941,7 @@ impl GrpcMetaClient {
.map(|addr| (Self::addr_to_endpoint(addr.clone()), addr))
.collect();

let endpoints = endpoints.clone();
let mut last_error = None;

for (endpoint, addr) in endpoints {
match Self::connect_to_endpoint(endpoint).await {
Expand All @@ -1929,14 +1954,19 @@ impl GrpcMetaClient {
error = %e.as_report(),
"Failed to connect to meta server {}, trying again",
addr,
)
);
last_error = Some(e);
}
}
}

Err(RpcError::Internal(anyhow!(
"Failed to connect to meta server"
)))
if let Some(last_error) = last_error {
Err(anyhow::anyhow!(last_error)
.context("failed to connect to all meta servers")
.into())
} else {
bail!("no meta server address provided")
}
}

async fn connect_to_endpoint(endpoint: Endpoint) -> Result<Channel> {
Expand Down
3 changes: 1 addition & 2 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ pub async fn compactor_serve(
Default::default(),
&config.meta,
)
.await
.unwrap();
.await;

info!("Assigned compactor id {}", meta_client.worker_id());

Expand Down
8 changes: 4 additions & 4 deletions src/tests/compaction_test/src/compaction_test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ async fn init_metadata_for_replay(
std::process::exit(0);
},
ret = MetaClient::register_new(cluster_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), &meta_config) => {
(meta_client, _) = ret.unwrap();
(meta_client, _) = ret;
},
}
let worker_id = meta_client.worker_id();
Expand All @@ -254,7 +254,7 @@ async fn init_metadata_for_replay(
Default::default(),
&meta_config,
)
.await?;
.await;
new_meta_client.activate(advertise_addr).await.unwrap();
if ci_mode {
let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap();
Expand Down Expand Up @@ -286,7 +286,7 @@ async fn pull_version_deltas(
Default::default(),
&MetaConfig::default(),
)
.await?;
.await;
let worker_id = meta_client.worker_id();
tracing::info!("Assigned pull worker id {}", worker_id);
meta_client.activate(advertise_addr).await.unwrap();
Expand Down Expand Up @@ -335,7 +335,7 @@ async fn start_replay(
Default::default(),
&config.meta,
)
.await?;
.await;
let worker_id = meta_client.worker_id();
tracing::info!("Assigned replay worker id {}", worker_id);
meta_client.activate(&advertise_addr).await.unwrap();
Expand Down

0 comments on commit 0986508

Please sign in to comment.