From 9e4129c8bb11176c2e3ef90a75cdda9cd8288e4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Marcos=20Bezerra?= Date: Tue, 14 May 2024 21:00:14 -0300 Subject: [PATCH] rocksdb improve index position checks compare index block number and table content number for all indexed tables, and report it if anything irregular is seen --- src/eth/storage/rocks/rocks_permanent.rs | 2 +- src/eth/storage/rocks/rocks_state.rs | 99 +++++++++++++++++------- 2 files changed, 74 insertions(+), 27 deletions(-) diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index b00138e5e..33d990095 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -274,7 +274,7 @@ impl PermanentStorage for RocksPermanentStorage { } }); - self.state.reset_at(block_number).await + self.state.reset_dbs_at(block_number).await } async fn read_slots_sample(&self, _start: BlockNumber, _end: BlockNumber, _max_samples: u64, _seed: u64) -> anyhow::Result> { diff --git a/src/eth/storage/rocks/rocks_state.rs b/src/eth/storage/rocks/rocks_state.rs index b205dd358..aba2f23d6 100644 --- a/src/eth/storage/rocks/rocks_state.rs +++ b/src/eth/storage/rocks/rocks_state.rs @@ -7,6 +7,8 @@ use anyhow::anyhow; use futures::future::join_all; use itertools::Itertools; use num_traits::cast::ToPrimitive; +use serde::Deserialize; +use serde::Serialize; use tokio::sync::mpsc; use tokio::task; use tokio::task::JoinHandle; @@ -119,33 +121,40 @@ impl RocksStorageState { /// This method restores the DBs from the last backup to synchronize them. #[tracing::instrument(skip_all)] pub async fn sync_data(&self) -> anyhow::Result<()> { - tracing::info!("starting sync_data"); - tracing::info!("account_block_number {:?}", self.accounts.get_current_block_number()); - tracing::info!("slots_block_number {:?}", self.account_slots.get_current_block_number()); - tracing::info!("slots_history_block_number {:?}", self.account_slots_history.get_index_block_number()); - tracing::info!("accounts_history_block_number {:?}", self.accounts_history.get_index_block_number()); - tracing::info!("logs_block_number {:?}", self.logs.get_index_block_number()); - tracing::info!("transactions_block_number {:?}", self.transactions.get_index_block_number()); + info!("starting RocksDB sync_data"); + + // compare block_number reported by different DBs + let dbs_grouped_by_block_number = [ + (self.accounts.get_current_block_number(), "accounts"), + (self.account_slots.get_current_block_number(), "account_slots"), + (self.account_slots_history.get_current_block_number(), "account_slots_history"), + (self.accounts_history.get_current_block_number(), "accounts_history"), + (self.logs.get_current_block_number(), "logs"), + (self.transactions.get_current_block_number(), "transactions"), + ] + .into_iter() + .into_group_map(); + + if dbs_grouped_by_block_number.len() == 1 { + // if all DBs are in the same block_number, pick any one to report its value + info!("DBs are at block_number '{}'", self.accounts.get_current_block_number()); + } else { + // if there is more than one group, it means that the DBs don't agree on current block number + warn!(?dbs_grouped_by_block_number, "databases aren't consistent on what is current block number"); + } + + // for each DB with an index, check if the block number is consistent between the contents and index + check_indexed_block_number("account_slots_history", &self.account_slots_history); + check_indexed_block_number("accounts_history", &self.accounts_history); + check_indexed_block_number("logs", &self.logs); + check_indexed_block_number("transactions", &self.transactions); if let Some((last_block_number, _)) = self.blocks_by_number.last() { info!("last_block_number {:?}", last_block_number); - let get_min_block_number = || { - [ - self.accounts.get_current_block_number(), - self.account_slots.get_current_block_number(), - self.account_slots_history.get_current_block_number(), - self.accounts_history.get_current_block_number(), - self.logs.get_current_block_number(), - self.transactions.get_current_block_number(), - ] - .into_iter() - .min() - .expect("array in this expression is not empty") - }; - - if self.accounts.get_current_block_number() != self.account_slots.get_current_block_number() { - let mut min_block_number = get_min_block_number(); + let should_restore_from_backup = self.accounts.get_current_block_number() != self.account_slots.get_current_block_number(); + if should_restore_from_backup { + let mut min_block_number = self.min_block_number(); let last_secure_block_number = last_block_number.inner_value().as_u64() - 5000; if last_secure_block_number > min_block_number { @@ -166,9 +175,10 @@ impl RocksStorageState { self.logs.restore().expect("failed to restore 'logs'"); tracing::warn!("logs restored"); - min_block_number = get_min_block_number(); + // get the updated `min_block_number` after the restoring done above + min_block_number = self.min_block_number(); } - self.reset_at(BlockNumber::from(min_block_number)).await?; + self.reset_dbs_at(BlockNumber::from(min_block_number)).await?; } } @@ -177,7 +187,7 @@ impl RocksStorageState { Ok(()) } - pub async fn reset_at(&self, block_number: BlockNumber) -> anyhow::Result<()> { + pub async fn reset_dbs_at(&self, block_number: BlockNumber) -> anyhow::Result<()> { let tasks = vec![ { let self_blocks_by_hash_clone = Arc::clone(&self.blocks_by_hash); @@ -595,6 +605,20 @@ impl RocksStorageState { self.blocks_by_hash.export_metrics(); self.blocks_by_number.export_metrics(); } + + fn min_block_number(&self) -> u64 { + [ + self.accounts.get_current_block_number(), + self.account_slots.get_current_block_number(), + self.account_slots_history.get_current_block_number(), + self.accounts_history.get_current_block_number(), + self.logs.get_current_block_number(), + self.transactions.get_current_block_number(), + ] + .into_iter() + .min() + .expect("array in this expression is not empty") + } } impl fmt::Debug for RocksStorageState { @@ -602,3 +626,26 @@ impl fmt::Debug for RocksStorageState { f.debug_struct("RocksDb").field("db", &"Arc").finish() } } + +/// Checks if index of the given DB has the same block number as the DB contents. +/// +/// This function logs the mismatching `block_number` to help debug bugs where the +/// index might not be properly updated. +fn check_indexed_block_number(db_name: &str, db: impl AsRef>) +where + K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, + V: Serialize + for<'de> Deserialize<'de> + Clone, +{ + let db = db.as_ref(); + let block_number = db.get_current_block_number(); + let index_block_number = db.get_index_block_number(); + + if block_number != index_block_number { + tracing::warn!( + db = db_name, + get_current_block_number = block_number, + get_index_block_number = index_block_number, + "db index has a different block_number" + ); + } +}