diff --git a/src/meta/client/src/grpc_client.rs b/src/meta/client/src/grpc_client.rs index aa6b86b3d647..c9b680dd665d 100644 --- a/src/meta/client/src/grpc_client.rs +++ b/src/meta/client/src/grpc_client.rs @@ -15,6 +15,8 @@ use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -161,14 +163,23 @@ impl ClientHandle { Req: Into, Result: TryFrom, as TryFrom>::Error: std::fmt::Display, - E: From, + E: From + Debug, { + static META_REQUEST_ID: AtomicU64 = AtomicU64::new(1); + let request_future = async move { let (tx, rx) = oneshot::channel(); let req = message::ClientWorkerRequest { + request_id: META_REQUEST_ID.fetch_add(1, Ordering::Relaxed), resp_tx: tx, req: req.into(), }; + + debug!( + request = as_debug!(&req); + "Meta ClientHandle send request to meta client worker" + ); + grpc_metrics::incr_meta_grpc_client_request_inflight(1); let res = self.req_tx.send(req).await.map_err(|e| { @@ -181,12 +192,22 @@ impl ClientHandle { if let Err(err) = res { grpc_metrics::incr_meta_grpc_client_request_inflight(-1); + error!( + error = as_debug!(&err); + "Meta ClientHandle send request to meta client worker failed" + ); + return Err(err); } let res = rx.await.map_err(|e| { grpc_metrics::incr_meta_grpc_client_request_inflight(-1); + error!( + error = as_debug!(&e); + "Meta ClientHandle recv response from meta client worker failed" + ); + MetaClientError::ClientRuntimeError( AnyError::new(&e).add_context(|| "when recv resp from MetaGrpcClient worker"), ) @@ -362,6 +383,7 @@ impl MetaGrpcClient { continue; } + let request_id = req.request_id; let resp_tx = req.resp_tx; let req = req.req; let req_name = req.name(); @@ -428,6 +450,7 @@ impl MetaGrpcClient { }; debug!( + request_id = as_debug!(&request_id), resp = as_debug!(&resp); "MetaGrpcClient send response to the handle" ); @@ -446,8 +469,12 @@ impl MetaGrpcClient { ); if elapsed > 1000_f64 { warn!( + request_id = as_display!(request_id); "MetaGrpcClient slow request {} to {} takes {} ms: {}", - req_name, current_endpoint, elapsed, req_str, + req_name, + current_endpoint, + elapsed, + req_str, ); } @@ -457,7 +484,10 @@ impl MetaGrpcClient { req_name, &err.to_string(), ); - error!("MetaGrpcClient error: {:?}", err); + error!( + request_id = as_display!(request_id); + "MetaGrpcClient error: {:?}", err + ); } else { grpc_metrics::incr_meta_grpc_client_request_success( ¤t_endpoint, @@ -469,6 +499,7 @@ impl MetaGrpcClient { let send_res = resp_tx.send(resp); if let Err(err) = send_res { error!( + request_id = as_display!(request_id), err = as_debug!(&err); "MetaGrpcClient failed to send response to the handle. recv-end closed" ); diff --git a/src/meta/client/src/message.rs b/src/meta/client/src/message.rs index ae668bbb12a6..f3eedda9ed08 100644 --- a/src/meta/client/src/message.rs +++ b/src/meta/client/src/message.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; + use common_base::base::tokio::sync::oneshot::Sender; use common_meta_kvapi::kvapi::GetKVReply; use common_meta_kvapi::kvapi::GetKVReq; @@ -36,8 +38,9 @@ use tonic::transport::Channel; use crate::grpc_client::AuthInterceptor; /// A request that is sent by a meta-client handle to its worker. -#[derive(Debug)] pub struct ClientWorkerRequest { + pub(crate) request_id: u64, + /// For sending back the response to the handle. pub(crate) resp_tx: Sender, @@ -45,6 +48,15 @@ pub struct ClientWorkerRequest { pub(crate) req: Request, } +impl fmt::Debug for ClientWorkerRequest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClientWorkerRequest") + .field("request_id", &self.request_id) + .field("req", &self.req) + .finish() + } +} + /// Meta-client handle-to-worker request body #[derive(Debug, Clone, derive_more::From)] pub enum Request {