diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 35d73fbb2..d2277da39 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -8,6 +8,7 @@ use stratus::eth::primitives::ExternalBlock; use stratus::eth::primitives::ExternalReceipt; use stratus::eth::primitives::ExternalReceipts; use stratus::eth::primitives::Hash; +use stratus::eth::storage::StratusStorage; #[cfg(feature = "metrics")] use stratus::infra::metrics; use stratus::infra::BlockchainClient; @@ -17,6 +18,7 @@ use stratus::log_and_err; /// Number of transactions receipts that can be fetched in parallel. const RECEIPTS_PARALELLISM: usize = 10; +#[allow(dead_code)] fn main() -> anyhow::Result<()> { let config: ImporterOnlineConfig = init_global_services(); let runtime = config.init_runtime(); @@ -24,9 +26,13 @@ fn main() -> anyhow::Result<()> { } async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> { + let storage = config.stratus_storage.init().await?; + run_importer_online(config, storage).await +} + +pub async fn run_importer_online(config: ImporterOnlineConfig, storage: Arc) -> anyhow::Result<()> { // init services let chain = BlockchainClient::new(&config.external_rpc).await?; - let storage = Arc::new(config.stratus_storage.init().await?); let executor = config.executor.init(Arc::clone(&storage)); // start from last imported block diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index d3f3bf6b1..d563b7a08 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -1,25 +1,15 @@ +mod importer_online; + use std::sync::Arc; -use futures::StreamExt; -use futures::TryStreamExt; -use stratus::config::ImporterOnlineConfig; +use importer_online::run_importer_online; use stratus::config::RunWithImporterConfig; -use stratus::eth::primitives::BlockNumber; -use stratus::eth::primitives::ExternalBlock; -use stratus::eth::primitives::ExternalReceipt; -use stratus::eth::primitives::ExternalReceipts; -use stratus::eth::primitives::Hash; use stratus::eth::rpc::serve_rpc; -use stratus::eth::storage::StratusStorage; #[cfg(feature = "metrics")] use stratus::infra::metrics; -use stratus::infra::BlockchainClient; use stratus::init_global_services; -use stratus::log_and_err; use tokio::try_join; -const RECEIPTS_PARALELLISM: usize = 10; - fn main() -> anyhow::Result<()> { let config: RunWithImporterConfig = init_global_services(); let runtime = config.init_runtime(); @@ -34,7 +24,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { let executor = stratus_config.executor.init(Arc::clone(&storage)); let rpc_task = tokio::spawn(serve_rpc(executor, Arc::clone(&storage), stratus_config)); - let importer_task = tokio::spawn(run_importer(importer_config, storage)); + let importer_task = tokio::spawn(run_importer_online(importer_config, storage)); let join_result = try_join!(rpc_task, importer_task)?; join_result.0?; @@ -42,94 +32,3 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { Ok(()) } - -// TODO: I extracted this from importer_online.rs, in the future it'd make sense to extract this function into a separate file. -async fn run_importer(config: ImporterOnlineConfig, storage: Arc) -> anyhow::Result<()> { - // init services - let chain = BlockchainClient::new(&config.external_rpc).await?; - let executor = config.executor.init(Arc::clone(&storage)); - - // start from last imported block - let mut number = storage.read_mined_block_number().await?; - - // keep importing forever - loop { - #[cfg(feature = "metrics")] - let start = metrics::now(); - - number = number.next(); - - // fetch block and receipts - let block = fetch_block(&chain, number).await?; - - // fetch receipts in parallel - let mut receipts = Vec::with_capacity(block.transactions.len()); - for tx in &block.transactions { - receipts.push(fetch_receipt(&chain, tx.hash())); - } - let receipts = futures::stream::iter(receipts).buffered(RECEIPTS_PARALELLISM).try_collect::>().await?; - - // import block - let receipts: ExternalReceipts = receipts.into(); - executor.import_external_to_perm(block, &receipts).await?; - - #[cfg(feature = "metrics")] - metrics::inc_import_online(start.elapsed()); - } -} - -async fn fetch_block(chain: &BlockchainClient, number: BlockNumber) -> anyhow::Result { - let block = loop { - tracing::info!(%number, "fetching block"); - let block = match chain.get_block_by_number(number).await { - Ok(json) => json, - Err(e) => { - tracing::warn!(reason = ?e, "retrying block download because error"); - continue; - } - }; - - if block.is_null() { - #[cfg(not(feature = "perf"))] - { - tracing::warn!(reason = %"null", "retrying block download because block is not mined yet"); - continue; - } - - #[cfg(feature = "perf")] - std::process::exit(0); - } - - break block; - }; - - match serde_json::from_value(block.clone()) { - Ok(block) => Ok(block), - Err(e) => log_and_err!(reason = e, payload = block, "failed to deserialize external block"), - } -} - -async fn fetch_receipt(chain: &BlockchainClient, hash: Hash) -> anyhow::Result { - let receipt = loop { - tracing::info!(%hash, "fetching receipt"); - let receipt = match chain.get_transaction_receipt(&hash).await { - Ok(json) => json, - Err(e) => { - tracing::warn!(reason = ?e, "retrying receipt download because error"); - continue; - } - }; - - if receipt.is_null() { - tracing::warn!(reason = %"null", "retrying receipt download because block is not mined yet"); - continue; - } - - break receipt; - }; - - match serde_json::from_value(receipt.clone()) { - Ok(receipt) => Ok(receipt), - Err(e) => log_and_err!(reason = e, payload = receipt, "failed to deserialize external receipt"), - } -}