Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19-cw committed Nov 28, 2024
1 parent 2a50a99 commit ae91f49
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 98 deletions.
30 changes: 24 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ cfg-if = "=1.0.0"
chrono = { version = "=0.4.38", features = ["serde"] }
const_format = "=0.2.32"
const-hex = "=1.12.0"
dashmap = { version = "=6.1.0", features = ["inline"] }
derive_more = "=0.99.17"
derive-new = "=0.6.0"
hash_hasher = "=2.0.3"
Expand All @@ -30,11 +31,13 @@ nanoid = "=0.4.0"
nonempty = { version = "=0.10.0", features = ["serialize"] }
once_cell = "=1.19.0"
oneshot = "=0.1.8"
parking_lot = "=0.12.3"
paste = "=1.0.15"
phf = "=0.11.2"
pin-project = "=1.1.5"
rand = { version = "=0.8.5", features = ["small_rng"] }
rust_decimal = "=1.36.0"
rustc-hash = "=2.0.0"
static_assertions = "=1.1.0"
strum = "=0.26.2"
sugars = "=3.0.1"
Expand Down
28 changes: 12 additions & 16 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,16 @@ const RPC_FETCHER_CHANNEL_CAPACITY: usize = 10;
///
/// We want to persist to the storage in batches, this means we don't save a
/// block right away, but the information from that block still needs to be
/// found in the storage.
/// found in the cache.
///
/// By using a channel with capacity 0 to send STORAGE_SAVER_BATCH_SIZE blocks,
/// we guarantee that accounts and slots can either be found in the permanent
/// storage, or in the temporary one.
///
/// In other words, at most (STORAGE_SAVER_BATCH_SIZE) * 2 executed blocks
/// won't be found in the permanent storage, but that's still within the
/// temporary storage capacity.
///
/// We use half because we want parallelism in execution and persisting, both
/// places need to hold blocks that aren't in the permanent storage yet, it's
/// half for each.
const STORAGE_SAVER_BATCH_SIZE: usize = 64 / 2 - 1;
/// These constants are organized to guarantee that accounts and slots can
/// still be found in the storage cache.
const CACHE_SIZE: usize = 86_400;
const MAX_BLOCKS_NOT_SAVED: usize = CACHE_SIZE - 1;

const BATCH_COUNT: usize = 12;
const SAVER_BATCH_SIZE: usize = MAX_BLOCKS_NOT_SAVED / BATCH_COUNT;
const SAVER_CHANNEL_CAPACITY: usize = BATCH_COUNT - 2; // TODO: explain why

type BlocksToExecute = Vec<ExternalBlockWithReceipts>;
type BlocksToSave = Vec<Block>;
Expand Down Expand Up @@ -95,7 +91,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
let (fetch_to_execute_tx, fetch_to_execute_rx) = async_mpsc::channel::<BlocksToExecute>(RPC_FETCHER_CHANNEL_CAPACITY);

