Skip to content

Commit

Permalink
refactoring: renaming storage functions and params (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored Apr 12, 2024
1 parent f19ba5a commit 0332dac
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 121 deletions.
34 changes: 15 additions & 19 deletions src/eth/storage/inmemory/inmemory_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl PermanentStorage for InMemoryPermanentStorage {
// State operations
// ------------------------------------------------------------------------

async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>> {
async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>> {
tracing::debug!(%address, "reading account");

let state = self.lock_read().await;
Expand All @@ -212,29 +212,38 @@ impl PermanentStorage for InMemoryPermanentStorage {
}
}

async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %slot_index, ?point_in_time, "reading slot");
async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %index, ?point_in_time, "reading slot");

let state = self.lock_read().await;
let Some(account) = state.accounts.get(address) else {
tracing::trace!(%address, "account not found");
return Ok(Default::default());
};

match account.slots.get(slot_index) {
match account.slots.get(index) {
Some(slot_history) => {
let slot = slot_history.get_at_point(point_in_time).unwrap_or_default();
tracing::trace!(%address, %slot_index, ?point_in_time, %slot, "slot found");
tracing::trace!(%address, %index, ?point_in_time, %slot, "slot found");
Ok(Some(slot))
}

None => {
tracing::trace!(%address, %slot_index, ?point_in_time, "slot not found");
tracing::trace!(%address, %index, ?point_in_time, "slot not found");
Ok(None)
}
}
}

async fn read_slots(
&self,
_address: &Address,
_indexes: &[SlotIndex],
_point_in_time: &StoragePointInTime,
) -> anyhow::Result<HashMap<SlotIndex, SlotValue>> {
todo!()
}

async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result<Option<Block>> {
tracing::debug!(?selection, "reading block");

Expand Down Expand Up @@ -356,10 +365,6 @@ impl PermanentStorage for InMemoryPermanentStorage {
Ok(())
}

async fn after_commit_hook(&self) -> anyhow::Result<()> {
Ok(())
}

async fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<()> {
tracing::debug!(?accounts, "saving initial accounts");

Expand Down Expand Up @@ -435,15 +440,6 @@ impl PermanentStorage for InMemoryPermanentStorage {
}
}
}

async fn read_slots(
&self,
_address: &Address,
_slot_indexes: &[SlotIndex],
_point_in_time: &StoragePointInTime,
) -> anyhow::Result<HashMap<SlotIndex, SlotValue>> {
todo!()
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
Expand Down
12 changes: 6 additions & 6 deletions src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
Ok(state.active_block_number)
}

async fn maybe_read_account(&self, address: &Address) -> anyhow::Result<Option<Account>> {
async fn read_account(&self, address: &Address) -> anyhow::Result<Option<Account>> {
tracing::debug!(%address, "reading account");

let state = self.lock_read().await;
Expand All @@ -90,23 +90,23 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}
}

async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %slot_index, "reading slot");
async fn read_slot(&self, address: &Address, index: &SlotIndex) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %index, "reading slot");

let state = self.lock_read().await;
let Some(account) = state.accounts.get(address) else {
tracing::trace!(%address, "account not found");
return Ok(Default::default());
};

