From 9b8454b2a5d066fa303681c0267f2318ddb33160 Mon Sep 17 00:00:00 2001 From: Renato Dinhani Date: Fri, 31 May 2024 09:08:30 -0300 Subject: [PATCH 1/4] feat: use timeout instead of yield --- src/bin/importer_online.rs | 47 +++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index f4abd3507..675616eae 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -18,6 +18,7 @@ use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; use stratus::ext::DisplayExt; +use stratus::if_else; #[cfg(feature = "metrics")] use stratus::infra::metrics; use stratus::infra::tracing::warn_task_rx_closed; @@ -29,7 +30,6 @@ use stratus::utils::calculate_tps; use stratus::GlobalServices; use stratus::GlobalState; use tokio::sync::mpsc; -use tokio::task::yield_now; use tokio::time::sleep; use tokio::time::timeout; @@ -37,10 +37,15 @@ use tokio::time::timeout; // Globals // ----------------------------------------------------------------------------- -/// Current block number used by the number fetcher and block fetcher. -/// -/// It is a global to avoid unnecessary synchronization using a channel. -static RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0); +/// Current block number of the external RPC blockchain. +static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0); + +fn set_external_rpc_current_block(new_number: BlockNumber) { + let new_number_u64 = new_number.as_u64(); + let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| { + if_else!(new_number_u64 >= current_number, Some(new_number_u64), None) + }); +} // ----------------------------------------------------------------------------- // Constants @@ -51,11 +56,14 @@ const PARALLEL_BLOCKS: usize = 3; /// Number of receipts that are downloaded in parallel. const PARALLEL_RECEIPTS: usize = 100; -/// Timeout for new newHeads event before fallback to polling. +/// Timeout awaiting for newHeads event before fallback to polling. const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000); -/// Time to wait before we starting retrieving receipts because they are not immediatly available after the block is retrieved. -const BACKOFF_RECEIPTS: Duration = Duration::from_millis(45); +/// Interval before we check again if we are behind the external rpc current block number. +const BACKOFF_CATCH_UP: Duration = Duration::from_millis(20); + +/// Interval before we starting retrieving receipts because they are not immediately available after the block is retrieved. +const BACKOFF_RECEIPTS: Duration = Duration::from_millis(50); // ----------------------------------------------------------------------------- // Execution @@ -217,7 +225,7 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat let resubscribe = match timeout(TIMEOUT_NEW_HEADS, sub.next()).await { Ok(Some(Ok(block))) => { tracing::info!(number = %block.number(), "newHeads event received"); - RPC_CURRENT_BLOCK.store(block.number().as_u64(), Ordering::SeqCst); + set_external_rpc_current_block(block.number()); continue; } Ok(None) => { @@ -256,7 +264,7 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat sync_interval = %sync_interval.to_string_ext(), "fetched current block number via http. awaiting sync interval to retrieve again." ); - RPC_CURRENT_BLOCK.store(number.as_u64(), Ordering::SeqCst); + set_external_rpc_current_block(number); sleep(sync_interval).await; } Err(e) => { @@ -274,7 +282,7 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat async fn start_block_fetcher( chain: Arc, backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, - mut number: BlockNumber, + mut importer_block_number: BlockNumber, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-fetcher"; @@ -284,21 +292,22 @@ async fn start_block_fetcher( } // if we are ahead of current block number, await until we are behind again - let rpc_current_number = RPC_CURRENT_BLOCK.load(Ordering::SeqCst); - if number.as_u64() > rpc_current_number { - yield_now().await; + let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed); + if importer_block_number.as_u64() > external_rpc_current_block { + sleep(BACKOFF_CATCH_UP).await; continue; } // we are behind current, so we will fetch multiple blocks in parallel to catch up - let mut blocks_to_fetch = rpc_current_number.saturating_sub(number.as_u64()) + 1; - blocks_to_fetch = min(blocks_to_fetch, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe + let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; + let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe + tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks"); let mut tasks = Vec::with_capacity(blocks_to_fetch as usize); while blocks_to_fetch > 0 { blocks_to_fetch -= 1; - tasks.push(fetch_block_and_receipts(Arc::clone(&chain), number)); - number = number.next(); + tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number)); + importer_block_number = importer_block_number.next(); } // keep fetching in order @@ -318,7 +327,7 @@ async fn fetch_block_and_receipts(chain: Arc, number: BlockNum let block = fetch_block(Arc::clone(&chain), number).await; // wait some time until receipts are available - let _ = tokio::time::sleep(BACKOFF_RECEIPTS).await; + let _ = sleep(BACKOFF_RECEIPTS).await; // fetch receipts in parallel let mut receipts_tasks = Vec::with_capacity(block.transactions.len()); From a837946bec6c54347649dcaa9f8a29c0a5d511c7 Mon Sep 17 00:00:00 2001 From: Renato Dinhani Date: Fri, 31 May 2024 09:10:38 -0300 Subject: [PATCH 2/4] docs --- src/bin/importer_online.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 675616eae..ef3fd4d08 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -40,6 +40,7 @@ use tokio::time::timeout; /// Current block number of the external RPC blockchain. static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0); +/// Only sets the external rpcl current block number if it is equals or greater than the current one. fn set_external_rpc_current_block(new_number: BlockNumber) { let new_number_u64 = new_number.as_u64(); let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| { From bebcc86a7ad404121e492a9d6f5d729c1e8271b3 Mon Sep 17 00:00:00 2001 From: Renato Dinhani Date: Fri, 31 May 2024 09:13:33 -0300 Subject: [PATCH 3/4] doc --- src/bin/importer_online.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index ef3fd4d08..8223759f2 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -40,7 +40,7 @@ use tokio::time::timeout; /// Current block number of the external RPC blockchain. static EXTERNAL_RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0); -/// Only sets the external rpcl current block number if it is equals or greater than the current one. +/// Only sets the external RPC current block number if it is equals or greater than the current one. fn set_external_rpc_current_block(new_number: BlockNumber) { let new_number_u64 = new_number.as_u64(); let _ = EXTERNAL_RPC_CURRENT_BLOCK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current_number| { From 902972cefc7dcc625e1b14e21d0ced418d958bab Mon Sep 17 00:00:00 2001 From: Renato Dinhani Date: Fri, 31 May 2024 09:14:22 -0300 Subject: [PATCH 4/4] rename --- src/bin/importer_online.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 8223759f2..b2ef5500d 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -61,10 +61,10 @@ const PARALLEL_RECEIPTS: usize = 100; const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000); /// Interval before we check again if we are behind the external rpc current block number. -const BACKOFF_CATCH_UP: Duration = Duration::from_millis(20); +const INTERVAL_CATCH_UP: Duration = Duration::from_millis(20); /// Interval before we starting retrieving receipts because they are not immediately available after the block is retrieved. -const BACKOFF_RECEIPTS: Duration = Duration::from_millis(50); +const INTERVAL_FETCH_RECEIPTS: Duration = Duration::from_millis(50); // ----------------------------------------------------------------------------- // Execution @@ -295,7 +295,7 @@ async fn start_block_fetcher( // if we are ahead of current block number, await until we are behind again let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed); if importer_block_number.as_u64() > external_rpc_current_block { - sleep(BACKOFF_CATCH_UP).await; + sleep(INTERVAL_CATCH_UP).await; continue; } @@ -328,7 +328,7 @@ async fn fetch_block_and_receipts(chain: Arc, number: BlockNum let block = fetch_block(Arc::clone(&chain), number).await; // wait some time until receipts are available - let _ = sleep(BACKOFF_RECEIPTS).await; + let _ = sleep(INTERVAL_FETCH_RECEIPTS).await; // fetch receipts in parallel let mut receipts_tasks = Vec::with_capacity(block.transactions.len());