From feba05956e45f222e5d1798e16bfa9fbe3a3b885 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Mon, 10 Jun 2024 19:37:44 -0300 Subject: [PATCH 1/2] refactor: move tracing and metric enum to their modules (#1060) --- src/config.rs | 74 ++---------------------------- src/infra/metrics/metrics_init.rs | 2 +- src/infra/metrics/metrics_types.rs | 41 +++++++++++++++++ src/infra/tracing.rs | 52 ++++++++++++++++++--- 4 files changed, 93 insertions(+), 76 deletions(-) diff --git a/src/config.rs b/src/config.rs index 469d79daf..1d8d84aaa 100644 --- a/src/config.rs +++ b/src/config.rs @@ -45,8 +45,11 @@ use crate::eth::Executor; use crate::eth::TransactionRelayer; use crate::ext::binary_name; use crate::ext::parse_duration; +#[cfg(feature = "metrics")] +use crate::infra::metrics::MetricsHistogramKind; use crate::infra::tracing::info_task_spawn; use crate::infra::tracing::warn_task_tx_closed; +use crate::infra::tracing::TracingLogFormat; use crate::infra::BlockchainClient; use crate::GlobalState; @@ -85,6 +88,7 @@ pub struct CommonConfig { #[arg(long = "metrics-exporter-address", env = "METRICS_EXPORTER_ADDRESS", default_value = "0.0.0.0:9000")] pub metrics_exporter_address: SocketAddr, + #[cfg(feature = "metrics")] /// Metrics histograms will be collected using summaries or histograms (buckets)? #[arg(long = "metrics-histogram-kind", env = "METRICS_HISTOGRAM_KIND", default_value = "summary")] pub metrics_histogram_kind: MetricsHistogramKind, @@ -99,7 +103,7 @@ pub struct CommonConfig { /// How tracing events will be formatted. #[arg(long = "log-format", env = "LOG_FORMAT", default_value = "normal")] - pub log_format: LogFormat, + pub log_format: TracingLogFormat, /// Sentry URL where error events will be pushed. #[arg(long = "sentry-url", env = "SENTRY_URL")] @@ -890,74 +894,6 @@ impl FromStr for PermanentStorageKind { } } -// ----------------------------------------------------------------------------- -// Enum: LogFormat -// ----------------------------------------------------------------------------- - -/// Tracing event log format. -#[derive(DebugAsJson, strum::Display, Clone, Copy, Eq, PartialEq, serde::Serialize)] -pub enum LogFormat { - /// Minimal format: Time (no date), level, and message. - #[strum(to_string = "minimal")] - Minimal, - - /// Normal format: Default `tracing` crate configuration. - #[strum(to_string = "normal")] - Normal, - - /// Verbose format: Full datetime, level, thread, target, and message. - #[strum(to_string = "verbose")] - Verbose, - - /// JSON format: Verbose information formatted as JSON. - #[strum(to_string = "json")] - Json, -} - -impl FromStr for LogFormat { - type Err = anyhow::Error; - - fn from_str(s: &str) -> anyhow::Result { - match s.to_lowercase().trim() { - "json" => Ok(Self::Json), - "minimal" => Ok(Self::Minimal), - "normal" => Ok(Self::Normal), - "verbose" | "full" => Ok(Self::Verbose), - s => Err(anyhow!("unknown log format: {}", s)), - } - } -} - -// ----------------------------------------------------------------------------- -// Enum: MetricsHistogramKind -// ----------------------------------------------------------------------------- - -/// See: -#[derive(DebugAsJson, Clone, Copy, Eq, PartialEq, serde::Serialize)] -pub enum MetricsHistogramKind { - /// Quantiles are calculated on client-side based on recent data kept in-memory. - /// - /// Client defines the quantiles to calculate. - Summary, - - /// Quantiles are calculated on server-side based on bucket counts. - /// - /// Cient defines buckets to group observations. - Histogram, -} - -impl FromStr for MetricsHistogramKind { - type Err = anyhow::Error; - - fn from_str(s: &str) -> anyhow::Result { - match s.to_lowercase().trim() { - "summary" => Ok(Self::Summary), - "histogram" => Ok(Self::Histogram), - s => Err(anyhow!("unknown metrics histogram kind: {}", s)), - } - } -} - // ----------------------------------------------------------------------------- // Enum: ValidatorMethodConfig // ----------------------------------------------------------------------------- diff --git a/src/infra/metrics/metrics_init.rs b/src/infra/metrics/metrics_init.rs index 17396dfd0..b182aa497 100644 --- a/src/infra/metrics/metrics_init.rs +++ b/src/infra/metrics/metrics_init.rs @@ -7,7 +7,6 @@ use std::stringify; use metrics_exporter_prometheus::Matcher; use metrics_exporter_prometheus::PrometheusBuilder; -use crate::config::MetricsHistogramKind; use crate::infra::metrics::metrics_for_consensus; use crate::infra::metrics::metrics_for_evm; use crate::infra::metrics::metrics_for_executor; @@ -17,6 +16,7 @@ use crate::infra::metrics::metrics_for_json_rpc; use crate::infra::metrics::metrics_for_rocks; use crate::infra::metrics::metrics_for_storage_read; use crate::infra::metrics::metrics_for_storage_write; +use crate::infra::metrics::MetricsHistogramKind; /// Default bucket for duration based metrics. const BUCKET_FOR_DURATION: [f64; 37] = [ diff --git a/src/infra/metrics/metrics_types.rs b/src/infra/metrics/metrics_types.rs index 77e5579d9..00c8408da 100644 --- a/src/infra/metrics/metrics_types.rs +++ b/src/infra/metrics/metrics_types.rs @@ -1,5 +1,8 @@ use std::borrow::Cow; +use std::str::FromStr; +use anyhow::anyhow; +use display_json::DebugAsJson; use metrics::describe_counter; use metrics::describe_gauge; use metrics::describe_histogram; @@ -7,6 +10,10 @@ use metrics::Label; use crate::ext::not; +// ----------------------------------------------------------------------------- +// Metric +// ----------------------------------------------------------------------------- + /// Metric definition. pub(super) struct Metric { pub(super) kind: &'static str, @@ -32,6 +39,10 @@ impl Metric { } } +// ----------------------------------------------------------------------------- +// MetricLabelValue +// ----------------------------------------------------------------------------- + /// Representation of a metric label value. /// /// It exists to improve two aspects `metrics` crate does not cover: @@ -91,3 +102,33 @@ pub(super) fn into_labels(labels: Vec<(&'static str, MetricLabelValue)>) -> Vec< .map(|(key, value)| Label::new(key, value)) .collect() } + +// ----------------------------------------------------------------------------- +// MetricsHistogramKind +// ----------------------------------------------------------------------------- + +/// See: +#[derive(DebugAsJson, Clone, Copy, Eq, PartialEq, serde::Serialize)] +pub enum MetricsHistogramKind { + /// Quantiles are calculated on client-side based on recent data kept in-memory. + /// + /// Client defines the quantiles to calculate. + Summary, + + /// Quantiles are calculated on server-side based on bucket counts. + /// + /// Cient defines buckets to group observations. + Histogram, +} + +impl FromStr for MetricsHistogramKind { + type Err = anyhow::Error; + + fn from_str(s: &str) -> anyhow::Result { + match s.to_lowercase().trim() { + "summary" => Ok(Self::Summary), + "histogram" => Ok(Self::Histogram), + s => Err(anyhow!("unknown metrics histogram kind: {}", s)), + } + } +} diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index ae3023512..3a82e9ea0 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -3,9 +3,12 @@ use std::io::stdout; use std::io::IsTerminal; use std::net::SocketAddr; +use std::str::FromStr; +use anyhow::anyhow; use chrono::Local; use console_subscriber::ConsoleLayer; +use display_json::DebugAsJson; use opentelemetry::KeyValue; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::runtime; @@ -18,13 +21,12 @@ use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; use tracing_subscriber::Layer; -use crate::config::LogFormat; use crate::ext::binary_name; use crate::ext::named_spawn; /// Init application tracing. pub async fn init_tracing( - log_format: LogFormat, + log_format: TracingLogFormat, opentelemetry_url: Option<&str>, sentry_url: Option<&str>, tokio_console_address: SocketAddr, @@ -36,14 +38,14 @@ pub async fn init_tracing( println!("tracing registry: enabling console logs | format={} ansi={}", log_format, enable_ansi); let stdout_layer = match log_format { - LogFormat::Json => fmt::Layer::default() + TracingLogFormat::Json => fmt::Layer::default() .json() .with_target(true) .with_thread_ids(true) .with_thread_names(true) .with_filter(EnvFilter::from_default_env()) .boxed(), - LogFormat::Minimal => fmt::Layer::default() + TracingLogFormat::Minimal => fmt::Layer::default() .with_thread_ids(false) .with_thread_names(false) .with_target(false) @@ -51,8 +53,8 @@ pub async fn init_tracing( .with_timer(MinimalTimer) .with_filter(EnvFilter::from_default_env()) .boxed(), - LogFormat::Normal => fmt::Layer::default().with_ansi(enable_ansi).with_filter(EnvFilter::from_default_env()).boxed(), - LogFormat::Verbose => fmt::Layer::default() + TracingLogFormat::Normal => fmt::Layer::default().with_ansi(enable_ansi).with_filter(EnvFilter::from_default_env()).boxed(), + TracingLogFormat::Verbose => fmt::Layer::default() .with_ansi(enable_ansi) .with_target(true) .with_thread_ids(true) @@ -127,6 +129,44 @@ pub async fn init_tracing( } } +// ----------------------------------------------------------------------------- +// Tracing types +// ----------------------------------------------------------------------------- + +/// Tracing event log format. +#[derive(DebugAsJson, strum::Display, Clone, Copy, Eq, PartialEq, serde::Serialize)] +pub enum TracingLogFormat { + /// Minimal format: Time (no date), level, and message. + #[strum(to_string = "minimal")] + Minimal, + + /// Normal format: Default `tracing` crate configuration. + #[strum(to_string = "normal")] + Normal, + + /// Verbose format: Full datetime, level, thread, target, and message. + #[strum(to_string = "verbose")] + Verbose, + + /// JSON format: Verbose information formatted as JSON. + #[strum(to_string = "json")] + Json, +} + +impl FromStr for TracingLogFormat { + type Err = anyhow::Error; + + fn from_str(s: &str) -> anyhow::Result { + match s.to_lowercase().trim() { + "json" => Ok(Self::Json), + "minimal" => Ok(Self::Minimal), + "normal" => Ok(Self::Normal), + "verbose" | "full" => Ok(Self::Verbose), + s => Err(anyhow!("unknown log format: {}", s)), + } + } +} + struct MinimalTimer; impl FormatTime for MinimalTimer { From 7a2f5a48d98ddb77b64630ce4151d901f5640877 Mon Sep 17 00:00:00 2001 From: carneiro-cw <156914855+carneiro-cw@users.noreply.github.com> Date: Mon, 10 Jun 2024 21:15:30 -0300 Subject: [PATCH 2/2] fix: timeout pending transaction (#1061) --- src/eth/relayer/external.rs | 8 +++++++- src/infra/blockchain_client/pending_transaction.rs | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index 740141163..da49277e1 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -8,6 +8,7 @@ use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; use tokio::fs::File; use tokio::io::AsyncWriteExt; +use tokio::time::timeout; use tokio::time::Instant; use tracing::Span; @@ -142,7 +143,12 @@ impl ExternalRelayer { let start = Instant::now(); let mut substrate_receipt = substrate_pending_transaction; loop { - match substrate_receipt.await { + let Ok(receipt) = timeout(Duration::from_secs(30), substrate_receipt).await else { + tracing::error!(?tx_hash, "no receipt returned by substrate for more than 30 seconds, retrying block"); + return Err(RelayError::CompareTimeout(anyhow!("no receipt returned by substrate for more than 30 seconds"))); + }; + + match receipt { Ok(Some(substrate_receipt)) => if let Err(compare_error) = substrate_receipt.compare(&stratus_receipt) { let err_string = compare_error.to_string(); diff --git a/src/infra/blockchain_client/pending_transaction.rs b/src/infra/blockchain_client/pending_transaction.rs index 4f1efaf6d..4ea2f7729 100644 --- a/src/infra/blockchain_client/pending_transaction.rs +++ b/src/infra/blockchain_client/pending_transaction.rs @@ -88,7 +88,7 @@ impl<'a> Future for PendingTransaction<'a> { fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { let this = self.project(); - tracing::debug!(?this.state); + tracing::debug!(?this.state, ?this.retries_remaining); match this.state { PendingTxState::InitialDelay(fut) => { @@ -119,7 +119,7 @@ impl<'a> Future for PendingTransaction<'a> { let tx_opt = tx_res.unwrap(); if tx_opt.is_none() { - if *this.retries_remaining == 0 { + if *this.retries_remaining <= 0 { *this.state = PendingTxState::Completed; return Poll::Ready(Ok(None)); }