diff --git a/.sqlx/query-3aa3f853d45b3fadcc9eb1a80cc45233482a14c82ac8a6630922bdcefae3cfd8.json b/.sqlx/query-3aa3f853d45b3fadcc9eb1a80cc45233482a14c82ac8a6630922bdcefae3cfd8.json new file mode 100644 index 000000000..9752cdc84 --- /dev/null +++ b/.sqlx/query-3aa3f853d45b3fadcc9eb1a80cc45233482a14c82ac8a6630922bdcefae3cfd8.json @@ -0,0 +1,32 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT ON (account_address, slot_index)\n account_address,\n slot_index,\n value\n FROM\n neo_account_slots\n ORDER BY\n account_address,\n slot_index,\n block_number DESC\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "account_address", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "slot_index", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "value", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + true + ] + }, + "hash": "3aa3f853d45b3fadcc9eb1a80cc45233482a14c82ac8a6630922bdcefae3cfd8" +} diff --git a/.sqlx/query-94064f50a8d7041651be00c2892585c0333dc2de6412fe5cff8594e5dc4b183f.json b/.sqlx/query-8fba38ff2edbba29a64d40f2f1386c24f91dccea326df850e48b0f53d3c6437f.json similarity index 51% rename from .sqlx/query-94064f50a8d7041651be00c2892585c0333dc2de6412fe5cff8594e5dc4b183f.json rename to .sqlx/query-8fba38ff2edbba29a64d40f2f1386c24f91dccea326df850e48b0f53d3c6437f.json index 7b0e14c6a..68f785b22 100644 --- a/.sqlx/query-94064f50a8d7041651be00c2892585c0333dc2de6412fe5cff8594e5dc4b183f.json +++ b/.sqlx/query-8fba38ff2edbba29a64d40f2f1386c24f91dccea326df850e48b0f53d3c6437f.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO public.neo_accounts (block_number, address, bytecode, balance, nonce)\n SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::numeric[])\n AS t(block_number, address, bytecode, balance, nonce);", + "query": "INSERT INTO public.neo_accounts (block_number, address, bytecode, balance, nonce)\n SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::numeric[])\n AS t(block_number, address, bytecode, balance, nonce);", "describe": { "columns": [], "parameters": { @@ -14,5 +14,5 @@ }, "nullable": [] }, - "hash": "94064f50a8d7041651be00c2892585c0333dc2de6412fe5cff8594e5dc4b183f" + "hash": "8fba38ff2edbba29a64d40f2f1386c24f91dccea326df850e48b0f53d3c6437f" } diff --git a/.sqlx/query-edae8a7534d957fb6f727f9222299b258a4c96059ab55cd6af5477502acd2265.json b/.sqlx/query-edae8a7534d957fb6f727f9222299b258a4c96059ab55cd6af5477502acd2265.json new file mode 100644 index 000000000..f76288568 --- /dev/null +++ b/.sqlx/query-edae8a7534d957fb6f727f9222299b258a4c96059ab55cd6af5477502acd2265.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT DISTINCT ON (address)\n address,\n nonce,\n balance,\n bytecode\n FROM\n neo_accounts\n ORDER BY\n address,\n block_number DESC\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "address", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "nonce", + "type_info": "Numeric" + }, + { + "ordinal": 2, + "name": "balance", + "type_info": "Numeric" + }, + { + "ordinal": 3, + "name": "bytecode", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + true, + true, + true + ] + }, + "hash": "edae8a7534d957fb6f727f9222299b258a4c96059ab55cd6af5477502acd2265" +} diff --git a/src/eth/primitives/slot.rs b/src/eth/primitives/slot.rs index 1b2669f5f..f980d62ce 100644 --- a/src/eth/primitives/slot.rs +++ b/src/eth/primitives/slot.rs @@ -81,6 +81,23 @@ impl Display for SlotIndex { gen_newtype_from!(self = SlotIndex, other = u64, U256, [u8; 32]); +impl From> for SlotIndex { + fn from(bytes: Vec) -> Self { + // Initialize U256 to zero + // Assuming the byte array is in big-endian format, + let u256: U256 = if bytes.len() <= 32 { + let mut padded_bytes = [0u8; 32]; + padded_bytes[32 - bytes.len()..].copy_from_slice(&bytes); + U256::from_big_endian(&padded_bytes) + } else { + // Handle the error or truncate the Vec as needed + // For simplicity, this example will only load the first 32 bytes if the Vec is too large + U256::from_big_endian(&bytes[0..32]) + }; + SlotIndex(u256) + } +} + impl From for SlotIndex { fn from(value: RevmU256) -> Self { Self(value.to_be_bytes().into()) @@ -187,6 +204,30 @@ impl From for [u8; 32] { } } +impl From for Vec { + fn from(value: SlotValue) -> Self { + let mut vec = vec![0u8; 32]; // Initialize a vector with 32 bytes set to 0 + value.0.to_big_endian(&mut vec); + vec + } +} +impl From> for SlotValue { + fn from(bytes: Vec) -> Self { + // Initialize U256 to zero + // Assuming the byte array is in big-endian format, + let u256: U256 = if bytes.len() <= 32 { + let mut padded_bytes = [0u8; 32]; + padded_bytes[32 - bytes.len()..].copy_from_slice(&bytes); + U256::from_big_endian(&padded_bytes) + } else { + // Handle the error or truncate the Vec as needed + // For simplicity, this example will only load the first 32 bytes if the Vec is too large + U256::from_big_endian(&bytes[0..32]) + }; + SlotValue(u256) + } +} + gen_newtype_from!(self = SlotValue, other = u64, U256, [u8; 32]); impl From for SlotValue { diff --git a/src/eth/storage/hybrid/hybrid_history.rs b/src/eth/storage/hybrid/hybrid_history.rs new file mode 100644 index 000000000..96f4bf893 --- /dev/null +++ b/src/eth/storage/hybrid/hybrid_history.rs @@ -0,0 +1,166 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use sqlx::types::BigDecimal; +use sqlx::FromRow; +use sqlx::Pool; +use sqlx::Postgres; + +use crate::eth::primitives::Account; +use crate::eth::primitives::Address; +use crate::eth::primitives::Bytes; +use crate::eth::primitives::Nonce; +use crate::eth::primitives::Slot; +use crate::eth::primitives::SlotIndex; +use crate::eth::primitives::SlotValue; +use crate::eth::primitives::StoragePointInTime; +use crate::eth::primitives::Wei; + +#[derive(Debug)] +struct SlotInfo { + value: SlotValue, +} + +#[derive(Debug)] +pub struct AccountInfo { + balance: Wei, + nonce: Nonce, + bytecode: Option, + slots: HashMap, +} + +#[derive(Debug)] +pub struct HybridHistory { + pub hybrid_accounts_slots: HashMap, + pool: Arc>, +} + +#[derive(FromRow)] +struct AccountRow { + address: Vec, + nonce: Option, + balance: Option, + bytecode: Option>, +} + +#[derive(FromRow)] +struct SlotRow { + account_address: Vec, + slot_index: SlotIndex, + value: Option>, +} + +impl HybridHistory { + pub async fn new(pool: Arc>) -> Result { + // Initialize the structure + let mut history = HybridHistory { + hybrid_accounts_slots: HashMap::new(), + pool, + }; + + history.load_latest_data().await?; + + Ok(history) + } + + //XXX TODO use a fixed block_number during load, in order to avoid sync problem + // e.g other instance moving forward and this query getting incongruous data + async fn load_latest_data(&mut self) -> Result<(), sqlx::Error> { + let account_rows = sqlx::query_as!( + AccountRow, + " + SELECT DISTINCT ON (address) + address, + nonce, + balance, + bytecode + FROM + neo_accounts + ORDER BY + address, + block_number DESC + " + ) + .fetch_all(&*self.pool) + .await?; + + let mut accounts: HashMap = HashMap::new(); + + for account_row in account_rows { + let addr: Address = account_row.address.try_into().unwrap_or_default(); //XXX add alert + accounts.insert( + addr, + AccountInfo { + balance: account_row.balance.map(|b| b.try_into().unwrap_or_default()).unwrap_or_default(), + nonce: account_row.nonce.map(|n| n.try_into().unwrap_or_default()).unwrap_or_default(), + bytecode: account_row.bytecode.map(Bytes::from), + slots: HashMap::new(), + }, + ); + } + + // Load slots + let slot_rows = sqlx::query_as!( + SlotRow, + " + SELECT DISTINCT ON (account_address, slot_index) + account_address, + slot_index, + value + FROM + neo_account_slots + ORDER BY + account_address, + slot_index, + block_number DESC + " + ) + .fetch_all(&*self.pool) + .await?; + + for slot_row in slot_rows { + let addr = &slot_row.account_address.try_into().unwrap_or_default(); //XXX add alert + if let Some(account_info) = accounts.get_mut(addr) { + account_info.slots.insert( + slot_row.slot_index, + SlotInfo { + value: slot_row.value.unwrap_or_default().into(), + }, + ); + } + } + + self.hybrid_accounts_slots = accounts; + + Ok(()) + } + + pub async fn get_slot_at_point(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> Option { + match point_in_time { + StoragePointInTime::Present => self.hybrid_accounts_slots.get(address).map(|account_info| { + let value = account_info.slots.get(slot_index).map(|slot_info| slot_info.value.clone()).unwrap_or_default(); + Slot { + index: slot_index.clone(), + value, + } + }), + StoragePointInTime::Past(_number) => { + None //XXX TODO use postgres query + } + } + } +} + +impl AccountInfo { + pub async fn to_account(&self, point_in_time: &StoragePointInTime, address: &Address) -> Account { + match point_in_time { + StoragePointInTime::Present => Account { + address: address.clone(), + nonce: self.nonce.clone(), + balance: self.balance.clone(), + bytecode: self.bytecode.clone(), + }, + StoragePointInTime::Past(_number) => Account::default(), + } + } +} diff --git a/src/eth/storage/hybrid/mod.rs b/src/eth/storage/hybrid/mod.rs index 375335727..2b3e1c40b 100644 --- a/src/eth/storage/hybrid/mod.rs +++ b/src/eth/storage/hybrid/mod.rs @@ -1,3 +1,4 @@ +mod hybrid_history; mod query_executor; use std::collections::HashMap; @@ -65,6 +66,7 @@ pub struct HybridPermanentStorageState { pub struct HybridPermanentStorage { state: RwLock, block_number: AtomicU64, + hybrid_state: hybrid_history::HybridHistory, task_sender: mpsc::Sender, } @@ -101,10 +103,13 @@ impl HybridPermanentStorage { let block_number = Self::preload_block_number(connection_pool.clone()).await?; let state = RwLock::new(HybridPermanentStorageState::default()); + //XXX this will eventually replace state + let hybrid_state = hybrid_history::HybridHistory::new(connection_pool.clone().into()).await?; Ok(Self { state, block_number, + hybrid_state, task_sender, }) } @@ -230,18 +235,16 @@ impl PermanentStorage for HybridPermanentStorage { // ------------------------------------------------------------------------ async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { - tracing::debug!(%address, "reading account"); - - let state = self.lock_read().await; - - match state.accounts.get(address) { + //XXX TODO deal with point_in_time first, e.g create to_account at hybrid_accounts_slots + match self.hybrid_state.hybrid_accounts_slots.get(address) { Some(inmemory_account) => { - let account = inmemory_account.to_account(point_in_time); + let account = inmemory_account.to_account(point_in_time, address).await; tracing::trace!(%address, ?account, "account found"); Ok(Some(account)) } None => { + //XXX TODO start a inmemory account from the database maybe using to_account on a empty account tracing::trace!(%address, "account not found"); Ok(None) } @@ -251,24 +254,7 @@ impl PermanentStorage for HybridPermanentStorage { async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { tracing::debug!(%address, %slot_index, ?point_in_time, "reading slot"); - let state = self.lock_read().await; - let Some(account) = state.accounts.get(address) else { - tracing::trace!(%address, "account not found"); - return Ok(Default::default()); - }; - - match account.slots.get(slot_index) { - Some(slot_history) => { - let slot = slot_history.get_at_point(point_in_time).unwrap_or_default(); - tracing::trace!(%address, %slot_index, ?point_in_time, %slot, "slot found"); - Ok(Some(slot)) - } - - None => { - tracing::trace!(%address, %slot_index, ?point_in_time, "slot not found"); - Ok(None) - } - } + Ok(self.hybrid_state.get_slot_at_point(address, slot_index, point_in_time).await) } async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result> { @@ -368,6 +354,7 @@ impl PermanentStorage for HybridPermanentStorage { if let Some(slot) = slot.take_modified() { match account.slots.get_mut(&slot.index) { Some(slot_history) => { + //XXX simply maintain the latest and no history slot_history.push(*number, slot); } None => { @@ -519,14 +506,4 @@ impl InMemoryPermanentAccount { pub fn is_contract(&self) -> bool { self.bytecode.get_current().is_some() } - - /// Converts itself to an account at a point-in-time. - pub fn to_account(&self, point_in_time: &StoragePointInTime) -> Account { - Account { - address: self.address.clone(), - balance: self.balance.get_at_point(point_in_time).unwrap_or_default(), - nonce: self.nonce.get_at_point(point_in_time).unwrap_or_default(), - bytecode: self.bytecode.get_at_point(point_in_time).unwrap_or_default(), - } - } } diff --git a/src/eth/storage/hybrid/query_executor.rs b/src/eth/storage/hybrid/query_executor.rs index 599dadd29..2ad5f503d 100644 --- a/src/eth/storage/hybrid/query_executor.rs +++ b/src/eth/storage/hybrid/query_executor.rs @@ -101,8 +101,8 @@ pub async fn commit_eventually(pool: Arc>, block_task: BlockTask) if !accounts_changes.0.is_empty() { sqlx::query!( "INSERT INTO public.neo_accounts (block_number, address, bytecode, balance, nonce) - SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::numeric[]) - AS t(block_number, address, bytecode, balance, nonce);", + SELECT * FROM UNNEST($1::bigint[], $2::bytea[], $3::bytea[], $4::numeric[], $5::numeric[]) + AS t(block_number, address, bytecode, balance, nonce);", accounts_changes.0 as _, accounts_changes.1 as _, accounts_changes.2 as _,