From 7fafdd0daf30646c9e08097b1f2056796b26c9bc Mon Sep 17 00:00:00 2001 From: Daniel Freire Date: Thu, 23 May 2024 19:54:00 -0300 Subject: [PATCH] chore: add instrumentation and metrics for consensus --- src/eth/consensus.rs | 17 +++++++++++++++++ src/infra/metrics/metrics_definitions.rs | 10 ++++++++++ src/infra/metrics/metrics_init.rs | 2 ++ 3 files changed, 29 insertions(+) diff --git a/src/eth/consensus.rs b/src/eth/consensus.rs index 702b1d0dd..6eace5ed5 100644 --- a/src/eth/consensus.rs +++ b/src/eth/consensus.rs @@ -13,6 +13,7 @@ use tokio::sync::mpsc::{self}; use tokio::time::sleep; use crate::config::RunWithImporterConfig; +use crate::infra::metrics; use crate::infra::BlockchainClient; const RETRY_ATTEMPTS: u32 = 3; @@ -127,6 +128,7 @@ impl Consensus { (config.online.external_rpc, config.online.external_rpc_ws) } + #[tracing::instrument(skip_all)] pub async fn discover_followers() -> Result, anyhow::Error> { let client = Client::try_default().await?; let pods: Api = Api::namespaced(client, &Self::current_namespace().unwrap_or("default".to_string())); @@ -148,7 +150,11 @@ impl Consensus { Ok(followers) } + #[tracing::instrument(skip_all)] async fn append_entries(follower: &str, entries: Vec) -> Result<(), anyhow::Error> { + #[cfg(feature = "metrics")] + let start = metrics::now(); + let client = BlockchainClient::new_http_ws(follower, None, Duration::from_secs(2)).await?; for attempt in 1..=RETRY_ATTEMPTS { @@ -163,10 +169,17 @@ impl Consensus { sleep(RETRY_DELAY).await; } + #[cfg(feature = "metrics")] + metrics::inc_append_entries(start.elapsed()); + Err(anyhow!("Failed to append entries to {} after {} attempts", follower, RETRY_ATTEMPTS)) } + #[tracing::instrument(skip_all)] pub async fn append_entries_to_followers(entries: Vec, followers: Vec) -> Result<(), anyhow::Error> { + #[cfg(feature = "metrics")] + let start = metrics::now(); + for entry in entries { for follower in &followers { if let Err(e) = Self::append_entries(follower, vec![entry.clone()]).await { @@ -174,6 +187,10 @@ impl Consensus { } } } + + #[cfg(feature = "metrics")] + metrics::inc_append_entries_to_followers(start.elapsed()); + Ok(()) } } diff --git a/src/infra/metrics/metrics_definitions.rs b/src/infra/metrics/metrics_definitions.rs index b32f4b11e..2da0e3221 100644 --- a/src/infra/metrics/metrics_definitions.rs +++ b/src/infra/metrics/metrics_definitions.rs @@ -173,3 +173,13 @@ metrics! { "Number of bytes read." gauge rocks_bytes_read{dbname} [] } + +metrics! { + group: consensus, + + "Time to run Consensus::apend_entries." + histogram_duration append_entries{} [], + + "Time to run Consensus::append_entries_to_followers." + histogram_duration append_entries_to_followers{} [] +} diff --git a/src/infra/metrics/metrics_init.rs b/src/infra/metrics/metrics_init.rs index e7af596aa..d4f69bd29 100644 --- a/src/infra/metrics/metrics_init.rs +++ b/src/infra/metrics/metrics_init.rs @@ -7,6 +7,7 @@ use metrics_exporter_prometheus::Matcher; use metrics_exporter_prometheus::PrometheusBuilder; use crate::config::MetricsHistogramKind; +use crate::infra::metrics::metrics_for_consensus; use crate::infra::metrics::metrics_for_evm; use crate::infra::metrics::metrics_for_executor; use crate::infra::metrics::metrics_for_importer_online; @@ -39,6 +40,7 @@ pub fn init_metrics(histogram_kind: MetricsHistogramKind) { metrics.extend(metrics_for_storage_read()); metrics.extend(metrics_for_storage_write()); metrics.extend(metrics_for_rocks()); + metrics.extend(metrics_for_consensus()); // init exporter let mut builder = PrometheusBuilder::new();