diff --git a/src/eth/storage/hybrid/hybrid_history.rs b/src/eth/storage/hybrid/hybrid_state.rs similarity index 82% rename from src/eth/storage/hybrid/hybrid_history.rs rename to src/eth/storage/hybrid/hybrid_state.rs index efe5d0b53..782d5fcfe 100644 --- a/src/eth/storage/hybrid/hybrid_history.rs +++ b/src/eth/storage/hybrid/hybrid_state.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::Context; +use indexmap::IndexMap; use sqlx::types::BigDecimal; use sqlx::FromRow; use sqlx::Pool; @@ -9,21 +10,26 @@ use sqlx::Postgres; use crate::eth::primitives::Account; use crate::eth::primitives::Address; +use crate::eth::primitives::Block; +use crate::eth::primitives::BlockNumber; use crate::eth::primitives::Bytes; use crate::eth::primitives::ExecutionAccountChanges; +use crate::eth::primitives::Hash; +use crate::eth::primitives::LogMined; use crate::eth::primitives::Nonce; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::SlotValue; use crate::eth::primitives::StoragePointInTime; +use crate::eth::primitives::TransactionMined; use crate::eth::primitives::Wei; -#[derive(Debug)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] pub struct SlotInfo { pub value: SlotValue, } -#[derive(Debug)] +#[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct AccountInfo { pub balance: Wei, pub nonce: Nonce, @@ -31,12 +37,6 @@ pub struct AccountInfo { pub slots: HashMap, } -#[derive(Debug)] -pub struct HybridHistory { - pub hybrid_accounts_slots: HashMap, - pub pool: Arc>, -} - #[derive(FromRow)] struct AccountRow { address: Vec, @@ -52,22 +52,19 @@ struct SlotRow { value: Option>, } -impl HybridHistory { - pub async fn new(pool: Arc>) -> Result { - // Initialize the structure - let mut history = HybridHistory { - hybrid_accounts_slots: HashMap::new(), - pool, - }; - - history.load_latest_data().await?; - - Ok(history) - } +#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] +pub struct HybridStorageState { + pub accounts: HashMap, + pub transactions: HashMap, + pub blocks_by_number: IndexMap>, + pub blocks_by_hash: IndexMap>, + pub logs: Vec, +} +impl HybridStorageState { //XXX TODO use a fixed block_number during load, in order to avoid sync problem // e.g other instance moving forward and this query getting incongruous data - async fn load_latest_data(&mut self) -> Result<(), sqlx::Error> { + pub async fn load_latest_data(&mut self, pool: &Pool) -> Result<(), sqlx::Error> { let account_rows = sqlx::query_as!( AccountRow, " @@ -83,7 +80,7 @@ impl HybridHistory { block_number DESC " ) - .fetch_all(&*self.pool) + .fetch_all(pool) .await?; let mut accounts: HashMap = HashMap::new(); @@ -117,7 +114,7 @@ impl HybridHistory { block_number DESC " ) - .fetch_all(&*self.pool) + .fetch_all(pool) .await?; for slot_row in slot_rows { @@ -132,7 +129,7 @@ impl HybridHistory { } } - self.hybrid_accounts_slots = accounts; + self.accounts = accounts; Ok(()) } @@ -142,7 +139,7 @@ impl HybridHistory { for change in changes { let address = change.address.clone(); // Assuming Address is cloneable and the correct type. - let account_info_entry = self.hybrid_accounts_slots.entry(address).or_insert_with(|| AccountInfo { + let account_info_entry = self.accounts.entry(address).or_insert_with(|| AccountInfo { balance: Wei::ZERO, // Initialize with default values. nonce: Nonce::ZERO, bytecode: None, @@ -171,9 +168,15 @@ impl HybridHistory { Ok(()) } - pub async fn get_slot_at_point(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { + pub async fn get_slot_at_point( + &self, + address: &Address, + slot_index: &SlotIndex, + point_in_time: &StoragePointInTime, + pool: &Pool, + ) -> anyhow::Result> { let slot = match point_in_time { - StoragePointInTime::Present => self.hybrid_accounts_slots.get(address).map(|account_info| { + StoragePointInTime::Present => self.accounts.get(address).map(|account_info| { let value = account_info.slots.get(slot_index).map(|slot_info| slot_info.value.clone()).unwrap_or_default(); Slot { index: slot_index.clone(), @@ -199,7 +202,7 @@ impl HybridHistory { slot_index as _, number as _ ) - .fetch_optional(&*self.pool) + .fetch_optional(pool) .await .context("failed to select slot")?, }; diff --git a/src/eth/storage/hybrid/mod.rs b/src/eth/storage/hybrid/mod.rs index 16baba22d..8740eb13b 100644 --- a/src/eth/storage/hybrid/mod.rs +++ b/src/eth/storage/hybrid/mod.rs @@ -1,4 +1,4 @@ -mod hybrid_history; +mod hybrid_state; mod query_executor; use std::collections::HashMap; @@ -8,14 +8,11 @@ use std::time::Duration; use anyhow::Context; use async_trait::async_trait; -use indexmap::IndexMap; use metrics::atomics::AtomicU64; use num_traits::cast::ToPrimitive; -use rand::rngs::StdRng; -use rand::seq::IteratorRandom; -use rand::SeedableRng; use sqlx::postgres::PgPoolOptions; use sqlx::Pool; +use sqlx::Postgres; use tokio::sync::mpsc; use tokio::sync::mpsc::channel; use tokio::sync::RwLock; @@ -23,28 +20,24 @@ use tokio::sync::RwLockReadGuard; use tokio::sync::RwLockWriteGuard; use tokio::sync::Semaphore; +use self::hybrid_state::HybridStorageState; use crate::eth::primitives::Account; use crate::eth::primitives::Address; use crate::eth::primitives::Block; use crate::eth::primitives::BlockNumber; use crate::eth::primitives::BlockSelection; -use crate::eth::primitives::Bytes; use crate::eth::primitives::ExecutionAccountChanges; use crate::eth::primitives::ExecutionConflicts; use crate::eth::primitives::ExecutionConflictsBuilder; use crate::eth::primitives::Hash; use crate::eth::primitives::LogFilter; use crate::eth::primitives::LogMined; -use crate::eth::primitives::Nonce; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::SlotSample; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::TransactionMined; -use crate::eth::primitives::Wei; -use crate::eth::storage::hybrid::hybrid_history::AccountInfo; -use crate::eth::storage::hybrid::hybrid_history::HybridHistory; -use crate::eth::storage::inmemory::InMemoryHistory; +use crate::eth::storage::hybrid::hybrid_state::AccountInfo; use crate::eth::storage::PermanentStorage; use crate::eth::storage::StorageError; @@ -56,20 +49,11 @@ struct BlockTask { account_changes: Vec, } -#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] -pub struct HybridPermanentStorageState { - pub accounts: HashMap, - pub transactions: HashMap, - pub blocks_by_number: IndexMap>, - pub blocks_by_hash: IndexMap>, - pub logs: Vec, -} - #[derive(Debug)] pub struct HybridPermanentStorage { - state: RwLock, + state: RwLock, + pool: Arc>, block_number: AtomicU64, - hybrid_state: RwLock, task_sender: mpsc::Sender, } @@ -96,24 +80,23 @@ impl HybridPermanentStorage { })?; let (task_sender, task_receiver) = channel::(32); - let pool = Arc::new(connection_pool.clone()); + let worker_pool = Arc::new(connection_pool.clone()); + let pool = Arc::clone(&worker_pool); tokio::spawn(async move { // Assuming you define a 'response_sender' if you plan to handle responses - let worker_pool = Arc::>::clone(&pool); + let worker_pool = Arc::>::clone(&worker_pool); // Omitting response channel setup for simplicity Self::worker(task_receiver, worker_pool, config.connections).await; }); let block_number = Self::preload_block_number(connection_pool.clone()).await?; - let state = RwLock::new(HybridPermanentStorageState::default()); - //XXX this will eventually replace state - let hybrid_state = RwLock::new(HybridHistory::new(connection_pool.clone().into()).await?); - + let state = RwLock::new(HybridStorageState::default()); + state.write().await.load_latest_data(&pool).await?; Ok(Self { state, block_number, - hybrid_state, task_sender, + pool, }) } @@ -150,12 +133,12 @@ impl HybridPermanentStorage { // ------------------------------------------------------------------------- /// Locks inner state for reading. - async fn lock_read(&self) -> RwLockReadGuard<'_, HybridPermanentStorageState> { + async fn lock_read(&self) -> RwLockReadGuard<'_, HybridStorageState> { self.state.read().await } /// Locks inner state for writing. - async fn lock_write(&self) -> RwLockWriteGuard<'_, HybridPermanentStorageState> { + async fn lock_write(&self) -> RwLockWriteGuard<'_, HybridStorageState> { self.state.write().await } @@ -173,13 +156,13 @@ impl HybridPermanentStorage { state.logs.clear(); } - async fn check_conflicts(hybrid_state: &HybridHistory, account_changes: &[ExecutionAccountChanges]) -> Option { + async fn check_conflicts(state: &HybridStorageState, account_changes: &[ExecutionAccountChanges]) -> Option { let mut conflicts = ExecutionConflictsBuilder::default(); for change in account_changes { let address = &change.address; - if let Some(account) = hybrid_state.hybrid_accounts_slots.get(address) { + if let Some(account) = state.accounts.get(address) { // check account info conflicts if let Some(original_nonce) = change.nonce.take_original_ref() { let account_nonce = &account.nonce; @@ -210,8 +193,6 @@ impl HybridPermanentStorage { } } -impl HybridPermanentStorage {} - #[async_trait] impl PermanentStorage for HybridPermanentStorage { // ------------------------------------------------------------------------- @@ -238,11 +219,11 @@ impl PermanentStorage for HybridPermanentStorage { async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { //XXX TODO deal with point_in_time first, e.g create to_account at hybrid_accounts_slots - let hybrid_state = self.hybrid_state.read().await; - + let state = self.state.read().await; + tracing::debug!(?state.accounts); let account = match point_in_time { StoragePointInTime::Present => { - match hybrid_state.hybrid_accounts_slots.get(address) { + match state.accounts.get(address) { Some(inmemory_account) => { let account = inmemory_account.to_account(point_in_time, address).await; tracing::trace!(%address, ?account, "account found"); @@ -272,7 +253,7 @@ impl PermanentStorage for HybridPermanentStorage { address as _, block_number as _ ) - .fetch_optional(&*hybrid_state.pool) + .fetch_optional(&*self.pool) .await .context("failed to select account")?, }; @@ -281,7 +262,7 @@ impl PermanentStorage for HybridPermanentStorage { async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { tracing::debug!(%address, %slot_index, ?point_in_time, "reading slot"); - self.hybrid_state.read().await.get_slot_at_point(address, slot_index, point_in_time).await + self.state.read().await.get_slot_at_point(address, slot_index, point_in_time, &self.pool).await } async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result> { @@ -341,24 +322,22 @@ impl PermanentStorage for HybridPermanentStorage { } async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError> { - let mut state = self.lock_write().await; - let mut hybrid_state = self.hybrid_state.write().await; + let mut state = self.state.write().await; // check conflicts before persisting any state changes let account_changes = block.compact_account_changes(); - if let Some(conflicts) = Self::check_conflicts(&hybrid_state, &account_changes).await { + if let Some(conflicts) = Self::check_conflicts(&state, &account_changes).await { return Err(StorageError::Conflict(conflicts)); } // save block - tracing::debug!(number = %block.number(), "saving block"); let block = Arc::new(block); let number = block.number(); state.blocks_by_hash.truncate(600); state.blocks_by_number.insert(*number, Arc::clone(&block)); //XXX deal with errors later - let _ = hybrid_state.update_state_with_execution_changes(&account_changes).await; + let _ = state.update_state_with_execution_changes(&account_changes).await; let block_task = BlockTask { block_number: *block.number(), @@ -379,10 +358,10 @@ impl PermanentStorage for HybridPermanentStorage { async fn save_accounts(&self, accounts: Vec) -> anyhow::Result<()> { tracing::debug!(?accounts, "saving initial accounts"); - let mut state = self.hybrid_state.write().await; + let mut state = self.state.write().await; let mut accounts_changes = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()); for account in accounts { - state.hybrid_accounts_slots.insert( + state.accounts.insert( account.address.clone(), AccountInfo { balance: account.balance.clone(), @@ -408,7 +387,7 @@ impl PermanentStorage for HybridPermanentStorage { accounts_changes.3 as _, accounts_changes.4 as _, ) - .execute(&*state.pool) + .execute(&*self.pool) .await?; Ok(()) } @@ -433,91 +412,13 @@ impl PermanentStorage for HybridPermanentStorage { state.transactions.retain(|_, t| t.block_number <= block_number); state.logs.retain(|l| l.block_number <= block_number); - // remove account changes - for account in state.accounts.values_mut() { - account.reset_at(block_number); - } + // XXX Reset in postgres and load latest account data + // state.accounts.clear(); Ok(()) } - async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { - let state = self.lock_read().await; - - let samples = state - .accounts - .iter() - .filter(|(_, account_info)| account_info.is_contract()) - .flat_map(|(_, contract)| { - contract - .slots - .values() - .flat_map(|slot_history| Vec::from((*slot_history).clone())) - .filter_map(|slot| { - if slot.block_number >= start && slot.block_number < end { - Some(SlotSample { - address: contract.address.clone(), - block_number: slot.block_number, - index: slot.value.index, - value: slot.value.value, - }) - } else { - None - } - }) - }); - - match max_samples { - 0 => Ok(samples.collect()), - n => { - let mut rng = StdRng::seed_from_u64(seed); - Ok(samples.choose_multiple(&mut rng, n as usize)) - } - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct InMemoryPermanentAccount { - #[allow(dead_code)] - pub address: Address, - pub balance: InMemoryHistory, - pub nonce: InMemoryHistory, - pub bytecode: InMemoryHistory>, - pub slots: HashMap>, -} - -impl InMemoryPermanentAccount { - /// Creates a new permanent account with initial balance. - pub fn _new_with_balance(address: Address, balance: Wei) -> Self { - Self { - address, - balance: InMemoryHistory::new_at_zero(balance), - nonce: InMemoryHistory::new_at_zero(Nonce::ZERO), - bytecode: InMemoryHistory::new_at_zero(None), - slots: Default::default(), - } - } - - /// Resets all account changes to the specified block number. - pub fn reset_at(&mut self, block_number: BlockNumber) { - // SAFETY: ok to unwrap because all historical values starts at block 0 - self.balance = self.balance.reset_at(block_number).expect("never empty"); - self.nonce = self.nonce.reset_at(block_number).expect("never empty"); - self.bytecode = self.bytecode.reset_at(block_number).expect("never empty"); - - // SAFETY: not ok to unwrap because slot value does not start at block 0 - let mut new_slots = HashMap::with_capacity(self.slots.len()); - for (slot_index, slot_history) in self.slots.iter() { - if let Some(new_slot_history) = slot_history.reset_at(block_number) { - new_slots.insert(slot_index.clone(), new_slot_history); - } - } - self.slots = new_slots; - } - - /// Checks current account is a contract. - pub fn is_contract(&self) -> bool { - self.bytecode.get_current().is_some() + async fn read_slots_sample(&self, _start: BlockNumber, _end: BlockNumber, _max_samples: u64, _seed: u64) -> anyhow::Result> { + todo!() } }