Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enha: implement storage cache #1887

Merged
merged 4 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@ humantime = "=2.1.0"
indexmap = { version = "=2.2.6", features = ["serde"] }
itertools = "=0.13.0"
lazy_static = "=1.4.0"
lru = "=0.12.5"
nanoid = "=0.4.0"
nonempty = { version = "=0.10.0", features = ["serialize"] }
once_cell = "=1.19.0"
oneshot = "=0.1.8"
parking_lot = "=0.12.3"
paste = "=1.0.15"
phf = "=0.11.2"
pin-project = "=1.1.5"
rand = { version = "=0.8.5", features = ["small_rng"] }
rust_decimal = "=1.36.0"
rustc-hash = "=2.0.0"
smallvec = "=1.13.2"
static_assertions = "=1.1.0"
strum = "=0.26.2"
sugars = "=3.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ describe("Miner mode change integration test", function () {
updateProviderUrl("stratus");
const response = await sendAndGetFullResponse("stratus_changeMinerMode", ["automine"]);
expect(response.data.error.code).eq(-32603);
expect(response.data.error.message).eq("Miner mode change to (automine) is unsupported.");
expect(response.data.error.message.split("\n")[0]).eq(
"Unexpected error: Miner mode change to 'automine' is unsupported.",
);
});

it("Miner change on Leader to External should fail if there are pending transactions", async function () {
Expand Down
31 changes: 15 additions & 16 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,19 @@ const RPC_FETCHER_CHANNEL_CAPACITY: usize = 10;
///
/// We want to persist to the storage in batches, this means we don't save a
/// block right away, but the information from that block still needs to be
/// found in the storage.
/// found in the cache.
///
/// By using a channel with capacity 0 to send STORAGE_SAVER_BATCH_SIZE blocks,
/// we guarantee that accounts and slots can either be found in the permanent
/// storage, or in the temporary one.
///
/// In other words, at most (STORAGE_SAVER_BATCH_SIZE) * 2 executed blocks
/// won't be found in the permanent storage, but that's still within the
/// temporary storage capacity.
///
/// We use half because we want parallelism in execution and persisting, both
/// places need to hold blocks that aren't in the permanent storage yet, it's
/// half for each.
const STORAGE_SAVER_BATCH_SIZE: usize = 64 / 2 - 1;
/// These constants are organized to guarantee that accounts and slots can
/// still be found in the storage cache.
const CACHE_SIZE: usize = 10_000;
const MAX_BLOCKS_NOT_SAVED: usize = CACHE_SIZE - 1;

const BATCH_COUNT: usize = 10;
const SAVER_BATCH_SIZE: usize = MAX_BLOCKS_NOT_SAVED / BATCH_COUNT;
// The fetcher and saver tasks hold each, at most, SAVER_BATCH_SIZE blocks,
// so we need to subtract 2 from the buffer capacity to ensure we only have
// `CACHE_SIZE` executed blocks at a time.
const SAVER_CHANNEL_CAPACITY: usize = BATCH_COUNT - 2;

type BlocksToExecute = Vec<ExternalBlockWithReceipts>;
type BlocksToSave = Vec<Block>;
Expand Down Expand Up @@ -95,7 +94,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
let (fetch_to_execute_tx, fetch_to_execute_rx) = async_mpsc::channel::<BlocksToExecute>(RPC_FETCHER_CHANNEL_CAPACITY);

// send blocks from executor task to saver task
let (execute_to_save_tx, execute_to_save_rx) = mpsc::sync_channel::<BlocksToSave>(0);
let (execute_to_save_tx, execute_to_save_rx) = mpsc::sync_channel::<BlocksToSave>(SAVER_CHANNEL_CAPACITY);

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;
Expand Down Expand Up @@ -232,8 +231,8 @@ fn run_external_block_executor(

let instant_before_execution = Instant::now();

for blocks in Itertools::chunks(blocks.into_iter(), STORAGE_SAVER_BATCH_SIZE).into_iter() {
let mut executed_batch = Vec::with_capacity(STORAGE_SAVER_BATCH_SIZE);
for blocks in Itertools::chunks(blocks.into_iter(), SAVER_BATCH_SIZE).into_iter() {
let mut executed_batch = Vec::with_capacity(SAVER_BATCH_SIZE);

for (mut block, receipts) in blocks {
if GlobalState::is_shutdown_warn(TASK_NAME) {
Expand Down
3 changes: 1 addition & 2 deletions src/eth/primitives/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use crate::eth::primitives::LogTopic;
use crate::gen_newtype_from;

/// Address of an Ethereum account (wallet or contract).
#[derive(DebugAsJson, Clone, Copy, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
#[cfg_attr(test, derive(PartialOrd, Ord))]
#[derive(DebugAsJson, Clone, Copy, Default, Eq, PartialEq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize)]
pub struct Address(pub H160);

impl Address {
Expand Down
89 changes: 89 additions & 0 deletions src/eth/storage/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::num::NonZeroUsize;

use lru::LruCache;
use parking_lot::Mutex;
use rustc_hash::FxBuildHasher;
use smallvec::SmallVec;

use super::AccountWithSlots;
use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
use crate::eth::primitives::ExecutionChanges;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotValue;

pub struct StorageCache {
slot_cache: Mutex<LruCache<(Address, SlotIndex), SlotValue, FxBuildHasher>>,
account_cache: Mutex<LruCache<Address, Account, FxBuildHasher>>,
}

impl Default for StorageCache {
fn default() -> Self {
Self {
slot_cache: Mutex::new(LruCache::with_hasher(NonZeroUsize::new(100_000).unwrap(), FxBuildHasher)),
account_cache: Mutex::new(LruCache::with_hasher(NonZeroUsize::new(20_000).unwrap(), FxBuildHasher)),
}
}
}

impl StorageCache {
pub fn clear(&self) {
self.slot_cache.lock().clear();
self.account_cache.lock().clear();
}

pub fn cache_slot(&self, address: Address, slot: Slot) {
self.slot_cache.lock().put((address, slot.index), slot.value);
}

pub fn cache_account(&self, account: Account) {
self.account_cache.lock().put(account.address, account);
}

pub fn cache_account_and_slots_from_changes(&self, changes: ExecutionChanges) {
let mut slot_batch = SmallVec::<[_; 16]>::new();
let mut account_batch = SmallVec::<[_; 8]>::new();

for change in changes.into_values() {
// cache slots
for slot in change.slots.into_values().flat_map(|slot| slot.take()) {
slot_batch.push(((change.address, slot.index), slot.value));
}

// cache account
let mut account = AccountWithSlots::new(change.address);
if let Some(nonce) = change.nonce.take_ref() {
account.info.nonce = *nonce;
}
if let Some(balance) = change.balance.take_ref() {
account.info.balance = *balance;
}
if let Some(Some(bytecode)) = change.bytecode.take_ref() {
account.info.bytecode = Some(bytecode.clone());
}
account_batch.push((change.address, account.info));
}

{
let mut slot_lock = self.slot_cache.lock();
for (key, value) in slot_batch {
slot_lock.push(key, value);
}
}
{
let mut account_lock = self.account_cache.lock();
for (key, value) in account_batch {
account_lock.push(key, value);
}
}
}

pub fn get_slot(&self, address: Address, index: SlotIndex) -> Option<Slot> {
self.slot_cache.lock().get(&(address, index)).map(|&value| Slot { value, index })
}

pub fn get_account(&self, address: Address) -> Option<Account> {
self.account_cache.lock().get(&address).cloned()
}
}
21 changes: 20 additions & 1 deletion src/eth/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Ethereum / EVM storage.

pub use cache::StorageCache;
pub use permanent::InMemoryPermanentStorage;
pub use permanent::PermanentStorage;
pub use permanent::PermanentStorageConfig;
Expand All @@ -11,10 +12,12 @@ pub use temporary::TemporaryStorage;
pub use temporary::TemporaryStorageConfig;
pub use temporary::TemporaryStorageKind;

mod cache;
pub mod permanent;
mod stratus_storage;
mod temporary;

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -39,7 +42,7 @@ use crate::eth::primitives::StratusError;
use crate::eth::primitives::TransactionExecution;
use crate::eth::primitives::TransactionStage;

pub trait Storage: Sized {
pub trait Storage: Send + Sync + 'static {
// -------------------------------------------------------------------------
// Block number
// -------------------------------------------------------------------------
Expand Down Expand Up @@ -95,6 +98,22 @@ pub trait Storage: Sized {
fn translate_to_point_in_time(&self, block_filter: BlockFilter) -> Result<PointInTime, StratusError>;
}

#[derive(Debug, Clone)]
pub struct AccountWithSlots {
pub info: Account,
pub slots: HashMap<SlotIndex, Slot, hash_hasher::HashBuildHasher>,
}

impl AccountWithSlots {
/// Creates a new temporary account.
fn new(address: Address) -> Self {
Self {
info: Account::new_empty(address),
slots: HashMap::default(),
}
}
}

// -----------------------------------------------------------------------------
// Config
// -----------------------------------------------------------------------------
Expand Down
Loading
Loading