Skip to content

Commit

Permalink
chore: remove lock from hybrid storage (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Mar 28, 2024
1 parent 148ad84 commit ab86074
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 66 deletions.
4 changes: 2 additions & 2 deletions src/eth/storage/hybrid/hybrid_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl HybridStorageState {

//XXX TODO use a fixed block_number during load, in order to avoid sync problem
// e.g other instance moving forward and this query getting incongruous data
pub async fn load_latest_data(&mut self, pool: &Pool<Postgres>) -> anyhow::Result<()> {
pub async fn load_latest_data(&self, pool: &Pool<Postgres>) -> anyhow::Result<()> {
let account_rows = sqlx::query_as!(
AccountRow,
"
Expand Down Expand Up @@ -145,7 +145,7 @@ impl HybridStorageState {
}

/// Updates the in-memory state with changes from transaction execution
pub async fn update_state_with_execution_changes(&mut self, changes: &[ExecutionAccountChanges], block_number: BlockNumber) -> Result<(), sqlx::Error> {
pub async fn update_state_with_execution_changes(&self, changes: &[ExecutionAccountChanges], block_number: BlockNumber) -> Result<(), sqlx::Error> {
// Directly capture the fields needed by each future from `self`
let accounts = &self.accounts;
let accounts_history = &self.accounts_history;
Expand Down
103 changes: 39 additions & 64 deletions src/eth/storage/hybrid/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ use sqlx::QueryBuilder;
use sqlx::Row;
use tokio::sync::mpsc;
use tokio::sync::mpsc::channel;
use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;
use tokio::sync::RwLockWriteGuard;
use tokio::sync::Semaphore;
use tokio::sync::SemaphorePermit;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -57,7 +54,7 @@ struct BlockTask {

#[derive(Debug)]
pub struct HybridPermanentStorage {
state: RwLock<HybridStorageState>, //XXX TODO remove RwLock when rocksdb is implemented everywhere
state: HybridStorageState, //XXX TODO remove RwLock when rocksdb is implemented everywhere
pool: Arc<Pool<Postgres>>,
block_number: AtomicU64,
task_sender: mpsc::Sender<BlockTask>,
Expand Down Expand Up @@ -98,8 +95,8 @@ impl HybridPermanentStorage {
});

let block_number = Self::preload_block_number(connection_pool.clone()).await?;
let state = RwLock::new(HybridStorageState::new());
state.write().await.load_latest_data(&pool).await?;
let state = HybridStorageState::new();
state.load_latest_data(&pool).await?;
Ok(Self {
state,
block_number,
Expand Down Expand Up @@ -143,36 +140,21 @@ impl HybridPermanentStorage {
}
}

// -------------------------------------------------------------------------
// Lock methods
// -------------------------------------------------------------------------

/// Locks inner state for reading.
async fn lock_read(&self) -> RwLockReadGuard<'_, HybridStorageState> {
self.state.read().await
}

/// Locks inner state for writing.
async fn lock_write(&self) -> RwLockWriteGuard<'_, HybridStorageState> {
self.state.write().await
}

// -------------------------------------------------------------------------
// State methods
// -------------------------------------------------------------------------

/// Clears in-memory state.
pub async fn clear(&self) {
let state = self.lock_write().await;
let _ = state.accounts.clear();
let _ = state.accounts_history.clear();
let _ = state.account_slots.clear();
let _ = state.account_slots_history.clear();

state.transactions.clear().unwrap();
state.blocks_by_hash.clear().unwrap();
state.blocks_by_number.clear().unwrap();
state.logs.clear().unwrap();
let _ = self.state.accounts.clear();
let _ = self.state.accounts_history.clear();
let _ = self.state.account_slots.clear();
let _ = self.state.account_slots_history.clear();

self.state.transactions.clear().unwrap();
self.state.blocks_by_hash.clear().unwrap();
self.state.blocks_by_number.clear().unwrap();
self.state.logs.clear().unwrap();
}

async fn check_conflicts(state: &HybridStorageState, account_changes: &[ExecutionAccountChanges]) -> Option<ExecutionConflicts> {
Expand Down Expand Up @@ -238,10 +220,9 @@ impl PermanentStorage for HybridPermanentStorage {

async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>> {
//XXX TODO deal with point_in_time first, e.g create to_account at hybrid_accounts_slots
let state = self.state.read().await;
let account = match point_in_time {
StoragePointInTime::Present => {
match state.accounts.get(address) {
match self.state.accounts.get(address) {
Some(inmemory_account) => {
let account = inmemory_account.to_account(address).await;
tracing::trace!(%address, ?account, "account found");
Expand All @@ -256,7 +237,8 @@ impl PermanentStorage for HybridPermanentStorage {
}
}
StoragePointInTime::Past(block_number) => {
if let Some(((addr, _), account_info)) = state
if let Some(((addr, _), account_info)) = self
.state
.accounts_history
.iter_from((address.clone(), *block_number), rocksdb::Direction::Reverse)
.next()
Expand Down Expand Up @@ -294,19 +276,18 @@ impl PermanentStorage for HybridPermanentStorage {

async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %slot_index, ?point_in_time, "reading slot");
self.state.read().await.get_slot_at_point(address, slot_index, point_in_time, &self.pool).await
self.state.get_slot_at_point(address, slot_index, point_in_time, &self.pool).await
}

async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result<Option<Block>> {
// TODO read from pg if not in memory
tracing::debug!(?selection, "reading block");

let state_lock = self.lock_read().await;
let block = match selection {
BlockSelection::Latest => state_lock.blocks_by_number.iter_end().next().map(|(_, block)| block),
BlockSelection::Earliest => state_lock.blocks_by_number.iter_start().next().map(|(_, block)| block),
BlockSelection::Number(number) => state_lock.blocks_by_number.get(number),
BlockSelection::Hash(hash) => state_lock.blocks_by_hash.get(hash),
BlockSelection::Latest => self.state.blocks_by_number.iter_end().next().map(|(_, block)| block),
BlockSelection::Earliest => self.state.blocks_by_number.iter_start().next().map(|(_, block)| block),
BlockSelection::Number(number) => self.state.blocks_by_number.get(number),
BlockSelection::Hash(hash) => self.state.blocks_by_hash.get(hash),
};
match block {
Some(block) => {
Expand Down Expand Up @@ -337,9 +318,8 @@ impl PermanentStorage for HybridPermanentStorage {

async fn read_mined_transaction(&self, hash: &Hash) -> anyhow::Result<Option<TransactionMined>> {
tracing::debug!(%hash, "reading transaction");
let state_lock = self.lock_read().await;

match state_lock.transactions.get(hash) {
match self.state.transactions.get(hash) {
Some(transaction) => {
tracing::trace!(%hash, "transaction found in memory");
Ok(Some(transaction.clone()))
Expand Down Expand Up @@ -373,33 +353,31 @@ impl PermanentStorage for HybridPermanentStorage {

async fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>> {
tracing::debug!(?filter, "reading logs");
self.state.read().await.read_logs(filter, &self.pool).await
self.state.read_logs(filter, &self.pool).await
}

async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError> {
let mut state = self.state.write().await;

// check conflicts before persisting any state changes
let account_changes = block.compact_account_changes();
if let Some(conflicts) = Self::check_conflicts(&state, &account_changes).await {
if let Some(conflicts) = Self::check_conflicts(&self.state, &account_changes).await {
return Err(StorageError::Conflict(conflicts));
}

for transaction in block.transactions.clone() {
state.transactions.insert(transaction.input.hash.clone(), transaction.clone());
self.state.transactions.insert(transaction.input.hash.clone(), transaction.clone());
for log in transaction.logs {
state.logs.insert((transaction.input.hash.clone(), log.log_index), log.clone());
self.state.logs.insert((transaction.input.hash.clone(), log.log_index), log.clone());
}
}

// save block
let number = block.number();
let hash = block.hash().clone();
state.blocks_by_number.insert(*number, block.clone());
state.blocks_by_hash.insert(hash.clone(), block.clone());
self.state.blocks_by_number.insert(*number, block.clone());
self.state.blocks_by_hash.insert(hash.clone(), block.clone());

//XXX deal with errors later
let _ = state.update_state_with_execution_changes(&account_changes, *number).await;
let _ = self.state.update_state_with_execution_changes(&account_changes, *number).await;

let block_task = BlockTask {
block_number: *number,
Expand All @@ -421,10 +399,9 @@ impl PermanentStorage for HybridPermanentStorage {
sleep(Duration::from_secs(2)).await;
tracing::debug!(?accounts, "saving initial accounts");

let state = self.state.write().await;
let mut accounts_changes = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new());
for account in accounts {
state.accounts.insert(
self.state.accounts.insert(
account.address.clone(),
AccountInfo {
balance: account.balance.clone(),
Expand All @@ -434,7 +411,7 @@ impl PermanentStorage for HybridPermanentStorage {
},
);

state.accounts_history.insert(
self.state.accounts_history.insert(
(account.address.clone(), 0.into()),
AccountInfo {
balance: account.balance.clone(),
Expand Down Expand Up @@ -482,13 +459,12 @@ impl PermanentStorage for HybridPermanentStorage {
});

// remove blocks
let mut state = self.lock_write().await;
state.blocks_by_hash.clear()?;
state.blocks_by_number.clear()?;
self.state.blocks_by_hash.clear()?;
self.state.blocks_by_number.clear()?;

// remove transactions and logs
state.transactions.clear()?;
state.logs.clear()?;
self.state.transactions.clear()?;
self.state.logs.clear()?;

let _ = self.tasks_pending.acquire().await.expect("semaphore has closed");

Expand All @@ -508,13 +484,12 @@ impl PermanentStorage for HybridPermanentStorage {
.execute(&*self.pool)
.await?;

let _ = state.accounts.clear();
let _ = state.accounts_history.clear();

let _ = state.account_slots.clear();
let _ = state.account_slots_history.clear();
let _ = self.state.accounts.clear();
let _ = self.state.accounts_history.clear();
let _ = self.state.account_slots.clear();
let _ = self.state.account_slots_history.clear();

state.load_latest_data(&self.pool).await?;
self.state.load_latest_data(&self.pool).await?;

Ok(())
}
Expand Down

0 comments on commit ab86074

Please sign in to comment.