diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 47df3939f..1b4013938 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -20,6 +20,7 @@ use stratus::eth::Executor; use stratus::ext::named_spawn; use stratus::ext::traced_sleep; use stratus::ext::DisplayExt; +use stratus::ext::SleepReason; use stratus::ext::SpanExt; use stratus::if_else; #[cfg(feature = "metrics")] @@ -277,7 +278,7 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat "fetched current block number via http. awaiting sync interval to retrieve again." ); set_external_rpc_current_block(number); - traced_sleep(sync_interval).await; + traced_sleep(sync_interval, SleepReason::SyncData).await; } Err(e) => { tracing::error!(reason = ?e, "failed to retrieve block number. retrying now."); @@ -343,7 +344,7 @@ async fn fetch_block_and_receipts(chain: Arc, number: BlockNum let block = fetch_block(Arc::clone(&chain), number).await; // wait some time until receipts are available - let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS).await; + let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS, SleepReason::SyncData).await; // fetch receipts in parallel let mut receipts_tasks = Vec::with_capacity(block.transactions.len()); @@ -357,29 +358,25 @@ async fn fetch_block_and_receipts(chain: Arc, number: BlockNum #[tracing::instrument(name = "importer::fetch_block", skip_all, fields(number))] async fn fetch_block(chain: Arc, number: BlockNumber) -> ExternalBlock { + const RETRY_DELAY: Duration = Duration::from_millis(10); Span::with(|s| { s.rec_str("number", &number); }); - let mut backoff_ms = 10; loop { tracing::info!(%number, "fetching block"); let block = match chain.fetch_block(number).await { Ok(json) => json, Err(e) => { - backoff_ms *= 2; - backoff_ms = min(backoff_ms, 1000); // no more than 1000ms of backoff - tracing::warn!(reason = ?e, %number, %backoff_ms, "failed to retrieve block. retrying with backoff."); - traced_sleep(Duration::from_millis(backoff_ms)).await; + tracing::warn!(reason = ?e, %number, delay_ms=%RETRY_DELAY.as_millis(), "failed to retrieve block. retrying with delay."); + traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; continue; } }; if block.is_null() { - backoff_ms *= 2; - backoff_ms = min(backoff_ms, 1000); // no more than 1000ms of backoff - tracing::warn!(%number, "block not available yet because block is not mined. retrying with backoff."); - traced_sleep(Duration::from_millis(backoff_ms)).await; + tracing::warn!(%number, delay_ms=%RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay."); + traced_sleep(RETRY_DELAY, SleepReason::SyncData).await; continue; } diff --git a/src/bin/relayer.rs b/src/bin/relayer.rs index 9fc9de359..6cb7c6351 100644 --- a/src/bin/relayer.rs +++ b/src/bin/relayer.rs @@ -2,6 +2,7 @@ mod importer_online; use stratus::config::ExternalRelayerConfig; use stratus::ext::traced_sleep; +use stratus::ext::SleepReason; #[cfg(feature = "metrics")] use stratus::infra::metrics; use stratus::utils::DropTimer; @@ -47,7 +48,7 @@ async fn run(config: ExternalRelayerConfig) -> anyhow::Result<()> { Some(block_number) => tracing::info!(number = %block_number, "relayed"), None => { tracing::info!("no pending block found"); - traced_sleep(backoff).await; + traced_sleep(backoff, SleepReason::RetryBackoff).await; } }; } diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index bfd5046f6..637aadc78 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -35,6 +35,7 @@ use crate::eth::primitives::Hash; use crate::eth::storage::StratusStorage; use crate::ext::named_spawn; use crate::ext::traced_sleep; +use crate::ext::SleepReason; use crate::infra::BlockchainClient; use crate::GlobalState; @@ -200,7 +201,7 @@ impl Consensus { loop { let timeout = consensus.heartbeat_timeout; tokio::select! { - _ = traced_sleep(timeout) => { + _ = traced_sleep(timeout, SleepReason::Interval) => { if !consensus.is_leader().await { tracing::info!("starting election due to heartbeat timeout"); Self::start_election(Arc::clone(&consensus)).await; @@ -612,7 +613,7 @@ impl Consensus { } Err(e) => { tracing::warn!("failed to append block to peer {:?}: {:?}", peer.client, e); - traced_sleep(RETRY_DELAY).await; + traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await; } } } diff --git a/src/eth/primitives/execution_result.rs b/src/eth/primitives/execution_result.rs index 9e4a22ce6..0c1612bcc 100644 --- a/src/eth/primitives/execution_result.rs +++ b/src/eth/primitives/execution_result.rs @@ -8,15 +8,15 @@ use sqlx::error::BoxDynError; #[serde(rename_all = "snake_case")] pub enum ExecutionResult { /// Finished normally (RETURN opcode). - #[strum(serialize = "success")] + #[strum(to_string = "success")] Success, /// Transaction execution finished with a reversion (REVERT opcode). - #[strum(serialize = "reverted")] + #[strum(to_string = "reverted")] Reverted, /// Transaction execution did not finish. - #[strum(serialize = "halted")] + #[strum(to_string = "halted")] Halted { reason: String }, } diff --git a/src/eth/primitives/storage_point_in_time.rs b/src/eth/primitives/storage_point_in_time.rs index afa644a4b..8df02151c 100644 --- a/src/eth/primitives/storage_point_in_time.rs +++ b/src/eth/primitives/storage_point_in_time.rs @@ -15,11 +15,11 @@ use crate::infra::metrics::MetricLabelValue; pub enum StoragePointInTime { /// The current state of the EVM storage. #[default] - #[strum(serialize = "present")] + #[strum(to_string = "present")] Present, /// The state of the EVM storage at the given block number. - #[strum(serialize = "past")] + #[strum(to_string = "past")] Past(BlockNumber), } diff --git a/src/eth/relayer/external.rs b/src/eth/relayer/external.rs index cca300d6f..362d14be9 100644 --- a/src/eth/relayer/external.rs +++ b/src/eth/relayer/external.rs @@ -21,6 +21,7 @@ use crate::eth::primitives::Hash; use crate::eth::primitives::TransactionMined; use crate::ext::traced_sleep; use crate::ext::ResultExt; +use crate::ext::SleepReason; use crate::ext::SpanExt; use crate::infra::blockchain_client::pending_transaction::PendingTransaction; #[cfg(feature = "metrics")] @@ -165,7 +166,7 @@ impl ExternalRelayer { } } substrate_receipt = PendingTransaction::new(tx_hash, &self.substrate_chain); - traced_sleep(Duration::from_millis(50)).await; + traced_sleep(Duration::from_millis(50), SleepReason::SyncData).await; } } @@ -260,7 +261,7 @@ impl ExternalRelayer { let mut tries = 0; while self.substrate_chain.fetch_transaction(tx_mined.input.hash).await.unwrap_or(None).is_none() { tracing::warn!(?tx_mined.input.hash, ?tries, "transaction not found, retrying..."); - traced_sleep(Duration::from_millis(100)).await; + traced_sleep(Duration::from_millis(100), SleepReason::SyncData).await; tries += 1; } diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index a0d235b44..c4dbaf5e9 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -18,6 +18,7 @@ use crate::eth::primitives::LogMined; use crate::ext::named_spawn; use crate::ext::not; use crate::ext::traced_sleep; +use crate::ext::SleepReason; use crate::if_else; #[cfg(feature = "metrics")] use crate::infra::metrics; @@ -84,7 +85,7 @@ impl RpcSubscriptions { } // await next iteration - traced_sleep(CLEANING_FREQUENCY).await; + traced_sleep(CLEANING_FREQUENCY, SleepReason::Interval).await; } }) } diff --git a/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs b/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs index c82cac09f..41c3d9e19 100644 --- a/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs +++ b/src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs @@ -17,6 +17,7 @@ use crate::eth::primitives::Wei; use crate::eth::storage::ExternalRpcStorage; use crate::ext::traced_sleep; use crate::ext::ResultExt; +use crate::ext::SleepReason; use crate::log_and_err; const MAX_RETRIES: u64 = 50; @@ -75,7 +76,7 @@ impl ExternalRpcStorage for PostgresExternalRpcStorage { async fn read_blocks_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result> { tracing::debug!(%start, %end, "retrieving external blocks in range"); - let mut attempts: u64 = 0; + let mut attempt: u64 = 1; loop { let result = sqlx::query_file!( @@ -95,23 +96,23 @@ impl ExternalRpcStorage for PostgresExternalRpcStorage { let blocks_sorted = blocks.into_iter().sorted_by_key(|x| x.number()).collect(); return Ok(blocks_sorted); } - Err(e) => { - if attempts < MAX_RETRIES { - attempts += 1; - tracing::warn!("Attempt {} failed, retrying...: {}", attempts, e); - traced_sleep(Duration::from_millis(attempts.pow(2))).await; - // Exponential backoff + Err(e) => + if attempt <= MAX_RETRIES { + tracing::warn!(reason = ?e, %attempt, "attempt failed. retrying now."); + attempt += 1; + + let backoff = Duration::from_millis(attempt.pow(2)); + traced_sleep(backoff, SleepReason::RetryBackoff).await; } else { return log_and_err!(reason = e, "failed to retrieve external blocks"); - } - } + }, } } } async fn read_receipts_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result> { tracing::debug!(%start, %end, "retrieving external receipts in range"); - let mut attempts: u64 = 0; + let mut attempt: u64 = 1; loop { let result = sqlx::query_file!( @@ -130,16 +131,16 @@ impl ExternalRpcStorage for PostgresExternalRpcStorage { } return Ok(receipts); } - Err(e) => { - if attempts < MAX_RETRIES { - attempts += 1; - tracing::warn!("Attempt {} failed, retrying...: {}", attempts, e); - traced_sleep(Duration::from_millis(attempts.pow(2))).await; - // Exponential backoff + Err(e) => + if attempt <= MAX_RETRIES { + tracing::warn!(reason = ?e, %attempt, "attempt failed. retrying now."); + attempt += 1; + + let backoff = Duration::from_millis(attempt.pow(2)); + traced_sleep(backoff, SleepReason::RetryBackoff).await; } else { return log_and_err!(reason = e, "failed to retrieve receipts"); - } - } + }, } } } diff --git a/src/ext.rs b/src/ext.rs index bd7d5f72e..20f09cf6c 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -273,12 +273,34 @@ macro_rules! log_and_err { // Tokio // ----------------------------------------------------------------------------- +/// Indicates why a sleep is happening. +#[derive(Debug, strum::Display)] +pub enum SleepReason { + /// Task is executed at predefined intervals. + #[strum(to_string = "interval")] + Interval, + + /// Task is awaiting a backoff before retrying the operation. + #[strum(to_string = "retry-backoff")] + RetryBackoff, + + /// Task is awaiting an external system or component to produde or synchronize data. + #[strum(to_string = "sync-data")] + SyncData, +} + +/// Sleeps the current task and tracks why it is sleeping. #[inline(always)] -pub async fn traced_sleep(duration: Duration) { +pub async fn traced_sleep(duration: Duration, reason: SleepReason) { #[cfg(feature = "tracing")] { - let span = info_span!("tokio::sleep", duration_ms = %duration.as_millis()); - tokio::time::sleep(duration).instrument(span).await; + let span = info_span!("tokio::sleep", duration_ms = %duration.as_millis(), %reason); + async { + tracing::debug!(duration_ms = %duration.as_millis(), %reason, "sleeping"); + tokio::time::sleep(duration).await; + } + .instrument(span) + .await; } #[cfg(not(feature = "tracing"))] diff --git a/tests/test_import_external_snapshot_common.rs b/tests/test_import_external_snapshot_common.rs index 18f83c5f5..8c535b180 100644 --- a/tests/test_import_external_snapshot_common.rs +++ b/tests/test_import_external_snapshot_common.rs @@ -19,6 +19,7 @@ use stratus::eth::storage::InMemoryTemporaryStorage; use stratus::eth::storage::PermanentStorage; use stratus::eth::storage::StratusStorage; use stratus::ext::traced_sleep; +use stratus::ext::SleepReason; use stratus::infra::docker::Docker; use stratus::GlobalServices; #[cfg(feature = "metrics")] @@ -163,7 +164,7 @@ pub async fn execute_test( miner.mine_external_and_commit().await.unwrap(); // get metrics from prometheus (sleep to ensure prometheus collected) - traced_sleep(Duration::from_secs(5)).await; + traced_sleep(Duration::from_secs(5), SleepReason::SyncData).await; println!("{}\n{}\n{}", "=".repeat(80), test_name, "=".repeat(80)); for query in METRIC_QUERIES {