Skip to content

Commit

Permalink
fix: rocksdb timing metrics to report the average since the last coll…
Browse files Browse the repository at this point in the history
…ection, instead of cumulative (#815)
  • Loading branch information
carneiro-cw authored May 9, 2024
1 parent cc07bd8 commit 7fa4c96
Showing 1 changed file with 38 additions and 12 deletions.
50 changes: 38 additions & 12 deletions src/eth/storage/rocks/rocks_db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#[cfg(feature = "metrics")]
use std::collections::HashMap;
use std::marker::PhantomData;
#[cfg(feature = "metrics")]
use std::sync::Mutex;

use anyhow::anyhow;
use anyhow::Result;
Expand All @@ -19,6 +23,13 @@ use rocksdb::DB;
use serde::Deserialize;
use serde::Serialize;

#[cfg(feature = "metrics")]
type HistogramInt = u32;
#[cfg(feature = "metrics")]
type Sum = u64;
#[cfg(feature = "metrics")]
type Count = u64;

#[cfg(feature = "metrics")]
use crate::infra::metrics;

Expand All @@ -33,6 +44,9 @@ pub struct RocksDb<K, V> {
pub db: DB,
pub opts: Options,
_marker: PhantomData<(K, V)>,
// Last collected stats for a histogram
#[cfg(feature = "metrics")]
pub prev_stats: Mutex<HashMap<HistogramInt, (Sum, Count)>>,
}

impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Serialize + for<'de> Deserialize<'de> + Clone> RocksDb<K, V> {
Expand Down Expand Up @@ -168,6 +182,8 @@ impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Seriali
db,
opts,
_marker: PhantomData,
#[cfg(feature = "metrics")]
prev_stats: Mutex::new(HashMap::new()),
})
}

Expand Down Expand Up @@ -370,13 +386,29 @@ impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Seriali
}
}

#[cfg(feature = "metrics")]
pub fn get_histogram_average_in_interval(&self, hist: Histogram) -> u64 {
// The stats are cumulative since opening the db
// we can get the average in the time interval with: avg = (new_sum - sum)/(new_count - count)

let mut prev_values = self.prev_stats.lock().unwrap();
let (prev_sum, prev_count): (Sum, Count) = *prev_values.get(&(hist as u32)).unwrap_or(&(0, 0));
let data = self.opts.get_histogram_data(hist);
let data_count = data.count();
let data_sum = data.sum();

let Some(avg) = (data_sum - prev_sum).checked_div(data_count - prev_count) else {
return 0;
};

prev_values.insert(hist as u32, (data_sum, data_count));
avg
}

#[cfg(feature = "metrics")]
pub fn export_metrics(&self) {
let db_get = self.opts.get_histogram_data(Histogram::DbGet);
let db_write = self.opts.get_histogram_data(Histogram::DbWrite);
let compaction_time = self.opts.get_histogram_data(Histogram::CompactionTime);
let compaction_cpu_time = self.opts.get_histogram_data(Histogram::CompactionCpuTime);
let flush_time = self.opts.get_histogram_data(Histogram::FlushTime);

let block_cache_miss = self.opts.get_ticker_count(Ticker::BlockCacheMiss);
let block_cache_hit = self.opts.get_ticker_count(Ticker::BlockCacheHit);
Expand All @@ -392,15 +424,9 @@ impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Seriali
metrics::set_rocks_bytes_written(bytes_written, db_name);
metrics::set_rocks_bytes_read(bytes_read, db_name);

metrics::set_rocks_compaction_time(
compaction_time.average().max(std::u64::MIN as f64).min(std::u64::MAX as f64).round() as u64,
db_name,
);
metrics::set_rocks_compaction_cpu_time(
compaction_cpu_time.average().max(std::u64::MIN as f64).min(std::u64::MAX as f64).round() as u64,
db_name,
);
metrics::set_rocks_flush_time(flush_time.average().max(std::u64::MIN as f64).min(std::u64::MAX as f64).round() as u64, db_name);
metrics::set_rocks_compaction_time(self.get_histogram_average_in_interval(Histogram::CompactionTime), db_name);
metrics::set_rocks_compaction_cpu_time(self.get_histogram_average_in_interval(Histogram::CompactionCpuTime), db_name);
metrics::set_rocks_flush_time(self.get_histogram_average_in_interval(Histogram::FlushTime), db_name);
}
}

Expand Down

0 comments on commit 7fa4c96

Please sign in to comment.