From 83f662ef3fcf63f81d949f519777583a399301c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Marcos?= <164224824+marcospb19-cw@users.noreply.github.com> Date: Sat, 23 Nov 2024 21:13:55 -0300 Subject: [PATCH] enha: optimize importer offline (#1877) * enha: optimize importer-offline * refac: importer-offline and storage * add flag to disable rocks write sync * update importer-offline default config --- config/importer-offline.env.local | 3 + src/bin/historic_events_processor.rs | 2 +- src/bin/importer_offline.rs | 281 +++++++++++------- src/eth/primitives/unix_time.rs | 8 +- .../storage/inmemory/inmemory_temporary.rs | 39 ++- src/eth/storage/mod.rs | 3 +- src/eth/storage/permanent_storage.rs | 12 +- src/eth/storage/rocks/mod.rs | 3 - src/eth/storage/rocks/rocks_batch_writer.rs | 78 ----- src/eth/storage/rocks/rocks_permanent.rs | 15 +- src/eth/storage/rocks/rocks_state.rs | 57 +++- src/eth/storage/stratus_storage.rs | 49 +++ 12 files changed, 310 insertions(+), 240 deletions(-) diff --git a/config/importer-offline.env.local b/config/importer-offline.env.local index 6095f2c27..4f3956d30 100644 --- a/config/importer-offline.env.local +++ b/config/importer-offline.env.local @@ -12,3 +12,6 @@ EXTERNAL_RPC_STORAGE_TIMEOUT=20000 EXTERNAL_RPC_SLOW_QUERY_WARN_THRESHOLD=10s EXECUTOR_REJECT_NOT_CONTRACT=false + +ROCKS_DISABLE_SYNC_WRITE=true +ROCKS_CACHE_SIZE_MULTIPLIER=0.1 diff --git a/src/bin/historic_events_processor.rs b/src/bin/historic_events_processor.rs index 66d498a35..1c0f4247d 100644 --- a/src/bin/historic_events_processor.rs +++ b/src/bin/historic_events_processor.rs @@ -80,7 +80,7 @@ fn process_block_events(block: BlockRocksdb) -> Vec { /// Main function that processes blockchain data and generates events fn main() -> Result<(), anyhow::Error> { tracing_subscriber::fmt::init(); - let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1)).context("failed to create rocksdb state")?; + let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1), false).context("failed to create rocksdb state")?; let (b_pb, tx_pb) = create_progress_bar(&state); diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 9398deadd..ee18ac90f 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -1,6 +1,6 @@ //! Importer-Offline binary. //! -//! It loads blocks (and receipts) from an external RPC server, or from a PostgreSQL DB +//! It fetches blocks (and receipts) from an external RPC server, or from a PostgreSQL DB //! that was prepared with the `rpc-downloader` binary. //! //! This importer will check on startup what is the `block_end` value at the external @@ -9,14 +9,17 @@ //! arrive. use std::cmp::min; +use std::sync::mpsc; use std::sync::Arc; use anyhow::anyhow; use futures::StreamExt; +use itertools::Itertools; use stratus::config::ImporterOfflineConfig; use stratus::eth::executor::Executor; use stratus::eth::miner::Miner; use stratus::eth::miner::MinerMode; +use stratus::eth::primitives::Block; use stratus::eth::primitives::BlockNumber; use stratus::eth::primitives::ExternalReceipts; use stratus::eth::primitives::ExternalTransaction; @@ -31,16 +34,37 @@ use stratus::GlobalServices; use stratus::GlobalState; #[cfg(all(not(target_env = "msvc"), any(feature = "jemalloc", feature = "jeprof")))] use tikv_jemallocator::Jemalloc; -use tokio::sync::mpsc; +use tokio::sync::mpsc as async_mpsc; use tokio::time::Instant; #[cfg(all(not(target_env = "msvc"), any(feature = "jemalloc", feature = "jeprof")))] #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; -/// Number of tasks in the backlog. Each task contains `--blocks-by-fetch` blocks and all receipts for them. -const BACKLOG_SIZE: usize = 10; -type BacklogTask = Vec; +/// Number of fetcher tasks buffered. Each task contains `--blocks-by-fetch` blocks and all receipts for them +const RPC_FETCHER_CHANNEL_CAPACITY: usize = 10; + +/// Size of the executed block batches to be save. +/// +/// We want to persist to the storage in batches, this means we don't save a +/// block right away, but the information from that block still needs to be +/// found in the storage. +/// +/// By using a channel with capacity 0 to send STORAGE_SAVER_BATCH_SIZE blocks, +/// we guarantee that accounts and slots can either be found in the permanent +/// storage, or in the temporary one. +/// +/// In other words, at most (STORAGE_SAVER_BATCH_SIZE) * 2 executed blocks +/// won't be found in the permanent storage, but that's still within the +/// temporary storage capacity. +/// +/// We use half because we want parallelism in execution and persisting, both +/// places need to hold blocks that aren't in the permanent storage yet, it's +/// half for each. +const STORAGE_SAVER_BATCH_SIZE: usize = 64 / 2 - 1; + +type BlocksToExecute = Vec; +type BlocksToSave = Vec; fn main() -> anyhow::Result<()> { let global_services = GlobalServices::::init(); @@ -66,29 +90,46 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { None => block_number_to_stop(&rpc_storage).await?, }; - // init shared data between importer and external rpc storage loader - let (backlog_tx, backlog_rx) = mpsc::channel::(BACKLOG_SIZE); + // send blocks from fetcher task to executor task + let (fetch_to_execute_tx, fetch_to_execute_rx) = async_mpsc::channel::(RPC_FETCHER_CHANNEL_CAPACITY); + + // send blocks from executor task to saver task + let (execute_to_save_tx, execute_to_save_rx) = mpsc::sync_channel::(0); // load genesis accounts let initial_accounts = rpc_storage.read_initial_accounts().await?; storage.save_accounts(initial_accounts.clone())?; - let storage_loader = execute_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, backlog_tx); - spawn_named("storage-loader", async move { - if let Err(e) = storage_loader.await { - tracing::error!(parent: None, reason = ?e, "'storage-loader' task failed"); + let block_fetcher_fut = run_rpc_block_fetcher( + rpc_storage, + config.blocks_by_fetch, + config.paralellism, + block_start, + block_end, + fetch_to_execute_tx, + ); + spawn_named("block_fetcher", async { + if let Err(e) = block_fetcher_fut.await { + tracing::error!(reason = ?e, "'block-fetcher' task failed"); } }); - let block_importer = spawn_thread("block-importer", || { - if let Err(e) = execute_block_importer(executor, miner, backlog_rx) { - tracing::error!(parent: None, reason = ?e, "'block-importer' task failed"); + let miner_clone = Arc::clone(&miner); + spawn_thread("block-executor", || { + if let Err(e) = run_external_block_executor(executor, miner_clone, fetch_to_execute_rx, execute_to_save_tx) { + tracing::error!(reason = ?e, "'block-executor' task failed"); } }); - block_importer - .join() - .expect("'block-importer' thread panic'ed instead of properly returning an error"); + let block_saver_handle = spawn_thread("block-saver", || { + if let Err(e) = run_block_saver(miner, execute_to_save_rx) { + tracing::error!(reason = ?e, "'block-saver' task failed"); + } + }); + + if let Err(e) = block_saver_handle.join() { + tracing::error!(reason = ?e, "'block-importer' thread panic'ed"); + } // Explicitly block the `main` thread while waiting for the storage to drop. drop(storage); @@ -96,155 +137,167 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { Ok(()) } -// ----------------------------------------------------------------------------- -// Block importer -// ----------------------------------------------------------------------------- -fn execute_block_importer( - // services +async fn run_rpc_block_fetcher( + rpc_storage: Arc, + blocks_by_fetch: usize, + paralellism: usize, + mut start: BlockNumber, + end: BlockNumber, + to_execute_tx: async_mpsc::Sender, +) -> anyhow::Result<()> { + const TASK_NAME: &str = "block-fetcher"; + + let mut fetch_stream = { + // prepare fetches to be executed in parallel + let mut tasks = Vec::new(); + while start <= end { + let end = min(start + (blocks_by_fetch - 1), end); + + let task = fetch_blocks_and_receipts(Arc::clone(&rpc_storage), start, end); + tasks.push(task); + + start += blocks_by_fetch; + } + + futures::stream::iter(tasks).buffered(paralellism) + }; + + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + }; + + // retrieve next batch of fetched blocks or finish task + let Some(result) = fetch_stream.next().await else { + tracing::info!(parent: None, "{} has no more blocks to fetch", TASK_NAME); + return Ok(()); + }; + + let blocks = match result { + Ok(blocks) => blocks, + Err(e) => { + return log_and_err!(reason = e, GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt")); + } + }; + + if blocks.is_empty() { + return log_and_err!(GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected")); + } + + if to_execute_tx.send(blocks).await.is_err() { + return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer"))); + }; + } +} + +fn run_external_block_executor( executor: Arc, miner: Arc, - // data - mut backlog_rx: mpsc::Receiver, + mut from_fetcher_rx: async_mpsc::Receiver, + to_saver_tx: mpsc::SyncSender, ) -> anyhow::Result<()> { - const TASK_NAME: &str = "external-block-executor"; - let _timer = DropTimer::start("importer-offline::execute_block_importer"); + const TASK_NAME: &str = "run_external_block_executor"; + let _timer = DropTimer::start("importer-offline::run_external_block_executor"); - // receives blocks and receipts from the backlog to reexecute and import loop { if GlobalState::is_shutdown_warn(TASK_NAME) { return Ok(()); }; - // receive blocks to execute - let Some(blocks) = backlog_rx.blocking_recv() else { - tracing::info!(parent: None, "{} has no more blocks to reexecute", TASK_NAME); + let Some(blocks) = from_fetcher_rx.blocking_recv() else { + tracing::info!(parent: None, "{} has no more blocks to execute", TASK_NAME); return Ok(()); }; - // ensure range is not empty let (Some((block_start, _)), Some((block_end, _))) = (blocks.first(), blocks.last()) else { - let message = GlobalState::shutdown_from(TASK_NAME, "received empty block range to reexecute"); - return log_and_err!(message); + return log_and_err!(GlobalState::shutdown_from(TASK_NAME, "received empty block range to reexecute")); }; - // track operation let block_start = block_start.number(); let block_end = block_end.number(); - let batch_blocks_len = blocks.len(); - let receipts_len = blocks.iter().map(|(_, receipts)| receipts.len()).sum::(); - tracing::info!(parent: None, %block_start, %block_end, %receipts_len, "reexecuting blocks"); - - // ensure block range have no gaps - if block_start.count_to(block_end) != batch_blocks_len as u64 { - let message = GlobalState::shutdown_from(TASK_NAME, "received block range with gaps to reexecute"); - return log_and_err!(message); + let receipts_count = blocks.iter().map(|(_, receipts)| receipts.len()).sum::(); + let tx_count = blocks.iter().map(|(block, _)| block.transactions.len()).sum(); + let blocks_count = blocks.len(); + + tracing::info!(parent: None, %block_start, %block_end, %tx_count, "executing blocks"); + + if receipts_count != tx_count { + return log_and_err!(GlobalState::shutdown_from(TASK_NAME, "receipt count doesn't match transaction count")); } - // imports block transactions - let mut batch_tx_len = 0; + if block_start.count_to(block_end) != blocks_count as u64 { + return log_and_err!(GlobalState::shutdown_from(TASK_NAME, "received block range with gaps to execute")); + } - let before_block = Instant::now(); - for (mut block, receipts) in blocks.into_iter() { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return Ok(()); - } + let instant_before_execution = Instant::now(); - // fill missing transaction_type with `v` - block.transactions.iter_mut().for_each(ExternalTransaction::fill_missing_transaction_type); + for blocks in Itertools::chunks(blocks.into_iter(), STORAGE_SAVER_BATCH_SIZE).into_iter() { + let mut executed_batch = Vec::with_capacity(STORAGE_SAVER_BATCH_SIZE); - // re-execute (and import) block - batch_tx_len += block.transactions.len(); - executor.execute_external_block(block.clone(), ExternalReceipts::from(receipts))?; + for (mut block, receipts) in blocks { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + } - // mine and save block - miner.mine_external_and_commit(block)?; + // fill missing transaction_type with `v` + block.transactions.iter_mut().for_each(ExternalTransaction::fill_missing_transaction_type); + + // TODO: remove clone + executor.execute_external_block(block.clone(), ExternalReceipts::from(receipts))?; + let mined_block = miner.mine_external(block)?; + executed_batch.push(mined_block); + } + + if to_saver_tx.send(executed_batch).is_err() { + return log_and_err!(GlobalState::shutdown_from(TASK_NAME, "failed to send executed batch to be saved on storage")); + } } - let batch_duration = before_block.elapsed(); - let (tps, bpm) = calculate_tps_and_bpm(batch_duration, batch_tx_len, batch_blocks_len); + let execution_duration = instant_before_execution.elapsed(); + let (tps, bpm) = calculate_tps_and_bpm(execution_duration, tx_count, blocks_count); tracing::info!( parent: None, tps, blocks_per_minute = format_args!("{bpm:.2}"), - ?batch_duration, + ?execution_duration, %block_start, %block_end, - %receipts_len, - "reexecuted blocks batch", + %receipts_count, + "executed blocks batch", ); } } -// ----------------------------------------------------------------------------- -// Block loader -// ----------------------------------------------------------------------------- -async fn execute_external_rpc_storage_loader( - // services - rpc_storage: Arc, - // data - blocks_by_fetch: usize, - paralellism: usize, - mut start: BlockNumber, - end: BlockNumber, - backlog: mpsc::Sender, -) -> anyhow::Result<()> { - const TASK_NAME: &str = "external-block-loader"; - tracing::info!(parent: None, %start, %end, "creating task {}", TASK_NAME); - - // prepare loads to be executed in parallel - let mut tasks = Vec::new(); - while start <= end { - let end = min(start + (blocks_by_fetch - 1), end); +fn run_block_saver(miner: Arc, from_executor_rx: mpsc::Receiver) -> anyhow::Result<()> { + const TASK_NAME: &str = "block-saver"; + let _timer = DropTimer::start("importer-offline::run_block_saver"); - let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), start, end); - tasks.push(task); - - start += blocks_by_fetch; - } - - // execute loads in parallel - let mut tasks = futures::stream::iter(tasks).buffered(paralellism); loop { if GlobalState::is_shutdown_warn(TASK_NAME) { return Ok(()); }; - // retrieve next batch of loaded blocks - // if finished, do not cancel, it is expected to finish - let Some(result) = tasks.next().await else { - tracing::info!(parent: None, "{} has no more blocks to process", TASK_NAME); + let Ok(blocks_batch) = from_executor_rx.recv() else { + tracing::info!("{} has no more batches to save", TASK_NAME); return Ok(()); }; - // check if executed correctly - let blocks = match result { - Ok(blocks) => blocks, - Err(e) => { - let message = GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt"); - return log_and_err!(reason = e, message); - } - }; - - // check blocks were really loaded - if blocks.is_empty() { - let message = GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected"); - return log_and_err!(message); + for block in blocks_batch { + miner.commit(block)?; } - - // send to backlog - if backlog.send(blocks).await.is_err() { - return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer"))); - }; } } -async fn load_blocks_and_receipts(rpc_storage: Arc, block_start: BlockNumber, block_end: BlockNumber) -> anyhow::Result { - tracing::info!(parent: None, %block_start, %block_end, "loading blocks and receipts"); +async fn fetch_blocks_and_receipts( + rpc_storage: Arc, + block_start: BlockNumber, + block_end: BlockNumber, +) -> anyhow::Result { + tracing::info!(parent: None, %block_start, %block_end, "fetching blocks and receipts"); rpc_storage.read_block_and_receipts_in_range(block_start, block_end).await } -// Finds the block number to stop the import job. async fn block_number_to_stop(rpc_storage: &Arc) -> anyhow::Result { match rpc_storage.read_max_block_number_in_range(BlockNumber::ZERO, BlockNumber::MAX).await { Ok(Some(number)) => Ok(number), diff --git a/src/eth/primitives/unix_time.rs b/src/eth/primitives/unix_time.rs index e32835e90..1b4589779 100644 --- a/src/eth/primitives/unix_time.rs +++ b/src/eth/primitives/unix_time.rs @@ -165,7 +165,7 @@ mod offset { return log_and_err!("timestamp can't be before the latest block"); } - let current_time = Utc::now().timestamp() as i64; + let current_time = Utc::now().timestamp(); let diff: i64 = if *new_block_timestamp == 0 { 0 } else { @@ -188,12 +188,12 @@ mod offset { /// 1. When a specific timestamp was set (was_evm_timestamp_set = true): /// - If new_timestamp is 0: Returns last_block_timestamp + 1 /// - If new_timestamp > 0: Returns exactly new_timestamp - /// After this call, resets the timestamp flag and stored value + /// - After this call, resets the timestamp flag and stored value /// /// 2. For subsequent blocks (was_evm_timestamp_set = false): /// - If new_timestamp is set: Uses it as reference point /// - Otherwise: Uses max(current_time + offset, last_block_timestamp) - /// In both cases, adds 1 second to ensure progression + /// - In both cases, adds 1 second to ensure progression /// /// The function always updates LAST_BLOCK_TIMESTAMP with the returned value /// to maintain the chain of increasing timestamps. @@ -202,7 +202,7 @@ mod offset { let new_timestamp_diff = NEW_TIMESTAMP_DIFF.load(Acquire); let was_evm_timestamp_set = EVM_SET_NEXT_BLOCK_TIMESTAMP_WAS_CALLED.load(Acquire); let last_block_timestamp = LAST_BLOCK_TIMESTAMP.load(Acquire); - let current_time = Utc::now().timestamp() as i64; + let current_time = Utc::now().timestamp(); let result = if !was_evm_timestamp_set { let last_timestamp = if new_timestamp != 0 { diff --git a/src/eth/storage/inmemory/inmemory_temporary.rs b/src/eth/storage/inmemory/inmemory_temporary.rs index 8c826a045..e94f9ad16 100644 --- a/src/eth/storage/inmemory/inmemory_temporary.rs +++ b/src/eth/storage/inmemory/inmemory_temporary.rs @@ -86,9 +86,7 @@ impl InMemoryTemporaryStorageState { None => log_and_err!("no pending block being mined"), // try calling set_pending_block_number_as_next_if_not_set or any other method to create a new block on temp storage } } -} -impl InMemoryTemporaryStorageState { pub fn reset(&mut self) { self.block = None; self.accounts.clear(); @@ -158,8 +156,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage { .or_insert_with(|| InMemoryTemporaryAccount::new(change.address)); // account basic info - if let Some(&nonce) = change.nonce.take_ref() { - account.info.nonce = nonce; + if let Some(nonce) = change.nonce.take_ref() { + account.info.nonce = *nonce; } if let Some(balance) = change.balance.take_ref() { account.info.balance = *balance; @@ -172,8 +170,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage { // slots for slot in change.slots.values() { - if let Some(&slot) = slot.take_ref() { - account.slots.insert(slot.index, slot); + if let Some(slot) = slot.take_ref() { + account.slots.insert(slot.index, *slot); } } } @@ -284,18 +282,17 @@ fn do_read_account(states: &NonEmpty, address: Ad } fn do_read_slot(states: &NonEmpty, address: Address, index: SlotIndex) -> Option { - // search all - for state in states.iter() { - let Some(account) = state.accounts.get(&address) else { continue }; - let Some(&slot) = account.slots.get(&index) else { continue }; + let slot = states + .iter() + .find_map(|state| state.accounts.get(&address).and_then(|account| account.slots.get(&index))); + if let Some(&slot) = slot { tracing::trace!(%address, %index, %slot, "slot found in temporary"); - return Some(slot); + Some(slot) + } else { + tracing::trace!(%address, %index, "slot not found in temporary"); + None } - - // not found - tracing::trace!(%address, %index, "slot not found in temporary"); - None } fn do_check_conflicts(states: &NonEmpty, execution: &EvmExecution) -> Option { @@ -304,16 +301,16 @@ fn do_check_conflicts(states: &NonEmpty, executio for (&address, change) in &execution.changes { // check account info conflicts if let Some(account) = do_read_account(states, address) { - if let Some(&expected) = change.nonce.take_original_ref() { - let original = account.nonce; + if let Some(expected) = change.nonce.take_original_ref() { + let original = &account.nonce; if expected != original { - conflicts.add_nonce(address, original, expected); + conflicts.add_nonce(address, *original, *expected); } } - if let Some(&expected) = change.balance.take_original_ref() { - let original = account.balance; + if let Some(expected) = change.balance.take_original_ref() { + let original = &account.balance; if expected != original { - conflicts.add_balance(address, original, expected); + conflicts.add_balance(address, *original, *expected); } } } diff --git a/src/eth/storage/mod.rs b/src/eth/storage/mod.rs index 453b858b7..2d4247dcd 100644 --- a/src/eth/storage/mod.rs +++ b/src/eth/storage/mod.rs @@ -4,9 +4,8 @@ mod external_rpc_storage; mod inmemory; mod permanent_storage; mod postgres_external_rpc; -pub mod rocks; - mod redis; +pub mod rocks; mod storage_point_in_time; mod stratus_storage; mod temporary_storage; diff --git a/src/eth/storage/permanent_storage.rs b/src/eth/storage/permanent_storage.rs index b810f9d90..81d3a0e56 100644 --- a/src/eth/storage/permanent_storage.rs +++ b/src/eth/storage/permanent_storage.rs @@ -39,9 +39,14 @@ pub trait PermanentStorage: Send + Sync + 'static { // Block // ------------------------------------------------------------------------- - /// Persists atomically all changes from a block. + /// Persists atomically changes from block. fn save_block(&self, block: Block) -> anyhow::Result<()>; + /// Persists atomically changes from blocks. + fn save_block_batch(&self, blocks: Vec) -> anyhow::Result<()> { + blocks.into_iter().try_for_each(|block| self.save_block(block)) + } + /// Retrieves a block from the storage. fn read_block(&self, block_filter: BlockFilter) -> anyhow::Result>; @@ -99,6 +104,10 @@ pub struct PermanentStorageConfig { /// Augments or decreases the size of Column Family caches based on a multiplier. #[arg(long = "rocks-cache-size-multiplier", env = "ROCKS_CACHE_SIZE_MULTIPLIER")] pub rocks_cache_size_multiplier: Option, + + /// Augments or decreases the size of Column Family caches based on a multiplier. + #[arg(long = "rocks-disable-sync-write", env = "ROCKS_DISABLE_SYNC_WRITE")] + pub rocks_disable_sync_write: bool, } #[derive(DebugAsJson, Clone, serde::Serialize)] @@ -132,6 +141,7 @@ impl PermanentStorageConfig { self.rocks_path_prefix.clone(), self.rocks_shutdown_timeout, self.rocks_cache_size_multiplier, + !self.rocks_disable_sync_write, )?), }; Ok(perm) diff --git a/src/eth/storage/rocks/mod.rs b/src/eth/storage/rocks/mod.rs index 56b688040..371e69825 100644 --- a/src/eth/storage/rocks/mod.rs +++ b/src/eth/storage/rocks/mod.rs @@ -1,8 +1,5 @@ //! RocksDB layers (top-to-bottom): permanent -> state -> rest. -/// Batch writer with capacity for flushing temporarily. -mod rocks_batch_writer; - /// Exposed API. pub mod rocks_permanent; diff --git a/src/eth/storage/rocks/rocks_batch_writer.rs b/src/eth/storage/rocks/rocks_batch_writer.rs index 05e784736..8b1378917 100644 --- a/src/eth/storage/rocks/rocks_batch_writer.rs +++ b/src/eth/storage/rocks/rocks_batch_writer.rs @@ -1,79 +1 @@ -use std::fmt::Debug; -use std::mem; -use anyhow::Context; -use rocksdb::WriteBatch; -use rocksdb::WriteOptions; -use rocksdb::DB; -use serde::Deserialize; -use serde::Serialize; - -use super::rocks_cf::RocksCfRef; - -pub fn write_in_batch_for_multiple_cfs_impl(db: &DB, batch: WriteBatch) -> anyhow::Result<()> { - tracing::debug!("writing batch"); - let batch_len = batch.len(); - let mut options = WriteOptions::default(); - // By default, each write to rocksdb is asynchronous: it returns after pushing - // the write from the process into the operating system (buffer cache). - // This option enables sync write to ensure data is persisted to disk before - // returning, preventing potential data loss in case of system failure. - options.set_sync(true); - db.write_opt(batch, &options) - .context("failed to write in batch to (possibly) multiple column families") - .inspect_err(|e| { - tracing::error!(reason = ?e, batch_len, "failed to write batch to DB"); - }) -} - -/// A writer that automatically flushes the batch when it exhausts capacity, supports multiple CFs. -/// -/// Similar to `io::BufWriter`. -#[allow(dead_code)] -pub struct BufferedBatchWriter { - len: usize, - capacity: usize, - batch: WriteBatch, -} - -#[allow(dead_code)] -impl BufferedBatchWriter { - pub fn new(capacity: usize) -> Self { - Self { - len: 0, - capacity, - batch: WriteBatch::default(), - } - } - - pub fn insert(&mut self, cf_ref: &RocksCfRef, key: K, value: V) -> anyhow::Result<()> - where - K: Serialize + for<'de> Deserialize<'de> + Debug + std::hash::Hash + Eq, - V: Serialize + for<'de> Deserialize<'de> + Debug + Clone, - { - self.len += 1; - cf_ref.prepare_batch_insertion([(key, value)], &mut self.batch)?; - if self.len >= self.capacity { - self.flush(cf_ref.db())?; - } - Ok(()) - } - - pub fn flush(&mut self, db: &DB) -> anyhow::Result<()> { - if self.len == 0 { - return Ok(()); - } - let batch = mem::take(&mut self.batch); - write_in_batch_for_multiple_cfs_impl(db, batch)?; - self.len = 0; - Ok(()) - } -} - -impl Drop for BufferedBatchWriter { - fn drop(&mut self) { - if self.len > 0 { - tracing::error!(elements_remaining = %self.len, "BufferedBatchWriter dropped with elements not flushed"); - } - } -} diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index 11c7aa87d..21b272384 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -27,7 +27,12 @@ pub struct RocksPermanentStorage { } impl RocksPermanentStorage { - pub fn new(db_path_prefix: Option, shutdown_timeout: Duration, cache_size_multiplier: Option) -> anyhow::Result { + pub fn new( + db_path_prefix: Option, + shutdown_timeout: Duration, + cache_size_multiplier: Option, + enable_sync_write: bool, + ) -> anyhow::Result { tracing::info!("setting up rocksdb storage"); let path = if let Some(prefix) = db_path_prefix { @@ -48,7 +53,7 @@ impl RocksPermanentStorage { "data/rocksdb".to_string() }; - let state = RocksStorageState::new(path, shutdown_timeout, cache_size_multiplier)?; + let state = RocksStorageState::new(path, shutdown_timeout, cache_size_multiplier, enable_sync_write)?; let block_number = state.preload_block_number()?; Ok(Self { state, block_number }) @@ -131,6 +136,12 @@ impl PermanentStorage for RocksPermanentStorage { }) } + fn save_block_batch(&self, block_batch: Vec) -> anyhow::Result<()> { + self.state.save_block_batch(block_batch).inspect_err(|e| { + tracing::error!(reason = ?e, "failed to save block_batch in RocksPermanent"); + }) + } + fn save_accounts(&self, accounts: Vec) -> anyhow::Result<()> { self.state.save_accounts(accounts).inspect_err(|e| { tracing::error!(reason = ?e, "failed to save accounts in RocksPermanent"); diff --git a/src/eth/storage/rocks/rocks_state.rs b/src/eth/storage/rocks/rocks_state.rs index 822e0b2da..b4aba5f93 100644 --- a/src/eth/storage/rocks/rocks_state.rs +++ b/src/eth/storage/rocks/rocks_state.rs @@ -13,6 +13,7 @@ use rocksdb::Direction; use rocksdb::Options; use rocksdb::WaitForCompactOptions; use rocksdb::WriteBatch; +use rocksdb::WriteOptions; use rocksdb::DB; use serde::Deserialize; use serde::Serialize; @@ -26,7 +27,6 @@ use super::cf_versions::CfBlocksByHashValue; use super::cf_versions::CfBlocksByNumberValue; use super::cf_versions::CfLogsValue; use super::cf_versions::CfTransactionsValue; -use super::rocks_batch_writer::write_in_batch_for_multiple_cfs_impl; use super::rocks_cf::RocksCfRef; use super::rocks_config::CacheSetting; use super::rocks_config::DbConfig; @@ -130,10 +130,11 @@ pub struct RocksStorageState { #[cfg(feature = "metrics")] db_options: Options, shutdown_timeout: Duration, + enable_sync_write: bool, } impl RocksStorageState { - pub fn new(path: String, shutdown_timeout: Duration, cache_multiplier: Option) -> Result { + pub fn new(path: String, shutdown_timeout: Duration, cache_multiplier: Option, enable_sync_write: bool) -> Result { tracing::debug!("creating (or opening an existing) database with the specified column families"); let cf_options_map = generate_cf_options_map(cache_multiplier); @@ -164,6 +165,7 @@ impl RocksStorageState { db_options, db, shutdown_timeout, + enable_sync_write, }; tracing::debug!("opened database successfully"); @@ -175,7 +177,7 @@ impl RocksStorageState { pub fn new_in_testdir() -> anyhow::Result<(Self, tempfile::TempDir)> { let test_dir = tempfile::tempdir()?; let path = test_dir.as_ref().display().to_string(); - let state = Self::new(path, Duration::ZERO, None)?; + let state = Self::new(path, Duration::ZERO, None, true)?; Ok((state, test_dir)) } @@ -410,11 +412,24 @@ impl RocksStorageState { &mut write_batch, )?; - write_in_batch_for_multiple_cfs_impl(&self.db, write_batch)?; - Ok(()) + self.write_in_batch_for_multiple_cfs(write_batch) + } + + pub fn save_block_batch(&self, block_batch: Vec) -> Result<()> { + let mut batch = WriteBatch::default(); + for block in block_batch { + self.prepare_block_insertion(block, &mut batch)?; + } + self.write_in_batch_for_multiple_cfs(batch) } pub fn save_block(&self, block: Block) -> Result<()> { + let mut batch = WriteBatch::default(); + self.prepare_block_insertion(block, &mut batch)?; + self.write_in_batch_for_multiple_cfs(batch) + } + + pub fn prepare_block_insertion(&self, block: Block, batch: &mut WriteBatch) -> Result<()> { let account_changes = block.compact_account_changes(); let mut txs_batch = vec![]; @@ -425,10 +440,9 @@ impl RocksStorageState { logs_batch.push(((transaction.input.hash.into(), log.log_index.into()), transaction.block_number.into())); } } - let mut batch = WriteBatch::default(); - self.transactions.prepare_batch_insertion(txs_batch, &mut batch)?; - self.logs.prepare_batch_insertion(logs_batch, &mut batch)?; + self.transactions.prepare_batch_insertion(txs_batch, batch)?; + self.logs.prepare_batch_insertion(logs_batch, batch)?; let number = block.number(); let block_hash = block.hash(); @@ -446,20 +460,35 @@ impl RocksStorageState { }; let block_by_number = (number.into(), block_without_changes.into()); - self.blocks_by_number.prepare_batch_insertion([block_by_number], &mut batch)?; + self.blocks_by_number.prepare_batch_insertion([block_by_number], batch)?; let block_by_hash = (block_hash.into(), number.into()); - self.blocks_by_hash.prepare_batch_insertion([block_by_hash], &mut batch)?; - - self.prepare_batch_with_execution_changes(account_changes, number, &mut batch)?; + self.blocks_by_hash.prepare_batch_insertion([block_by_hash], batch)?; - self.write_in_batch_for_multiple_cfs(batch)?; + self.prepare_batch_with_execution_changes(account_changes, number, batch)?; Ok(()) } /// Write to DB in a batch pub fn write_in_batch_for_multiple_cfs(&self, batch: WriteBatch) -> Result<()> { - write_in_batch_for_multiple_cfs_impl(&self.db, batch) + tracing::debug!("writing batch"); + let batch_len = batch.len(); + + let mut options = WriteOptions::default(); + // By default, each write to rocksdb is asynchronous: it returns after pushing + // the write from the process into the operating system (buffer cache). + // + // This option offers the trade-off of waiting after each write to + // ensure data is persisted to disk before returning, preventing + // potential data loss in case of system failure. + options.set_sync(self.enable_sync_write); + + self.db + .write_opt(batch, &options) + .context("failed to write in batch to (possibly) multiple column families") + .inspect_err(|e| { + tracing::error!(reason = ?e, batch_len, "failed to write batch to DB"); + }) } /// Writes slots to state (does not write to slot history) diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index 113cf0fea..6bce855d3 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::anyhow; use clap::Parser; use display_json::DebugAsJson; use tracing::Span; @@ -377,6 +378,54 @@ impl StratusStorage { .map_err(Into::into) } + pub fn save_block_batch(&self, blocks: Vec) -> Result<(), StratusError> { + let Some(first) = blocks.first() else { + tracing::error!("save_block_batch called with no blocks, ignoring"); + return Ok(()); + }; + + let first_number = first.number(); + + // check mined number + let mined_number = self.read_mined_block_number()?; + if not(first_number.is_zero()) && first_number != mined_number.next_block_number() { + tracing::error!(%first_number, %mined_number, "failed to save block because mismatch with mined block number"); + return Err(StratusError::StorageMinedNumberConflict { + new: first_number, + mined: mined_number, + }); + } + + // check pending number + if let Some(pending_header) = self.read_pending_block_header()? { + if first_number >= pending_header.number { + tracing::error!(%first_number, pending_number = %pending_header.number, "failed to save block because mismatch with pending block number"); + return Err(StratusError::StoragePendingNumberConflict { + new: first_number, + pending: pending_header.number, + }); + } + } + + // check number of rest of blocks + for window in blocks.windows(2) { + let (previous, next) = (window[0].number(), window[1].number()); + if previous.next_block_number() != next { + tracing::error!(%previous, %next, "previous block number doesn't match next one"); + return Err(anyhow!("consecutive blocks in batch aren't adjacent").into()); + } + } + + // check mined block + let existing_block = self.read_block(BlockFilter::Number(first_number))?; + if existing_block.is_some() { + tracing::error!(%first_number, %mined_number, "failed to save block because block with the same number already exists in the permanent storage"); + return Err(StratusError::StorageBlockConflict { number: first_number }); + } + + self.perm.save_block_batch(blocks).map_err(Into::into) + } + pub fn read_block(&self, filter: BlockFilter) -> Result, StratusError> { #[cfg(feature = "tracing")] let _span = tracing::info_span!("storage::read_block", %filter).entered();