diff --git a/src/common/base/src/runtime/mod.rs b/src/common/base/src/runtime/mod.rs index bc9b3d549f98..f5f1b613618a 100644 --- a/src/common/base/src/runtime/mod.rs +++ b/src/common/base/src/runtime/mod.rs @@ -49,6 +49,7 @@ pub use runtime::TrySpawn; pub use runtime::GLOBAL_TASK; pub use runtime_tracker::LimitMemGuard; pub use runtime_tracker::ThreadTracker; +pub use runtime_tracker::TrackingGuard; pub use runtime_tracker::TrackingPayload; pub use runtime_tracker::UnlimitedFuture; pub use thread::Thread; diff --git a/src/meta/client/src/grpc_client.rs b/src/meta/client/src/grpc_client.rs index 316d97d3745f..5ca7e66f7936 100644 --- a/src/meta/client/src/grpc_client.rs +++ b/src/meta/client/src/grpc_client.rs @@ -15,6 +15,7 @@ use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; +use std::str::FromStr; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -34,6 +35,8 @@ use databend_common_base::containers::ItemManager; use databend_common_base::containers::Pool; use databend_common_base::future::TimingFutureExt; use databend_common_base::runtime::Runtime; +use databend_common_base::runtime::ThreadTracker; +use databend_common_base::runtime::TrackingPayload; use databend_common_base::runtime::TrySpawn; use databend_common_base::runtime::UnlimitedFuture; use databend_common_base::GLOBAL_TASK; @@ -230,7 +233,7 @@ impl ItemManager for MetaChannelManager { /// The worker will be actually running in a dedicated runtime: `MetaGrpcClient.rt`. pub struct ClientHandle { /// For sending request to meta-client worker. - pub(crate) req_tx: Sender, + pub(crate) req_tx: Sender<(TrackingPayload, message::ClientWorkerRequest)>, /// Notify auto sync to stop. /// `oneshot::Receiver` impl `Drop` by sending a closed notification to the `Sender` half. #[allow(dead_code)] @@ -268,12 +271,18 @@ impl ClientHandle { grpc_metrics::incr_meta_grpc_client_request_inflight(1); - let res = self.req_tx.send(req).await.map_err(|e| { - let cli_err = MetaClientError::ClientRuntimeError( - AnyError::new(&e).add_context(|| "when sending req to MetaGrpcClient worker"), - ); - cli_err.into() - }); + let tracking_payload = ThreadTracker::new_tracking_payload(); + let res = self + .req_tx + .send((tracking_payload, req)) + .await + .map_err(|e| { + let cli_err = MetaClientError::ClientRuntimeError( + AnyError::new(&e) + .add_context(|| "when sending req to MetaGrpcClient worker"), + ); + cli_err.into() + }); if let Err(err) = res { grpc_metrics::incr_meta_grpc_client_request_inflight(-1); @@ -445,12 +454,15 @@ impl MetaGrpcClient { /// A worker runs a receiving-loop to accept user-request to metasrv and deals with request in the dedicated runtime. #[minitrace::trace] - async fn worker_loop(self: Arc, mut req_rx: Receiver) { + async fn worker_loop( + self: Arc, + mut req_rx: Receiver<(TrackingPayload, message::ClientWorkerRequest)>, + ) { info!("MetaGrpcClient::worker spawned"); loop { let recv_res = req_rx.recv().await; - let worker_request = match recv_res { + let (tracking_payload, worker_request) = match recv_res { None => { warn!("MetaGrpcClient handle closed. worker quit"); return; @@ -458,6 +470,8 @@ impl MetaGrpcClient { Some(x) => x, }; + let _guard = ThreadTracker::tracking(tracking_payload); + debug!(worker_request :? =(&worker_request); "MetaGrpcClient worker handle request"); let span = Span::enter_with_parent(full_name!(), &worker_request.span); @@ -1083,8 +1097,7 @@ impl MetaGrpcClient { "MetaGrpcClient::transaction request" ); - let req: Request = Request::new(txn.clone()); - let req = databend_common_tracing::inject_span_to_tonic_request(req); + let req = traced_req(txn.clone()); let mut client = self.make_established_client().await?; let result = client.transaction(req).await; @@ -1095,8 +1108,7 @@ impl MetaGrpcClient { if status_is_retryable(&s) { self.choose_next_endpoint(); let mut client = self.make_established_client().await?; - let req: Request = Request::new(txn); - let req = databend_common_tracing::inject_span_to_tonic_request(req); + let req = traced_req(txn); let ret = client.transaction(req).await?.into_inner(); return Ok(ret); } else { @@ -1129,7 +1141,18 @@ impl MetaGrpcClient { /// Inject span into a tonic request, so that on the remote peer the tracing context can be restored. fn traced_req(t: T) -> Request { let req = Request::new(t); - databend_common_tracing::inject_span_to_tonic_request(req) + let mut req = databend_common_tracing::inject_span_to_tonic_request(req); + + if let Some(query_id) = ThreadTracker::query_id() { + let key = tonic::metadata::AsciiMetadataKey::from_str("QueryID"); + let value = tonic::metadata::AsciiMetadataValue::from_str(query_id); + + if let Some((key, value)) = key.ok().zip(value.ok()) { + req.metadata_mut().insert(key, value); + } + } + + req } fn status_is_retryable(status: &Status) -> bool { diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index 3c8e1eb3c1e9..64bc498cd30d 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -19,6 +19,8 @@ use std::sync::Arc; use databend_common_arrow::arrow_format::flight::data::BasicAuth; use databend_common_base::base::tokio::sync::mpsc; use databend_common_base::future::TimingFutureExt; +use databend_common_base::runtime::ThreadTracker; +use databend_common_base::runtime::TrackingGuard; use databend_common_grpc::GrpcClaim; use databend_common_grpc::GrpcToken; use databend_common_meta_client::MetaGrpcReadReq; @@ -281,15 +283,20 @@ impl MetaService for MetaServiceImpl { async fn kv_api(&self, request: Request) -> Result, Status> { self.check_token(request.metadata())?; - network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); - let _guard = RequestInFlight::guard(); + let _guard = thread_tracking_guard(&request); + ThreadTracker::tracking_future(async move { + network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); + let _guard = RequestInFlight::guard(); - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); - let reply = self.handle_kv_api(request).in_span(root).await?; + let root = + databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let reply = self.handle_kv_api(request).in_span(root).await?; - network_metrics::incr_sent_bytes(reply.encoded_len() as u64); + network_metrics::incr_sent_bytes(reply.encoded_len() as u64); - Ok(Response::new(reply)) + Ok(Response::new(reply)) + }) + .await } type KvReadV1Stream = BoxStream; @@ -300,15 +307,20 @@ impl MetaService for MetaServiceImpl { ) -> Result, Status> { self.check_token(request.metadata())?; - network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let _guard = thread_tracking_guard(&request); + ThreadTracker::tracking_future(async move { + network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); + let root = + databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); - let (endpoint, strm) = self.handle_kv_read_v1(request).in_span(root).await?; + let (endpoint, strm) = self.handle_kv_read_v1(request).in_span(root).await?; - let mut resp = Response::new(strm); - GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref()); + let mut resp = Response::new(strm); + GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref()); - Ok(resp) + Ok(resp) + }) + .await } async fn transaction( @@ -317,18 +329,24 @@ impl MetaService for MetaServiceImpl { ) -> Result, Status> { self.check_token(request.metadata())?; - network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); - let _guard = RequestInFlight::guard(); + let _guard = thread_tracking_guard(&request); - let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); - let (endpoint, reply) = self.handle_txn(request).in_span(root).await?; + ThreadTracker::tracking_future(async move { + network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64); + let _guard = RequestInFlight::guard(); - network_metrics::incr_sent_bytes(reply.encoded_len() as u64); + let root = + databend_common_tracing::start_trace_for_remote_request(full_name!(), &request); + let (endpoint, reply) = self.handle_txn(request).in_span(root).await?; - let mut resp = Response::new(reply); - GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref()); + network_metrics::incr_sent_bytes(reply.encoded_len() as u64); - Ok(resp) + let mut resp = Response::new(reply); + GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref()); + + Ok(resp) + }) + .await } type ExportStream = Pin> + Send + 'static>>; @@ -483,3 +501,15 @@ impl MetaService for MetaServiceImpl { Err(Status::unavailable("can not get client ip address")) } } + +fn thread_tracking_guard(req: &tonic::Request) -> Option { + if let Some(value) = req.metadata().get("QueryID") { + if let Ok(value) = value.to_str() { + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + tracking_payload.query_id = Some(value.to_string()); + return Some(ThreadTracker::tracking(tracking_payload)); + } + } + + None +}