Skip to content

Commit

Permalink
refactor: add local request-id to grpc-request (#13087)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer authored Oct 6, 2023
1 parent 5863480 commit c89d812
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
37 changes: 34 additions & 3 deletions src/meta/client/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
Expand Down Expand Up @@ -161,14 +163,23 @@ impl ClientHandle {
Req: Into<message::Request>,
Result<Resp, E>: TryFrom<message::Response>,
<Result<Resp, E> as TryFrom<message::Response>>::Error: std::fmt::Display,
E: From<MetaClientError>,
E: From<MetaClientError> + Debug,
{
static META_REQUEST_ID: AtomicU64 = AtomicU64::new(1);

let request_future = async move {
let (tx, rx) = oneshot::channel();
let req = message::ClientWorkerRequest {
request_id: META_REQUEST_ID.fetch_add(1, Ordering::Relaxed),
resp_tx: tx,
req: req.into(),
};

debug!(
request = as_debug!(&req);
"Meta ClientHandle send request to meta client worker"
);

grpc_metrics::incr_meta_grpc_client_request_inflight(1);

let res = self.req_tx.send(req).await.map_err(|e| {
Expand All @@ -181,12 +192,22 @@ impl ClientHandle {
if let Err(err) = res {
grpc_metrics::incr_meta_grpc_client_request_inflight(-1);

error!(
error = as_debug!(&err);
"Meta ClientHandle send request to meta client worker failed"
);

return Err(err);
}

let res = rx.await.map_err(|e| {
grpc_metrics::incr_meta_grpc_client_request_inflight(-1);

error!(
error = as_debug!(&e);
"Meta ClientHandle recv response from meta client worker failed"
);

MetaClientError::ClientRuntimeError(
AnyError::new(&e).add_context(|| "when recv resp from MetaGrpcClient worker"),
)
Expand Down Expand Up @@ -362,6 +383,7 @@ impl MetaGrpcClient {
continue;
}

let request_id = req.request_id;
let resp_tx = req.resp_tx;
let req = req.req;
let req_name = req.name();
Expand Down Expand Up @@ -428,6 +450,7 @@ impl MetaGrpcClient {
};

debug!(
request_id = as_debug!(&request_id),
resp = as_debug!(&resp);
"MetaGrpcClient send response to the handle"
);
Expand All @@ -446,8 +469,12 @@ impl MetaGrpcClient {
);
if elapsed > 1000_f64 {
warn!(
request_id = as_display!(request_id);
"MetaGrpcClient slow request {} to {} takes {} ms: {}",
req_name, current_endpoint, elapsed, req_str,
req_name,
current_endpoint,
elapsed,
req_str,
);
}

Expand All @@ -457,7 +484,10 @@ impl MetaGrpcClient {
req_name,
&err.to_string(),
);
error!("MetaGrpcClient error: {:?}", err);
error!(
request_id = as_display!(request_id);
"MetaGrpcClient error: {:?}", err
);
} else {
grpc_metrics::incr_meta_grpc_client_request_success(
&current_endpoint,
Expand All @@ -469,6 +499,7 @@ impl MetaGrpcClient {
let send_res = resp_tx.send(resp);
if let Err(err) = send_res {
error!(
request_id = as_display!(request_id),
err = as_debug!(&err);
"MetaGrpcClient failed to send response to the handle. recv-end closed"
);
Expand Down
14 changes: 13 additions & 1 deletion src/meta/client/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use common_base::base::tokio::sync::oneshot::Sender;
use common_meta_kvapi::kvapi::GetKVReply;
use common_meta_kvapi::kvapi::GetKVReq;
Expand All @@ -36,15 +38,25 @@ use tonic::transport::Channel;
use crate::grpc_client::AuthInterceptor;

/// A request that is sent by a meta-client handle to its worker.
#[derive(Debug)]
pub struct ClientWorkerRequest {
pub(crate) request_id: u64,

/// For sending back the response to the handle.
pub(crate) resp_tx: Sender<Response>,

/// Request body
pub(crate) req: Request,
}

impl fmt::Debug for ClientWorkerRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClientWorkerRequest")
.field("request_id", &self.request_id)
.field("req", &self.req)
.finish()
}
}

/// Meta-client handle-to-worker request body
#[derive(Debug, Clone, derive_more::From)]
pub enum Request {
Expand Down

1 comment on commit c89d812

@vercel
Copy link

@vercel vercel bot commented on c89d812 Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.