Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19-cw committed Nov 28, 2024
1 parent 2a50a99 commit f8cd0c6
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 100 deletions.
35 changes: 27 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ cfg-if = "=1.0.0"
chrono = { version = "=0.4.38", features = ["serde"] }
const_format = "=0.2.32"
const-hex = "=1.12.0"
dashmap = { version = "=6.1.0", features = ["inline"] }
derive_more = "=0.99.17"
derive-new = "=0.6.0"
hash_hasher = "=2.0.3"
Expand All @@ -30,11 +31,13 @@ nanoid = "=0.4.0"
nonempty = { version = "=0.10.0", features = ["serialize"] }
once_cell = "=1.19.0"
oneshot = "=0.1.8"
parking_lot = "=0.12.3"
paste = "=1.0.15"
phf = "=0.11.2"
pin-project = "=1.1.5"
rand = { version = "=0.8.5", features = ["small_rng"] }
rust_decimal = "=1.36.0"
rustc-hash = "=2.0.0"
static_assertions = "=1.1.0"
strum = "=0.26.2"
sugars = "=3.0.1"
Expand Down Expand Up @@ -124,6 +127,7 @@ sasl2-sys = { version = "0.1.22", features = ["vendored"] }

# Historic events processor
indicatif = "=0.17.8"
smallvec = "1.13.2"

# ------------------------------------------------------------------------------
# Platform specific dependencies
Expand Down
28 changes: 12 additions & 16 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,16 @@ const RPC_FETCHER_CHANNEL_CAPACITY: usize = 10;
///
/// We want to persist to the storage in batches, this means we don't save a
/// block right away, but the information from that block still needs to be
/// found in the storage.
/// found in the cache.
///
/// By using a channel with capacity 0 to send STORAGE_SAVER_BATCH_SIZE blocks,
/// we guarantee that accounts and slots can either be found in the permanent
/// storage, or in the temporary one.
///
/// In other words, at most (STORAGE_SAVER_BATCH_SIZE) * 2 executed blocks
/// won't be found in the permanent storage, but that's still within the
/// temporary storage capacity.
///
/// We use half because we want parallelism in execution and persisting, both
/// places need to hold blocks that aren't in the permanent storage yet, it's
/// half for each.
const STORAGE_SAVER_BATCH_SIZE: usize = 64 / 2 - 1;
/// These constants are organized to guarantee that accounts and slots can
/// still be found in the storage cache.
const CACHE_SIZE: usize = 86_400;
const MAX_BLOCKS_NOT_SAVED: usize = CACHE_SIZE - 1;

const BATCH_COUNT: usize = 12;
const SAVER_BATCH_SIZE: usize = MAX_BLOCKS_NOT_SAVED / BATCH_COUNT;
const SAVER_CHANNEL_CAPACITY: usize = BATCH_COUNT - 2; // TODO: explain why

type BlocksToExecute = Vec<ExternalBlockWithReceipts>;
type BlocksToSave = Vec<Block>;
Expand Down Expand Up @@ -95,7 +91,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
let (fetch_to_execute_tx, fetch_to_execute_rx) = async_mpsc::channel::<BlocksToExecute>(RPC_FETCHER_CHANNEL_CAPACITY);

