diff --git a/.sqlx/query-3b16e7b318ad7861bcda111fdb911efcebaf53a58f850d356e248ea666389a8f.json b/.sqlx/query-3b16e7b318ad7861bcda111fdb911efcebaf53a58f850d356e248ea666389a8f.json new file mode 100644 index 000000000..775750ad3 --- /dev/null +++ b/.sqlx/query-3b16e7b318ad7861bcda111fdb911efcebaf53a58f850d356e248ea666389a8f.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO neo_blocks (block_number, block, created_at) VALUES ($1, $2, NOW())", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Numeric", + "Jsonb" + ] + }, + "nullable": [] + }, + "hash": "3b16e7b318ad7861bcda111fdb911efcebaf53a58f850d356e248ea666389a8f" +} diff --git a/src/config.rs b/src/config.rs index 67682d19e..70b850bff 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,6 +23,7 @@ use crate::eth::primitives::BlockNumber; use crate::eth::primitives::BlockSelection; #[cfg(feature = "dev")] use crate::eth::primitives::StoragePointInTime; +use crate::eth::storage::HybridPermanentStorage; use crate::eth::storage::InMemoryPermanentStorage; use crate::eth::storage::InMemoryTemporaryStorage; use crate::eth::storage::PermanentStorage; @@ -363,6 +364,9 @@ pub enum StorageKind { #[strum(serialize = "postgres")] Postgres { url: String }, + + #[strum(serialize = "hybrid")] + Hybrid { url: String }, } impl PermanentStorageConfig { @@ -379,6 +383,14 @@ impl PermanentStorageConfig { }; Arc::new(Postgres::new(config).await?) } + StorageKind::Hybrid { ref url } => { + let config = PostgresClientConfig { + url: url.to_owned(), + connections: self.perm_connections, + acquire_timeout: Duration::from_millis(self.perm_timeout_millis), + }; + Arc::new(HybridPermanentStorage::new(config).await?) + } }; Ok(Arc::new(StratusStorage::new(temp, perm))) @@ -392,6 +404,10 @@ impl FromStr for StorageKind { match s { "inmemory" => Ok(Self::InMemory), s if s.starts_with("postgres://") => Ok(Self::Postgres { url: s.to_string() }), + s if s.starts_with("hybrid://") => { + let s = s.replace("hybrid", "postgres"); //TODO there is a better way to do this + Ok(Self::Hybrid { url: s.to_string() }) + } s => Err(anyhow!("unknown storage: {}", s)), } } diff --git a/src/eth/storage/hybrid/mod.rs b/src/eth/storage/hybrid/mod.rs new file mode 100644 index 000000000..f1a0cd576 --- /dev/null +++ b/src/eth/storage/hybrid/mod.rs @@ -0,0 +1,532 @@ +//! In-memory storage implementations. + +use std::collections::HashMap; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use async_trait::async_trait; +use indexmap::IndexMap; +use metrics::atomics::AtomicU64; +use rand::rngs::StdRng; +use rand::seq::IteratorRandom; +use rand::SeedableRng; +use serde_json::Value; +use sqlx::postgres::PgPoolOptions; +use tokio::sync::mpsc; +use tokio::sync::mpsc::channel; +use tokio::sync::RwLock; +use tokio::sync::RwLockReadGuard; +use tokio::sync::RwLockWriteGuard; + +use crate::eth::primitives::Account; +use crate::eth::primitives::Address; +use crate::eth::primitives::Block; +use crate::eth::primitives::BlockNumber; +use crate::eth::primitives::BlockSelection; +use crate::eth::primitives::Bytes; +use crate::eth::primitives::ExecutionAccountChanges; +use crate::eth::primitives::ExecutionConflicts; +use crate::eth::primitives::ExecutionConflictsBuilder; +use crate::eth::primitives::Hash; +use crate::eth::primitives::LogFilter; +use crate::eth::primitives::LogMined; +use crate::eth::primitives::Nonce; +use crate::eth::primitives::Slot; +use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotSample; +use crate::eth::primitives::StoragePointInTime; +use crate::eth::primitives::TransactionMined; +use crate::eth::primitives::Wei; +use crate::eth::storage::inmemory::InMemoryHistory; +use crate::eth::storage::PermanentStorage; +use crate::eth::storage::StorageError; +use crate::infra::postgres::PostgresClientConfig; + +#[derive(Debug)] +struct BlockTask { + block_number: BlockNumber, + block_data: Value, +} + +#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] +pub struct HybridPermanentStorageState { + pub accounts: HashMap, + pub transactions: HashMap, + pub blocks_by_number: IndexMap>, + pub blocks_by_hash: IndexMap>, + pub logs: Vec, +} + +#[derive(Debug)] +pub struct HybridPermanentStorage { + state: RwLock, + block_number: AtomicU64, + task_sender: mpsc::Sender, +} + +impl HybridPermanentStorage { + pub async fn new(config: PostgresClientConfig) -> anyhow::Result { + tracing::info!("starting hybrid storage"); + + let connection_pool = PgPoolOptions::new() + .min_connections(config.connections) + .max_connections(config.connections) + .acquire_timeout(config.acquire_timeout) + .connect(&config.url) + .await + .map_err(|e| { + tracing::error!(reason = ?e, "failed to start postgres client"); + anyhow::anyhow!("failed to start postgres client") + })?; + + let (task_sender, task_receiver) = channel::(32); + let pool = Arc::new(connection_pool.clone()); + tokio::spawn(async move { + // Assuming you define a 'response_sender' if you plan to handle responses + let worker_pool = Arc::>::clone(&pool); + // Omitting response channel setup for simplicity + Self::worker(task_receiver, worker_pool).await; + }); + + Ok(Self { + state: RwLock::new(HybridPermanentStorageState::default()), + block_number: Default::default(), + task_sender, + }) + } + + async fn worker(mut receiver: tokio::sync::mpsc::Receiver, pool: Arc>) { + tracing::info!("Starting worker"); + while let Some(block_task) = receiver.recv().await { + let pool_clone = Arc::>::clone(&pool); + // Here we attempt to insert the block data into the database. + // Adjust the SQL query according to your table schema. + tokio::spawn(async move { + let result = sqlx::query!( + "INSERT INTO neo_blocks (block_number, block, created_at) VALUES ($1, $2, NOW())", + block_task.block_number as _, + block_task.block_data + ) + .execute(&*pool_clone) + .await; + + // Handle the result of the insert operation. + match result { + Ok(_) => tracing::info!("Block {} inserted successfully.", block_task.block_number), + Err(e) => tracing::error!("Failed to insert block {}: {}", block_task.block_number, e), + } + }); + } + } + + // ------------------------------------------------------------------------- + // Lock methods + // ------------------------------------------------------------------------- + + /// Locks inner state for reading. + async fn lock_read(&self) -> RwLockReadGuard<'_, HybridPermanentStorageState> { + self.state.read().await + } + + /// Locks inner state for writing. + async fn lock_write(&self) -> RwLockWriteGuard<'_, HybridPermanentStorageState> { + self.state.write().await + } + + // ------------------------------------------------------------------------- + // State methods + // ------------------------------------------------------------------------- + + /// Clears in-memory state. + pub async fn clear(&self) { + let mut state = self.lock_write().await; + state.accounts.clear(); + state.transactions.clear(); + state.blocks_by_hash.clear(); + state.blocks_by_number.clear(); + state.logs.clear(); + } + + async fn check_conflicts(state: &HybridPermanentStorageState, account_changes: &[ExecutionAccountChanges]) -> Option { + let mut conflicts = ExecutionConflictsBuilder::default(); + + for change in account_changes { + let address = &change.address; + + if let Some(account) = state.accounts.get(address) { + // check account info conflicts + if let Some(original_nonce) = change.nonce.take_original_ref() { + let account_nonce = account.nonce.get_current_ref(); + if original_nonce != account_nonce { + conflicts.add_nonce(address.clone(), account_nonce.clone(), original_nonce.clone()); + } + } + if let Some(original_balance) = change.balance.take_original_ref() { + let account_balance = account.balance.get_current_ref(); + if original_balance != account_balance { + conflicts.add_balance(address.clone(), account_balance.clone(), original_balance.clone()); + } + } + + // check slots conflicts + for (slot_index, slot_change) in &change.slots { + if let Some(account_slot) = account.slots.get(slot_index).map(|value| value.get_current_ref()) { + if let Some(original_slot) = slot_change.take_original_ref() { + let account_slot_value = account_slot.value.clone(); + if original_slot.value != account_slot_value { + conflicts.add_slot(address.clone(), slot_index.clone(), account_slot_value, original_slot.value.clone()); + } + } + } + } + } + } + conflicts.build() + } +} + +impl HybridPermanentStorage {} + +#[async_trait] +impl PermanentStorage for HybridPermanentStorage { + // ------------------------------------------------------------------------- + // Block number operations + // ------------------------------------------------------------------------- + + async fn read_current_block_number(&self) -> anyhow::Result { + Ok(self.block_number.load(Ordering::SeqCst).into()) + } + + async fn increment_block_number(&self) -> anyhow::Result { + let next = self.block_number.fetch_add(1, Ordering::SeqCst) + 1; + Ok(next.into()) + } + + async fn set_block_number(&self, number: BlockNumber) -> anyhow::Result<()> { + self.block_number.store(number.as_u64(), Ordering::SeqCst); + Ok(()) + } + + // ------------------------------------------------------------------------- + // State operations + // ------------------------------------------------------------------------ + + async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { + tracing::debug!(%address, "reading account"); + + let state = self.lock_read().await; + + match state.accounts.get(address) { + Some(inmemory_account) => { + let account = inmemory_account.to_account(point_in_time); + tracing::trace!(%address, ?account, "account found"); + Ok(Some(account)) + } + + None => { + tracing::trace!(%address, "account not found"); + Ok(None) + } + } + } + + async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { + tracing::debug!(%address, %slot_index, ?point_in_time, "reading slot"); + + let state = self.lock_read().await; + let Some(account) = state.accounts.get(address) else { + tracing::trace!(%address, "account not found"); + return Ok(Default::default()); + }; + + match account.slots.get(slot_index) { + Some(slot_history) => { + let slot = slot_history.get_at_point(point_in_time).unwrap_or_default(); + tracing::trace!(%address, %slot_index, ?point_in_time, %slot, "slot found"); + Ok(Some(slot)) + } + + None => { + tracing::trace!(%address, %slot_index, ?point_in_time, "slot not found"); + Ok(None) + } + } + } + + async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result> { + tracing::debug!(?selection, "reading block"); + + let state_lock = self.lock_read().await; + let block = match selection { + BlockSelection::Latest => state_lock.blocks_by_number.values().last().cloned(), + BlockSelection::Earliest => state_lock.blocks_by_number.values().next().cloned(), + BlockSelection::Number(number) => state_lock.blocks_by_number.get(number).cloned(), + BlockSelection::Hash(hash) => state_lock.blocks_by_hash.get(hash).cloned(), + }; + match block { + Some(block) => { + tracing::trace!(?selection, ?block, "block found"); + Ok(Some((*block).clone())) + } + None => { + tracing::trace!(?selection, "block not found"); + Ok(None) + } + } + } + + async fn read_mined_transaction(&self, hash: &Hash) -> anyhow::Result> { + tracing::debug!(%hash, "reading transaction"); + let state_lock = self.lock_read().await; + + match state_lock.transactions.get(hash) { + Some(transaction) => { + tracing::trace!(%hash, "transaction found"); + Ok(Some(transaction.clone())) + } + None => { + tracing::trace!(%hash, "transaction not found"); + Ok(None) + } + } + } + + async fn read_logs(&self, filter: &LogFilter) -> anyhow::Result> { + tracing::debug!(?filter, "reading logs"); + let state_lock = self.lock_read().await; + + let logs = state_lock + .logs + .iter() + .skip_while(|log| log.block_number < filter.from_block) + .take_while(|log| match filter.to_block { + Some(to_block) => log.block_number <= to_block, + None => true, + }) + .filter(|log| filter.matches(log)) + .cloned() + .collect(); + Ok(logs) + } + + async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError> { + let mut state = self.lock_write().await; + + // check conflicts before persisting any state changes + let account_changes = block.compact_account_changes(); + if let Some(conflicts) = Self::check_conflicts(&state, &account_changes).await { + return Err(StorageError::Conflict(conflicts)); + } + + // save block + tracing::debug!(number = %block.number(), "saving block"); + let block = Arc::new(block); + let number = block.number(); + state.blocks_by_number.insert(*number, Arc::clone(&block)); + state.blocks_by_hash.insert(block.hash().clone(), Arc::clone(&block)); + + // save transactions + for transaction in block.transactions.clone() { + tracing::debug!(hash = %transaction.input.hash, "saving transaction"); + state.transactions.insert(transaction.input.hash.clone(), transaction.clone()); + if transaction.is_success() { + for log in transaction.logs { + state.logs.push(log); + } + } + } + + // save block account changes + for changes in account_changes { + let account = state + .accounts + .entry(changes.address.clone()) + .or_insert_with(|| InMemoryPermanentAccount::new_empty(changes.address)); + + // account basic info + if let Some(nonce) = changes.nonce.take_modified() { + account.nonce.push(*number, nonce); + } + if let Some(balance) = changes.balance.take_modified() { + account.balance.push(*number, balance); + } + + // bytecode + if let Some(Some(bytecode)) = changes.bytecode.take_modified() { + account.bytecode.push(*number, Some(bytecode)); + } + + // slots + for (_, slot) in changes.slots { + if let Some(slot) = slot.take_modified() { + match account.slots.get_mut(&slot.index) { + Some(slot_history) => { + slot_history.push(*number, slot); + } + None => { + account.slots.insert(slot.index.clone(), InMemoryHistory::new(*number, slot)); + } + } + } + } + } + + Ok(()) + } + + async fn after_commit_hook(&self) -> anyhow::Result<()> { + let b = self.read_block(&BlockSelection::Latest).await?; + if let Some(bb) = b { + let s = format!("{} => {}", bb.number(), bb.transactions.len()); + dbg!(s); + let bbb = *bb.number(); + + let block_task = BlockTask { + block_number: bbb, + block_data: serde_json::to_value(bb).unwrap(), + }; + + // Send the task to be processed by a worker + dbg!("sending block task"); + self.task_sender.send(block_task).await.expect("Failed to send block task"); + } + + Ok(()) + } + + async fn save_accounts(&self, accounts: Vec) -> anyhow::Result<()> { + tracing::debug!(?accounts, "saving initial accounts"); + + let mut state = self.lock_write().await; + for account in accounts { + state.accounts.insert( + account.address.clone(), + InMemoryPermanentAccount::new_with_balance(account.address, account.balance), + ); + } + Ok(()) + } + + async fn reset_at(&self, block_number: BlockNumber) -> anyhow::Result<()> { + // reset block number + let block_number_u64: u64 = block_number.into(); + let _ = self.block_number.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| { + if block_number_u64 <= current { + Some(block_number_u64) + } else { + None + } + }); + + // remove blocks + let mut state = self.lock_write().await; + state.blocks_by_hash.retain(|_, b| *b.number() <= block_number); + state.blocks_by_number.retain(|_, b| *b.number() <= block_number); + + // remove transactions and logs + state.transactions.retain(|_, t| t.block_number <= block_number); + state.logs.retain(|l| l.block_number <= block_number); + + // remove account changes + for account in state.accounts.values_mut() { + account.reset_at(block_number); + } + + Ok(()) + } + + async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result> { + let state = self.lock_read().await; + + let samples = state + .accounts + .iter() + .filter(|(_, account_info)| account_info.is_contract()) + .flat_map(|(_, contract)| { + contract + .slots + .values() + .flat_map(|slot_history| Vec::from((*slot_history).clone())) + .filter_map(|slot| { + if slot.block_number >= start && slot.block_number < end { + Some(SlotSample { + address: contract.address.clone(), + block_number: slot.block_number, + index: slot.value.index, + value: slot.value.value, + }) + } else { + None + } + }) + }); + + match max_samples { + 0 => Ok(samples.collect()), + n => { + let mut rng = StdRng::seed_from_u64(seed); + Ok(samples.choose_multiple(&mut rng, n as usize)) + } + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct InMemoryPermanentAccount { + #[allow(dead_code)] + pub address: Address, + pub balance: InMemoryHistory, + pub nonce: InMemoryHistory, + pub bytecode: InMemoryHistory>, + pub slots: HashMap>, +} + +impl InMemoryPermanentAccount { + /// Creates a new empty permanent account. + fn new_empty(address: Address) -> Self { + Self::new_with_balance(address, Wei::ZERO) + } + + /// Creates a new permanent account with initial balance. + pub fn new_with_balance(address: Address, balance: Wei) -> Self { + Self { + address, + balance: InMemoryHistory::new_at_zero(balance), + nonce: InMemoryHistory::new_at_zero(Nonce::ZERO), + bytecode: InMemoryHistory::new_at_zero(None), + slots: Default::default(), + } + } + + /// Resets all account changes to the specified block number. + pub fn reset_at(&mut self, block_number: BlockNumber) { + // SAFETY: ok to unwrap because all historical values starts at block 0 + self.balance = self.balance.reset_at(block_number).expect("never empty"); + self.nonce = self.nonce.reset_at(block_number).expect("never empty"); + self.bytecode = self.bytecode.reset_at(block_number).expect("never empty"); + + // SAFETY: not ok to unwrap because slot value does not start at block 0 + let mut new_slots = HashMap::with_capacity(self.slots.len()); + for (slot_index, slot_history) in self.slots.iter() { + if let Some(new_slot_history) = slot_history.reset_at(block_number) { + new_slots.insert(slot_index.clone(), new_slot_history); + } + } + self.slots = new_slots; + } + + /// Checks current account is a contract. + pub fn is_contract(&self) -> bool { + self.bytecode.get_current().is_some() + } + + /// Converts itself to an account at a point-in-time. + pub fn to_account(&self, point_in_time: &StoragePointInTime) -> Account { + Account { + address: self.address.clone(), + balance: self.balance.get_at_point(point_in_time).unwrap_or_default(), + nonce: self.nonce.get_at_point(point_in_time).unwrap_or_default(), + bytecode: self.bytecode.get_at_point(point_in_time).unwrap_or_default(), + } + } +} diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index 1eae34ae7..2f36e9090 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -351,6 +351,10 @@ impl PermanentStorage for InMemoryPermanentStorage { Ok(()) } + async fn after_commit_hook(&self) -> anyhow::Result<()> { + Ok(()) + } + async fn save_accounts(&self, accounts: Vec) -> anyhow::Result<()> { tracing::debug!(?accounts, "saving initial accounts"); diff --git a/src/eth/storage/mod.rs b/src/eth/storage/mod.rs index 0a54f09fa..9fcff8958 100644 --- a/src/eth/storage/mod.rs +++ b/src/eth/storage/mod.rs @@ -1,6 +1,7 @@ //! Ethereum / EVM storage. mod csv; +mod hybrid; mod inmemory; mod permanent_storage; mod postgres; @@ -9,6 +10,7 @@ mod stratus_storage; mod temporary_storage; pub use csv::CsvExporter; +pub use hybrid::HybridPermanentStorage; pub use inmemory::InMemoryPermanentStorage; pub use inmemory::InMemoryPermanentStorageState; pub use inmemory::InMemoryTemporaryStorage; diff --git a/src/eth/storage/permanent_storage.rs b/src/eth/storage/permanent_storage.rs index 7da84b3ca..8f178f2f9 100644 --- a/src/eth/storage/permanent_storage.rs +++ b/src/eth/storage/permanent_storage.rs @@ -47,6 +47,9 @@ pub trait PermanentStorage: Send + Sync { /// Persists atomically all changes from a block. async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError>; + /// Run after block commit callbacks. + async fn after_commit_hook(&self) -> anyhow::Result<()>; + /// Persists initial accounts (test accounts or genesis accounts). async fn save_accounts(&self, accounts: Vec) -> anyhow::Result<()>; diff --git a/src/eth/storage/postgres/postgres.rs b/src/eth/storage/postgres/postgres.rs index 34ea50636..aa86d1d68 100644 --- a/src/eth/storage/postgres/postgres.rs +++ b/src/eth/storage/postgres/postgres.rs @@ -684,6 +684,10 @@ impl PermanentStorage for Postgres { Ok(()) } + async fn after_commit_hook(&self) -> anyhow::Result<()> { + Ok(()) + } + async fn read_current_block_number(&self) -> anyhow::Result { tracing::debug!("reading current block number"); diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index 8d748bb67..00c419a4b 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -178,6 +178,7 @@ impl StratusStorage { // save block to permanent storage and clears temporary storage let result = self.perm.save_block(block).await; + self.perm.after_commit_hook().await?; self.reset_temp().await?; metrics::inc_storage_commit(start.elapsed(), label_size_by_tx, label_size_by_gas, result.is_ok()); diff --git a/src/infra/postgres.rs b/src/infra/postgres.rs index 88e8d553a..0b56b1518 100644 --- a/src/infra/postgres.rs +++ b/src/infra/postgres.rs @@ -11,6 +11,7 @@ use sqlx::postgres::PgPoolOptions; use sqlx::types::BigDecimal; use sqlx::PgPool; use tokio::sync::RwLock; +use tracing::info; use crate::eth::primitives::Address; use crate::eth::primitives::BlockNumber; @@ -84,8 +85,6 @@ impl Postgres { } async fn new_sload_cache(connection_pool: PgPool) -> anyhow::Result> { - tracing::info!("loading sload cache"); - let raw_sload = sqlx::query_file_as!(SlotCache, "src/eth/storage/postgres/queries/select_slot_cache.sql", BigDecimal::from(0)) .fetch_all(&connection_pool) .await?; @@ -95,6 +94,7 @@ impl Postgres { sload_cache.insert((s.address, s.index), (s.value, s.block)); }); + info!("finished loading sload cache"); Ok(sload_cache) } diff --git a/static/schema/001-init.sql b/static/schema/001-init.sql index 2e9eb4f54..e805761e5 100644 --- a/static/schema/001-init.sql +++ b/static/schema/001-init.sql @@ -721,6 +721,12 @@ CREATE INDEX index_topics_on_block_hash_and_log_idx_and_topic_idx ON public.topi CREATE UNIQUE INDEX index_transactions_on_hash ON public.transactions USING btree (hash); +--- XXX temporary +CREATE TABLE public.neo_blocks ( + block_number numeric PRIMARY KEY, + block JSONB NOT NULL, + created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT now() +); -- -- PostgreSQL database dump complete