// send blocks from executor task to saver task
let (execute_to_save_tx, execute_to_save_rx) = mpsc::sync_channel::<BlocksToSave>(0);
let (execute_to_save_tx, execute_to_save_rx) = mpsc::sync_channel::<BlocksToSave>(SAVER_CHANNEL_CAPACITY);

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;
Expand Down Expand Up @@ -232,8 +228,8 @@ fn run_external_block_executor(

let instant_before_execution = Instant::now();

for blocks in Itertools::chunks(blocks.into_iter(), STORAGE_SAVER_BATCH_SIZE).into_iter() {
let mut executed_batch = Vec::with_capacity(STORAGE_SAVER_BATCH_SIZE);
for blocks in Itertools::chunks(blocks.into_iter(), SAVER_BATCH_SIZE).into_iter() {
let mut executed_batch = Vec::with_capacity(SAVER_BATCH_SIZE);

for (mut block, receipts) in blocks {
if GlobalState::is_shutdown_warn(TASK_NAME) {
Expand Down
3 changes: 1 addition & 2 deletions src/eth/primitives/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use crate::eth::primitives::LogTopic;
use crate::gen_newtype_from;

/// Address of an Ethereum account (wallet or contract).
#[derive(DebugAsJson, Clone, Copy, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
#[cfg_attr(test, derive(PartialOrd, Ord))]
#[derive(DebugAsJson, Clone, Copy, Default, Eq, PartialEq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize)]
pub struct Address(pub H160);

impl Address {
Expand Down
113 changes: 113 additions & 0 deletions src/eth/storage/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::collections::BTreeSet;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use dashmap::DashMap;
use parking_lot::RwLock;
use rustc_hash::FxBuildHasher;

use super::AccountWithSlots;
use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
use crate::eth::primitives::ExecutionChanges;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotValue;

type EntryId = u64;

const SLOT_CACHE_LIMIT: usize = 10_000;

pub struct StorageCache {
entry_id: AtomicU64,
should_check_eviction: AtomicBool,
cached_slots: DashMap<(SlotIndex, Address), (SlotValue, EntryId), FxBuildHasher>,
slot_eviction_queue: RwLock<BTreeSet<(EntryId, SlotIndex, Address)>>,
cached_accounts: DashMap<Address, (Account, EntryId), FxBuildHasher>,
}

impl StorageCache {
pub fn new() -> StorageCache {
StorageCache {
entry_id: AtomicU64::new(0),
should_check_eviction: AtomicBool::new(false),
cached_slots: DashMap::default(),
cached_accounts: DashMap::default(),
slot_eviction_queue: RwLock::new(BTreeSet::new()),
}
}

pub fn cache_slot(&self, address: Address, slot: Slot) {
self.cached_slots.insert((slot.index, address), (slot.value, self.next_id()));
}

pub fn cache_account(&self, account: Account) {
self.cached_accounts.insert(account.address, (account, self.next_id()));
}

pub fn cache_account_and_slots_from_execution(&self, changes: &ExecutionChanges) {
for change in changes.values() {
// cache slots
let mut slots = vec![];
for &slot in change.slots.values().flat_map(|slot| slot.take_ref()) {
slots.push(slot);
}

for slot in slots {
self.cached_slots.insert((slot.index, change.address), (slot.value, self.next_id()));
// self.slot_eviction_queue
}

// cache account
let mut account = AccountWithSlots::new(change.address);
if let Some(nonce) = change.nonce.take_ref() {
account.info.nonce = *nonce;
}
if let Some(balance) = change.balance.take_ref() {
account.info.balance = *balance;
}
if let Some(Some(bytecode)) = change.bytecode.take_ref() {
account.info.bytecode = Some(bytecode.clone());
}

self.cached_accounts.insert(account.info.address, (account.info, self.next_id()));
}

self.evict_if_necessary();
}

pub fn get_slot(&self, address: Address, index: SlotIndex) -> Option<Slot> {
self.cached_slots.get(&(index, address)).map(|pair| Slot { value: pair.value().0, index })
}

pub fn get_account(&self, address: Address) -> Option<Account> {
self.cached_accounts.get(&address).map(|pair| pair.value().0.clone())
}

/// Clears space if capacity was surpassed.
fn evict_if_necessary(&self) {
// if !self.should_check_eviction.fetch_and(false, Ordering::Relaxed) {
// return;
// }

// let mut slot_count = self.cached_slots.len();
// while slot_count > SLOT_CACHE_LIMIT {
// // Unwrap safety:
// // eviction queue should have same size as cached_slots
// let (_id, index, value) = self.slot_eviction_queue.write().pop_first().unwrap();

// self.cached_slots.remove(&(index, value));

// slot_count -= 1;
// }
}

fn next_id(&self) -> EntryId {
let id = self.entry_id.fetch_add(1, Ordering::Relaxed);
if id % 8192 == 0 {
self.should_check_eviction.store(true, Ordering::Relaxed);
}
id
}
}
19 changes: 19 additions & 0 deletions src/eth/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Ethereum / EVM storage.
pub use cache::StorageCache;
pub use permanent::InMemoryPermanentStorage;
pub use permanent::PermanentStorage;
pub use permanent::PermanentStorageConfig;
Expand All @@ -11,10 +12,12 @@ pub use temporary::TemporaryStorage;
pub use temporary::TemporaryStorageConfig;
pub use temporary::TemporaryStorageKind;

mod cache;
pub mod permanent;
mod stratus_storage;
mod temporary;

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -95,6 +98,22 @@ pub trait Storage: Send + Sync + 'static {
fn translate_to_point_in_time(&self, block_filter: BlockFilter) -> Result<PointInTime, StratusError>;
}

#[derive(Debug, Clone)]
pub struct AccountWithSlots {
pub info: Account,
pub slots: HashMap<SlotIndex, Slot, hash_hasher::HashBuildHasher>,
}

impl AccountWithSlots {
/// Creates a new temporary account.
fn new(address: Address) -> Self {
Self {
info: Account::new_empty(address),
slots: HashMap::default(),
}
}
}

// -----------------------------------------------------------------------------
// Config
// -----------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit ae91f49

Please sign in to comment.