match account.slots.get(slot_index) {
match account.slots.get(index) {
Some(slot) => {
tracing::trace!(%address, %slot_index, %slot, "slot found");
tracing::trace!(%address, %index, %slot, "slot found");
Ok(Some(slot.clone()))
}

None => {
tracing::trace!(%address, %slot_index, "slot not found");
tracing::trace!(%address, %index, "slot not found");
Ok(None)
}
}
Expand Down
18 changes: 5 additions & 13 deletions src/eth/storage/permanent_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ pub trait PermanentStorage: Send + Sync {
async fn increment_block_number(&self) -> anyhow::Result<BlockNumber>;

/// Retrieves an account from the storage. Returns Option when not found.
async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>>;
async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>>;

/// Retrieves an slot from the storage. Returns Option when not found.
async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>>;
async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>>;

/// Retrieves several slots at once.
async fn read_slots(&self, address: &Address, indexes: &[SlotIndex], point_in_time: &StoragePointInTime) -> anyhow::Result<HashMap<SlotIndex, SlotValue>>;

/// Retrieves a block from the storage.
async fn read_block(&self, block_selection: &BlockSelection) -> anyhow::Result<Option<Block>>;
Expand All @@ -53,9 +56,6 @@ pub trait PermanentStorage: Send + Sync {
/// Persists atomically all changes from a block.
async fn save_block(&self, block: Block) -> anyhow::Result<(), StorageError>;

/// Run after block commit callbacks.
async fn after_commit_hook(&self) -> anyhow::Result<()>;

/// Persists initial accounts (test accounts or genesis accounts).
async fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<()>;

Expand All @@ -64,12 +64,4 @@ pub trait PermanentStorage: Send + Sync {

/// Retrieves a random sample of slots, from the provided start and end blocks.
async fn read_slots_sample(&self, start: BlockNumber, end: BlockNumber, max_samples: u64, seed: u64) -> anyhow::Result<Vec<SlotSample>>;

/// Retrieves several slots at once
async fn read_slots(
&self,
address: &Address,
slot_indexes: &[SlotIndex],
point_in_time: &StoragePointInTime,
) -> anyhow::Result<HashMap<SlotIndex, SlotValue>>;
}
65 changes: 29 additions & 36 deletions src/eth/storage/postgres_permanent/postgres_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl PermanentStorage for PostgresPermanentStorage {
Ok(())
}

async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>> {
async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>> {
tracing::debug!(%address, "reading account");

let mut conn = PoolOrThreadConnection::take(&self.pool).await?;
Expand Down Expand Up @@ -158,11 +158,11 @@ impl PermanentStorage for PostgresPermanentStorage {
}
}

async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %slot_index, "reading slot");
async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %index, "reading slot");

// TODO: improve this conversion
let slot_index_u8: [u8; 32] = slot_index.clone().into();
let slot_index_u8: [u8; 32] = index.clone().into();

let mut conn = PoolOrThreadConnection::take(&self.pool).await?;
let slot_value_vec: Option<Vec<u8>> = match point_in_time {
Expand Down Expand Up @@ -193,19 +193,41 @@ impl PermanentStorage for PostgresPermanentStorage {
Some(slot_value_vec) => {
let slot_value = SlotValue::from(slot_value_vec);
let slot = Slot {
index: slot_index.clone(),
index: index.clone(),
value: slot_value,
};
tracing::trace!(?address, ?slot_index, %slot, "slot found");
tracing::trace!(?address, ?index, %slot, "slot found");
Ok(Some(slot))
}
None => {
tracing::trace!(?address, ?slot_index, ?point_in_time, "slot not found");
tracing::trace!(?address, ?index, ?point_in_time, "slot not found");
Ok(None)
}
}
}

async fn read_slots(&self, address: &Address, indexes: &[SlotIndex], point_in_time: &StoragePointInTime) -> anyhow::Result<HashMap<SlotIndex, SlotValue>> {
tracing::debug!(%address, indexes_len = %indexes.len(), "reading slots");

let slots = match point_in_time {
StoragePointInTime::Present =>
sqlx::query_file_as!(Slot, "src/eth/storage/postgres_permanent/sql/select_slots.sql", indexes as _, address as _)
.fetch_all(&self.pool)
.await?,
StoragePointInTime::Past(block_number) =>
sqlx::query_file_as!(
Slot,
"src/eth/storage/postgres_permanent/sql/select_historical_slots.sql",
indexes as _,
address as _,
block_number as _
)
.fetch_all(&self.pool)
.await?,
};
Ok(slots.into_iter().map(|slot| (slot.index, slot.value)).collect())
}

async fn read_block(&self, block: &BlockSelection) -> anyhow::Result<Option<Block>> {
tracing::debug!(block = ?block, "reading block");

Expand Down Expand Up @@ -751,10 +773,6 @@ impl PermanentStorage for PostgresPermanentStorage {
Ok(())
}

async fn after_commit_hook(&self) -> anyhow::Result<()> {
Ok(())
}

async fn read_mined_block_number(&self) -> anyhow::Result<BlockNumber> {
tracing::debug!("reading current block number");

Expand Down Expand Up @@ -861,31 +879,6 @@ impl PermanentStorage for PostgresPermanentStorage {

Ok(slots_sample_rows)
}

async fn read_slots(
&self,
address: &Address,
slot_indexes: &[SlotIndex],
point_in_time: &StoragePointInTime,
) -> anyhow::Result<HashMap<SlotIndex, SlotValue>> {
let slots = match point_in_time {
StoragePointInTime::Present =>
sqlx::query_file_as!(Slot, "src/eth/storage/postgres_permanent/sql/select_slots.sql", slot_indexes as _, address as _)
.fetch_all(&self.pool)
.await?,
StoragePointInTime::Past(block_number) =>
sqlx::query_file_as!(
Slot,
"src/eth/storage/postgres_permanent/sql/select_historical_slots.sql",
slot_indexes as _,
address as _,
block_number as _
)
.fetch_all(&self.pool)
.await?,
};
Ok(slots.into_iter().map(|slot| (slot.index, slot.value)).collect())
}
}

fn partition_logs(logs: impl IntoIterator<Item = PostgresLog>) -> HashMap<TransactionHash, Vec<PostgresLog>> {
Expand Down
30 changes: 13 additions & 17 deletions src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,22 @@ impl PermanentStorage for RocksPermanentStorage {
// State operations
// ------------------------------------------------------------------------

async fn maybe_read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>> {
async fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>> {
Ok(self.state.read_account(address, point_in_time))
}

async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %slot_index, ?point_in_time, "reading slot");
Ok(self.state.read_slot(address, slot_index, point_in_time))
async fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %index, ?point_in_time, "reading slot");
Ok(self.state.read_slot(address, index, point_in_time))
}

async fn read_slots(
&self,
_address: &Address,
_indexes: &[SlotIndex],
_point_in_time: &StoragePointInTime,
) -> anyhow::Result<HashMap<SlotIndex, SlotValue>> {
todo!()
}

async fn read_block(&self, selection: &BlockSelection) -> anyhow::Result<Option<Block>> {
Expand Down Expand Up @@ -213,10 +222,6 @@ impl PermanentStorage for RocksPermanentStorage {
Ok(())
}

async fn after_commit_hook(&self) -> anyhow::Result<()> {
Ok(())
}

async fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<()> {
tracing::debug!(?accounts, "saving initial accounts");

Expand Down Expand Up @@ -262,13 +267,4 @@ impl PermanentStorage for RocksPermanentStorage {
async fn read_slots_sample(&self, _start: BlockNumber, _end: BlockNumber, _max_samples: u64, _seed: u64) -> anyhow::Result<Vec<SlotSample>> {
todo!()
}

async fn read_slots(
&self,
_address: &Address,
_slot_indexes: &[SlotIndex],
_point_in_time: &StoragePointInTime,
) -> anyhow::Result<HashMap<SlotIndex, SlotValue>> {
todo!()
}
}
14 changes: 7 additions & 7 deletions src/eth/storage/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,20 +341,20 @@ impl RocksStorageState {
.collect()
}

pub fn read_slot(&self, address: &Address, slot_index: &SlotIndex, point_in_time: &StoragePointInTime) -> Option<Slot> {
pub fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> Option<Slot> {
match point_in_time {
StoragePointInTime::Present => self.account_slots.get(&(address.clone(), slot_index.clone())).map(|account_slot_value| Slot {
index: slot_index.clone(),
StoragePointInTime::Present => self.account_slots.get(&(address.clone(), index.clone())).map(|account_slot_value| Slot {
index: index.clone(),
value: account_slot_value.clone(),
}),
StoragePointInTime::Past(number) => {
if let Some(((addr, index, _), value)) = self
if let Some(((rocks_address, rocks_index, _), value)) = self
.account_slots_history
.iter_from((address.clone(), slot_index.clone(), *number), rocksdb::Direction::Reverse)
.iter_from((address.clone(), index.clone(), *number), rocksdb::Direction::Reverse)
.next()
{
if slot_index == &index && address == &addr {
return Some(Slot { index, value });
if index == &rocks_index && address == &rocks_address {
return Some(Slot { index: rocks_index, value });
}
}
None
Expand Down
10 changes: 5 additions & 5 deletions src/eth/storage/rocks/rocks_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,28 @@ impl TemporaryStorage for RocksTemporary {
Ok(Some(self.current_block.load(Ordering::SeqCst).into()))
}

async fn maybe_read_account(&self, address: &Address) -> anyhow::Result<Option<Account>> {
async fn read_account(&self, address: &Address) -> anyhow::Result<Option<Account>> {
tracing::debug!(%address, "reading account");

// try temporary data
let account = self.temp.maybe_read_account(address).await?;
let account = self.temp.read_account(address).await?;
if let Some(account) = account {
return Ok(Some(account));
}

Ok(self.db.read_account(address, &StoragePointInTime::Present))
}

async fn maybe_read_slot(&self, address: &Address, slot_index: &SlotIndex) -> anyhow::Result<Option<Slot>> {
async fn read_slot(&self, address: &Address, index: &SlotIndex) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, "reading slot");

// try temporary data
let slot = self.temp.maybe_read_slot(address, slot_index).await?;
let slot = self.temp.read_slot(address, index).await?;
if let Some(slot) = slot {
return Ok(Some(slot));
}

Ok(self.db.read_slot(address, slot_index, &StoragePointInTime::Present))
Ok(self.db.read_slot(address, index, &StoragePointInTime::Present))
}

async fn save_account_changes(&self, changes: Vec<ExecutionAccountChanges>) -> anyhow::Result<()> {
Expand Down
Loading

0 comments on commit 0332dac

Please sign in to comment.