From 0332dacf3ea2ff02efc7fb7b71da0de64143a004 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Thu, 11 Apr 2024 22:58:59 -0300 Subject: [PATCH] refactoring: renaming storage functions and params (#573) --- .../storage/inmemory/inmemory_permanent.rs | 34 +++++----- .../storage/inmemory/inmemory_temporary.rs | 12 ++-- src/eth/storage/permanent_storage.rs | 18 ++--- .../postgres_permanent/postgres_permanent.rs | 65 +++++++++---------- src/eth/storage/rocks/rocks_permanent.rs | 30 ++++----- src/eth/storage/rocks/rocks_state.rs | 14 ++-- src/eth/storage/rocks/rocks_temporary.rs | 10 +-- src/eth/storage/sled/sled_temporary.rs | 18 ++--- src/eth/storage/stratus_storage.rs | 13 ++-- src/eth/storage/temporary_storage.rs | 4 +- 10 files changed, 97 insertions(+), 121 deletions(-) diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index b7afa247e..17206cafc 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -193,7 +193,7 @@ impl PermanentStorage for InMemoryPermanentStorage { // State operations // ------------------------------------------------------------------------ - async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { + async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { tracing::debug!(%address, "reading account"); let state = self.lock_read().await; @@ -212,8 +212,8 @@ impl PermanentStorage for InMemoryPermanentStorage { } } - async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { - tracing::debug!(%address, %slot_index, ?point_in_time, "reading slot"); + async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { + tracing::debug!(%address, %index, ?point_in_time, "reading slot"); let state = self.lock_read().await; let Some(account) = state.accounts.get(address) else { @@ -221,20 +221,29 @@ impl PermanentStorage for InMemoryPermanentStorage { return Ok(Default::default()); }; - match account.slots.get(slot_index) { + match account.slots.get(index) { Some(slot_history) => { let slot = slot_history.get_at_point(point_in_time).unwrap_or_default(); - tracing::trace!(%address, %slot_index, ?point_in_time, %slot, "slot found"); + tracing::trace!(%address, %index, ?point_in_time, %slot, "slot found"); Ok(Some(slot)) } None => { - tracing::trace!(%address, %slot_index, ?point_in_time, "slot not found"); + tracing::trace!(%address, %index, ?point_in_time, "slot not found"); Ok(None) } } } + async fn read_slots( + &self, + _address: &Address, + _indexes: &[SlotIndex], + _point_in_time: &StoragePointInTime, + ) -> anyhow::Result> { + todo!() + } + async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result> { tracing::debug!(?selection, "reading block"); @@ -356,10 +365,6 @@ impl PermanentStorage for InMemoryPermanentStorage { Ok(()) } - async fn after_commit_hook(&self) -> anyhow::Result<()> { - Ok(()) - } - async fn save_accounts(&self, accounts: Vec) -> anyhow::Result<()> { tracing::debug!(?accounts, "saving initial accounts"); @@ -435,15 +440,6 @@ impl PermanentStorage for InMemoryPermanentStorage { } } } - - async fn read_slots( - &self, - _address: &Address, - _slot_indexes: &[SlotIndex], - _point_in_time: &StoragePointInTime, - ) -> anyhow::Result> { - todo!() - } } #[derive(Debug, serde::Serialize, serde::Deserialize)] diff --git a/src/eth/storage/inmemory/inmemory_temporary.rs b/src/eth/storage/inmemory/inmemory_temporary.rs index 178a47f76..bda41edfb 100644 --- a/src/eth/storage/inmemory/inmemory_temporary.rs +++ b/src/eth/storage/inmemory/inmemory_temporary.rs @@ -65,7 +65,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage { Ok(state.active_block_number) } - async fn maybe_read_account(&self, address: &Address) -> anyhow::Result> { + async fn read_account(&self, address: &Address) -> anyhow::Result> { tracing::debug!(%address, "reading account"); let state = self.lock_read().await; @@ -90,8 +90,8 @@ impl TemporaryStorage for InMemoryTemporaryStorage { } } - async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex) -> anyhow::Result> { - tracing::debug!(%address, %slot_index, "reading slot"); + async fn read_slot(&self, address: &Address, index: &SlotIndex) -> anyhow::Result> { + tracing::debug!(%address, %index, "reading slot"); let state = self.lock_read().await; let Some(account) = state.accounts.get(address) else { @@ -99,14 +99,14 @@ impl TemporaryStorage for InMemoryTemporaryStorage { return Ok(Default::default()); }; - match account.slots.get(slot_index) { + match account.slots.get(index) { Some(slot) => { - tracing::trace!(%address, %slot_index, %slot, "slot found"); + tracing::trace!(%address, %index, %slot, "slot found"); Ok(Some(slot.clone())) } None => { - tracing::trace!(%address, %slot_index, "slot not found"); + tracing::trace!(%address, %index, "slot not found"); Ok(None) } } diff --git a/src/eth/storage/permanent_storage.rs b/src/eth/storage/permanent_storage.rs index 036ea591b..6877542d2 100644 --- a/src/eth/storage/permanent_storage.rs +++ b/src/eth/storage/permanent_storage.rs @@ -36,10 +36,13 @@ pub trait PermanentStorage: Send + Sync { async fn increment_block_number(&self) -> anyhow::Result; /// Retrieves an account from the storage. Returns Option when not found. - async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result>; + async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result>; /// Retrieves an slot from the storage. Returns Option when not found. - async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result>; + async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result>; + + /// Retrieves several slots at once. + async fn read_slots(&self, address: &Address, indexes: &[SlotIndex], point_in_time: &StoragePointInTime) -> anyhow::Result>; /// Retrieves a block from the storage. async fn read_block(&self, block_selection: &BlockSelection) -> anyhow::Result>; @@ -53,9 +56,6 @@ pub trait PermanentStorage: Send + Sync { /// Persists atomically all changes from a block. async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError>; - /// Run after block commit callbacks. - async fn after_commit_hook(&self) -> anyhow::Result<()>; - /// Persists initial accounts (test accounts or genesis accounts). async fn save_accounts(&self, accounts: Vec) -> anyhow::Result<()>; @@ -64,12 +64,4 @@ pub trait PermanentStorage: Send + Sync { /// Retrieves a random sample of slots, from the provided start and end blocks. async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result>; - - /// Retrieves several slots at once - async fn read_slots( - &self, - address: &Address, - slot_indexes: &[SlotIndex], - point_in_time: &StoragePointInTime, - ) -> anyhow::Result>; } diff --git a/src/eth/storage/postgres_permanent/postgres_permanent.rs b/src/eth/storage/postgres_permanent/postgres_permanent.rs index 7749c99a0..573304d7a 100644 --- a/src/eth/storage/postgres_permanent/postgres_permanent.rs +++ b/src/eth/storage/postgres_permanent/postgres_permanent.rs @@ -121,7 +121,7 @@ impl PermanentStorage for PostgresPermanentStorage { Ok(()) } - async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { + async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { tracing::debug!(%address, "reading account"); let mut conn = PoolOrThreadConnection::take(&self.pool).await?; @@ -158,11 +158,11 @@ impl PermanentStorage for PostgresPermanentStorage { } } - async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { - tracing::debug!(%address, %slot_index, "reading slot"); + async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { + tracing::debug!(%address, %index, "reading slot"); // TODO: improve this conversion - let slot_index_u8: [u8; 32] = slot_index.clone().into(); + let slot_index_u8: [u8; 32] = index.clone().into(); let mut conn = PoolOrThreadConnection::take(&self.pool).await?; let slot_value_vec: Option> = match point_in_time { @@ -193,19 +193,41 @@ impl PermanentStorage for PostgresPermanentStorage { Some(slot_value_vec) => { let slot_value = SlotValue::from(slot_value_vec); let slot = Slot { - index: slot_index.clone(), + index: index.clone(), value: slot_value, }; - tracing::trace!(?address, ?slot_index, %slot, "slot found"); + tracing::trace!(?address, ?index, %slot, "slot found"); Ok(Some(slot)) } None => { - tracing::trace!(?address, ?slot_index, ?point_in_time, "slot not found"); + tracing::trace!(?address, ?index, ?point_in_time, "slot not found"); Ok(None) } } } + async fn read_slots(&self, address: &Address, indexes: &[SlotIndex], point_in_time: &StoragePointInTime) -> anyhow::Result> { + tracing::debug!(%address, indexes_len = %indexes.len(), "reading slots"); + + let slots = match point_in_time { + StoragePointInTime::Present => + sqlx::query_file_as!(Slot, "src/eth/storage/postgres_permanent/sql/select_slots.sql", indexes as _, address as _) + .fetch_all(&self.pool) + .await?, + StoragePointInTime::Past(block_number) => + sqlx::query_file_as!( + Slot, + "src/eth/storage/postgres_permanent/sql/select_historical_slots.sql", + indexes as _, + address as _, + block_number as _ + ) + .fetch_all(&self.pool) + .await?, + }; + Ok(slots.into_iter().map(|slot| (slot.index, slot.value)).collect()) + } + async fn read_block(&self, block: &BlockSelection) -> anyhow::Result> { tracing::debug!(block = ?block, "reading block"); @@ -751,10 +773,6 @@ impl PermanentStorage for PostgresPermanentStorage { Ok(()) } - async fn after_commit_hook(&self) -> anyhow::Result<()> { - Ok(()) - } - async fn read_mined_block_number(&self) -> anyhow::Result { tracing::debug!("reading current block number"); @@ -861,31 +879,6 @@ impl PermanentStorage for PostgresPermanentStorage { Ok(slots_sample_rows) } - - async fn read_slots( - &self, - address: &Address, - slot_indexes: &[SlotIndex], - point_in_time: &StoragePointInTime, - ) -> anyhow::Result> { - let slots = match point_in_time { - StoragePointInTime::Present => - sqlx::query_file_as!(Slot, "src/eth/storage/postgres_permanent/sql/select_slots.sql", slot_indexes as _, address as _) - .fetch_all(&self.pool) - .await?, - StoragePointInTime::Past(block_number) => - sqlx::query_file_as!( - Slot, - "src/eth/storage/postgres_permanent/sql/select_historical_slots.sql", - slot_indexes as _, - address as _, - block_number as _ - ) - .fetch_all(&self.pool) - .await?, - }; - Ok(slots.into_iter().map(|slot| (slot.index, slot.value)).collect()) - } } fn partition_logs(logs: impl IntoIterator) -> HashMap> { diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index e550a1f41..7d54e1c04 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -129,13 +129,22 @@ impl PermanentStorage for RocksPermanentStorage { // State operations // ------------------------------------------------------------------------ - async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { + async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { Ok(self.state.read_account(address, point_in_time)) } - async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { - tracing::debug!(%address, %slot_index, ?point_in_time, "reading slot"); - Ok(self.state.read_slot(address, slot_index, point_in_time)) + async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { + tracing::debug!(%address, %index, ?point_in_time, "reading slot"); + Ok(self.state.read_slot(address, index, point_in_time)) + } + + async fn read_slots( + &self, + _address: &Address, + _indexes: &[SlotIndex], + _point_in_time: &StoragePointInTime, + ) -> anyhow::Result> { + todo!() } async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result> { @@ -213,10 +222,6 @@ impl PermanentStorage for RocksPermanentStorage { Ok(()) } - async fn after_commit_hook(&self) -> anyhow::Result<()> { - Ok(()) - } - async fn save_accounts(&self, accounts: Vec) -> anyhow::Result<()> { tracing::debug!(?accounts, "saving initial accounts"); @@ -262,13 +267,4 @@ impl PermanentStorage for RocksPermanentStorage { async fn read_slots_sample(&self, _start: BlockNumber, _end: BlockNumber, _max_samples: u64, _seed: u64) -> anyhow::Result> { todo!() } - - async fn read_slots( - &self, - _address: &Address, - _slot_indexes: &[SlotIndex], - _point_in_time: &StoragePointInTime, - ) -> anyhow::Result> { - todo!() - } } diff --git a/src/eth/storage/rocks/rocks_state.rs b/src/eth/storage/rocks/rocks_state.rs index 21073026f..219ac7049 100644 --- a/src/eth/storage/rocks/rocks_state.rs +++ b/src/eth/storage/rocks/rocks_state.rs @@ -341,20 +341,20 @@ impl RocksStorageState { .collect() } - pub fn read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> Option { + pub fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> Option { match point_in_time { - StoragePointInTime::Present => self.account_slots.get(&(address.clone(), slot_index.clone())).map(|account_slot_value| Slot { - index: slot_index.clone(), + StoragePointInTime::Present => self.account_slots.get(&(address.clone(), index.clone())).map(|account_slot_value| Slot { + index: index.clone(), value: account_slot_value.clone(), }), StoragePointInTime::Past(number) => { - if let Some(((addr, index, _), value)) = self + if let Some(((rocks_address, rocks_index, _), value)) = self .account_slots_history - .iter_from((address.clone(), slot_index.clone(), *number), rocksdb::Direction::Reverse) + .iter_from((address.clone(), index.clone(), *number), rocksdb::Direction::Reverse) .next() { - if slot_index == &index && address == &addr { - return Some(Slot { index, value }); + if index == &rocks_index && address == &rocks_address { + return Some(Slot { index: rocks_index, value }); } } None diff --git a/src/eth/storage/rocks/rocks_temporary.rs b/src/eth/storage/rocks/rocks_temporary.rs index 36bd15222..f5a9222f7 100644 --- a/src/eth/storage/rocks/rocks_temporary.rs +++ b/src/eth/storage/rocks/rocks_temporary.rs @@ -45,11 +45,11 @@ impl TemporaryStorage for RocksTemporary { Ok(Some(self.current_block.load(Ordering::SeqCst).into())) } - async fn maybe_read_account(&self, address: &Address) -> anyhow::Result> { + async fn read_account(&self, address: &Address) -> anyhow::Result> { tracing::debug!(%address, "reading account"); // try temporary data - let account = self.temp.maybe_read_account(address).await?; + let account = self.temp.read_account(address).await?; if let Some(account) = account { return Ok(Some(account)); } @@ -57,16 +57,16 @@ impl TemporaryStorage for RocksTemporary { Ok(self.db.read_account(address, &StoragePointInTime::Present)) } - async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex) -> anyhow::Result> { + async fn read_slot(&self, address: &Address, index: &SlotIndex) -> anyhow::Result> { tracing::debug!(%address, "reading slot"); // try temporary data - let slot = self.temp.maybe_read_slot(address, slot_index).await?; + let slot = self.temp.read_slot(address, index).await?; if let Some(slot) = slot { return Ok(Some(slot)); } - Ok(self.db.read_slot(address, slot_index, &StoragePointInTime::Present)) + Ok(self.db.read_slot(address, index, &StoragePointInTime::Present)) } async fn save_account_changes(&self, changes: Vec) -> anyhow::Result<()> { diff --git a/src/eth/storage/sled/sled_temporary.rs b/src/eth/storage/sled/sled_temporary.rs index 690cef486..cf555436e 100644 --- a/src/eth/storage/sled/sled_temporary.rs +++ b/src/eth/storage/sled/sled_temporary.rs @@ -55,11 +55,11 @@ impl TemporaryStorage for SledTemporary { } } - async fn maybe_read_account(&self, address: &Address) -> anyhow::Result> { + async fn read_account(&self, address: &Address) -> anyhow::Result> { tracing::debug!(%address, "reading account"); // try temporary data - let account = self.temp.maybe_read_account(address).await?; + let account = self.temp.read_account(address).await?; if let Some(account) = account { return Ok(Some(account)); } @@ -75,17 +75,17 @@ impl TemporaryStorage for SledTemporary { } } - async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex) -> anyhow::Result> { + async fn read_slot(&self, address: &Address, index: &SlotIndex) -> anyhow::Result> { tracing::debug!(%address, "reading slot"); // try temporary data - let slot = self.temp.maybe_read_slot(address, slot_index).await?; + let slot = self.temp.read_slot(address, index).await?; if let Some(slot) = slot { return Ok(Some(slot)); } // try durable data - match self.db.get(slot_key(address, slot_index)) { + match self.db.get(slot_key(address, index)) { Ok(Some(slot)) => { let slot = serde_json::from_slice(&slot).unwrap(); Ok(Some(slot)) @@ -170,12 +170,12 @@ fn account_key(address: &Address) -> String { format!("address::{}", address) } -fn slot_key_vec(address: &Address, slot_index: &SlotIndex) -> Vec { - slot_key(address, slot_index).into_bytes().to_vec() +fn slot_key_vec(address: &Address, index: &SlotIndex) -> Vec { + slot_key(address, index).into_bytes().to_vec() } -fn slot_key(address: &Address, slot_index: &SlotIndex) -> String { - format!("slot::{}::{}", address, slot_index) +fn slot_key(address: &Address, index: &SlotIndex) -> String { + format!("slot::{}::{}", address, index) } fn block_number_key_vec() -> Vec { diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index c3b1b92f5..b4224f742 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -128,14 +128,14 @@ impl StratusStorage { #[cfg(feature = "metrics")] let start = metrics::now(); - match self.temp.maybe_read_account(address).await? { + match self.temp.read_account(address).await? { Some(account) => { tracing::debug!("account found in the temporary storage"); #[cfg(feature = "metrics")] metrics::inc_storage_read_account(start.elapsed(), STORAGE_TEMP, point_in_time, true); Ok(account) } - None => match self.perm.maybe_read_account(address, point_in_time).await? { + None => match self.perm.read_account(address, point_in_time).await? { Some(account) => { tracing::debug!("account found in the permanent storage"); #[cfg(feature = "metrics")] @@ -156,18 +156,18 @@ impl StratusStorage { } /// Retrieves an slot from the storage. Returns default value when not found. - pub async fn read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result { + pub async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result { #[cfg(feature = "metrics")] let start = metrics::now(); - match self.temp.maybe_read_slot(address, slot_index).await? { + match self.temp.read_slot(address, index).await? { Some(slot) => { tracing::debug!("slot found in the temporary storage"); #[cfg(feature = "metrics")] metrics::inc_storage_read_slot(start.elapsed(), STORAGE_TEMP, point_in_time, true); Ok(slot) } - None => match self.perm.maybe_read_slot(address, slot_index, point_in_time).await? { + None => match self.perm.read_slot(address, index, point_in_time).await? { Some(slot) => { tracing::debug!("slot found in the permanent storage"); #[cfg(feature = "metrics")] @@ -179,7 +179,7 @@ impl StratusStorage { #[cfg(feature = "metrics")] metrics::inc_storage_read_slot(start.elapsed(), DEFAULT_VALUE, point_in_time, true); Ok(Slot { - index: slot_index.clone(), + index: index.clone(), ..Default::default() }) } @@ -289,7 +289,6 @@ impl StratusStorage { // save block to permanent storage and clears temporary storage let next_number = block.number().next(); let result = self.perm.save_block(block).await; - self.perm.after_commit_hook().await?; self.reset_temp().await?; self.set_active_block_number(next_number).await?; diff --git a/src/eth/storage/temporary_storage.rs b/src/eth/storage/temporary_storage.rs index 352fc3cc2..826aa4a56 100644 --- a/src/eth/storage/temporary_storage.rs +++ b/src/eth/storage/temporary_storage.rs @@ -17,10 +17,10 @@ pub trait TemporaryStorage: Send + Sync { async fn read_active_block_number(&self) -> anyhow::Result>; /// Retrieves an account from the storage. Returns Option when not found. - async fn maybe_read_account(&self, address: &Address) -> anyhow::Result>; + async fn read_account(&self, address: &Address) -> anyhow::Result>; /// Retrieves an slot from the storage. Returns Option when not found. - async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex) -> anyhow::Result>; + async fn read_slot(&self, address: &Address, index: &SlotIndex) -> anyhow::Result>; /// Temporarily stores account changes during block production. async fn save_account_changes(&self, changes: Vec) -> anyhow::Result<()>;