diff --git a/grpc-sys/build.rs b/grpc-sys/build.rs index 832a3a38..dd99fecf 100644 --- a/grpc-sys/build.rs +++ b/grpc-sys/build.rs @@ -300,7 +300,7 @@ fn build_grpc(cc: &mut cc::Build, library: &str) { fn figure_systemd_path(build_dir: &str) { let path = format!("{build_dir}/CMakeCache.txt"); - let f = BufReader::new(std::fs::File::open(&path).unwrap()); + let f = BufReader::new(std::fs::File::open(path).unwrap()); let mut libdir: Option = None; let mut libname: Option = None; for l in f.lines() { diff --git a/src/env.rs b/src/env.rs index 2c3f6c31..6a16aa04 100644 --- a/src/env.rs +++ b/src/env.rs @@ -5,6 +5,9 @@ use std::sync::mpsc; use std::sync::Arc; use std::thread::{Builder as ThreadBuilder, JoinHandle}; +use prometheus::local::LocalHistogram; +use prometheus::IntCounter; + use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue}; use crate::grpc_sys; use crate::task::CallTag; @@ -16,27 +19,30 @@ use { GRPC_TASK_WAIT_DURATION, }, crate::task::resolve, - prometheus::{ - core::{AtomicU64, Counter}, - Histogram, - }, std::time::Instant, }; +const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s + #[cfg(feature = "prometheus")] pub struct GRPCRunner { - cq_next_duration_his: Histogram, - execute_duration_his: Histogram, - wait_duration_his: Histogram, - event_counter: [Counter; 6], + cq_next_duration_his: LocalHistogram, + execute_duration_his: LocalHistogram, + wait_duration_his: LocalHistogram, + event_counter: [IntCounter; 6], + last_flush_time: Instant, } #[cfg(feature = "prometheus")] impl GRPCRunner { pub fn new(name: &String) -> GRPCRunner { - let cq_next_duration_his = GRPC_POOL_CQ_NEXT_DURATION.with_label_values(&[name]); - let execute_duration_his = GRPC_POOL_EXECUTE_DURATION.with_label_values(&[name]); - let wait_duration_his = GRPC_TASK_WAIT_DURATION.with_label_values(&[name]); + let cq_next_duration_his = GRPC_POOL_CQ_NEXT_DURATION + .with_label_values(&[name]) + .local(); + let execute_duration_his = GRPC_POOL_EXECUTE_DURATION + .with_label_values(&[name]) + .local(); + let wait_duration_his = GRPC_TASK_WAIT_DURATION.with_label_values(&[name]).local(); let event_counter = ["batch", "request", "unary", "abort", "action", "spawn"] .map(|event| GRPC_POOL_EVENT_COUNT_VEC.with_label_values(&[name, event])); GRPCRunner { @@ -44,11 +50,12 @@ impl GRPCRunner { execute_duration_his, wait_duration_his, event_counter, + last_flush_time: Instant::now(), } } // event loop - pub fn run(&self, tx: mpsc::Sender) { + pub fn run(&mut self, tx: mpsc::Sender) { let cq = Arc::new(CompletionQueueHandle::new()); let worker_info = Arc::new(WorkQueue::new()); let cq = CompletionQueue::new(cq, worker_info); @@ -73,7 +80,21 @@ impl GRPCRunner { } self.execute_duration_his .observe(now.elapsed().as_secs_f64()); + self.maybe_flush(); + } + } + + fn maybe_flush(&mut self) { + let now = Instant::now(); + if now.saturating_duration_since(self.last_flush_time) + < std::time::Duration::from_millis(METRICS_FLUSH_INTERVAL) + { + return; } + self.last_flush_time = now; + self.cq_next_duration_his.flush(); + self.execute_duration_his.flush(); + self.wait_duration_his.flush(); } fn resolve(&self, tag: Box, cq: &CompletionQueue, success: bool) { @@ -193,7 +214,7 @@ impl EnvBuilder { .as_ref() .map_or(format!("grpc-pool-{i}"), |prefix| format!("{prefix}-{i}")); #[cfg(feature = "prometheus")] - let runner = GRPCRunner::new(&name); + let mut runner = GRPCRunner::new(&name); builder = builder.name(name); let after_start = self.after_start.clone(); let before_stop = self.before_stop.clone(); diff --git a/src/metrics.rs b/src/metrics.rs index 690d0f64..8e1e9bed 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -29,7 +29,7 @@ lazy_static! { "grpc_pool_execute_duration", "Bucketed histogram of grpc pool execute duration for every time", &["name"], - exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms + exponential_buckets(1e-7, 2.0, 30).unwrap() // 100ns ~ 100s ) .unwrap();