Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into recluster_dis
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Oct 16, 2023
2 parents f90cf54 + 9062634 commit 298baef
Showing 1 changed file with 29 additions and 40 deletions.
69 changes: 29 additions & 40 deletions src/meta/client/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use common_meta_types::protobuf::ExportedChunk;
use common_meta_types::protobuf::HandshakeRequest;
use common_meta_types::protobuf::MemberListReply;
use common_meta_types::protobuf::MemberListRequest;
use common_meta_types::protobuf::RaftReply;
use common_meta_types::protobuf::RaftRequest;
use common_meta_types::protobuf::WatchRequest;
use common_meta_types::protobuf::WatchResponse;
Expand Down Expand Up @@ -689,7 +688,7 @@ impl MetaGrpcClient {
Ok(r) => Ok(r.into_inner()),
Err(s) => {
if status_is_retryable(&s) {
self.mark_as_unhealthy().await;
self.mark_as_unhealthy();
let mut client = self.make_client().await?;
let req = Request::new(MemberListRequest {
data: "".to_string(),
Expand Down Expand Up @@ -893,49 +892,38 @@ impl MetaGrpcClient {
.to_raft_request()
.map_err(MetaNetworkError::InvalidArgument)?;

let req = common_tracing::inject_span_to_tonic_request(Request::new(raft_req.clone()));
for i in 0..2 {
let req = common_tracing::inject_span_to_tonic_request(Request::new(raft_req.clone()));

let mut client = self
.make_client()
.timed_ge(threshold(), info_spent("MetaGrpcClient::make_client-1"))
.await?;
let mut client = self
.make_client()
.timed_ge(threshold(), info_spent("MetaGrpcClient::make_client"))
.await?;

let result = client
.kv_api(req)
.timed_ge(threshold(), info_spent("client::kv_api-1"))
.await;
let result = client
.kv_api(req)
.timed_ge(threshold(), info_spent("client::kv_api"))
.await;

debug!(
reply = as_debug!(&result);
"MetaGrpcClient::kv_api reply"
);
debug!(
result = as_debug!(&result);
"MetaGrpcClient::kv_api result, {}-th try", i
);

let rpc_res: Result<RaftReply, Status> = match result {
Ok(r) => Ok(r.into_inner()),
Err(s) => {
if status_is_retryable(&s) {
self.mark_as_unhealthy().await;
let mut client = self
.make_client()
.timed_ge(threshold(), info_spent("MetaGrpcClient::make_client-2"))
.await?;

let req = common_tracing::inject_span_to_tonic_request(Request::new(raft_req));

Ok(client
.kv_api(req)
.timed_ge(threshold(), info_spent("client::kv_api-2"))
.await?
.into_inner())
} else {
Err(s)
if let Err(ref e) = result {
if status_is_retryable(e) {
self.mark_as_unhealthy();
continue;
}
}
};
let raft_reply = rpc_res?;

let resp: T::Reply = reply_to_api_result(raft_reply)?;
Ok(resp)
let raft_reply = result?;

let resp: T::Reply = reply_to_api_result(raft_reply.into_inner())?;
return Ok(resp);
}

unreachable!("impossible to reach here");
}

#[minitrace::trace]
Expand All @@ -957,7 +945,7 @@ impl MetaGrpcClient {
Ok(r) => return Ok(r.into_inner()),
Err(s) => {
if status_is_retryable(&s) {
self.mark_as_unhealthy().await;
self.mark_as_unhealthy();
let mut client = self.make_client().await?;
let req: Request<TxnRequest> = Request::new(txn);
let req = common_tracing::inject_span_to_tonic_request(req);
Expand All @@ -978,7 +966,8 @@ impl MetaGrpcClient {

Ok(reply)
}
async fn mark_as_unhealthy(&self) {

fn mark_as_unhealthy(&self) {
let ca = self.current_endpoint.lock();
let mut ue = self.unhealthy_endpoints.lock();
ue.insert((*ca).as_ref().unwrap().clone(), ());
Expand Down

0 comments on commit 298baef

Please sign in to comment.