From ab860746217698ca0108448ebd2d26f57046db06 Mon Sep 17 00:00:00 2001 From: carneiro-cw <156914855+carneiro-cw@users.noreply.github.com> Date: Thu, 28 Mar 2024 17:34:58 -0300 Subject: [PATCH] chore: remove lock from hybrid storage (#483) --- src/eth/storage/hybrid/hybrid_state.rs | 4 +- src/eth/storage/hybrid/mod.rs | 103 ++++++++++--------------- 2 files changed, 41 insertions(+), 66 deletions(-) diff --git a/src/eth/storage/hybrid/hybrid_state.rs b/src/eth/storage/hybrid/hybrid_state.rs index 90f9c89e9..4285a5f82 100644 --- a/src/eth/storage/hybrid/hybrid_state.rs +++ b/src/eth/storage/hybrid/hybrid_state.rs @@ -83,7 +83,7 @@ impl HybridStorageState { //XXX TODO use a fixed block_number during load, in order to avoid sync problem // e.g other instance moving forward and this query getting incongruous data - pub async fn load_latest_data(&mut self, pool: &Pool) -> anyhow::Result<()> { + pub async fn load_latest_data(&self, pool: &Pool) -> anyhow::Result<()> { let account_rows = sqlx::query_as!( AccountRow, " @@ -145,7 +145,7 @@ impl HybridStorageState { } /// Updates the in-memory state with changes from transaction execution - pub async fn update_state_with_execution_changes(&mut self, changes: &[ExecutionAccountChanges], block_number: BlockNumber) -> Result<(), sqlx::Error> { + pub async fn update_state_with_execution_changes(&self, changes: &[ExecutionAccountChanges], block_number: BlockNumber) -> Result<(), sqlx::Error> { // Directly capture the fields needed by each future from `self` let accounts = &self.accounts; let accounts_history = &self.accounts_history; diff --git a/src/eth/storage/hybrid/mod.rs b/src/eth/storage/hybrid/mod.rs index e94da4ce8..e55888d02 100644 --- a/src/eth/storage/hybrid/mod.rs +++ b/src/eth/storage/hybrid/mod.rs @@ -18,9 +18,6 @@ use sqlx::QueryBuilder; use sqlx::Row; use tokio::sync::mpsc; use tokio::sync::mpsc::channel; -use tokio::sync::RwLock; -use tokio::sync::RwLockReadGuard; -use tokio::sync::RwLockWriteGuard; use tokio::sync::Semaphore; use tokio::sync::SemaphorePermit; use tokio::task::JoinSet; @@ -57,7 +54,7 @@ struct BlockTask { #[derive(Debug)] pub struct HybridPermanentStorage { - state: RwLock, //XXX TODO remove RwLock when rocksdb is implemented everywhere + state: HybridStorageState, //XXX TODO remove RwLock when rocksdb is implemented everywhere pool: Arc>, block_number: AtomicU64, task_sender: mpsc::Sender, @@ -98,8 +95,8 @@ impl HybridPermanentStorage { }); let block_number = Self::preload_block_number(connection_pool.clone()).await?; - let state = RwLock::new(HybridStorageState::new()); - state.write().await.load_latest_data(&pool).await?; + let state = HybridStorageState::new(); + state.load_latest_data(&pool).await?; Ok(Self { state, block_number, @@ -143,36 +140,21 @@ impl HybridPermanentStorage { } } - // ------------------------------------------------------------------------- - // Lock methods - // ------------------------------------------------------------------------- - - /// Locks inner state for reading. - async fn lock_read(&self) -> RwLockReadGuard<'_, HybridStorageState> { - self.state.read().await - } - - /// Locks inner state for writing. - async fn lock_write(&self) -> RwLockWriteGuard<'_, HybridStorageState> { - self.state.write().await - } - // ------------------------------------------------------------------------- // State methods // ------------------------------------------------------------------------- /// Clears in-memory state. pub async fn clear(&self) { - let state = self.lock_write().await; - let _ = state.accounts.clear(); - let _ = state.accounts_history.clear(); - let _ = state.account_slots.clear(); - let _ = state.account_slots_history.clear(); - - state.transactions.clear().unwrap(); - state.blocks_by_hash.clear().unwrap(); - state.blocks_by_number.clear().unwrap(); - state.logs.clear().unwrap(); + let _ = self.state.accounts.clear(); + let _ = self.state.accounts_history.clear(); + let _ = self.state.account_slots.clear(); + let _ = self.state.account_slots_history.clear(); + + self.state.transactions.clear().unwrap(); + self.state.blocks_by_hash.clear().unwrap(); + self.state.blocks_by_number.clear().unwrap(); + self.state.logs.clear().unwrap(); } async fn check_conflicts(state: &HybridStorageState, account_changes: &[ExecutionAccountChanges]) -> Option { @@ -238,10 +220,9 @@ impl PermanentStorage for HybridPermanentStorage { async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result> { //XXX TODO deal with point_in_time first, e.g create to_account at hybrid_accounts_slots - let state = self.state.read().await; let account = match point_in_time { StoragePointInTime::Present => { - match state.accounts.get(address) { + match self.state.accounts.get(address) { Some(inmemory_account) => { let account = inmemory_account.to_account(address).await; tracing::trace!(%address, ?account, "account found"); @@ -256,7 +237,8 @@ impl PermanentStorage for HybridPermanentStorage { } } StoragePointInTime::Past(block_number) => { - if let Some(((addr, _), account_info)) = state + if let Some(((addr, _), account_info)) = self + .state .accounts_history .iter_from((address.clone(), *block_number), rocksdb::Direction::Reverse) .next() @@ -294,19 +276,18 @@ impl PermanentStorage for HybridPermanentStorage { async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result> { tracing::debug!(%address, %slot_index, ?point_in_time, "reading slot"); - self.state.read().await.get_slot_at_point(address, slot_index, point_in_time, &self.pool).await + self.state.get_slot_at_point(address, slot_index, point_in_time, &self.pool).await } async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result> { // TODO read from pg if not in memory tracing::debug!(?selection, "reading block"); - let state_lock = self.lock_read().await; let block = match selection { - BlockSelection::Latest => state_lock.blocks_by_number.iter_end().next().map(|(_, block)| block), - BlockSelection::Earliest => state_lock.blocks_by_number.iter_start().next().map(|(_, block)| block), - BlockSelection::Number(number) => state_lock.blocks_by_number.get(number), - BlockSelection::Hash(hash) => state_lock.blocks_by_hash.get(hash), + BlockSelection::Latest => self.state.blocks_by_number.iter_end().next().map(|(_, block)| block), + BlockSelection::Earliest => self.state.blocks_by_number.iter_start().next().map(|(_, block)| block), + BlockSelection::Number(number) => self.state.blocks_by_number.get(number), + BlockSelection::Hash(hash) => self.state.blocks_by_hash.get(hash), }; match block { Some(block) => { @@ -337,9 +318,8 @@ impl PermanentStorage for HybridPermanentStorage { async fn read_mined_transaction(&self, hash: &Hash) -> anyhow::Result> { tracing::debug!(%hash, "reading transaction"); - let state_lock = self.lock_read().await; - match state_lock.transactions.get(hash) { + match self.state.transactions.get(hash) { Some(transaction) => { tracing::trace!(%hash, "transaction found in memory"); Ok(Some(transaction.clone())) @@ -373,33 +353,31 @@ impl PermanentStorage for HybridPermanentStorage { async fn read_logs(&self, filter: &LogFilter) -> anyhow::Result> { tracing::debug!(?filter, "reading logs"); - self.state.read().await.read_logs(filter, &self.pool).await + self.state.read_logs(filter, &self.pool).await } async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError> { - let mut state = self.state.write().await; - // check conflicts before persisting any state changes let account_changes = block.compact_account_changes(); - if let Some(conflicts) = Self::check_conflicts(&state, &account_changes).await { + if let Some(conflicts) = Self::check_conflicts(&self.state, &account_changes).await { return Err(StorageError::Conflict(conflicts)); } for transaction in block.transactions.clone() { - state.transactions.insert(transaction.input.hash.clone(), transaction.clone()); + self.state.transactions.insert(transaction.input.hash.clone(), transaction.clone()); for log in transaction.logs { - state.logs.insert((transaction.input.hash.clone(), log.log_index), log.clone()); + self.state.logs.insert((transaction.input.hash.clone(), log.log_index), log.clone()); } } // save block let number = block.number(); let hash = block.hash().clone(); - state.blocks_by_number.insert(*number, block.clone()); - state.blocks_by_hash.insert(hash.clone(), block.clone()); + self.state.blocks_by_number.insert(*number, block.clone()); + self.state.blocks_by_hash.insert(hash.clone(), block.clone()); //XXX deal with errors later - let _ = state.update_state_with_execution_changes(&account_changes, *number).await; + let _ = self.state.update_state_with_execution_changes(&account_changes, *number).await; let block_task = BlockTask { block_number: *number, @@ -421,10 +399,9 @@ impl PermanentStorage for HybridPermanentStorage { sleep(Duration::from_secs(2)).await; tracing::debug!(?accounts, "saving initial accounts"); - let state = self.state.write().await; let mut accounts_changes = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()); for account in accounts { - state.accounts.insert( + self.state.accounts.insert( account.address.clone(), AccountInfo { balance: account.balance.clone(), @@ -434,7 +411,7 @@ impl PermanentStorage for HybridPermanentStorage { }, ); - state.accounts_history.insert( + self.state.accounts_history.insert( (account.address.clone(), 0.into()), AccountInfo { balance: account.balance.clone(), @@ -482,13 +459,12 @@ impl PermanentStorage for HybridPermanentStorage { }); // remove blocks - let mut state = self.lock_write().await; - state.blocks_by_hash.clear()?; - state.blocks_by_number.clear()?; + self.state.blocks_by_hash.clear()?; + self.state.blocks_by_number.clear()?; // remove transactions and logs - state.transactions.clear()?; - state.logs.clear()?; + self.state.transactions.clear()?; + self.state.logs.clear()?; let _ = self.tasks_pending.acquire().await.expect("semaphore has closed"); @@ -508,13 +484,12 @@ impl PermanentStorage for HybridPermanentStorage { .execute(&*self.pool) .await?; - let _ = state.accounts.clear(); - let _ = state.accounts_history.clear(); - - let _ = state.account_slots.clear(); - let _ = state.account_slots_history.clear(); + let _ = self.state.accounts.clear(); + let _ = self.state.accounts_history.clear(); + let _ = self.state.account_slots.clear(); + let _ = self.state.account_slots_history.clear(); - state.load_latest_data(&self.pool).await?; + self.state.load_latest_data(&self.pool).await?; Ok(()) }