Skip to content

Commit

Permalink
Backup fine tunning (#629)
Browse files Browse the repository at this point in the history
* chore: use block number as prefix

* chore: add back reset for logs

* chore: implement indexed iter

* reset based on indexes

* fix: add index deletion

* chore: use indexed block numbers on historic accounts and slots

* lint

* fix: add missing index save

* chore: add a limit of backups

* do the reset based on minimal block on all databases

* lint

---------

Co-authored-by: Maycon Amaro <[email protected]>
  • Loading branch information
renancloudwalk and mayconamaroCW authored Apr 19, 2024
1 parent 06b5caa commit a86febd
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
25 changes: 22 additions & 3 deletions src/eth/storage/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,30 @@ impl RocksStorageState {
pub async fn sync_data(&self) -> anyhow::Result<()> {
let account_block_number = self.accounts.get_current_block_number();
let slots_block_number = self.account_slots.get_current_block_number();
let slots_history_block_number = self.account_slots_history.get_index_block_number();
let accounts_history_block_number = self.accounts_history.get_index_block_number();
let logs_block_number = self.logs.get_index_block_number();
let transactions_block_number = self.transactions.get_index_block_number();
if let Some((last_block_number, _)) = self.blocks_by_number.last() {
if account_block_number != slots_block_number {
warn!("block numbers are not in sync {:?} {:?}", account_block_number, slots_block_number);
let min_block_number = std::cmp::min(account_block_number, slots_block_number);
let last_secure_block_number = last_block_number.as_i64() - 1000;
warn!(
"block numbers are not in sync {:?} {:?} {:?} {:?} {:?} {:?}",
account_block_number,
slots_block_number,
slots_history_block_number,
accounts_history_block_number,
logs_block_number,
transactions_block_number
);
let min_block_number = std::cmp::min(
std::cmp::min(
std::cmp::min(account_block_number, slots_block_number),
std::cmp::min(slots_history_block_number, accounts_history_block_number),
),
std::cmp::min(logs_block_number, transactions_block_number),
);

let last_secure_block_number = last_block_number.as_u64() - 5000;
if last_secure_block_number > min_block_number {
panic!("block numbers is too far away from the last secure block number, please resync the data from the last secure block number");
}
Expand Down
19 changes: 14 additions & 5 deletions src/eth/storage/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Seriali
pub fn restore(&self) -> anyhow::Result<()> {
let mut backup_engine = self.backup_engine()?;
let restore_options = RestoreOptions::default();
//XXX TODO panic if nothing to restore
backup_engine.restore_from_latest_backup(self.db.path(), self.backup_path()?, &restore_options)?;
Ok(())
}
Expand All @@ -159,13 +158,17 @@ impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Seriali
bincode::deserialize(&value_bytes).ok()
}

pub fn get_current_block_number(&self) -> i64 {
pub fn get_current_block_number(&self) -> u64 {
let Ok(serialized_key) = bincode::serialize(&"current_block") else {
return -1;
return 0;
};
let Ok(Some(value_bytes)) = self.db.get(serialized_key) else { return -1 };
let Ok(Some(value_bytes)) = self.db.get(serialized_key) else { return 0 };

bincode::deserialize(&value_bytes).ok().unwrap_or(-1)
bincode::deserialize(&value_bytes).ok().unwrap_or(0)
}

pub fn get_index_block_number(&self) -> u64 {
self.last_index().map(|(block_number, _)| block_number).unwrap_or(0)
}

// Mimics the 'insert' functionality of a HashMap
Expand Down Expand Up @@ -230,6 +233,7 @@ impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Seriali
// Deletes an entry from the database by key
pub fn delete_index(&self, key: u64) -> Result<()> {
let serialized_key = bincode::serialize(&key)?;
//XXX check if value is a vec that can be deserialized as a safety measure
self.db.delete(serialized_key)?;
Ok(())
}
Expand Down Expand Up @@ -274,6 +278,11 @@ impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Seriali
RocksDBIterator::<K, V>::new(iter)
}

pub fn last_index(&self) -> Option<(u64, Vec<K>)> {
let iter = self.db.iterator(IteratorMode::End);
IndexedRocksDBIterator::<K>::new(iter).next()
}

pub fn last(&self) -> Option<(K, V)> {
let mut iter = self.db.iterator(IteratorMode::End);
if let Some(Ok((k, v))) = iter.next() {
Expand Down

0 comments on commit a86febd

Please sign in to comment.