diff --git a/src/eth/importer/importer.rs b/src/eth/importer/importer.rs index 1078419a5..30176b431 100644 --- a/src/eth/importer/importer.rs +++ b/src/eth/importer/importer.rs @@ -108,19 +108,25 @@ impl Importer { // it executes and mines blocks and expects to receive them via channel in the correct order. let task_executor = spawn_named( "importer::executor", - start_block_executor(Arc::clone(&self.executor), Arc::clone(&self.miner), backlog_rx), + Importer::start_block_executor(Arc::clone(&self.executor), Arc::clone(&self.miner), backlog_rx), ); // spawn block number: // it keeps track of the blockchain current block number. let number_fetcher_chain = Arc::clone(&self.chain); - let task_number_fetcher = spawn_named("importer::number-fetcher", start_number_fetcher(number_fetcher_chain, self.sync_interval)); + let task_number_fetcher = spawn_named( + "importer::number-fetcher", + Importer::start_number_fetcher(number_fetcher_chain, self.sync_interval), + ); // spawn block fetcher: // it fetches blocks and receipts in parallel and sends them to the executor in the correct order. // it uses the number fetcher current block to determine if should keep downloading more blocks or not. let block_fetcher_chain = Arc::clone(&self.chain); - let task_block_fetcher = spawn_named("importer::block-fetcher", start_block_fetcher(block_fetcher_chain, backlog_tx, number)); + let task_block_fetcher = spawn_named( + "importer::block-fetcher", + Importer::start_block_fetcher(block_fetcher_chain, backlog_tx, number), + ); // await all tasks if let Err(e) = try_join!(task_executor, task_block_fetcher, task_number_fetcher) { @@ -128,231 +134,235 @@ impl Importer { } Ok(()) } -} -// ----------------------------------------------------------------------------- -// Executor -// ----------------------------------------------------------------------------- - -// Executes external blocks and persist them to storage. -async fn start_block_executor( - executor: Arc, - miner: Arc, - mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>, -) -> anyhow::Result<()> { - const TASK_NAME: &str = "block-executor"; + // ----------------------------------------------------------------------------- + // Executor + // ----------------------------------------------------------------------------- - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } + // Executes external blocks and persist them to storage. + async fn start_block_executor( + executor: Arc, + miner: Arc, + mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>, + ) -> anyhow::Result<()> { + const TASK_NAME: &str = "block-executor"; - let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await { - Ok(Some(inner)) => inner, - Ok(None) => break, // channel closed - Err(_timed_out) => { - tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second"); - continue; + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); } - }; - #[cfg(feature = "metrics")] - let start = metrics::now(); + let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await { + Ok(Some(inner)) => inner, + Ok(None) => break, // channel closed + Err(_timed_out) => { + tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second"); + continue; + } + }; - // execute and mine - let receipts = ExternalReceipts::from(receipts); - if let Err(e) = executor.execute_external_block(&block, &receipts) { - let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block"); - return log_and_err!(reason = e, message); - }; + #[cfg(feature = "metrics")] + let start = metrics::now(); - // statistics - #[cfg(feature = "metrics")] - { - let duration = start.elapsed(); - let tps = calculate_tps(duration, block.transactions.len()); - - tracing::info!( - tps, - duraton = %duration.to_string_ext(), - block_number = %block.number(), - receipts = receipts.len(), - "reexecuted external block", - ); - } + // execute and mine + let receipts = ExternalReceipts::from(receipts); + if let Err(e) = executor.execute_external_block(&block, &receipts) { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block"); + return log_and_err!(reason = e, message); + }; - if let Err(e) = miner.mine_external_mixed_and_commit() { - let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); - return log_and_err!(reason = e, message); - }; + // statistics + #[cfg(feature = "metrics")] + { + let duration = start.elapsed(); + let tps = calculate_tps(duration, block.transactions.len()); - #[cfg(feature = "metrics")] - { - metrics::inc_n_importer_online_transactions_total(receipts.len() as u64); - metrics::inc_import_online_mined_block(start.elapsed()); - } - } + tracing::info!( + tps, + duraton = %duration.to_string_ext(), + block_number = %block.number(), + receipts = receipts.len(), + "reexecuted external block", + ); + } - warn_task_tx_closed(TASK_NAME); - Ok(()) -} + if let Err(e) = miner.mine_external_mixed_and_commit() { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); + return log_and_err!(reason = e, message); + }; -// ----------------------------------------------------------------------------- -// Number fetcher -// ----------------------------------------------------------------------------- + #[cfg(feature = "metrics")] + { + metrics::inc_n_importer_online_transactions_total(receipts.len() as u64); + metrics::inc_import_online_mined_block(start.elapsed()); + } + } -/// Retrieves the blockchain current block number. -async fn start_number_fetcher(chain: Arc, sync_interval: Duration) -> anyhow::Result<()> { - const TASK_NAME: &str = "external-number-fetcher"; + warn_task_tx_closed(TASK_NAME); + Ok(()) + } - // initial newHeads subscriptions. - // abort application if cannot subscribe. - let mut sub_new_heads = if chain.supports_ws() { - tracing::info!("{} subscribing to newHeads event", TASK_NAME); + // ----------------------------------------------------------------------------- + // Number fetcher + // ----------------------------------------------------------------------------- - match chain.subscribe_new_heads().await { - Ok(sub) => { - tracing::info!("{} subscribed to newHeads events", TASK_NAME); - Some(sub) - } - Err(e) => { - let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event"); - return log_and_err!(reason = e, message); - } - } - } else { - tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME); - None - }; + /// Retrieves the blockchain current block number. + async fn start_number_fetcher(chain: Arc, sync_interval: Duration) -> anyhow::Result<()> { + const TASK_NAME: &str = "external-number-fetcher"; - // keep reading websocket subscription or polling via http. - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } + // initial newHeads subscriptions. + // abort application if cannot subscribe. + let mut sub_new_heads = if chain.supports_ws() { + tracing::info!("{} subscribing to newHeads event", TASK_NAME); - // if we have a subscription, try to read from subscription. - // in case of failure, re-subscribe because current subscription may have been closed in the server. - if let Some(sub) = &mut sub_new_heads { - tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME); - let resubscribe_ws = match timeout(TIMEOUT_NEW_HEADS, sub.next()).await { - Ok(Some(Ok(block))) => { - tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME); - set_external_rpc_current_block(block.number()); - continue; + match chain.subscribe_new_heads().await { + Ok(sub) => { + tracing::info!("{} subscribed to newHeads events", TASK_NAME); + Some(sub) } - Ok(None) => { - if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME); - } - true + Err(e) => { + let message = GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event"); + return log_and_err!(reason = e, message); } - Ok(Some(Err(e))) => { - if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME); - } - true - } - Err(_) => { - if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME); - } - true - } - }; + } + } else { + tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME); + None + }; + // keep reading websocket subscription or polling via http. + loop { if GlobalState::is_shutdown_warn(TASK_NAME) { return Ok(()); } - // resubscribe if necessary. - // only update the existing subscription if succedeed, otherwise we will try again in the next iteration. - if chain.supports_ws() && resubscribe_ws { - tracing::info!("{} resubscribing to newHeads event", TASK_NAME); - match chain.subscribe_new_heads().await { - Ok(sub) => { - tracing::info!("{} resubscribed to newHeads event", TASK_NAME); - sub_new_heads = Some(sub); + // if we have a subscription, try to read from subscription. + // in case of failure, re-subscribe because current subscription may have been closed in the server. + if let Some(sub) = &mut sub_new_heads { + tracing::info!("{} awaiting block number from newHeads subscription", TASK_NAME); + let resubscribe_ws = match timeout(TIMEOUT_NEW_HEADS, sub.next()).await { + Ok(Some(Ok(block))) => { + tracing::info!(block_number = %block.number(), "{} received newHeads event", TASK_NAME); + set_external_rpc_current_block(block.number()); + continue; + } + Ok(None) => { + if !GlobalState::is_shutdown_warn(TASK_NAME) { + tracing::error!("{} newHeads subscription closed by the other side", TASK_NAME); + } + true + } + Ok(Some(Err(e))) => { + if !GlobalState::is_shutdown_warn(TASK_NAME) { + tracing::error!(reason = ?e, "{} failed to read newHeads subscription event", TASK_NAME); + } + true } - Err(e) => + Err(_) => { if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME); - }, + tracing::error!("{} timed-out waiting for newHeads subscription event", TASK_NAME); + } + true + } + }; + + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } + + // resubscribe if necessary. + // only update the existing subscription if succedeed, otherwise we will try again in the next iteration. + if chain.supports_ws() && resubscribe_ws { + tracing::info!("{} resubscribing to newHeads event", TASK_NAME); + match chain.subscribe_new_heads().await { + Ok(sub) => { + tracing::info!("{} resubscribed to newHeads event", TASK_NAME); + sub_new_heads = Some(sub); + } + Err(e) => + if !GlobalState::is_shutdown_warn(TASK_NAME) { + tracing::error!(reason = ?e, "{} failed to resubscribe to newHeads event", TASK_NAME); + }, + } } } - } - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } - // fallback to polling - tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME); - match chain.fetch_block_number().await { - Ok(block_number) => { - tracing::info!( - %block_number, - sync_interval = %sync_interval.to_string_ext(), - "fetched current block number via http. awaiting sync interval to retrieve again." - ); - set_external_rpc_current_block(block_number); - traced_sleep(sync_interval, SleepReason::SyncData).await; + // fallback to polling + tracing::warn!("{} falling back to http polling because subscription failed or it is not enabled", TASK_NAME); + match chain.fetch_block_number().await { + Ok(block_number) => { + tracing::info!( + %block_number, + sync_interval = %sync_interval.to_string_ext(), + "fetched current block number via http. awaiting sync interval to retrieve again." + ); + set_external_rpc_current_block(block_number); + traced_sleep(sync_interval, SleepReason::SyncData).await; + } + Err(e) => + if !GlobalState::is_shutdown_warn(TASK_NAME) { + tracing::error!(reason = ?e, "failed to retrieve block number. retrying now."); + }, } - Err(e) => - if !GlobalState::is_shutdown_warn(TASK_NAME) { - tracing::error!(reason = ?e, "failed to retrieve block number. retrying now."); - }, } } -} -// ----------------------------------------------------------------------------- -// Block fetcher -// ----------------------------------------------------------------------------- + // ----------------------------------------------------------------------------- + // Block fetcher + // ----------------------------------------------------------------------------- -/// Retrieves blocks and receipts. -async fn start_block_fetcher( - chain: Arc, - backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, - mut importer_block_number: BlockNumber, -) -> anyhow::Result<()> { - const TASK_NAME: &str = "external-block-fetcher"; + /// Retrieves blocks and receipts. + async fn start_block_fetcher( + chain: Arc, + backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, + mut importer_block_number: BlockNumber, + ) -> anyhow::Result<()> { + const TASK_NAME: &str = "external-block-fetcher"; - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } - // if we are ahead of current block number, await until we are behind again - let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed); - if importer_block_number.as_u64() > external_rpc_current_block { - yield_now().await; - continue; - } + // if we are ahead of current block number, await until we are behind again + let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed); + if importer_block_number.as_u64() > external_rpc_current_block { + yield_now().await; + continue; + } - // we are behind current, so we will fetch multiple blocks in parallel to catch up - let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; // TODO: use count_to from BlockNumber - let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe - tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks"); + // we are behind current, so we will fetch multiple blocks in parallel to catch up + let blocks_behind = external_rpc_current_block.saturating_sub(importer_block_number.as_u64()) + 1; // TODO: use count_to from BlockNumber + let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe + tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks"); - let mut tasks = Vec::with_capacity(blocks_to_fetch as usize); - while blocks_to_fetch > 0 { - blocks_to_fetch -= 1; - tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number)); - importer_block_number = importer_block_number.next(); - } + let mut tasks = Vec::with_capacity(blocks_to_fetch as usize); + while blocks_to_fetch > 0 { + blocks_to_fetch -= 1; + tasks.push(fetch_block_and_receipts(Arc::clone(&chain), importer_block_number)); + importer_block_number = importer_block_number.next(); + } - // keep fetching in order - let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS); - while let Some((block, receipts)) = tasks.next().await { - if backlog_tx.send((block, receipts)).is_err() { - warn_task_rx_closed(TASK_NAME); - return Ok(()); + // keep fetching in order + let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS); + while let Some((block, receipts)) = tasks.next().await { + if backlog_tx.send((block, receipts)).is_err() { + warn_task_rx_closed(TASK_NAME); + return Ok(()); + } } } } } +// ----------------------------------------------------------------------------- +// Helpers +// ----------------------------------------------------------------------------- + #[tracing::instrument(name = "importer::fetch_block_and_receipts", skip_all, fields(block_number))] async fn fetch_block_and_receipts(chain: Arc, block_number: BlockNumber) -> (ExternalBlock, Vec) { Span::with(|s| {