diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 64ff47adb..603b0a832 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -1,7 +1,11 @@ -use std::collections::HashMap; +use std::cmp::min; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; +use futures::try_join; +use futures::StreamExt; use serde::Deserialize; use stratus::config::ImporterOnlineConfig; use stratus::eth::primitives::BlockNumber; @@ -15,14 +19,34 @@ use stratus::eth::Executor; #[cfg(feature = "metrics")] use stratus::infra::metrics; use stratus::infra::BlockchainClient; -use stratus::log_and_err; use stratus::utils::signal_handler; use stratus::GlobalServices; use tokio::sync::mpsc; -use tokio::sync::RwLock; +use tokio::task::yield_now; use tokio::time::sleep; use tokio_util::sync::CancellationToken; +// ----------------------------------------------------------------------------- +// Globals +// ----------------------------------------------------------------------------- + +/// Current block number used by the number fetcher and block fetcher. +/// +/// It is a global to avoid unnecessary synchronization using a channel. +static RPC_CURRENT_BLOCK: AtomicU64 = AtomicU64::new(0); + +// ----------------------------------------------------------------------------- +// Constants +// ----------------------------------------------------------------------------- +/// Number of blocks that are downloaded in parallel. +const PARALLEL_BLOCKS: usize = 3; + +/// Number of receipts that are downloaded in parallel. +const PARALLEL_RECEIPTS: usize = 100; + +// ----------------------------------------------------------------------------- +// Execution +// ----------------------------------------------------------------------------- #[allow(dead_code)] fn main() -> anyhow::Result<()> { let global_services = GlobalServices::::init(); @@ -34,7 +58,7 @@ async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> { let relayer = config.relayer.init(Arc::clone(&storage)).await?; let miner = config.miner.init(Arc::clone(&storage)).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer).await; - let chain = BlockchainClient::new(&config.external_rpc).await?; + let chain = Arc::new(BlockchainClient::new(&config.external_rpc).await?); let cancellation: CancellationToken = signal_handler(); let result = run_importer_online(executor, miner, storage, chain, cancellation, config.sync_interval).await; @@ -48,36 +72,73 @@ pub async fn run_importer_online( executor: Arc, miner: Arc, storage: Arc, - chain: BlockchainClient, + chain: Arc, cancellation: CancellationToken, sync_interval: Duration, ) -> anyhow::Result<()> { // start from last imported block let mut number = storage.read_mined_block_number().await?; - let (data_tx, mut data_rx) = mpsc::channel(10); - if number != BlockNumber::from(0) { number = number.next(); } - let task_cancellation = cancellation.clone(); - tokio::spawn(async move { - prefetch_blocks_and_receipts(number, chain, data_tx, task_cancellation, sync_interval).await; - }); + let (backlog_tx, backlog_rx) = mpsc::unbounded_channel(); + + // spawn block executor: + // it executes and mines blocks and expects to receive them via channel in the correct order. + let executor_cancellation = cancellation.clone(); + let task_executor = tokio::spawn(start_block_executor(executor, miner, backlog_rx, executor_cancellation)); + + // spawn block number: + // it keeps track of the blockchain current block number. + let number_fetcher_chain = Arc::clone(&chain); + let number_fetcher_cancellation = cancellation.clone(); + let task_number_fetcher = tokio::spawn(start_number_fetcher(number_fetcher_chain, number_fetcher_cancellation, sync_interval)); + + // spawn block fetcher: + // it fetches blocks and receipts in parallel and sends them to the executor in the correct order. + // it uses the number fetcher current block to determine if should keep downloading more blocks or not. + let block_fetcher_chain = Arc::clone(&chain); + let block_fetcher_cancellation = cancellation.clone(); + let task_block_fetcher = tokio::spawn(start_block_fetcher(block_fetcher_chain, block_fetcher_cancellation, backlog_tx, number)); - while let Some((block, receipts)) = data_rx.recv().await { + // await all tasks + try_join!(task_executor, task_block_fetcher, task_number_fetcher)?; + Ok(()) +} + +// ----------------------------------------------------------------------------- +// Executor +// ----------------------------------------------------------------------------- + +// Executes external blocks and persist them to storage. +async fn start_block_executor( + executor: Arc, + miner: Arc, + mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>, + cancellation: CancellationToken, +) { + while let Some((block, receipts)) = backlog_rx.recv().await { if cancellation.is_cancelled() { - tracing::info!("run_importer_online task cancelled, exiting"); + tracing::warn!("exiting importer-online block-executor because cancellation"); break; } #[cfg(feature = "metrics")] let start = metrics::now(); - executor.reexecute_external(&block, &receipts).await?; - - // mine block - miner.mine_external_mixed_and_commit().await?; + // execute and mine + let receipts = ExternalReceipts::from(receipts); + if let Err(e) = executor.reexecute_external(&block, &receipts).await { + tracing::error!(reason = ?e, number = %block.number(), "cancelling importer-online because failed to reexecute block"); + cancellation.cancel(); + break; + }; + if let Err(e) = miner.mine_external_mixed_and_commit().await { + tracing::error!(reason = ?e, number = %block.number(), "cancelling importer-online because failed to mine external block"); + cancellation.cancel(); + break; + }; #[cfg(feature = "metrics")] { @@ -85,95 +146,113 @@ pub async fn run_importer_online( metrics::inc_import_online_mined_block(start.elapsed()); } } - - Ok(()) + tracing::warn!("exiting importer-online block-executor because backlog channel was closed by the other side"); } -async fn prefetch_blocks_and_receipts( - mut number: BlockNumber, - chain: BlockchainClient, - data_tx: mpsc::Sender<(ExternalBlock, ExternalReceipts)>, - cancellation: CancellationToken, - sync_interval: Duration, -) { - let buffered_data = Arc::new(RwLock::new(HashMap::new())); - let chain_clone = chain.clone(); - - // This task will handle the ordered sending of blocks and receipts - { - let buffered_data = Arc::clone(&buffered_data); - tokio::spawn(async move { - let mut next_block_number = number; - loop { - if cancellation.is_cancelled() { - tracing::info!("prefetch_blocks_and_receipts task cancelled, closing channel"); - break; - } - let mut data = buffered_data.write().await; - - //if it is close to the last block, use the sync interval - match chain_clone.get_current_block_number().await { - Ok(current_block_number) => - if current_block_number < next_block_number.next() { - sleep(sync_interval).await; - }, - Err(e) => { - tracing::error!("failed to get current block number {:?}", e); - sleep(sync_interval).await; - } - } - - if let Some((block, receipts)) = data.remove(&next_block_number) { - data_tx.send((block, receipts)).await.expect("Failed to send block and receipts"); - next_block_number = next_block_number.next(); - } +// ----------------------------------------------------------------------------- +// Number fetcher +// ----------------------------------------------------------------------------- + +/// Retrieves the blockchain current block number. +async fn start_number_fetcher(chain: Arc, cancellation: CancellationToken, sync_interval: Duration) { + loop { + if cancellation.is_cancelled() { + tracing::warn!("exiting importer-online number-fetcher because cancellation"); + break; + } + + tracing::info!("fetching current block number"); + match chain.get_current_block_number().await { + Ok(number) => { + tracing::info!( + %number, + sync_interval = %humantime::Duration::from(sync_interval), + "fetched current block number. awaiting sync interval to retrieve again." + ); + RPC_CURRENT_BLOCK.store(number.as_u64(), Ordering::SeqCst); + sleep(sync_interval).await; + } + Err(e) => { + tracing::error!(reason = ?e, "failed to retrieve block number. retrying now."); } - }); + } } +} +// ----------------------------------------------------------------------------- +// Block fetcher +// ----------------------------------------------------------------------------- + +/// Retrieves blocks and receipts. +async fn start_block_fetcher( + chain: Arc, + cancellation: CancellationToken, + backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, + mut number: BlockNumber, +) { loop { - let mut handles = Vec::new(); - // Spawn tasks for concurrent fetching - for _ in 0..2 { - // Number of concurrent fetch tasks - let chain = chain.clone(); - let buffered_data = Arc::clone(&buffered_data); - handles.push(tokio::spawn(async move { - let block = fetch_block(&chain, number).await.unwrap(); - let receipts = fetch_receipts_in_parallel(&chain, &block).await; - - let mut data = buffered_data.write().await; - data.insert(number, (block, receipts.clone().into())); - })); + if cancellation.is_cancelled() { + tracing::warn!("exiting importer-online block-fetcher because cancellation"); + break; + } + + // if we are ahead of current block number, await until we are behind again + let rpc_current_number = RPC_CURRENT_BLOCK.load(Ordering::SeqCst); + if number.as_u64() > rpc_current_number { + yield_now().await; + continue; + } + // we are behind current, so we will fetch multiple blocks in parallel to catch up + let mut block_diff = rpc_current_number.saturating_sub(number.as_u64()); + block_diff = min(block_diff, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe + + let mut tasks = Vec::with_capacity(block_diff as usize); + while block_diff > 0 { + block_diff -= 1; + tasks.push(fetch_block_and_receipts(Arc::clone(&chain), number)); number = number.next(); } - futures::future::join_all(handles).await; + // keep fetching in order + let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS); + while let Some((block, receipts)) = tasks.next().await { + if backlog_tx.send((block, receipts)).is_err() { + tracing::error!("cancelling importer-online block-fetcher because backlog channel was closed by the other side"); + cancellation.cancel(); + break; + } + } } } -// This is an example helper function to fetch receipts in parallel -async fn fetch_receipts_in_parallel(chain: &BlockchainClient, block: &ExternalBlock) -> Vec { - let receipts_futures = block.transactions.iter().map(|tx| fetch_receipt(chain, tx.hash())); - futures::future::join_all(receipts_futures) - .await - .into_iter() - .collect::, _>>() - .unwrap() +#[tracing::instrument(skip_all)] +async fn fetch_block_and_receipts(chain: Arc, number: BlockNumber) -> (ExternalBlock, Vec) { + // fetch block + let block = fetch_block(Arc::clone(&chain), number).await; + + // fetch receipts in parallel + let mut receipts_tasks = Vec::with_capacity(block.transactions.len()); + for hash in block.transactions.iter().map(|tx| tx.hash()) { + receipts_tasks.push(fetch_receipt(Arc::clone(&chain), number, hash)); + } + let receipts = futures::stream::iter(receipts_tasks).buffer_unordered(PARALLEL_RECEIPTS).collect().await; + + (block, receipts) } #[tracing::instrument(skip_all)] -async fn fetch_block(chain: &BlockchainClient, number: BlockNumber) -> anyhow::Result { - let mut delay = 10; - let block = loop { +async fn fetch_block(chain: Arc, number: BlockNumber) -> ExternalBlock { + let mut backoff = 10; + loop { tracing::info!(%number, "fetching block"); let block = match chain.get_block_by_number(number).await { Ok(json) => json, Err(e) => { - tracing::warn!(reason = ?e, "retrying block download because error"); - sleep(Duration::from_millis(delay)).await; - delay *= 2; + backoff *= 2; + backoff = min(backoff, 1000); // no more than 1000ms of backoff + tracing::warn!(reason = ?e, %number, %backoff, "failed to retrieve block. retrying with backoff."); + sleep(Duration::from_millis(backoff)).await; continue; } }; @@ -181,7 +260,7 @@ async fn fetch_block(chain: &BlockchainClient, number: BlockNumber) -> anyhow::R if block.is_null() { #[cfg(not(feature = "perf"))] { - tracing::warn!(reason = %"null", "retrying block download because block is not mined yet"); + tracing::warn!(%number, "block not available yet because block is not mined. retrying now."); continue; } @@ -189,29 +268,24 @@ async fn fetch_block(chain: &BlockchainClient, number: BlockNumber) -> anyhow::R std::process::exit(0); } - break block; - }; - - match ExternalBlock::deserialize(&block) { - Ok(block) => Ok(block), - Err(e) => log_and_err!(reason = e, payload = block, "failed to deserialize external block"), + return ExternalBlock::deserialize(&block).expect("cannot fail to deserialize external block"); } } #[tracing::instrument(skip_all)] -async fn fetch_receipt(chain: &BlockchainClient, hash: Hash) -> anyhow::Result { - let receipt = loop { - tracing::info!(%hash, "fetching receipt"); - let receipt = chain.get_transaction_receipt(hash).await?; - - match receipt { - Some(receipt) => break receipt, - None => { - tracing::warn!(reason = %"null", "retrying receipt download because block is not mined yet"); +async fn fetch_receipt(chain: Arc, number: BlockNumber, hash: Hash) -> ExternalReceipt { + loop { + tracing::info!(%number, %hash, "fetching receipt"); + + match chain.get_transaction_receipt(hash).await { + Ok(Some(receipt)) => return receipt, + Ok(None) => { + tracing::warn!(%number, %hash, "receipt not available yet because block is not mined. retrying now."); continue; } + Err(e) => { + tracing::error!(reason = ?e, %number, %hash, "failed to fetch receipt. retrying now."); + } } - }; - - Ok(receipt) + } } diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index 2370779c0..2dff7e6a0 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -53,7 +53,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { let miner = config.miner.init(Arc::clone(&storage)).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer).await; let chain_url = get_chain_url(config.clone()); - let chain = BlockchainClient::new(&chain_url).await?; + let chain = Arc::new(BlockchainClient::new(&chain_url).await?); let rpc_storage = Arc::clone(&storage); let rpc_executor = Arc::clone(&executor); let rpc_miner = Arc::clone(&miner); diff --git a/src/config.rs b/src/config.rs index 5dbea4459..4e317aa91 100644 --- a/src/config.rs +++ b/src/config.rs @@ -423,7 +423,7 @@ pub struct ImporterOnlineConfig { #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")] pub external_rpc: String, - #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "600ms")] + #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms")] pub sync_interval: Duration, #[clap(flatten)]