Skip to content

Commit

Permalink
refactor: transfer permanent storage logs from implementations to Str…
Browse files Browse the repository at this point in the history
…atusStorage (#1212)
  • Loading branch information
dinhani-cw authored Jun 21, 2024
1 parent 37b5b69 commit 12cf614
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 55 deletions.
50 changes: 6 additions & 44 deletions src/eth/storage/inmemory/inmemory_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,10 @@ impl PermanentStorage for InMemoryPermanentStorage {
// -------------------------------------------------------------------------

fn read_mined_block_number(&self) -> anyhow::Result<BlockNumber> {
tracing::debug!("reading mined block number");
Ok(self.block_number.load(Ordering::SeqCst).into())
}

fn set_mined_block_number(&self, number: BlockNumber) -> anyhow::Result<()> {
tracing::debug!(%number, "setting mined block number");
self.block_number.store(number.as_u64(), Ordering::SeqCst);
Ok(())
}
Expand All @@ -151,61 +149,43 @@ impl PermanentStorage for InMemoryPermanentStorage {
// ------------------------------------------------------------------------

fn read_account(&self, address: &Address, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Account>> {
tracing::debug!(%address, "reading account");

let state = self.lock_read();

match state.accounts.get(address) {
Some(inmemory_account) => {
let account = inmemory_account.to_account(point_in_time);
tracing::trace!(%address, ?account, "account found");
Ok(Some(account))
}

None => {
tracing::trace!(%address, "account not found");
Ok(None)
}
None => Ok(None),
}
}

fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %index, ?point_in_time, "reading slot in permanent");

let state = self.lock_read();
let Some(account) = state.accounts.get(address) else {
tracing::trace!(%address, "account not found in permanent");
return Ok(Default::default());
};

match account.slots.get(index) {
Some(slot_history) => {
let slot = slot_history.get_at_point(point_in_time).unwrap_or_default();
tracing::trace!(%address, %index, ?point_in_time, %slot, "slot found in permanent");
Ok(Some(slot))
}

None => {
tracing::trace!(%address, %index, ?point_in_time, "slot not found in permanent");
Ok(None)
}
None => Ok(None),
}
}

fn read_all_slots(&self, address: &Address) -> anyhow::Result<Vec<Slot>> {
let state = self.lock_read();

let Some(account) = state.accounts.get(address) else {
tracing::trace!(%address, "account not found in permanent");
return Ok(Default::default());
};

Ok(account.slots.clone().into_values().map(|slot| slot.get_current()).collect())
}

fn read_block(&self, selection: &BlockSelection) -> anyhow::Result<Option<Block>> {
tracing::debug!(?selection, "reading block");

let state_lock = self.lock_read();
let block = match selection {
BlockSelection::Latest => state_lock.blocks_by_number.values().last().cloned(),
Expand All @@ -214,35 +194,21 @@ impl PermanentStorage for InMemoryPermanentStorage {
BlockSelection::Hash(hash) => state_lock.blocks_by_hash.get(hash).cloned(),
};
match block {
Some(block) => {
tracing::trace!(?selection, ?block, "block found");
Ok(Some((*block).clone()))
}
None => {
tracing::trace!(?selection, "block not found");
Ok(None)
}
Some(block) => Ok(Some((*block).clone())),
None => Ok(None),
}
}

fn read_mined_transaction(&self, hash: &Hash) -> anyhow::Result<Option<TransactionMined>> {
tracing::debug!(%hash, "reading transaction");
let state_lock = self.lock_read();

match state_lock.transactions.get(hash) {
Some(transaction) => {
tracing::trace!(%hash, "transaction found");
Ok(Some(transaction.clone()))
}
None => {
tracing::trace!(%hash, "transaction not found");
Ok(None)
}
Some(transaction) => Ok(Some(transaction.clone())),
None => Ok(None),
}
}

fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>> {
tracing::debug!(?filter, "reading logs");
let state_lock = self.lock_read();

let logs = state_lock
Expand All @@ -263,15 +229,13 @@ impl PermanentStorage for InMemoryPermanentStorage {
let mut state = self.lock_write();

// save block
tracing::debug!(number = %block.number(), transactions_len = %block.transactions.len(), "saving block");
let block = Arc::new(block);
let block_number = block.number();
state.blocks_by_number.insert(block_number, Arc::clone(&block));
state.blocks_by_hash.insert(block.hash(), Arc::clone(&block));

// save transactions
for transaction in block.transactions.clone() {
tracing::debug!(hash = %transaction.input.hash, "saving transaction");
state.transactions.insert(transaction.input.hash, transaction.clone());
if transaction.is_success() {
for log in transaction.logs {
Expand Down Expand Up @@ -319,8 +283,6 @@ impl PermanentStorage for InMemoryPermanentStorage {
}

fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<()> {
tracing::debug!(?accounts, "saving initial accounts");

let mut state = self.lock_write();
for account in accounts {
state
Expand Down
6 changes: 0 additions & 6 deletions src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ impl PermanentStorage for RocksPermanentStorage {
}

fn read_slot(&self, address: &Address, index: &SlotIndex, point_in_time: &StoragePointInTime) -> anyhow::Result<Option<Slot>> {
tracing::debug!(%address, %index, ?point_in_time, "reading slot");
Ok(self.state.read_slot(address, index, point_in_time))
}

Expand All @@ -94,12 +93,10 @@ impl PermanentStorage for RocksPermanentStorage {
}

fn read_mined_transaction(&self, hash: &Hash) -> anyhow::Result<Option<TransactionMined>> {
tracing::debug!(%hash, "reading transaction");
self.state.read_transaction(hash)
}

fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>> {
tracing::debug!(?filter, "reading logs");
self.state.read_logs(filter)
}

Expand All @@ -112,14 +109,12 @@ impl PermanentStorage for RocksPermanentStorage {
}

fn save_accounts(&self, accounts: Vec<Account>) -> anyhow::Result<()> {
tracing::debug!(?accounts, "saving initial accounts");
self.state.save_accounts(accounts);
Ok(())
}

fn reset_at(&self, block_number: BlockNumber) -> anyhow::Result<()> {
let block_number_u64 = block_number.as_u64();
tracing::info!(?block_number, "resetting Rocks DB to given block number");

// reset block number
let _ = self.block_number.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Expand All @@ -138,7 +133,6 @@ impl PermanentStorage for RocksPermanentStorage {
}

fn read_all_slots(&self, address: &Address) -> anyhow::Result<Vec<Slot>> {
tracing::info!(?address, "reading all slots");
self.state.read_all_slots(address)
}
}
28 changes: 23 additions & 5 deletions src/eth/storage/stratus_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ impl StratusStorage {

#[tracing::instrument(name = "storage::read_mined_block_number", skip_all)]
pub fn read_mined_block_number(&self) -> anyhow::Result<BlockNumber> {
tracing::debug!(storage = %label::PERM, "reading mined block number");

#[cfg(feature = "metrics")]
{
let start = metrics::now();
Expand Down Expand Up @@ -190,6 +192,7 @@ impl StratusStorage {
#[tracing::instrument(name = "storage::set_mined_block_number", skip_all, fields(number))]
pub fn set_mined_block_number(&self, number: BlockNumber) -> anyhow::Result<()> {
Span::with(|s| s.rec_str("number", &number));
tracing::debug!(storage = %label::PERM, %number, "setting mined block number");

#[cfg(feature = "metrics")]
{
Expand Down Expand Up @@ -233,6 +236,8 @@ impl StratusStorage {
}
}

tracing::debug!(storage = %label::PERM, accounts = ?missing_accounts, "saving initial accounts");

#[cfg(feature = "metrics")]
{
let start = metrics::now();
Expand Down Expand Up @@ -283,6 +288,7 @@ impl StratusStorage {
}

// always read from perm if necessary
tracing::debug!(storage = %label::PERM, %address, "reading account");
match self.perm.read_account(address, point_in_time)? {
Some(account) => {
tracing::debug!(%address, "account found in permanent storage");
Expand Down Expand Up @@ -322,6 +328,7 @@ impl StratusStorage {
}

// always read from perm if necessary
tracing::debug!(storage = %label::PERM, %address, %index, ?point_in_time, "reading slot");
match self.perm.read_slot(address, index, point_in_time)? {
Some(slot) => {
tracing::debug!(%address, %index, value = %slot.value, "slot found in permanent storage");
Expand All @@ -340,6 +347,7 @@ impl StratusStorage {

#[tracing::instrument(name = "storage::read_all_slots", skip_all)]
pub fn read_all_slots(&self, address: &Address) -> anyhow::Result<Vec<Slot>> {
tracing::info!(storage = %label::PERM, %address, "reading all slots");
self.perm.read_all_slots(address)
}

Expand Down Expand Up @@ -391,6 +399,7 @@ impl StratusStorage {
#[tracing::instrument(name = "storage::save_block", skip_all, fields(number))]
pub fn save_block(&self, block: Block) -> anyhow::Result<()> {
Span::with(|s| s.rec_str("number", &block.number()));
tracing::debug!(storage = %label::PERM, number = %block.number(), transactions_len = %block.transactions.len(), "saving block");

#[cfg(feature = "metrics")]
{
Expand All @@ -405,22 +414,25 @@ impl StratusStorage {
}

#[tracing::instrument(name = "storage::read_block", skip_all)]
pub fn read_block(&self, block_selection: &BlockSelection) -> anyhow::Result<Option<Block>> {
pub fn read_block(&self, selection: &BlockSelection) -> anyhow::Result<Option<Block>> {
tracing::debug!(storage = %label::PERM, ?selection, "reading block");

#[cfg(feature = "metrics")]
{
let start = metrics::now();
let result = self.perm.read_block(block_selection);
let result = self.perm.read_block(selection);
metrics::inc_storage_read_block(start.elapsed(), result.is_ok());
result
}

#[cfg(not(feature = "metrics"))]
self.perm.read_block(block_selection)
self.perm.read_block(selection)
}

#[tracing::instrument(name = "storage::read_transaction", skip_all, fields(hash))]
pub fn read_mined_transaction(&self, hash: &Hash) -> anyhow::Result<Option<TransactionMined>> {
Span::with(|s| s.rec_str("hash", hash));
tracing::debug!(storage = %label::PERM, %hash, "reading transaction");

#[cfg(feature = "metrics")]
{
Expand All @@ -436,6 +448,8 @@ impl StratusStorage {

#[tracing::instrument(name = "storage::read_logs", skip_all)]
pub fn read_logs(&self, filter: &LogFilter) -> anyhow::Result<Vec<LogMined>> {
tracing::debug!(storage = %label::PERM, ?filter, "reading logs");

#[cfg(feature = "metrics")]
{
let start = metrics::now();
Expand Down Expand Up @@ -474,11 +488,12 @@ impl StratusStorage {
#[cfg(feature = "metrics")]
{
let start = metrics::now();
tracing::debug!(storage = %label::TEMP, "reseting temporary storage");
tracing::debug!(storage = %label::PERM, "reseting storage");
let result = self.perm.reset_at(number);
metrics::inc_storage_reset(start.elapsed(), label::PERM, result.is_ok());

let start = metrics::now();
tracing::debug!(storage = %label::TEMP, "reseting storage");
let result = self.temp.reset();
metrics::inc_storage_reset(start.elapsed(), label::TEMP, result.is_ok());

Expand All @@ -489,9 +504,12 @@ impl StratusStorage {

#[cfg(not(feature = "metrics"))]
{
tracing::debug!(storage = %label::TEMP, "reseting temporary storage");
tracing::debug!(storage = %label::PERM, "reseting storage");
self.perm.reset_at(number)?;

tracing::debug!(storage = %label::TEMP, "reseting storage");
self.temp.reset()?;

self.set_active_block_number_as_next()?;
Ok(())
}
Expand Down

0 comments on commit 12cf614

Please sign in to comment.