Skip to content

Commit

Permalink
chore: cleanup unused RocksDB functions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19-cw committed May 24, 2024
1 parent 1527439 commit 1c4adb8
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 84 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ data/**/*.db
data/**/*.json
data/**/*.txt
data/**/*.rocksdb
data/rocksdb/

# E2E (Stratus)
e2e/artifacts
Expand Down
13 changes: 4 additions & 9 deletions src/eth/storage/rocks/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use anyhow::anyhow;
use anyhow::Result;
use rocksdb::backup::BackupEngine;
use rocksdb::backup::BackupEngineOptions;
use rocksdb::backup::RestoreOptions;
#[cfg(feature = "metrics")]
use rocksdb::statistics::Histogram;
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -57,8 +56,11 @@ pub struct RocksDb<K, V> {

impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Serialize + for<'de> Deserialize<'de> + Clone> RocksDb<K, V> {
pub fn new(cf_name: &str, db: Arc<DB>, config: DbConfig) -> anyhow::Result<Arc<Self>> {
let opts = Self::get_options(config, cf_name == "accounts" || cf_name == "account_slots");
let enable_cache = cf_name == "accounts" || cf_name == "account_slots";
let opts = Self::get_options(config, enable_cache);

db.create_cf(cf_name, &opts)?;

Ok(Arc::new(Self {
db,
opts,
Expand Down Expand Up @@ -233,13 +235,6 @@ impl<K: Serialize + for<'de> Deserialize<'de> + std::hash::Hash + Eq, V: Seriali
Ok(())
}

pub fn restore(&self) -> anyhow::Result<()> {
let mut backup_engine = self.backup_engine()?;
let restore_options = RestoreOptions::default();
backup_engine.restore_from_latest_backup(self.db.path(), self.backup_path()?, &restore_options)?;
Ok(())
}

// Clears the database
pub fn clear(&self) -> Result<()> {
let mut batch = WriteBatch::default();
Expand Down
4 changes: 1 addition & 3 deletions src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ pub struct RocksPermanentStorage {
impl RocksPermanentStorage {
pub async fn new() -> anyhow::Result<Self> {
tracing::info!("starting rocksdb storage");

let state = RocksStorageState::new("./data/rocks");
//state.sync_data().await?;
let state = RocksStorageState::new("./data/rocksdb");
let block_number = state.preload_block_number()?;
Ok(Self { state, block_number })
}
Expand Down
72 changes: 0 additions & 72 deletions src/eth/storage/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use rocksdb::DB;
use tokio::sync::mpsc;
use tokio::task;
use tracing::info;
use tracing::warn;

use super::rocks_db::DbConfig;
use super::rocks_db::RocksDb;
Expand Down Expand Up @@ -115,77 +114,6 @@ impl RocksStorageState {
Ok((account_block_number.to_u64().unwrap_or(0u64)).into())
}

pub async fn sync_data(&self) -> anyhow::Result<()> {
tracing::info!("starting sync_data");
tracing::info!("account_block_number {:?}", self.accounts.get_current_block_number());
tracing::info!("slots_block_number {:?}", self.account_slots.get_current_block_number());
tracing::info!("slots_history_block_number {:?}", self.account_slots_history.get_index_block_number());
tracing::info!("accounts_history_block_number {:?}", self.accounts_history.get_index_block_number());
tracing::info!("logs_block_number {:?}", self.logs.get_index_block_number());
tracing::info!("transactions_block_number {:?}", self.transactions.get_index_block_number());

if let Some((last_block_number, _)) = self.blocks_by_number.last() {
tracing::info!("last_block_number {:?}", last_block_number);
if self.accounts.get_current_block_number() != self.account_slots.get_current_block_number() {
warn!(
"block numbers are not in sync {:?} {:?} {:?} {:?} {:?} {:?}",
self.accounts.get_current_block_number(),
self.account_slots.get_current_block_number(),
self.account_slots_history.get_index_block_number(),
self.accounts_history.get_index_block_number(),
self.logs.get_index_block_number(),
self.transactions.get_index_block_number(),
);
let mut min_block_number = std::cmp::min(
std::cmp::min(
std::cmp::min(self.accounts.get_current_block_number(), self.account_slots.get_current_block_number()),
std::cmp::min(
self.account_slots_history.get_index_block_number(),
self.accounts_history.get_index_block_number(),
),
),
std::cmp::min(self.logs.get_index_block_number(), self.transactions.get_index_block_number()),
);

let last_secure_block_number = last_block_number.inner_value().as_u64() - 5000;
if last_secure_block_number > min_block_number {
self.accounts.restore().unwrap();
tracing::warn!("accounts restored");
self.accounts_history.restore().unwrap();
tracing::warn!("accounts_history restored");
self.account_slots.restore().unwrap();
tracing::warn!("account_slots restored");
self.account_slots_history.restore().unwrap();
tracing::warn!("account_slots_history restored");
self.transactions.restore().unwrap();
tracing::warn!("transactions restored");
self.blocks_by_number.restore().unwrap();
tracing::warn!("blocks_by_number restored");
self.blocks_by_hash.restore().unwrap();
tracing::warn!("blocks_by_hash restored");
self.logs.restore().unwrap();
tracing::warn!("logs restored");

min_block_number = std::cmp::min(
std::cmp::min(
std::cmp::min(self.accounts.get_current_block_number(), self.account_slots.get_current_block_number()),
std::cmp::min(
self.account_slots_history.get_index_block_number(),
self.accounts_history.get_index_block_number(),
),
),
std::cmp::min(self.logs.get_index_block_number(), self.transactions.get_index_block_number()),
);
}
self.reset_at(BlockNumber::from(min_block_number)).await?;
}
}

tracing::info!("data is in sync");

Ok(())
}

pub async fn reset_at(&self, block_number: BlockNumber) -> anyhow::Result<()> {
let tasks = vec![
{
Expand Down

0 comments on commit 1c4adb8

Please sign in to comment.