Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enha: optimize importer offline #1877

Merged
merged 7 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/importer-offline.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ EXTERNAL_RPC_STORAGE_TIMEOUT=20000
EXTERNAL_RPC_SLOW_QUERY_WARN_THRESHOLD=10s

EXECUTOR_REJECT_NOT_CONTRACT=false

ROCKS_DISABLE_SYNC_WRITE=true
ROCKS_CACHE_SIZE_MULTIPLIER=0.1
2 changes: 1 addition & 1 deletion src/bin/historic_events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn process_block_events(block: BlockRocksdb) -> Vec<String> {
/// Main function that processes blockchain data and generates events
fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt::init();
let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1)).context("failed to create rocksdb state")?;
let state = RocksStorageState::new("data/rocksdb".to_string(), TIMEOUT, Some(0.1), false).context("failed to create rocksdb state")?;

let (b_pb, tx_pb) = create_progress_bar(&state);

Expand Down
281 changes: 167 additions & 114 deletions src/bin/importer_offline.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/eth/primitives/unix_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ mod offset {
return log_and_err!("timestamp can't be before the latest block");
}

let current_time = Utc::now().timestamp() as i64;
let current_time = Utc::now().timestamp();
let diff: i64 = if *new_block_timestamp == 0 {
0
} else {
Expand All @@ -188,12 +188,12 @@ mod offset {
/// 1. When a specific timestamp was set (was_evm_timestamp_set = true):
/// - If new_timestamp is 0: Returns last_block_timestamp + 1
/// - If new_timestamp > 0: Returns exactly new_timestamp
/// After this call, resets the timestamp flag and stored value
/// - After this call, resets the timestamp flag and stored value
///
/// 2. For subsequent blocks (was_evm_timestamp_set = false):
/// - If new_timestamp is set: Uses it as reference point
/// - Otherwise: Uses max(current_time + offset, last_block_timestamp)
/// In both cases, adds 1 second to ensure progression
/// - In both cases, adds 1 second to ensure progression
///
/// The function always updates LAST_BLOCK_TIMESTAMP with the returned value
/// to maintain the chain of increasing timestamps.
Expand All @@ -202,7 +202,7 @@ mod offset {
let new_timestamp_diff = NEW_TIMESTAMP_DIFF.load(Acquire);
let was_evm_timestamp_set = EVM_SET_NEXT_BLOCK_TIMESTAMP_WAS_CALLED.load(Acquire);
let last_block_timestamp = LAST_BLOCK_TIMESTAMP.load(Acquire);
let current_time = Utc::now().timestamp() as i64;
let current_time = Utc::now().timestamp();

let result = if !was_evm_timestamp_set {
let last_timestamp = if new_timestamp != 0 {
Expand Down
39 changes: 18 additions & 21 deletions src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ impl InMemoryTemporaryStorageState {
None => log_and_err!("no pending block being mined"), // try calling set_pending_block_number_as_next_if_not_set or any other method to create a new block on temp storage
}
}
}

impl InMemoryTemporaryStorageState {
pub fn reset(&mut self) {
self.block = None;
self.accounts.clear();
Expand Down Expand Up @@ -158,8 +156,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
.or_insert_with(|| InMemoryTemporaryAccount::new(change.address));

// account basic info
if let Some(&nonce) = change.nonce.take_ref() {
account.info.nonce = nonce;
if let Some(nonce) = change.nonce.take_ref() {
account.info.nonce = *nonce;
}
if let Some(balance) = change.balance.take_ref() {
account.info.balance = *balance;
Expand All @@ -172,8 +170,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

// slots
for slot in change.slots.values() {
if let Some(&slot) = slot.take_ref() {
account.slots.insert(slot.index, slot);
if let Some(slot) = slot.take_ref() {
account.slots.insert(slot.index, *slot);
}
}
}
Expand Down Expand Up @@ -284,18 +282,17 @@ fn do_read_account(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Ad
}

fn do_read_slot(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Address, index: SlotIndex) -> Option<Slot> {
// search all
for state in states.iter() {
let Some(account) = state.accounts.get(&address) else { continue };
let Some(&slot) = account.slots.get(&index) else { continue };
let slot = states
.iter()
.find_map(|state| state.accounts.get(&address).and_then(|account| account.slots.get(&index)));

if let Some(&slot) = slot {
tracing::trace!(%address, %index, %slot, "slot found in temporary");
return Some(slot);
Some(slot)
} else {
tracing::trace!(%address, %index, "slot not found in temporary");
None
}

// not found
tracing::trace!(%address, %index, "slot not found in temporary");
None
}

fn do_check_conflicts(states: &NonEmpty<InMemoryTemporaryStorageState>, execution: &EvmExecution) -> Option<ExecutionConflicts> {
Expand All @@ -304,16 +301,16 @@ fn do_check_conflicts(states: &NonEmpty<InMemoryTemporaryStorageState>, executio
for (&address, change) in &execution.changes {
// check account info conflicts
if let Some(account) = do_read_account(states, address) {
if let Some(&expected) = change.nonce.take_original_ref() {
let original = account.nonce;
if let Some(expected) = change.nonce.take_original_ref() {
let original = &account.nonce;
if expected != original {
conflicts.add_nonce(address, original, expected);
conflicts.add_nonce(address, *original, *expected);
}
}
if let Some(&expected) = change.balance.take_original_ref() {
let original = account.balance;
if let Some(expected) = change.balance.take_original_ref() {
let original = &account.balance;
if expected != original {
conflicts.add_balance(address, original, expected);
conflicts.add_balance(address, *original, *expected);
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/eth/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ mod external_rpc_storage;
mod inmemory;
mod permanent_storage;
mod postgres_external_rpc;
pub mod rocks;

mod redis;
pub mod rocks;
mod storage_point_in_time;
mod stratus_storage;
mod temporary_storage;
Expand Down
12 changes: 11 additions & 1 deletion src/eth/storage/permanent_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ pub trait PermanentStorage: Send + Sync + 'static {
// Block
// -------------------------------------------------------------------------

/// Persists atomically all changes from a block.
/// Persists atomically changes from block.
fn save_block(&self, block: Block) -> anyhow::Result<()>;

/// Persists atomically changes from blocks.
fn save_block_batch(&self, blocks: Vec<Block>) -> anyhow::Result<()> {
blocks.into_iter().try_for_each(|block| self.save_block(block))
}

/// Retrieves a block from the storage.
fn read_block(&self, block_filter: BlockFilter) -> anyhow::Result<Option<Block>>;

Expand Down Expand Up @@ -99,6 +104,10 @@ pub struct PermanentStorageConfig {
/// Augments or decreases the size of Column Family caches based on a multiplier.
#[arg(long = "rocks-cache-size-multiplier", env = "ROCKS_CACHE_SIZE_MULTIPLIER")]
pub rocks_cache_size_multiplier: Option<f32>,

/// Augments or decreases the size of Column Family caches based on a multiplier.
#[arg(long = "rocks-disable-sync-write", env = "ROCKS_DISABLE_SYNC_WRITE")]
pub rocks_disable_sync_write: bool,
}

#[derive(DebugAsJson, Clone, serde::Serialize)]
Expand Down Expand Up @@ -132,6 +141,7 @@ impl PermanentStorageConfig {
self.rocks_path_prefix.clone(),
self.rocks_shutdown_timeout,
self.rocks_cache_size_multiplier,
!self.rocks_disable_sync_write,
)?),
};
Ok(perm)
Expand Down
3 changes: 0 additions & 3 deletions src/eth/storage/rocks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! RocksDB layers (top-to-bottom): permanent -> state -> rest.

/// Batch writer with capacity for flushing temporarily.
mod rocks_batch_writer;

/// Exposed API.
pub mod rocks_permanent;

Expand Down
78 changes: 0 additions & 78 deletions src/eth/storage/rocks/rocks_batch_writer.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1 @@
use std::fmt::Debug;
use std::mem;

use anyhow::Context;
use rocksdb::WriteBatch;
use rocksdb::WriteOptions;
use rocksdb::DB;
use serde::Deserialize;
use serde::Serialize;

use super::rocks_cf::RocksCfRef;

pub fn write_in_batch_for_multiple_cfs_impl(db: &DB, batch: WriteBatch) -> anyhow::Result<()> {
tracing::debug!("writing batch");
let batch_len = batch.len();
let mut options = WriteOptions::default();
// By default, each write to rocksdb is asynchronous: it returns after pushing
// the write from the process into the operating system (buffer cache).
// This option enables sync write to ensure data is persisted to disk before
// returning, preventing potential data loss in case of system failure.
options.set_sync(true);
db.write_opt(batch, &options)
.context("failed to write in batch to (possibly) multiple column families")
.inspect_err(|e| {
tracing::error!(reason = ?e, batch_len, "failed to write batch to DB");
})
}

/// A writer that automatically flushes the batch when it exhausts capacity, supports multiple CFs.
///
/// Similar to `io::BufWriter`.
#[allow(dead_code)]
pub struct BufferedBatchWriter {
len: usize,
capacity: usize,
batch: WriteBatch,
}

#[allow(dead_code)]
impl BufferedBatchWriter {
pub fn new(capacity: usize) -> Self {
Self {
len: 0,
capacity,
batch: WriteBatch::default(),
}
}

pub fn insert<K, V>(&mut self, cf_ref: &RocksCfRef<K, V>, key: K, value: V) -> anyhow::Result<()>
where
K: Serialize + for<'de> Deserialize<'de> + Debug + std::hash::Hash + Eq,
V: Serialize + for<'de> Deserialize<'de> + Debug + Clone,
{
self.len += 1;
cf_ref.prepare_batch_insertion([(key, value)], &mut self.batch)?;
if self.len >= self.capacity {
self.flush(cf_ref.db())?;
}
Ok(())
}

pub fn flush(&mut self, db: &DB) -> anyhow::Result<()> {
if self.len == 0 {
return Ok(());
}
let batch = mem::take(&mut self.batch);
write_in_batch_for_multiple_cfs_impl(db, batch)?;
self.len = 0;
Ok(())
}
}

impl Drop for BufferedBatchWriter {
fn drop(&mut self) {
if self.len > 0 {
tracing::error!(elements_remaining = %self.len, "BufferedBatchWriter dropped with elements not flushed");
}
}
}
15 changes: 13 additions & 2 deletions src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ pub struct RocksPermanentStorage {
}

impl RocksPermanentStorage {
pub fn new(db_path_prefix: Option<String>, shutdown_timeout: Duration, cache_size_multiplier: Option<f32>) -> anyhow::Result<Self> {
pub fn new(
db_path_prefix: Option<String>,
shutdown_timeout: Duration,
cache_size_multiplier: Option<f32>,
enable_sync_write: bool,
) -> anyhow::Result<Self> {
tracing::info!("setting up rocksdb storage");

let path = if let Some(prefix) = db_path_prefix {
Expand All @@ -48,7 +53,7 @@ impl RocksPermanentStorage {
"data/rocksdb".to_string()
};

let state = RocksStorageState::new(path, shutdown_timeout, cache_size_multiplier)?;
let state = RocksStorageState::new(path, shutdown_timeout, cache_size_multiplier, enable_sync_write)?;
let block_number = state.preload_block_number()?;

Ok(Self { state, block_number })
Expand Down Expand Up @@ -131,6 +136,12 @@ impl PermanentStorage for RocksPermanentStorage {
})
}

fn save_block_batch(&self, block_batch: Vec<Block>) -> anyhow::Result<()> {
self.state.save_block_batch(block_batch).inspect_err(|e| {
tracing::error!(reason = ?e, "failed to save block_batch in RocksPermanent");
})
}

fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<()> {
self.state.save_accounts(accounts).inspect_err(|e| {
tracing::error!(reason = ?e, "failed to save accounts in RocksPermanent");
Expand Down
Loading
Loading