Skip to content

Commit

Permalink
enha: optimize importer offline (#1877)
Browse files Browse the repository at this point in the history
* enha: optimize importer-offline
* refac: importer-offline and storage
* add flag to disable rocks write sync
* update importer-offline default config
  • Loading branch information
marcospb19-cw authored Nov 24, 2024
1 parent e223958 commit 83f662e
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 240 deletions.
3 changes: 3 additions & 0 deletions config/importer-offline.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ EXTERNAL_RPC_STORAGE_TIMEOUT=20000
EXTERNAL_RPC_SLOW_QUERY_WARN_THRESHOLD=10s

EXECUTOR_REJECT_NOT_CONTRACT=false

ROCKS_DISABLE_SYNC_WRITE=true
ROCKS_CACHE_SIZE_MULTIPLIER=0.1
2 changes: 1 addition & 1 deletion src/bin/historic_events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn process_block_events(block: BlockRocksdb) -> Vec<String> {
/// Main function that processes blockchain data and generates events
fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1)).context("failed to create rocksdb state")?;
let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1), false).context("failed to create rocksdb state")?;

let (b_pb, tx_pb) = create_progress_bar(&state);

Expand Down
281 changes: 167 additions & 114 deletions src/bin/importer_offline.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/eth/primitives/unix_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ mod offset {
return log_and_err!("timestamp can't be before the latest block");
}

