Skip to content

Commit

Permalink
enha: implement slot and account cache
Browse files Browse the repository at this point in the history
which replaces the previous temporary storage linear search with
an optimized LRU cache search, this improves TPS and importer-offline
speed by around 10%
  • Loading branch information
marcospb19-cw committed Nov 29, 2024
1 parent 9d23b3c commit 2b950ea
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 82 deletions.
47 changes: 39 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ humantime = "=2.1.0"
indexmap = { version = "=2.2.6", features = ["serde"] }
itertools = "=0.13.0"
lazy_static = "=1.4.0"
lru = "=0.12.5"
nanoid = "=0.4.0"
nonempty = { version = "=0.10.0", features = ["serialize"] }
once_cell = "=1.19.0"
oneshot = "=0.1.8"
parking_lot = "=0.12.3"
paste = "=1.0.15"
phf = "=0.11.2"
pin-project = "=1.1.5"
rand = { version = "=0.8.5", features = ["small_rng"] }
rust_decimal = "=1.36.0"
rustc-hash = "=2.0.0"
smallvec = "=1.13.2"
static_assertions = "=1.1.0"
strum = "=0.26.2"
sugars = "=3.0.1"
Expand Down
31 changes: 15 additions & 16 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,19 @@ const RPC_FETCHER_CHANNEL_CAPACITY: usize = 10;
///
/// We want to persist to the storage in batches, this means we don't save a
/// block right away, but the information from that block still needs to be
/// found in the storage.
/// found in the cache.
///
/// By using a channel with capacity 0 to send STORAGE_SAVER_BATCH_SIZE blocks,
/// we guarantee that accounts and slots can either be found in the permanent
/// storage, or in the temporary one.
///
/// In other words, at most (STORAGE_SAVER_BATCH_SIZE) * 2 executed blocks
/// won't be found in the permanent storage, but that's still within the
/// temporary storage capacity.
///
/// We use half because we want parallelism in execution and persisting, both
/// places need to hold blocks that aren't in the permanent storage yet, it's
/// half for each.
const STORAGE_SAVER_BATCH_SIZE: usize = 64 / 2 - 1;
/// These constants are organized to guarantee that accounts and slots can
/// still be found in the storage cache.
const CACHE_SIZE: usize = 10_000;
const MAX_BLOCKS_NOT_SAVED: usize = CACHE_SIZE - 1;

const BATCH_COUNT: usize = 10;
const SAVER_BATCH_SIZE: usize = MAX_BLOCKS_NOT_SAVED / BATCH_COUNT;
// The fetcher and saver tasks hold each, at most, SAVER_BATCH_SIZE blocks,
// so we need to subtract 2 from the buffer capacity to ensure we only have
// `CACHE_SIZE` executed blocks at a time.
const SAVER_CHANNEL_CAPACITY: usize = BATCH_COUNT - 2;

type BlocksToExecute = Vec<ExternalBlockWithReceipts>;
type BlocksToSave = Vec<Block>;
Expand Down Expand Up @@ -95,7 +94,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
let (fetch_to_execute_tx, fetch_to_execute_rx) = async_mpsc::channel::<BlocksToExecute>(RPC_FETCHER_CHANNEL_CAPACITY);

