Skip to content

Commit

Permalink
feat: traced_sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw committed Jun 6, 2024
1 parent 9ef07d0 commit 4ba3754
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 44 deletions.
19 changes: 8 additions & 11 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use stratus::eth::Executor;
use stratus::ext::named_spawn;
use stratus::ext::traced_sleep;
use stratus::ext::DisplayExt;
use stratus::ext::SleepReason;
use stratus::ext::SpanExt;
use stratus::if_else;
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -277,7 +278,7 @@ async fn start_number_fetcher(chain: Arc<BlockchainClient>, sync_interval: Durat
"fetched current block number via http. awaiting sync interval to retrieve again."
);
set_external_rpc_current_block(number);
traced_sleep(sync_interval).await;
traced_sleep(sync_interval, SleepReason::SyncData).await;
}
Err(e) => {
tracing::error!(reason = ?e, "failed to retrieve block number. retrying now.");
Expand Down Expand Up @@ -343,7 +344,7 @@ async fn fetch_block_and_receipts(chain: Arc<BlockchainClient>, number: BlockNum
let block = fetch_block(Arc::clone(&chain), number).await;

// wait some time until receipts are available
let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS).await;
let _ = traced_sleep(INTERVAL_FETCH_RECEIPTS, SleepReason::SyncData).await;

// fetch receipts in parallel
let mut receipts_tasks = Vec::with_capacity(block.transactions.len());
Expand All @@ -357,29 +358,25 @@ async fn fetch_block_and_receipts(chain: Arc<BlockchainClient>, number: BlockNum

#[tracing::instrument(name = "importer::fetch_block", skip_all, fields(number))]
async fn fetch_block(chain: Arc<BlockchainClient>, number: BlockNumber) -> ExternalBlock {
const RETRY_DELAY: Duration = Duration::from_millis(10);
Span::with(|s| {
s.rec_str("number", &number);
});

let mut backoff_ms = 10;
loop {
tracing::info!(%number, "fetching block");
let block = match chain.fetch_block(number).await {
Ok(json) => json,
Err(e) => {
backoff_ms *= 2;
backoff_ms = min(backoff_ms, 1000); // no more than 1000ms of backoff
tracing::warn!(reason = ?e, %number, %backoff_ms, "failed to retrieve block. retrying with backoff.");
traced_sleep(Duration::from_millis(backoff_ms)).await;
tracing::warn!(reason = ?e, %number, delay_ms=%RETRY_DELAY.as_millis(), "failed to retrieve block. retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
continue;
}
};

if block.is_null() {
backoff_ms *= 2;
backoff_ms = min(backoff_ms, 1000); // no more than 1000ms of backoff
tracing::warn!(%number, "block not available yet because block is not mined. retrying with backoff.");
traced_sleep(Duration::from_millis(backoff_ms)).await;
tracing::warn!(%number, delay_ms=%RETRY_DELAY.as_millis(), "block not mined yet. retrying with delay.");
traced_sleep(RETRY_DELAY, SleepReason::SyncData).await;
continue;
}

Expand Down
3 changes: 2 additions & 1 deletion src/bin/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod importer_online;

use stratus::config::ExternalRelayerConfig;
use stratus::ext::traced_sleep;
use stratus::ext::SleepReason;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::utils::DropTimer;
Expand Down Expand Up @@ -47,7 +48,7 @@ async fn run(config: ExternalRelayerConfig) -> anyhow::Result<()> {
Some(block_number) => tracing::info!(number = %block_number, "relayed"),
None => {
tracing::info!("no pending block found");
traced_sleep(backoff).await;
traced_sleep(backoff, SleepReason::RetryBackoff).await;
}
};
}
Expand Down
5 changes: 3 additions & 2 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::eth::primitives::Hash;
use crate::eth::storage::StratusStorage;
use crate::ext::named_spawn;
use crate::ext::traced_sleep;
use crate::ext::SleepReason;
use crate::infra::BlockchainClient;
use crate::GlobalState;

Expand Down Expand Up @@ -200,7 +201,7 @@ impl Consensus {
loop {
let timeout = consensus.heartbeat_timeout;
tokio::select! {
_ = traced_sleep(timeout) => {
_ = traced_sleep(timeout, SleepReason::Interval) => {
if !consensus.is_leader().await {
tracing::info!("starting election due to heartbeat timeout");
Self::start_election(Arc::clone(&consensus)).await;
Expand Down Expand Up @@ -612,7 +613,7 @@ impl Consensus {
}
Err(e) => {
tracing::warn!("failed to append block to peer {:?}: {:?}", peer.client, e);
traced_sleep(RETRY_DELAY).await;
traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/eth/primitives/execution_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use sqlx::error::BoxDynError;
#[serde(rename_all = "snake_case")]
pub enum ExecutionResult {
/// Finished normally (RETURN opcode).
#[strum(serialize = "success")]
#[strum(to_string = "success")]
Success,

/// Transaction execution finished with a reversion (REVERT opcode).
#[strum(serialize = "reverted")]
#[strum(to_string = "reverted")]
Reverted,

/// Transaction execution did not finish.
#[strum(serialize = "halted")]
#[strum(to_string = "halted")]
Halted { reason: String },
}

Expand Down
4 changes: 2 additions & 2 deletions src/eth/primitives/storage_point_in_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use crate::infra::metrics::MetricLabelValue;
pub enum StoragePointInTime {
/// The current state of the EVM storage.
#[default]
#[strum(serialize = "present")]
#[strum(to_string = "present")]
Present,

/// The state of the EVM storage at the given block number.
#[strum(serialize = "past")]
#[strum(to_string = "past")]
Past(BlockNumber),
}

Expand Down
5 changes: 3 additions & 2 deletions src/eth/relayer/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::eth::primitives::Hash;
use crate::eth::primitives::TransactionMined;
use crate::ext::traced_sleep;
use crate::ext::ResultExt;
use crate::ext::SleepReason;
use crate::ext::SpanExt;
use crate::infra::blockchain_client::pending_transaction::PendingTransaction;
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -165,7 +166,7 @@ impl ExternalRelayer {
}
}
substrate_receipt = PendingTransaction::new(tx_hash, &self.substrate_chain);
traced_sleep(Duration::from_millis(50)).await;
traced_sleep(Duration::from_millis(50), SleepReason::SyncData).await;
}
}

Expand Down Expand Up @@ -260,7 +261,7 @@ impl ExternalRelayer {
let mut tries = 0;
while self.substrate_chain.fetch_transaction(tx_mined.input.hash).await.unwrap_or(None).is_none() {
tracing::warn!(?tx_mined.input.hash, ?tries, "transaction not found, retrying...");
traced_sleep(Duration::from_millis(100)).await;
traced_sleep(Duration::from_millis(100), SleepReason::SyncData).await;
tries += 1;
}

Expand Down
3 changes: 2 additions & 1 deletion src/eth/rpc/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::eth::primitives::LogMined;
use crate::ext::named_spawn;
use crate::ext::not;
use crate::ext::traced_sleep;
use crate::ext::SleepReason;
use crate::if_else;
#[cfg(feature = "metrics")]
use crate::infra::metrics;
Expand Down Expand Up @@ -84,7 +85,7 @@ impl RpcSubscriptions {
}

// await next iteration
traced_sleep(CLEANING_FREQUENCY).await;
traced_sleep(CLEANING_FREQUENCY, SleepReason::Interval).await;
}
})
}
Expand Down
37 changes: 19 additions & 18 deletions src/eth/storage/postgres_external_rpc/postgres_external_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::eth::primitives::Wei;
use crate::eth::storage::ExternalRpcStorage;
use crate::ext::traced_sleep;
use crate::ext::ResultExt;
use crate::ext::SleepReason;
use crate::log_and_err;

const MAX_RETRIES: u64 = 50;
Expand Down Expand Up @@ -75,7 +76,7 @@ impl ExternalRpcStorage for PostgresExternalRpcStorage {

async fn read_blocks_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<ExternalBlock>> {
tracing::debug!(%start, %end, "retrieving external blocks in range");
let mut attempts: u64 = 0;
let mut attempt: u64 = 1;

loop {
let result = sqlx::query_file!(
Expand All @@ -95,23 +96,23 @@ impl ExternalRpcStorage for PostgresExternalRpcStorage {
let blocks_sorted = blocks.into_iter().sorted_by_key(|x| x.number()).collect();
return Ok(blocks_sorted);
}
Err(e) => {
if attempts < MAX_RETRIES {
attempts += 1;
tracing::warn!("Attempt {} failed, retrying...: {}", attempts, e);
traced_sleep(Duration::from_millis(attempts.pow(2))).await;
// Exponential backoff
Err(e) =>
if attempt <= MAX_RETRIES {
tracing::warn!(reason = ?e, %attempt, "attempt failed. retrying now.");
attempt += 1;

let backoff = Duration::from_millis(attempt.pow(2));
traced_sleep(backoff, SleepReason::RetryBackoff).await;
} else {
return log_and_err!(reason = e, "failed to retrieve external blocks");
}
}
},
}
}
}

async fn read_receipts_in_range(&self, start: BlockNumber, end: BlockNumber) -> anyhow::Result<Vec<ExternalReceipt>> {
tracing::debug!(%start, %end, "retrieving external receipts in range");
let mut attempts: u64 = 0;
let mut attempt: u64 = 1;

loop {
let result = sqlx::query_file!(
Expand All @@ -130,16 +131,16 @@ impl ExternalRpcStorage for PostgresExternalRpcStorage {
}
return Ok(receipts);
}
Err(e) => {
if attempts < MAX_RETRIES {
attempts += 1;
tracing::warn!("Attempt {} failed, retrying...: {}", attempts, e);
traced_sleep(Duration::from_millis(attempts.pow(2))).await;
// Exponential backoff
Err(e) =>
if attempt <= MAX_RETRIES {
tracing::warn!(reason = ?e, %attempt, "attempt failed. retrying now.");
attempt += 1;

let backoff = Duration::from_millis(attempt.pow(2));
traced_sleep(backoff, SleepReason::RetryBackoff).await;
} else {
return log_and_err!(reason = e, "failed to retrieve receipts");
}
}
},
}
}
}
Expand Down
28 changes: 25 additions & 3 deletions src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,34 @@ macro_rules! log_and_err {
// Tokio
// -----------------------------------------------------------------------------

/// Indicates why a sleep is happening.
#[derive(Debug, strum::Display)]
pub enum SleepReason {
/// Task is executed at predefined intervals.
#[strum(to_string = "interval")]
Interval,

/// Task is awaiting a backoff before retrying the operation.
#[strum(to_string = "retry-backoff")]
RetryBackoff,

/// Task is awaiting an external system or component to produde or synchronize data.
#[strum(to_string = "sync-data")]
SyncData,
}

/// Sleeps the current task and tracks why it is sleeping.
#[inline(always)]
pub async fn traced_sleep(duration: Duration) {
pub async fn traced_sleep(duration: Duration, reason: SleepReason) {
#[cfg(feature = "tracing")]
{
let span = info_span!("tokio::sleep", duration_ms = %duration.as_millis());
tokio::time::sleep(duration).instrument(span).await;
let span = info_span!("tokio::sleep", duration_ms = %duration.as_millis(), %reason);
async {
tracing::debug!(duration_ms = %duration.as_millis(), %reason, "sleeping");
tokio::time::sleep(duration).await;
}
.instrument(span)
.await;
}

#[cfg(not(feature = "tracing"))]
Expand Down
3 changes: 2 additions & 1 deletion tests/test_import_external_snapshot_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use stratus::eth::storage::InMemoryTemporaryStorage;
use stratus::eth::storage::PermanentStorage;
use stratus::eth::storage::StratusStorage;
use stratus::ext::traced_sleep;
use stratus::ext::SleepReason;
use stratus::infra::docker::Docker;
use stratus::GlobalServices;
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -163,7 +164,7 @@ pub async fn execute_test(
miner.mine_external_and_commit().await.unwrap();

// get metrics from prometheus (sleep to ensure prometheus collected)
traced_sleep(Duration::from_secs(5)).await;
traced_sleep(Duration::from_secs(5), SleepReason::SyncData).await;

println!("{}\n{}\n{}", "=".repeat(80), test_name, "=".repeat(80));
for query in METRIC_QUERIES {
Expand Down

0 comments on commit 4ba3754

Please sign in to comment.