Skip to content

Commit

Permalink
rocksdb improve index position checks
Browse files Browse the repository at this point in the history
compare index block number and table content number for all indexed
tables, and report it if anything irregular is seen
  • Loading branch information
marcospb19-cw committed May 15, 2024
1 parent 3ef66d8 commit 9e4129c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl PermanentStorage for RocksPermanentStorage {
}
});

self.state.reset_at(block_number).await
self.state.reset_dbs_at(block_number).await
}

async fn read_slots_sample(&self, _start: BlockNumber, _end: BlockNumber, _max_samples: u64, _seed: u64) -> anyhow::Result<Vec<SlotSample>> {
Expand Down
99 changes: 73 additions & 26 deletions src/eth/storage/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use anyhow::anyhow;
use futures::future::join_all;
use itertools::Itertools;
use num_traits::cast::ToPrimitive;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::task;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -119,33 +121,40 @@ impl RocksStorageState {
/// This method restores the DBs from the last backup to synchronize them.
#[tracing::instrument(skip_all)]
pub async fn sync_data(&self) -> anyhow::Result<()> {
tracing::info!("starting sync_data");
tracing::info!("account_block_number {:?}", self.accounts.get_current_block_number());
tracing::info!("slots_block_number {:?}", self.account_slots.get_current_block_number());
tracing::info!("slots_history_block_number {:?}", self.account_slots_history.get_index_block_number());
tracing::info!("accounts_history_block_number {:?}", self.accounts_history.get_index_block_number());
tracing::info!("logs_block_number {:?}", self.logs.get_index_block_number());
tracing::info!("transactions_block_number {:?}", self.transactions.get_index_block_number());
info!("starting RocksDB sync_data");

// compare block_number reported by different DBs
let dbs_grouped_by_block_number = [
(self.accounts.get_current_block_number(), "accounts"),
(self.account_slots.get_current_block_number(), "account_slots"),
(self.account_slots_history.get_current_block_number(), "account_slots_history"),
(self.accounts_history.get_current_block_number(), "accounts_history"),
(self.logs.get_current_block_number(), "logs"),
(self.transactions.get_current_block_number(), "transactions"),
]
.into_iter()
.into_group_map();

if dbs_grouped_by_block_number.len() == 1 {
// if all DBs are in the same block_number, pick any one to report its value
info!("DBs are at block_number '{}'", self.accounts.get_current_block_number());
} else {
// if there is more than one group, it means that the DBs don't agree on current block number
warn!(?dbs_grouped_by_block_number, "databases aren't consistent on what is current block number");
}

// for each DB with an index, check if the block number is consistent between the contents and index
check_indexed_block_number("account_slots_history", &self.account_slots_history);
check_indexed_block_number("accounts_history", &self.accounts_history);
check_indexed_block_number("logs", &self.logs);
check_indexed_block_number("transactions", &self.transactions);

if let Some((last_block_number, _)) = self.blocks_by_number.last() {
info!("last_block_number {:?}", last_block_number);

let get_min_block_number = || {
[
self.accounts.get_current_block_number(),
self.account_slots.get_current_block_number(),
self.account_slots_history.get_current_block_number(),
self.accounts_history.get_current_block_number(),
self.logs.get_current_block_number(),
self.transactions.get_current_block_number(),
]
.into_iter()
.min()
.expect("array in this expression is not empty")
};

if self.accounts.get_current_block_number() != self.account_slots.get_current_block_number() {
let mut min_block_number = get_min_block_number();
let should_restore_from_backup = self.accounts.get_current_block_number() != self.account_slots.get_current_block_number();
if should_restore_from_backup {
let mut min_block_number = self.min_block_number();

let last_secure_block_number = last_block_number.inner_value().as_u64() - 5000;
if last_secure_block_number > min_block_number {
Expand All @@ -166,9 +175,10 @@ impl RocksStorageState {
self.logs.restore().expect("failed to restore 'logs'");
tracing::warn!("logs restored");

min_block_number = get_min_block_number();
// get the updated `min_block_number` after the restoring done above
min_block_number = self.min_block_number();
}
self.reset_at(BlockNumber::from(min_block_number)).await?;
self.reset_dbs_at(BlockNumber::from(min_block_number)).await?;
}
}

Expand All @@ -177,7 +187,7 @@ impl RocksStorageState {
Ok(())
}

pub async fn reset_at(&self, block_number: BlockNumber) -> anyhow::Result<()> {
pub async fn reset_dbs_at(&self, block_number: BlockNumber) -> anyhow::Result<()> {
let tasks = vec![
{
let self_blocks_by_hash_clone = Arc::clone(&self.blocks_by_hash);
Expand Down Expand Up @@ -595,10 +605,47 @@ impl RocksStorageState {
self.blocks_by_hash.export_metrics();
self.blocks_by_number.export_metrics();
}

fn min_block_number(&self) -> u64 {
[
self.accounts.get_current_block_number(),
self.account_slots.get_current_block_number(),
self.account_slots_history.get_current_block_number(),
self.accounts_history.get_current_block_number(),
self.logs.get_current_block_number(),
self.transactions.get_current_block_number(),
]
.into_iter()
.min()
.expect("array in this expression is not empty")
}
}

impl fmt::Debug for RocksStorageState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDb").field("db", &"Arc<DB>").finish()
}
}

/// Checks if index of the given DB has the same block number as the DB contents.
///
/// This function logs the mismatching `block_number` to help debug bugs where the
/// index might not be properly updated.
fn check_indexed_block_number<K, V>(db_name: &str, db: impl AsRef<RocksDb<K, V>>)
where
K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq,
V: Serialize + for<'de> Deserialize<'de> + Clone,
{
let db = db.as_ref();
let block_number = db.get_current_block_number();
let index_block_number = db.get_index_block_number();

if block_number != index_block_number {
tracing::warn!(
db = db_name,
get_current_block_number = block_number,
get_index_block_number = index_block_number,
"db index has a different block_number"
);
}
}

0 comments on commit 9e4129c

Please sign in to comment.