let current_time = Utc::now().timestamp() as i64;
let current_time = Utc::now().timestamp();
let diff: i64 = if *new_block_timestamp == 0 {
0
} else {
Expand All @@ -188,12 +188,12 @@ mod offset {
/// 1. When a specific timestamp was set (was_evm_timestamp_set = true):
/// - If new_timestamp is 0: Returns last_block_timestamp + 1
/// - If new_timestamp > 0: Returns exactly new_timestamp
/// After this call, resets the timestamp flag and stored value
/// - After this call, resets the timestamp flag and stored value
///
/// 2. For subsequent blocks (was_evm_timestamp_set = false):
/// - If new_timestamp is set: Uses it as reference point
/// - Otherwise: Uses max(current_time + offset, last_block_timestamp)
/// In both cases, adds 1 second to ensure progression
/// - In both cases, adds 1 second to ensure progression
///
/// The function always updates LAST_BLOCK_TIMESTAMP with the returned value
/// to maintain the chain of increasing timestamps.
Expand All @@ -202,7 +202,7 @@ mod offset {
let new_timestamp_diff = NEW_TIMESTAMP_DIFF.load(Acquire);
let was_evm_timestamp_set = EVM_SET_NEXT_BLOCK_TIMESTAMP_WAS_CALLED.load(Acquire);
let last_block_timestamp = LAST_BLOCK_TIMESTAMP.load(Acquire);
let current_time = Utc::now().timestamp() as i64;
let current_time = Utc::now().timestamp();

let result = if !was_evm_timestamp_set {
let last_timestamp = if new_timestamp != 0 {
Expand Down
39 changes: 18 additions & 21 deletions src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ impl InMemoryTemporaryStorageState {
None => log_and_err!("no pending block being mined"), // try calling set_pending_block_number_as_next_if_not_set or any other method to create a new block on temp storage
}
}
}

impl InMemoryTemporaryStorageState {
pub fn reset(&mut self) {
self.block = None;
self.accounts.clear();
Expand Down Expand Up @@ -158,8 +156,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
.or_insert_with(|| InMemoryTemporaryAccount::new(change.address));

// account basic info
if let Some(&nonce) = change.nonce.take_ref() {
account.info.nonce = nonce;
if let Some(nonce) = change.nonce.take_ref() {
account.info.nonce = *nonce;
}
if let Some(balance) = change.balance.take_ref() {
account.info.balance = *balance;
Expand All @@ -172,8 +170,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

// slots
for slot in change.slots.values() {
if let Some(&slot) = slot.take_ref() {
account.slots.insert(slot.index, slot);
if let Some(slot) = slot.take_ref() {
account.slots.insert(slot.index, *slot);
}
}
}
Expand Down Expand Up @@ -284,18 +282,17 @@ fn do_read_account(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Ad
}

fn do_read_slot(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Address, index: SlotIndex) -> Option<Slot> {
// search all
for state in states.iter() {
let Some(account) = state.accounts.get(&address) else { continue };
let Some(&slot) = account.slots.get(&index) else { continue };
let slot = states
.iter()
.find_map(|state| state.accounts.get(&address).and_then(|account| account.slots.get(&index)));

if let Some(&slot) = slot {
tracing::trace!(%address, %index, %slot, "slot found in temporary");
return Some(slot);
Some(slot)
} else {
tracing::trace!(%address, %index, "slot not found in temporary");
None
}

// not found
tracing::trace!(%address, %index, "slot not found in temporary");
None
}

fn do_check_conflicts(states: &NonEmpty<InMemoryTemporaryStorageState>, execution: &EvmExecution) -> Option<ExecutionConflicts> {
Expand All @@ -304,16 +301,16 @@ fn do_check_conflicts(states: &NonEmpty<InMemoryTemporaryStorageState>, executio
for (&address, change) in &execution.changes {
// check account info conflicts
if let Some(account) = do_read_account(states, address) {
if let Some(&expected) = change.nonce.take_original_ref() {
let original = account.nonce;
if let Some(expected) = change.nonce.take_original_ref() {
let original = &account.nonce;
if expected != original {
conflicts.add_nonce(address, original, expected);
conflicts.add_nonce(address, *original, *expected);
}
}
if let Some(&expected) = change.balance.take_original_ref() {
let original = account.balance;
if let Some(expected) = change.balance.take_original_ref() {
let original = &account.balance;
if expected != original {
conflicts.add_balance(address, original, expected);
conflicts.add_balance(address, *original, *expected);
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/eth/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ mod external_rpc_storage;
mod inmemory;
mod permanent_storage;
mod postgres_external_rpc;
pub mod rocks;

mod redis;
pub mod rocks;
mod storage_point_in_time;
mod stratus_storage;
mod temporary_storage;
Expand Down
12 changes: 11 additions & 1 deletion src/eth/storage/permanent_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ pub trait PermanentStorage: Send + Sync + 'static {
// Block
// -------------------------------------------------------------------------

/// Persists atomically all changes from a block.
/// Persists atomically changes from block.
fn save_block(&self, block: Block) -> anyhow::Result<()>;

/// Persists atomically changes from blocks.
fn save_block_batch(&self, blocks: Vec<Block>) -> anyhow::Result<()> {
blocks.into_iter().try_for_each(|block| self.save_block(block))
}

/// Retrieves a block from the storage.
fn read_block(&self, block_filter: BlockFilter) -> anyhow::Result<Option<Block>>;

Expand Down Expand Up @@ -99,6 +104,10 @@ pub struct PermanentStorageConfig {
/// Augments or decreases the size of Column Family caches based on a multiplier.
#[arg(long = "rocks-cache-size-multiplier", env = "ROCKS_CACHE_SIZE_MULTIPLIER")]
pub rocks_cache_size_multiplier: Option<f32>,

/// Augments or decreases the size of Column Family caches based on a multiplier.
#[arg(long = "rocks-disable-sync-write", env = "ROCKS_DISABLE_SYNC_WRITE")]
pub rocks_disable_sync_write: bool,
}

#[derive(DebugAsJson, Clone, serde::Serialize)]
Expand Down Expand Up @@ -132,6 +141,7 @@ impl PermanentStorageConfig {
self.rocks_path_prefix.clone(),
self.rocks_shutdown_timeout,
self.rocks_cache_size_multiplier,
!self.rocks_disable_sync_write,
)?),
};
Ok(perm)
Expand Down
3 changes: 0 additions & 3 deletions src/eth/storage/rocks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! RocksDB layers (top-to-bottom): permanent -> state -> rest.
/// Batch writer with capacity for flushing temporarily.
mod rocks_batch_writer;

/// Exposed API.
pub mod rocks_permanent;

Expand Down
78 changes: 0 additions & 78 deletions src/eth/storage/rocks/rocks_batch_writer.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1 @@
use std::fmt::Debug;
use std::mem;

use anyhow::Context;
use rocksdb::WriteBatch;
use rocksdb::WriteOptions;
use rocksdb::DB;
use serde::Deserialize;
use serde::Serialize;

use super::rocks_cf::RocksCfRef;

pub fn write_in_batch_for_multiple_cfs_impl(db: &DB, batch: WriteBatch) -> anyhow::Result<()> {
tracing::debug!("writing batch");
let batch_len = batch.len();
let mut options = WriteOptions::default();
// By default, each write to rocksdb is asynchronous: it returns after pushing
// the write from the process into the operating system (buffer cache).
// This option enables sync write to ensure data is persisted to disk before
// returning, preventing potential data loss in case of system failure.
options.set_sync(true);
db.write_opt(batch, &options)
.context("failed to write in batch to (possibly) multiple column families")
.inspect_err(|e| {
tracing::error!(reason = ?e, batch_len, "failed to write batch to DB");
})
}

/// A writer that automatically flushes the batch when it exhausts capacity, supports multiple CFs.
///
/// Similar to `io::BufWriter`.
#[allow(dead_code)]
pub struct BufferedBatchWriter {
len: usize,
capacity: usize,
batch: WriteBatch,
}

#[allow(dead_code)]
impl BufferedBatchWriter {
pub fn new(capacity: usize) -> Self {
Self {
len: 0,
capacity,
batch: WriteBatch::default(),
}
}

pub fn insert<K, V>(&mut self, cf_ref: &RocksCfRef<K, V>, key: K, value: V) -> anyhow::Result<()>
where
K: Serialize + for<'de> Deserialize<'de> + Debug + std::hash::Hash + Eq,
V: Serialize + for<'de> Deserialize<'de> + Debug + Clone,
{
self.len += 1;
cf_ref.prepare_batch_insertion([(key, value)], &mut self.batch)?;
if self.len >= self.capacity {
self.flush(cf_ref.db())?;
}
Ok(())
}

pub fn flush(&mut self, db: &DB) -> anyhow::Result<()> {
if self.len == 0 {
return Ok(());
}
let batch = mem::take(&mut self.batch);
write_in_batch_for_multiple_cfs_impl(db, batch)?;
self.len = 0;
Ok(())
}
}

impl Drop for BufferedBatchWriter {
fn drop(&mut self) {
if self.len > 0 {
tracing::error!(elements_remaining = %self.len, "BufferedBatchWriter dropped with elements not flushed");
}
}
}
15 changes: 13 additions & 2 deletions src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ pub struct RocksPermanentStorage {
}

impl RocksPermanentStorage {
pub fn new(db_path_prefix: Option<String>, shutdown_timeout: Duration, cache_size_multiplier: Option<f32>) -> anyhow::Result<Self> {
pub fn new(
db_path_prefix: Option<String>,
shutdown_timeout: Duration,
cache_size_multiplier: Option<f32>,
enable_sync_write: bool,
) -> anyhow::Result<Self> {
tracing::info!("setting up rocksdb storage");

let path = if let Some(prefix) = db_path_prefix {
Expand All @@ -48,7 +53,7 @@ impl RocksPermanentStorage {
"data/rocksdb".to_string()
};

let state = RocksStorageState::new(path, shutdown_timeout, cache_size_multiplier)?;
let state = RocksStorageState::new(path, shutdown_timeout, cache_size_multiplier, enable_sync_write)?;
let block_number = state.preload_block_number()?;

Ok(Self { state, block_number })
Expand Down Expand Up @@ -131,6 +136,12 @@ impl PermanentStorage for RocksPermanentStorage {
})
}

fn save_block_batch(&self, block_batch: Vec<Block>) -> anyhow::Result<()> {
self.state.save_block_batch(block_batch).inspect_err(|e| {
tracing::error!(reason = ?e, "failed to save block_batch in RocksPermanent");
})
}

fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<()> {
self.state.save_accounts(accounts).inspect_err(|e| {
tracing::error!(reason = ?e, "failed to save accounts in RocksPermanent");
Expand Down
Loading

0 comments on commit 83f662e

Please sign in to comment.