// send blocks from executor task to saver task
let (execute_to_save_tx, execute_to_save_rx) = mpsc::sync_channel::<BlocksToSave>(0);
let (execute_to_save_tx, execute_to_save_rx) = mpsc::sync_channel::<BlocksToSave>(SAVER_CHANNEL_CAPACITY);

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;
Expand Down Expand Up @@ -232,8 +228,8 @@ fn run_external_block_executor(

let instant_before_execution = Instant::now();

for blocks in Itertools::chunks(blocks.into_iter(), STORAGE_SAVER_BATCH_SIZE).into_iter() {
let mut executed_batch = Vec::with_capacity(STORAGE_SAVER_BATCH_SIZE);
for blocks in Itertools::chunks(blocks.into_iter(), SAVER_BATCH_SIZE).into_iter() {
let mut executed_batch = Vec::with_capacity(SAVER_BATCH_SIZE);

for (mut block, receipts) in blocks {
if GlobalState::is_shutdown_warn(TASK_NAME) {
Expand Down
3 changes: 1 addition & 2 deletions src/eth/primitives/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use crate::eth::primitives::LogTopic;
use crate::gen_newtype_from;

/// Address of an Ethereum account (wallet or contract).
#[derive(DebugAsJson, Clone, Copy, Default, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
#[cfg_attr(test, derive(PartialOrd, Ord))]
#[derive(DebugAsJson, Clone, Copy, Default, Eq, PartialEq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize)]
pub struct Address(pub H160);

impl Address {
Expand Down
192 changes: 192 additions & 0 deletions src/eth/storage/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
use std::collections::hash_map::Entry;
use std::collections::BTreeMap;
use std::hash::Hash;
use std::mem;
use std::ops::DerefMut;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use parking_lot::RwLock;
use parking_lot::RwLockWriteGuard;
use rustc_hash::FxBuildHasher;
use rustc_hash::FxHashMap;
use smallvec::SmallVec;

use super::AccountWithSlots;
use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
use crate::eth::primitives::ExecutionChanges;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotValue;

type EntryId = u64;

pub struct StorageCache {
slot_cache: Cache<(Address, SlotIndex), SlotValue>,
account_cache: Cache<Address, Account>,
}

impl Default for StorageCache {
fn default() -> Self {
Self {
slot_cache: Cache::with_target_capacity(100_000),
account_cache: Cache::with_target_capacity(20_000),
}
}
}

impl StorageCache {
pub fn cache_slot(&self, address: Address, slot: Slot) {
self.slot_cache.put((address, slot.index), slot.value);
}

pub fn cache_account(&self, account: Account) {
self.account_cache.put(account.address, account);
}

pub fn cache_account_and_slots_from_execution(&self, changes: &ExecutionChanges) {
let mut slot_batch = SmallVec::<[_; 16]>::new();
let mut account_batch = SmallVec::<[_; 8]>::new();

for change in changes.values() {
// cache slots
for &slot in change.slots.values().flat_map(|slot| slot.take_ref()) {
slot_batch.push(((change.address, slot.index), slot.value));
}

// cache account
let mut account = AccountWithSlots::new(change.address);
if let Some(nonce) = change.nonce.take_ref() {
account.info.nonce = *nonce;
}
if let Some(balance) = change.balance.take_ref() {
account.info.balance = *balance;
}
if let Some(Some(bytecode)) = change.bytecode.take_ref() {
account.info.bytecode = Some(bytecode.clone());
}
account_batch.push((change.address, account.info));
}

self.slot_cache.put_batch(slot_batch);
self.account_cache.put_batch(account_batch);
}

pub fn get_slot(&self, address: Address, index: SlotIndex) -> Option<Slot> {
self.slot_cache.get(&(address, index)).map(|value| Slot { value, index })
}

pub fn get_account(&self, address: Address) -> Option<Account> {
self.account_cache.get(&address)
}
}

type Id = u64;

struct Cache<K, V> {
target_capacity: usize,
id_counter: AtomicU64,
kv: RwLock<FxHashMap<K, (Id, V)>>,
eviction_queue: RwLock<BTreeMap<Id, K>>,
}

impl<K, V> Cache<K, V>
where
K: Hash + Eq + Clone,
V: Clone,
{
pub fn with_target_capacity(target_capacity: usize) -> Self {
assert!(target_capacity > 8192);
Self {
target_capacity,
id_counter: AtomicU64::new(0),
kv: RwLock::new(FxHashMap::with_capacity_and_hasher(target_capacity + 8192 * 2, FxBuildHasher)),
eviction_queue: RwLock::new(BTreeMap::new()),
}
}

pub fn get(&self, key: &K) -> Option<V> {
self.kv.read().get(key).map(|pair| pair.1.clone())
}

pub fn put(&self, key: K, value: V) {
self.put_batch([(key, value)]);
}

pub fn put_batch<I>(&self, iter: I)
where
I: IntoIterator<Item = (K, V)>,
{
let mut should_evict = false;
let mut ids_with_keys_to_add = SmallVec::<[(Id, K); 16]>::new();
let mut ids_with_keys_to_remove = SmallVec::<[(Id, K); 16]>::new();

{
let mut kv_lock = self.kv.write();

for (key, value) in iter {
let (id, eviction_trigger) = self.next_id();
should_evict |= eviction_trigger;
ids_with_keys_to_add.push((id, key.clone()));

let previous = kv_lock.insert(key.clone(), (id, value));
if let Some((previous_id, _)) = previous {
ids_with_keys_to_remove.push((previous_id, key));
}
}
}

let mut queue_lock = self.eviction_queue.write();
for (id, key) in ids_with_keys_to_remove {
queue_lock.remove(&id);
}
for (id, key) in ids_with_keys_to_add {
queue_lock.insert(id, key);
}
if should_evict {
self.run_eviction(queue_lock);
}
}

#[inline]
/// Calculates the next entry id and tells if should run eviction.
fn next_id(&self) -> (EntryId, bool) {
let id = self.id_counter.fetch_add(1, Ordering::Relaxed);
// plan eviction every 8192 entries added to this cache
// 8192 == 2^13, which makes this an stupidly fast bitmask check
let should_evict = id % 8192 == 0;
(id, should_evict)
}

#[inline]
fn run_eviction(&self, mut queue_lock: RwLockWriteGuard<BTreeMap<Id, K>>) {
if queue_lock.len() <= self.target_capacity {
return; // nothing to do
}

// Underflow Safety:
// - queue_lock.len() > self.target_capacity
let amount_to_evict = queue_lock.len() - self.target_capacity;

// Note: linear search might be a bottleneck, but `.lower_bound()` is unstable
// Unwrap Safety:
// - amount_to_evict < queue_lock.len()
// - because self.target_capacity > 0
let (&id_to_split_at, _) = queue_lock.iter().nth(amount_to_evict).unwrap();

let right_side = queue_lock.split_off(&id_to_split_at);
let elements_to_evict = mem::replace(queue_lock.deref_mut(), right_side);

drop(queue_lock);
let mut kv_lock = self.kv.write();

for (id, key) in elements_to_evict {
if let Entry::Occupied(occupied) = kv_lock.entry(key) {
if occupied.get().0 == id {
occupied.remove();
}
}
}
}
}
Loading

0 comments on commit f8cd0c6

Please sign in to comment.