From 621c7a239189eb622732981264d599364291abb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Marcos=20Bezerra?= Date: Wed, 19 Jun 2024 04:54:37 -0300 Subject: [PATCH] optimize `importer-offline`: batch writes + more parallelism This commit divides importer-offline's task block-importer into block-executor and block-saver, to separate EVM and storage writing, now both can run in parallel Besides that, this enabled our RocksDB storage to perform block writes in batches, as a consequence, this leaves execution time as the new bottleneck for importer-offline, which hints that we don't need to optimize the storage further to improve this binary. --- src/bin/importer_offline.rs | 128 +++++++++++++----- .../storage/inmemory/inmemory_temporary.rs | 2 +- src/eth/storage/inmemory/mod.rs | 1 + src/eth/storage/mod.rs | 1 + src/eth/storage/stratus_storage.rs | 2 +- 5 files changed, 97 insertions(+), 37 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index e5e99d3c6..fd2bd87d0 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -10,6 +10,7 @@ use std::cmp::min; use std::fs; +use std::sync::mpsc; use std::sync::Arc; use anyhow::anyhow; @@ -26,6 +27,7 @@ use stratus::eth::primitives::ExternalReceipt; use stratus::eth::primitives::ExternalReceipts; use stratus::eth::storage::ExternalRpcStorage; use stratus::eth::storage::InMemoryPermanentStorage; +use stratus::eth::storage::INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS; use stratus::ext::spawn_named; use stratus::ext::spawn_thread; use stratus::ext::to_json_string_pretty; @@ -34,13 +36,26 @@ use stratus::utils::calculate_tps_and_bpm; use stratus::utils::DropTimer; use stratus::GlobalServices; use stratus::GlobalState; -use tokio::sync::mpsc; +use tokio::sync::mpsc as async_mpsc; use tokio::time::Instant; -/// Number of tasks in the backlog. Each task contains `--blocks-by-fetch` blocks and all receipts for them. -const BACKLOG_SIZE: usize = 50; +/// Number of loader tasks buffered. Each task contains `--blocks-by-fetch` blocks and all receipts for them. +const RPC_LOADER_CHANNEL_CAPACITY: usize = 25; -type BacklogTask = (Vec, Vec); +/// Size of the block batches to save after execution, and capacity of the batch channel. +/// +/// We are using half of the inmemory temporary storage and a channel with size 1, this way, we can parallize +/// execution and storage saving while staying below the inmemory storage limit. +/// +/// By setting channel capacity to 1, we use backpressure to ensure that at most +/// `INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS` are executed at a time. +/// +/// REMOVE THIS: Decrease by one to give room for the pending block. +/// TODO: why `max/2-1` doesn't work? +const STORAGE_WRITER_BATCHES_SIZE: usize = INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS / 3; +const STORAGE_WRITER_CHANNEL_CAPACITY: usize = 1; + +type LoadedBatch = (Vec, Vec); fn main() -> anyhow::Result<()> { let global_services = GlobalServices::::init(); @@ -69,29 +84,39 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { None => block_number_to_stop(&rpc_storage).await?, }; - // init shared data between importer and external rpc storage loader - let (backlog_tx, backlog_rx) = mpsc::channel::(BACKLOG_SIZE); + // init channel that sends batches from external storage loader to block executor + let (loader_tx, loader_rx) = async_mpsc::channel::(RPC_LOADER_CHANNEL_CAPACITY); + + // init channel that sends batches from executor to the storage writer + let (writer_batch_tx, writer_batch_rx) = mpsc::sync_channel::>(STORAGE_WRITER_CHANNEL_CAPACITY); // load genesis accounts let initial_accounts = rpc_storage.read_initial_accounts().await?; storage.save_accounts(initial_accounts.clone())?; - let storage_loader = execute_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, backlog_tx); - spawn_named("storage-loader", async move { - if let Err(e) = storage_loader.await { - tracing::error!(reason = ?e, "'storage-loader' task failed"); + let storage_loader_fut = run_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, loader_tx); + let _storage_loader = spawn_named("storage-loader", async move { + if let Err(err) = storage_loader_fut.await { + tracing::error!(?err, "'storage-loader' task failed"); } }); - let block_importer = spawn_thread("block-importer", || { - if let Err(e) = execute_block_importer(executor, miner, backlog_rx, block_snapshots) { - tracing::error!(reason = ?e, "'block-importer' task failed"); + let _block_executor = spawn_thread("block-executor", { + let miner = Arc::clone(&miner); + || { + if let Err(err) = run_external_block_executor(executor, miner, loader_rx, writer_batch_tx, block_snapshots) { + tracing::error!(?err, "'block-executor' task failed"); + } } }); - block_importer - .join() - .expect("'block-importer' thread panic'ed instead of properly returning an error"); + let block_saver = spawn_thread("block-saver", || { + if let Err(err) = run_block_saver(miner, writer_batch_rx) { + tracing::error!(?err, "'block-saver' task failed"); + } + }); + + block_saver.join().expect("'block-saver' thread panic'ed instead of returning an error"); // Explicitly block the `main` thread to drop the storage. drop(storage); @@ -100,27 +125,26 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { } // ----------------------------------------------------------------------------- -// Block importer +// Block executor // ----------------------------------------------------------------------------- -fn execute_block_importer( - // services +fn run_external_block_executor( executor: Arc, miner: Arc, - // data - mut backlog_rx: mpsc::Receiver, + mut loader_rx: async_mpsc::Receiver, + writer_batch_tx: mpsc::SyncSender>, blocks_to_export_snapshot: Vec, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-executor"; - let _timer = DropTimer::start("importer-offline::execute_block_importer"); + let _timer = DropTimer::start("importer-offline::run_external_block_executor"); - // receives blocks and receipts from the backlog to reexecute and import + // receives blocks and receipts from the loader to reexecute and import loop { if GlobalState::is_shutdown_warn(TASK_NAME) { return Ok(()); }; - // receive blocks to execute - let Some((blocks, receipts)) = backlog_rx.blocking_recv() else { + // receive new tasks to execute, or exit + let Some((blocks, receipts)) = loader_rx.blocking_recv() else { tracing::info!("{} has no more blocks to reexecute", TASK_NAME); return Ok(()); }; @@ -148,7 +172,8 @@ fn execute_block_importer( let mut transaction_count = 0; let instant_before_execution = Instant::now(); - for block in blocks.into_iter() { + let mut block_batch = Vec::with_capacity(STORAGE_WRITER_BATCHES_SIZE); + for block in blocks { if GlobalState::is_shutdown_warn(TASK_NAME) { return Ok(()); } @@ -162,7 +187,12 @@ fn execute_block_importer( if blocks_to_export_snapshot.contains(&mined_block.number()) { export_snapshot(&block, &receipts, &mined_block)?; } - miner.commit(mined_block.clone())?; + block_batch.push(mined_block); + + if block_batch.len() == STORAGE_WRITER_BATCHES_SIZE { + writer_batch_tx.send(block_batch).unwrap(); + block_batch = vec![]; + } } let duration = instant_before_execution.elapsed(); @@ -177,21 +207,49 @@ fn execute_block_importer( receipts = receipts.len(), "reexecuted blocks batch", ); + + // send the leftovers + if !block_batch.is_empty() { + writer_batch_tx.send(block_batch).unwrap(); + } + } +} + +// ----------------------------------------------------------------------------- +// Block saver +// ----------------------------------------------------------------------------- +fn run_block_saver(miner: Arc, writer_batch_rx: mpsc::Receiver>) -> anyhow::Result<()> { + const TASK_NAME: &str = "block-saver"; + let _timer = DropTimer::start("importer-offline::run_block_saver"); + + // receives blocks and receipts from the loader to reexecute and import + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + }; + + // receive new tasks to execute, or exit + let Ok(blocks_batch) = writer_batch_rx.recv() else { + tracing::info!("{} has no more batches to save", TASK_NAME); + return Ok(()); + }; + + for block in blocks_batch { + miner.commit(block)?; + } } } // ----------------------------------------------------------------------------- // Block loader // ----------------------------------------------------------------------------- -async fn execute_external_rpc_storage_loader( - // services +async fn run_external_rpc_storage_loader( rpc_storage: Arc, - // data blocks_by_fetch: usize, paralellism: usize, mut start: BlockNumber, end: BlockNumber, - backlog: mpsc::Sender, + loader_tx: async_mpsc::Sender, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-loader"; tracing::info!(%start, %end, "creating task {}", TASK_NAME); @@ -236,14 +294,14 @@ async fn execute_external_rpc_storage_loader( return log_and_err!(message); } - // send to backlog - if backlog.send((blocks, receipts)).await.is_err() { - return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer"))); + // send to channel + if loader_tx.send((blocks, receipts)).await.is_err() { + return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to executor"))); }; } } -async fn load_blocks_and_receipts(rpc_storage: Arc, block_start: BlockNumber, block_end: BlockNumber) -> anyhow::Result { +async fn load_blocks_and_receipts(rpc_storage: Arc, block_start: BlockNumber, block_end: BlockNumber) -> anyhow::Result { tracing::info!(%block_start, %block_end, "loading blocks and receipts"); let blocks_task = rpc_storage.read_blocks_in_range(block_start, block_end); let receipts_task = rpc_storage.read_receipts_in_range(block_start, block_end); diff --git a/src/eth/storage/inmemory/inmemory_temporary.rs b/src/eth/storage/inmemory/inmemory_temporary.rs index d7b3a5d6b..4ccaa94d1 100644 --- a/src/eth/storage/inmemory/inmemory_temporary.rs +++ b/src/eth/storage/inmemory/inmemory_temporary.rs @@ -24,7 +24,7 @@ use crate::eth::storage::TemporaryStorage; use crate::log_and_err; /// Number of previous blocks to keep inmemory to detect conflicts between different blocks. -const MAX_BLOCKS: usize = 64; +pub const MAX_BLOCKS: usize = 64; #[derive(Debug)] pub struct InMemoryTemporaryStorage { diff --git a/src/eth/storage/inmemory/mod.rs b/src/eth/storage/inmemory/mod.rs index 5bde84360..7a470a991 100644 --- a/src/eth/storage/inmemory/mod.rs +++ b/src/eth/storage/inmemory/mod.rs @@ -6,3 +6,4 @@ pub use inmemory_history::InMemoryHistory; pub use inmemory_permanent::InMemoryPermanentStorage; pub use inmemory_permanent::InMemoryPermanentStorageState; pub use inmemory_temporary::InMemoryTemporaryStorage; +pub use inmemory_temporary::MAX_BLOCKS as INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS; diff --git a/src/eth/storage/mod.rs b/src/eth/storage/mod.rs index 260109161..d80fea293 100644 --- a/src/eth/storage/mod.rs +++ b/src/eth/storage/mod.rs @@ -17,6 +17,7 @@ pub use external_rpc_storage::ExternalRpcStorageKind; pub use inmemory::InMemoryPermanentStorage; pub use inmemory::InMemoryPermanentStorageState; pub use inmemory::InMemoryTemporaryStorage; +pub use inmemory::INMEMORY_TEMPORARY_STORAGE_MAX_BLOCKS; pub use permanent_storage::PermanentStorage; pub use permanent_storage::PermanentStorageConfig; pub use permanent_storage::PermanentStorageKind; diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index c31df3986..bbc2814f0 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -358,7 +358,7 @@ impl StratusStorage { }) .map_err(Into::into); - if let Ok(ref block) = result { + if let Ok(block) = &result { Span::with(|s| s.rec_str("block_number", &block.number)); }