// send blocks from executor task to saver task
let (execute_to_save_tx, execute_to_save_rx) = mpsc::sync_channel::<BlocksToSave>(0);
let (execute_to_save_tx, execute_to_save_rx) = mpsc::sync_channel::<BlocksToSave>(SAVER_CHANNEL_CAPACITY);

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;
Expand Down Expand Up @@ -232,8 +231,8 @@ fn run_external_block_executor(

let instant_before_execution = Instant::now();

for blocks in Itertools::chunks(blocks.into_iter(), STORAGE_SAVER_BATCH_SIZE).into_iter() {
let mut executed_batch = Vec::with_capacity(STORAGE_SAVER_BATCH_SIZE);
for blocks in Itertools::chunks(blocks.into_iter(), SAVER_BATCH_SIZE).into_iter() {
let mut executed_batch = Vec::with_capacity(SAVER_BATCH_SIZE);

for (mut block, receipts) in blocks {
if GlobalState::is_shutdown_warn(TASK_NAME) {
Expand Down
3 changes: 1 addition & 2 deletions src/eth/primitives/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use crate::eth::primitives::LogTopic;
use crate::gen_newtype_from;

/// Address of an Ethereum account (wallet or contract).
#[derive(DebugAsJson, Clone, Copy, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
#[cfg_attr(test, derive(PartialOrd, Ord))]
#[derive(DebugAsJson, Clone, Copy, Default, Eq, PartialEq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize)]
pub struct Address(pub H160);

impl Address {
Expand Down
89 changes: 89 additions & 0 deletions src/eth/storage/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::num::NonZeroUsize;

use lru::LruCache;
use parking_lot::Mutex;
use rustc_hash::FxBuildHasher;
use smallvec::SmallVec;

use super::AccountWithSlots;
use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
use crate::eth::primitives::ExecutionChanges;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotValue;

pub struct StorageCache {
slot_cache: Mutex<LruCache<(Address, SlotIndex), SlotValue, FxBuildHasher>>,
account_cache: Mutex<LruCache<Address, Account, FxBuildHasher>>,
}

impl Default for StorageCache {
fn default() -> Self {
Self {
slot_cache: Mutex::new(LruCache::with_hasher(NonZeroUsize::new(100_000).unwrap(), FxBuildHasher)),
account_cache: Mutex::new(LruCache::with_hasher(NonZeroUsize::new(20_000).unwrap(), FxBuildHasher)),
}
}
}

impl StorageCache {
pub fn clear(&self) {
self.slot_cache.lock().clear();
self.account_cache.lock().clear();
}

pub fn cache_slot(&self, address: Address, slot: Slot) {
self.slot_cache.lock().put((address, slot.index), slot.value);
}

pub fn cache_account(&self, account: Account) {
self.account_cache.lock().put(account.address, account);
}

pub fn cache_account_and_slots_from_changes(&self, changes: ExecutionChanges) {
let mut slot_batch = SmallVec::<[_; 16]>::new();
let mut account_batch = SmallVec::<[_; 8]>::new();

for change in changes.into_values() {
// cache slots
for slot in change.slots.into_values().flat_map(|slot| slot.take()) {
slot_batch.push(((change.address, slot.index), slot.value));
}

// cache account
let mut account = AccountWithSlots::new(change.address);
if let Some(nonce) = change.nonce.take_ref() {
account.info.nonce = *nonce;
}
if let Some(balance) = change.balance.take_ref() {
account.info.balance = *balance;
}
if let Some(Some(bytecode)) = change.bytecode.take_ref() {
account.info.bytecode = Some(bytecode.clone());
}
account_batch.push((change.address, account.info));
}

{
let mut slot_lock = self.slot_cache.lock();
for (key, value) in slot_batch {
slot_lock.push(key, value);
}
}
{
let mut account_lock = self.account_cache.lock();
for (key, value) in account_batch {
account_lock.push(key, value);
}
}
}

pub fn get_slot(&self, address: Address, index: SlotIndex) -> Option<Slot> {
self.slot_cache.lock().get(&(address, index)).map(|&value| Slot { value, index })
}

pub fn get_account(&self, address: Address) -> Option<Account> {
self.account_cache.lock().get(&address).cloned()
}
}
2 changes: 2 additions & 0 deletions src/eth/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Ethereum / EVM storage.
pub use cache::StorageCache;
pub use permanent::InMemoryPermanentStorage;
pub use permanent::PermanentStorage;
pub use permanent::PermanentStorageConfig;
Expand All @@ -11,6 +12,7 @@ pub use temporary::TemporaryStorage;
pub use temporary::TemporaryStorageConfig;
pub use temporary::TemporaryStorageKind;

mod cache;
pub mod permanent;
mod stratus_storage;
mod temporary;
Expand Down
Loading

0 comments on commit 2b950ea

Please sign in to comment.