Skip to content

Commit

Permalink
Disable RocksDB backups
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19-cw committed Jun 25, 2024
1 parent 2d3dedf commit 9cc0ccc
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 96 deletions.
8 changes: 1 addition & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::eth::BlockMiner;
use crate::eth::BlockMinerMode;
use crate::eth::Executor;
use crate::eth::TransactionRelayer;
use crate::ext::not;
use crate::ext::parse_duration;
use crate::infra::build_info;
use crate::infra::tracing::TracingLogFormat;
Expand Down Expand Up @@ -848,10 +847,6 @@ pub struct PermanentStorageConfig {
/// RocksDB storage path prefix to execute multiple local Stratus instances.
#[arg(long = "rocks-path-prefix", env = "ROCKS_PATH_PREFIX")]
pub rocks_path_prefix: Option<String>,

// Disable RocksDB backups
#[arg(long = "perm-storage-disable-backups", env = "PERM_STORAGE_DISABLE_BACKUPS")]
pub perm_storage_disable_backups: bool,
}

#[derive(DebugAsJson, Clone, serde::Serialize)]
Expand All @@ -870,9 +865,8 @@ impl PermanentStorageConfig {
PermanentStorageKind::InMemory => Box::<InMemoryPermanentStorage>::default(),
#[cfg(feature = "rocks")]
PermanentStorageKind::Rocks => {
let enable_backups = not(self.perm_storage_disable_backups);
let prefix = self.rocks_path_prefix.clone();
Box::new(RocksPermanentStorage::new(enable_backups, prefix)?)
Box::new(RocksPermanentStorage::new(prefix)?)
}
};
Ok(perm)
Expand Down
24 changes: 1 addition & 23 deletions src/eth/storage/rocks/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;

use anyhow::anyhow;
use rocksdb::backup::BackupEngine;
use rocksdb::backup::BackupEngineOptions;
use rocksdb::Env;
use rocksdb::Options;
use rocksdb::DB;

