Skip to content

Commit

Permalink
chore(meta): tracking query id for meta log (#15497)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 authored May 13, 2024
1 parent 6f6df79 commit 1ff87bb
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 34 deletions.
1 change: 1 addition & 0 deletions src/common/base/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub use runtime::TrySpawn;
pub use runtime::GLOBAL_TASK;
pub use runtime_tracker::LimitMemGuard;
pub use runtime_tracker::ThreadTracker;
pub use runtime_tracker::TrackingGuard;
pub use runtime_tracker::TrackingPayload;
pub use runtime_tracker::UnlimitedFuture;
pub use thread::Thread;
Expand Down
51 changes: 37 additions & 14 deletions src/meta/client/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand All @@ -34,6 +35,8 @@ use databend_common_base::containers::ItemManager;
use databend_common_base::containers::Pool;
use databend_common_base::future::TimingFutureExt;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::ThreadTracker;
use databend_common_base::runtime::TrackingPayload;
use databend_common_base::runtime::TrySpawn;
use databend_common_base::runtime::UnlimitedFuture;
use databend_common_base::GLOBAL_TASK;
Expand Down Expand Up @@ -230,7 +233,7 @@ impl ItemManager for MetaChannelManager {
/// The worker will be actually running in a dedicated runtime: `MetaGrpcClient.rt`.
pub struct ClientHandle {
/// For sending request to meta-client worker.
pub(crate) req_tx: Sender<message::ClientWorkerRequest>,
pub(crate) req_tx: Sender<(TrackingPayload, message::ClientWorkerRequest)>,
/// Notify auto sync to stop.
/// `oneshot::Receiver` impl `Drop` by sending a closed notification to the `Sender` half.
#[allow(dead_code)]
Expand Down Expand Up @@ -268,12 +271,18 @@ impl ClientHandle {

grpc_metrics::incr_meta_grpc_client_request_inflight(1);

let res = self.req_tx.send(req).await.map_err(|e| {
let cli_err = MetaClientError::ClientRuntimeError(
AnyError::new(&e).add_context(|| "when sending req to MetaGrpcClient worker"),
);
cli_err.into()
});
let tracking_payload = ThreadTracker::new_tracking_payload();
let res = self
.req_tx
.send((tracking_payload, req))
.await
.map_err(|e| {
let cli_err = MetaClientError::ClientRuntimeError(
AnyError::new(&e)
.add_context(|| "when sending req to MetaGrpcClient worker"),
);
cli_err.into()
});

if let Err(err) = res {
grpc_metrics::incr_meta_grpc_client_request_inflight(-1);
Expand Down Expand Up @@ -445,19 +454,24 @@ impl MetaGrpcClient {

/// A worker runs a receiving-loop to accept user-request to metasrv and deals with request in the dedicated runtime.
#[minitrace::trace]
async fn worker_loop(self: Arc<Self>, mut req_rx: Receiver<message::ClientWorkerRequest>) {
async fn worker_loop(
self: Arc<Self>,
mut req_rx: Receiver<(TrackingPayload, message::ClientWorkerRequest)>,
) {
info!("MetaGrpcClient::worker spawned");

loop {
let recv_res = req_rx.recv().await;
let worker_request = match recv_res {
let (tracking_payload, worker_request) = match recv_res {
None => {
warn!("MetaGrpcClient handle closed. worker quit");
return;
}
Some(x) => x,
};

let _guard = ThreadTracker::tracking(tracking_payload);

debug!(worker_request :? =(&worker_request); "MetaGrpcClient worker handle request");

let span = Span::enter_with_parent(full_name!(), &worker_request.span);
Expand Down Expand Up @@ -1083,8 +1097,7 @@ impl MetaGrpcClient {
"MetaGrpcClient::transaction request"
);

let req: Request<TxnRequest> = Request::new(txn.clone());
let req = databend_common_tracing::inject_span_to_tonic_request(req);
let req = traced_req(txn.clone());

let mut client = self.make_established_client().await?;
let result = client.transaction(req).await;
Expand All @@ -1095,8 +1108,7 @@ impl MetaGrpcClient {
if status_is_retryable(&s) {
self.choose_next_endpoint();
let mut client = self.make_established_client().await?;
let req: Request<TxnRequest> = Request::new(txn);
let req = databend_common_tracing::inject_span_to_tonic_request(req);
let req = traced_req(txn);
let ret = client.transaction(req).await?.into_inner();
return Ok(ret);
} else {
Expand Down Expand Up @@ -1129,7 +1141,18 @@ impl MetaGrpcClient {
/// Inject span into a tonic request, so that on the remote peer the tracing context can be restored.
fn traced_req<T>(t: T) -> Request<T> {
let req = Request::new(t);
databend_common_tracing::inject_span_to_tonic_request(req)
let mut req = databend_common_tracing::inject_span_to_tonic_request(req);

if let Some(query_id) = ThreadTracker::query_id() {
let key = tonic::metadata::AsciiMetadataKey::from_str("QueryID");
let value = tonic::metadata::AsciiMetadataValue::from_str(query_id);

if let Some((key, value)) = key.ok().zip(value.ok()) {
req.metadata_mut().insert(key, value);
}
}

req
}

fn status_is_retryable(status: &Status) -> bool {
Expand Down
70 changes: 50 additions & 20 deletions src/meta/service/src/api/grpc/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::sync::Arc;
use databend_common_arrow::arrow_format::flight::data::BasicAuth;
use databend_common_base::base::tokio::sync::mpsc;
use databend_common_base::future::TimingFutureExt;
use databend_common_base::runtime::ThreadTracker;
use databend_common_base::runtime::TrackingGuard;
use databend_common_grpc::GrpcClaim;
use databend_common_grpc::GrpcToken;
use databend_common_meta_client::MetaGrpcReadReq;
Expand Down Expand Up @@ -281,15 +283,20 @@ impl MetaService for MetaServiceImpl {
async fn kv_api(&self, request: Request<RaftRequest>) -> Result<Response<RaftReply>, Status> {
self.check_token(request.metadata())?;

network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
let _guard = RequestInFlight::guard();
let _guard = thread_tracking_guard(&request);
ThreadTracker::tracking_future(async move {
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
let _guard = RequestInFlight::guard();

let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
let reply = self.handle_kv_api(request).in_span(root).await?;
let root =
databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
let reply = self.handle_kv_api(request).in_span(root).await?;

network_metrics::incr_sent_bytes(reply.encoded_len() as u64);
network_metrics::incr_sent_bytes(reply.encoded_len() as u64);

Ok(Response::new(reply))
Ok(Response::new(reply))
})
.await
}

type KvReadV1Stream = BoxStream<StreamItem>;
Expand All @@ -300,15 +307,20 @@ impl MetaService for MetaServiceImpl {
) -> Result<Response<Self::KvReadV1Stream>, Status> {
self.check_token(request.metadata())?;

network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
let _guard = thread_tracking_guard(&request);
ThreadTracker::tracking_future(async move {
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
let root =
databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);

let (endpoint, strm) = self.handle_kv_read_v1(request).in_span(root).await?;
let (endpoint, strm) = self.handle_kv_read_v1(request).in_span(root).await?;

let mut resp = Response::new(strm);
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());
let mut resp = Response::new(strm);
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());

Ok(resp)
Ok(resp)
})
.await
}

async fn transaction(
Expand All @@ -317,18 +329,24 @@ impl MetaService for MetaServiceImpl {
) -> Result<Response<TxnReply>, Status> {
self.check_token(request.metadata())?;

network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
let _guard = RequestInFlight::guard();
let _guard = thread_tracking_guard(&request);

let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
let (endpoint, reply) = self.handle_txn(request).in_span(root).await?;
ThreadTracker::tracking_future(async move {
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
let _guard = RequestInFlight::guard();

network_metrics::incr_sent_bytes(reply.encoded_len() as u64);
let root =
databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
let (endpoint, reply) = self.handle_txn(request).in_span(root).await?;

let mut resp = Response::new(reply);
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());
network_metrics::incr_sent_bytes(reply.encoded_len() as u64);

Ok(resp)
let mut resp = Response::new(reply);
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());

Ok(resp)
})
.await
}

type ExportStream = Pin<Box<dyn Stream<Item = Result<ExportedChunk, Status>> + Send + 'static>>;
Expand Down Expand Up @@ -483,3 +501,15 @@ impl MetaService for MetaServiceImpl {
Err(Status::unavailable("can not get client ip address"))
}
}

fn thread_tracking_guard<T>(req: &tonic::Request<T>) -> Option<TrackingGuard> {
if let Some(value) = req.metadata().get("QueryID") {
if let Ok(value) = value.to_str() {
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(value.to_string());
return Some(ThreadTracker::tracking(tracking_payload));
}
}

None
}

0 comments on commit 1ff87bb

Please sign in to comment.