diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 3865a8120..384fc83a0 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -10,6 +10,7 @@ use std::cmp::min; use std::fs; +use std::sync::mpsc; use std::sync::Arc; use anyhow::anyhow; @@ -24,6 +25,7 @@ use stratus::eth::primitives::ExternalReceipt; use stratus::eth::primitives::ExternalReceipts; use stratus::eth::storage::ExternalRpcStorage; use stratus::eth::storage::InMemoryPermanentStorage; +use stratus::eth::storage::INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS; use stratus::eth::BlockMiner; use stratus::eth::Executor; use stratus::ext::spawn_named; @@ -34,13 +36,26 @@ use stratus::utils::calculate_tps_and_bpm; use stratus::utils::DropTimer; use stratus::GlobalServices; use stratus::GlobalState; -use tokio::sync::mpsc; +use tokio::sync::mpsc as async_mpsc; use tokio::time::Instant; -/// Number of tasks in the backlog. Each task contains `--blocks-by-fetch` blocks and all receipts for them. -const BACKLOG_SIZE: usize = 50; +/// Number of loader tasks buffered. Each task contains `--blocks-by-fetch` blocks and all receipts for them. +const RPC_LOADER_CHANNEL_CAPACITY: usize = 25; -type BacklogTask = (Vec, Vec); +/// Size of the block batches to save after execution, and capacity of the batch channel. +/// +/// We are using half of the inmemory temporary storage and a channel with size 1, this way, we can parallize +/// execution and storage saving while staying below the inmemory storage limit. +/// +/// By setting channel capacity to 1, we use backpressure to ensure that at most +/// `INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS` are executed at a time. +/// +/// REMOVE THIS: Decrease by one to give room for the pending block. +/// TODO: why `max/2-1` doesn't work? +const STORAGE_WRITER_BATCHES_SIZE: usize = INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS / 3; +const STORAGE_WRITER_CHANNEL_CAPACITY: usize = 1; + +type LoadedBatch = (Vec, Vec); fn main() -> anyhow::Result<()> { let global_services = GlobalServices::::init(); @@ -69,55 +84,64 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { None => block_number_to_stop(&rpc_storage).await?, }; - // init shared data between importer and external rpc storage loader - let (backlog_tx, backlog_rx) = mpsc::channel::(BACKLOG_SIZE); + // init channel that sends batches from external storage loader to block executor + let (loader_tx, loader_rx) = async_mpsc::channel::(RPC_LOADER_CHANNEL_CAPACITY); + + // init channel that sends batches from executor to the storage writer + let (writer_batch_tx, writer_batch_rx) = mpsc::sync_channel::>(STORAGE_WRITER_CHANNEL_CAPACITY); // load genesis accounts let initial_accounts = rpc_storage.read_initial_accounts().await?; storage.save_accounts(initial_accounts.clone())?; - let storage_loader = execute_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, backlog_tx); - spawn_named("storage-loader", async move { - if let Err(e) = storage_loader.await { - tracing::error!(reason = ?e, "'storage-loader' task failed"); + let storage_loader_fut = run_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, loader_tx); + let _storage_loader = spawn_named("storage-loader", async move { + if let Err(err) = storage_loader_fut.await { + tracing::error!(?err, "'storage-loader' task failed"); } }); - let block_importer = spawn_thread("block-importer", || { - if let Err(e) = execute_block_importer(executor, miner, backlog_rx, block_snapshots) { - tracing::error!(reason = ?e, "'block-importer' task failed"); + let _block_executor = spawn_thread("block-executor", { + let miner = Arc::clone(&miner); + || { + if let Err(err) = run_external_block_executor(executor, miner, loader_rx, writer_batch_tx, block_snapshots) { + tracing::error!(?err, "'block-executor' task failed"); + } } }); - block_importer - .join() - .expect("'block-importer' thread panic'ed instead of properly returning an error"); + let block_saver = spawn_thread("block-saver", || { + if let Err(err) = run_block_saver(miner, writer_batch_rx) { + tracing::error!(?err, "'block-saver' task failed"); + } + }); + + block_saver.join().expect("'block-saver' thread panic'ed instead of returning an error"); Ok(()) } // ----------------------------------------------------------------------------- -// Block importer +// Block executor // ----------------------------------------------------------------------------- -fn execute_block_importer( - // services +fn run_external_block_executor( executor: Arc, miner: Arc, - // data - mut backlog_rx: mpsc::Receiver, + mut loader_rx: async_mpsc::Receiver, + writer_batch_tx: mpsc::SyncSender>, blocks_to_export_snapshot: Vec, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-executor"; - let _timer = DropTimer::start("importer-offline::execute_block_importer"); + let _timer = DropTimer::start("importer-offline::run_external_block_executor"); - // receives blocks and receipts from the backlog to reexecute and import + // receives blocks and receipts from the loader to reexecute and import loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); }; // receive new tasks to execute, or exit - let Some((blocks, receipts)) = backlog_rx.blocking_recv() else { + let Some((blocks, receipts)) = loader_rx.blocking_recv() else { tracing::info!("{} has no more blocks to process", TASK_NAME); return Ok(()); }; @@ -132,7 +156,8 @@ fn execute_block_importer( let mut transaction_count = 0; let instant_before_execution = Instant::now(); - for block in blocks.into_iter() { + let mut block_batch = Vec::with_capacity(STORAGE_WRITER_BATCHES_SIZE); + for block in blocks { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); } @@ -146,7 +171,12 @@ fn execute_block_importer( if blocks_to_export_snapshot.contains(&mined_block.number()) { export_snapshot(&block, &receipts, &mined_block)?; } - miner.commit(mined_block.clone())?; + block_batch.push(mined_block); + + if block_batch.len() == STORAGE_WRITER_BATCHES_SIZE { + writer_batch_tx.send(block_batch).unwrap(); + block_batch = vec![]; + } } let duration = instant_before_execution.elapsed(); @@ -161,21 +191,49 @@ fn execute_block_importer( receipts = receipts.len(), "reexecuted blocks batch", ); + + // send the leftovers + if !block_batch.is_empty() { + writer_batch_tx.send(block_batch).unwrap(); + } + } +} + +// ----------------------------------------------------------------------------- +// Block saver +// ----------------------------------------------------------------------------- +fn run_block_saver(miner: Arc, writer_batch_rx: mpsc::Receiver>) -> anyhow::Result<()> { + const TASK_NAME: &str = "block-saver"; + let _timer = DropTimer::start("importer-offline::run_block_saver"); + + // receives blocks and receipts from the loader to reexecute and import + loop { + if GlobalState::warn_if_shutdown(TASK_NAME) { + return Ok(()); + }; + + // receive new tasks to execute, or exit + let Ok(blocks_batch) = writer_batch_rx.recv() else { + tracing::info!("{} has no more batches to save", TASK_NAME); + return Ok(()); + }; + + for block in blocks_batch { + miner.commit(block)?; + } } } // ----------------------------------------------------------------------------- // Block loader // ----------------------------------------------------------------------------- -async fn execute_external_rpc_storage_loader( - // services +async fn run_external_rpc_storage_loader( rpc_storage: Arc, - // data blocks_by_fetch: usize, paralellism: usize, mut start: BlockNumber, end: BlockNumber, - backlog: mpsc::Sender, + loader_tx: async_mpsc::Sender, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-loader"; tracing::info!(%start, %end, "creating task {}", TASK_NAME); @@ -220,14 +278,14 @@ async fn execute_external_rpc_storage_loader( return log_and_err!(message); } - // send to backlog - if backlog.send((blocks, receipts)).await.is_err() { - return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer"))); + // send to channel + if loader_tx.send((blocks, receipts)).await.is_err() { + return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to executor"))); }; } } -async fn load_blocks_and_receipts(rpc_storage: Arc, start: BlockNumber, end: BlockNumber) -> anyhow::Result { +async fn load_blocks_and_receipts(rpc_storage: Arc, start: BlockNumber, end: BlockNumber) -> anyhow::Result { tracing::info!(%start, %end, "loading blocks and receipts"); let blocks_task = rpc_storage.read_blocks_in_range(start, end); let receipts_task = rpc_storage.read_receipts_in_range(start, end); diff --git a/src/eth/storage/inmemory/inmemory_temporary.rs b/src/eth/storage/inmemory/inmemory_temporary.rs index aee4669f9..b335feccd 100644 --- a/src/eth/storage/inmemory/inmemory_temporary.rs +++ b/src/eth/storage/inmemory/inmemory_temporary.rs @@ -26,7 +26,7 @@ use crate::eth::storage::TemporaryStorage; use crate::log_and_err; /// Number of previous blocks to keep inmemory to detect conflicts between different blocks. -const MAX_BLOCKS: usize = 64; +pub const MAX_BLOCKS: usize = 64; #[derive(Debug)] pub struct InMemoryTemporaryStorage { @@ -74,7 +74,7 @@ pub struct InMemoryTemporaryStorageState { impl InMemoryTemporaryStorageState { /// Validates there is an active pending block being mined and returns a reference to it. - fn require_active_block(&mut self) -> anyhow::Result<&PendingBlock> { + fn require_active_block(&self) -> anyhow::Result<&PendingBlock> { match &self.block { Some(block) => Ok(block), None => log_and_err!("no pending block being mined"), // try calling set_active_block_number_as_next_if_not_set or any other method to create a new block on temp storage diff --git a/src/eth/storage/inmemory/mod.rs b/src/eth/storage/inmemory/mod.rs index 5bde84360..7a470a991 100644 --- a/src/eth/storage/inmemory/mod.rs +++ b/src/eth/storage/inmemory/mod.rs @@ -6,3 +6,4 @@ pub use inmemory_history::InMemoryHistory; pub use inmemory_permanent::InMemoryPermanentStorage; pub use inmemory_permanent::InMemoryPermanentStorageState; pub use inmemory_temporary::InMemoryTemporaryStorage; +pub use inmemory_temporary::MAX_BLOCKS as INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS; diff --git a/src/eth/storage/mod.rs b/src/eth/storage/mod.rs index 2ed64ec67..3717107c8 100644 --- a/src/eth/storage/mod.rs +++ b/src/eth/storage/mod.rs @@ -15,6 +15,7 @@ pub use external_rpc_storage::ExternalRpcStorage; pub use inmemory::InMemoryPermanentStorage; pub use inmemory::InMemoryPermanentStorageState; pub use inmemory::InMemoryTemporaryStorage; +pub use inmemory::INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS; pub use permanent_storage::PermanentStorage; pub use postgres_external_rpc::PostgresExternalRpcStorage; pub use postgres_external_rpc::PostgresExternalRpcStorageConfig; diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index 7d8313fb0..b5f876ea5 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -330,7 +330,7 @@ impl StratusStorage { metrics::inc_storage_finish_block(m.elapsed, label::TEMP, m.result.is_ok()); }); - if let Ok(ref block) = result { + if let Ok(block) = &result { Span::with(|s| s.rec_str("block_number", &block.number)); } diff --git a/src/ext.rs b/src/ext.rs index da9bf3823..e9459ad40 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -161,7 +161,7 @@ macro_rules! channel_read { $crate::channel_read_impl!($rx, timeout_ms: 2000) }; ($rx: ident, $timeout_ms:expr) => { - $crate::channel_read_impl!($rx, timeout_ms: $timeout_ms), + $crate::channel_read_impl!($rx, timeout_ms: $timeout_ms) }; }