Expand All @@ -15,7 +11,7 @@ use crate::eth::storage::rocks::rocks_config::DbConfig;
/// Create or open the Database with the configs applied to all column families
///
/// The returned `Options` need to be stored to refer to the DB metrics
#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(path = ?path.as_ref()))]
pub fn create_or_open_db(path: impl AsRef<Path>, cf_configs: &HashMap<&'static str, Options>) -> anyhow::Result<(Arc<DB>, Options)> {
let path = path.as_ref();

Expand All @@ -41,21 +37,3 @@ pub fn create_or_open_db(path: impl AsRef<Path>, cf_configs: &HashMap<&'static s
tracing::info!("Opened RocksDB at {:?}", path);
Ok((Arc::new(db), db_opts))
}

#[tracing::instrument(skip_all)]
pub fn create_new_backup(db: &DB) -> anyhow::Result<()> {
tracing::info!("Creating new DB backup");
let mut backup_engine = backup_engine(db)?;
backup_engine.create_new_backup(db)?;
backup_engine.purge_old_backups(2)?;
Ok(())
}

fn backup_engine(db: &DB) -> anyhow::Result<BackupEngine> {
let db_path = db.path().to_str().ok_or(anyhow!("Invalid path"))?;
let backup_path = format!("{db_path}backup");
let backup_opts = BackupEngineOptions::new(backup_path)?;

let backup_env = Env::new()?;
Ok(BackupEngine::open(&backup_opts, &backup_env)?)
}
11 changes: 3 additions & 8 deletions src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ use crate::eth::storage::PermanentStorage;
pub struct RocksPermanentStorage {
pub state: RocksStorageState,
block_number: AtomicU64,
enable_backups: bool,
}

impl RocksPermanentStorage {
pub fn new(enable_backups: bool, rocks_path_prefix: Option<String>) -> anyhow::Result<Self> {
pub fn new(rocks_path_prefix: Option<String>) -> anyhow::Result<Self> {
tracing::info!("setting up rocksdb storage");
let path = if let Some(prefix) = rocks_path_prefix {
// run some checks on the given prefix
Expand All @@ -45,11 +44,7 @@ impl RocksPermanentStorage {

let state = RocksStorageState::new(path);
let block_number = state.preload_block_number()?;
Ok(Self {
state,
block_number,
enable_backups,
})
Ok(Self { state, block_number })
}

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -105,7 +100,7 @@ impl PermanentStorage for RocksPermanentStorage {
{
self.state.export_metrics();
}
self.state.save_block(block, self.enable_backups)
self.state.save_block(block)
}

fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<()> {
Expand Down
57 changes: 2 additions & 55 deletions src/eth/storage/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::sync::Arc;

use anyhow::anyhow;
Expand All @@ -23,7 +20,6 @@ use sugars::hmap;
use super::rocks_cf::RocksCf;
use super::rocks_config::CacheSetting;
use super::rocks_config::DbConfig;
use super::rocks_db::create_new_backup;
use super::rocks_db::create_or_open_db;
use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
Expand All @@ -46,7 +42,6 @@ use crate::eth::storage::rocks::types::HashRocksdb;
use crate::eth::storage::rocks::types::IndexRocksdb;
use crate::eth::storage::rocks::types::SlotIndexRocksdb;
use crate::eth::storage::rocks::types::SlotValueRocksdb;
use crate::ext::spawn_blocking_named;
use crate::ext::OptionExt;
use crate::log_and_err;
use crate::utils::GIGABYTE;
Expand All @@ -62,8 +57,6 @@ cfg_if::cfg_if! {
}
}

const BACKUP_TRANSACTION_COUNT_THRESHOLD: usize = 120_000;

lazy_static! {
/// Map setting presets for each Column Family
static ref CF_OPTIONS_MAP: HashMap<&'static str, Options> = hmap! {
Expand All @@ -80,8 +73,7 @@ lazy_static! {

/// State handler for our RocksDB storage, separating "tables" by column families.
///
/// With data separated by column families, writing and reading should be done via the `RocksCf` fields,
/// while operations that include the whole database (e.g. backup) should refer to the inner `DB` directly.
/// With data separated by column families, writing and reading should be done via the `RocksCf` fields.
pub struct RocksStorageState {
db: Arc<DB>,
db_path: PathBuf,
Expand All @@ -93,9 +85,6 @@ pub struct RocksStorageState {
blocks_by_number: RocksCf<BlockNumberRocksdb, BlockRocksdb>,
blocks_by_hash: RocksCf<HashRocksdb, BlockNumberRocksdb>,
logs: RocksCf<(HashRocksdb, IndexRocksdb), BlockNumberRocksdb>,
backup_trigger: mpsc::SyncSender<()>,
/// Used to trigger backup after threshold is hit
transactions_processed: AtomicUsize,
/// Last collected stats for a histogram
#[cfg(feature = "metrics")]
prev_stats: Mutex<HashMap<HistogramInt, (Sum, Count)>>,
Expand All @@ -110,7 +99,6 @@ pub struct RocksStorageState {
impl RocksStorageState {
pub fn new(path: impl AsRef<Path>) -> Self {
let db_path = path.as_ref().to_path_buf();
let (backup_trigger_tx, backup_trigger_rx) = mpsc::sync_channel::<()>(1);
tracing::debug!("initializing RocksStorageState");

#[cfg_attr(not(feature = "metrics"), allow(unused_variables))]
Expand All @@ -127,37 +115,17 @@ impl RocksStorageState {
blocks_by_number: new_cf(&db, "blocks_by_number"),
blocks_by_hash: new_cf(&db, "blocks_by_hash"), //XXX this is not needed we can afford to have blocks_by_hash pointing into blocks_by_number
logs: new_cf(&db, "logs"),
backup_trigger: backup_trigger_tx,
transactions_processed: AtomicUsize::new(0),
#[cfg(feature = "metrics")]
prev_stats: Default::default(),
#[cfg(feature = "metrics")]
db_options,
db,
};

tracing::debug!("initializing backup trigger");
state.listen_for_backup_trigger(backup_trigger_rx).unwrap();

tracing::debug!("returning RocksStorageState");
state
}

fn listen_for_backup_trigger(&self, rx: mpsc::Receiver<()>) -> anyhow::Result<()> {
tracing::info!("starting rocksdb backup trigger listener");

let db = Arc::clone(&self.db);
spawn_blocking_named("storage::listen_backup_trigger", move || {
while rx.recv().is_ok() {
if let Err(err) = create_new_backup(&db) {
tracing::error!(?err, "failed to backup DB");
}
}
});

Ok(())
}

pub fn preload_block_number(&self) -> anyhow::Result<AtomicU64> {
let block_number = self.blocks_by_number.last_key().unwrap_or_default();
tracing::info!(number = %block_number, "preloaded block_number");
Expand Down Expand Up @@ -427,7 +395,7 @@ impl RocksStorageState {
}
}

pub fn save_block(&self, block: Block, enable_backups: bool) -> anyhow::Result<()> {
pub fn save_block(&self, block: Block) -> anyhow::Result<()> {
let account_changes = block.compact_account_changes();

let mut txs_batch = vec![];
Expand All @@ -445,7 +413,6 @@ impl RocksStorageState {

let number = block.number();
let block_hash = block.hash();
let txs_len = block.transactions.len();

// this is an optimization, instead of saving the entire block into the database,
// remove all discardable account changes
Expand All @@ -467,30 +434,10 @@ impl RocksStorageState {

self.prepare_batch_state_update_with_execution_changes(&account_changes, number, &mut batch);

if enable_backups {
self.check_backup_threshold_trigger(txs_len);
}

self.write_batch(batch).unwrap();
Ok(())
}

fn check_backup_threshold_trigger(&self, transactions_just_processed: usize) {
let previous = self.transactions_processed.fetch_add(transactions_just_processed, Ordering::Relaxed);
let current = previous + transactions_just_processed;

// threshold hit, trigger backup and reset value
if current > BACKUP_TRANSACTION_COUNT_THRESHOLD {
if let Err(err) = self.backup_trigger.try_send(()) {
tracing::error!(
reason = ?err,
"Failed to trigger backup signal, either listener panicked or signal was triggered while another backup was in progress"
);
}
self.transactions_processed.store(0, Ordering::Relaxed);
}
}

/// Writes accounts to state (does not write to account history)
#[allow(dead_code)]
fn write_accounts(&self, accounts: Vec<Account>) {
Expand Down
3 changes: 1 addition & 2 deletions src/eth/storage/stratus_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ impl StratusStorage {
let temp_dir = tempfile::TempDir::new().expect("Failed to create temp dir");
let temp_path = temp_dir.path().to_str().expect("Failed to get temp path").to_string();

let rocks_permanent_storage =
crate::eth::storage::RocksPermanentStorage::new(false, Some(temp_path.clone())).expect("Failed to create RocksPermanentStorage");
let rocks_permanent_storage = crate::eth::storage::RocksPermanentStorage::new(Some(temp_path.clone())).expect("Failed to create RocksPermanentStorage");

(
Self {
Expand Down
2 changes: 1 addition & 1 deletion tests/test_import_external_snapshot_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub mod rocks_test {

let (accounts, slots) = common::filter_accounts_and_slots(snapshot);

let rocks = RocksPermanentStorage::new(false, Some("test_import_external_snapshot_with_rocksdb".to_string())).unwrap();
let rocks = RocksPermanentStorage::new(Some("test_import_external_snapshot_with_rocksdb".to_string())).unwrap();
rocks.save_accounts(accounts).unwrap();
rocks.state.write_slots(slots);

Expand Down

0 comments on commit 9cc0ccc

Please sign in to comment.