Skip to content

Commit

Permalink
enha and fix: possible storage bug and remove states vec from inmemor…
Browse files Browse the repository at this point in the history
…y temprary storage
  • Loading branch information
carneiro-cw committed Dec 6, 2024
1 parent 2f05cb1 commit e3ccb0e
Showing 1 changed file with 78 additions and 123 deletions.
201 changes: 78 additions & 123 deletions src/eth/storage/temporary/inmemory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
use std::collections::HashMap;
use std::sync::RwLock;
use std::sync::RwLockReadGuard;
use std::sync::RwLockWriteGuard;

use nonempty::NonEmpty;

use crate::eth::executor::EvmInput;
use crate::eth::primitives::Account;
Expand All @@ -28,33 +24,56 @@ use crate::eth::primitives::UnixTimeNow;
use crate::eth::storage::AccountWithSlots;
use crate::eth::storage::TemporaryStorage;

/// Number of previous blocks to keep inmemory to detect conflicts between different blocks.
const MAX_BLOCKS: usize = 1;

#[derive(Debug)]
pub struct InMemoryTemporaryStorage {
/// TODO: very inneficient, it is O(N), but it should be 0(1)
pub states: RwLock<NonEmpty<InMemoryTemporaryStorageState>>,
pub pending_block: RwLock<InMemoryTemporaryStorageState>,
pub latest_block: RwLock<Option<InMemoryTemporaryStorageState>>,
}

impl InMemoryTemporaryStorage {
pub fn new(block_number: BlockNumber) -> Self {
Self {
states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState {
pending_block: RwLock::new(InMemoryTemporaryStorageState {
block: PendingBlock::new_at_now(block_number),
accounts: HashMap::default(),
})),
}),
latest_block: RwLock::new(None),
}
}

/// Locks inner state for reading.
pub fn lock_read(&self) -> RwLockReadGuard<'_, NonEmpty<InMemoryTemporaryStorageState>> {
self.states.read().unwrap()
}
fn check_conflicts(&self, execution: &EvmExecution) -> anyhow::Result<Option<ExecutionConflicts>> {
let mut conflicts = ExecutionConflictsBuilder::default();

/// Locks inner state for writing.
pub fn lock_write(&self) -> RwLockWriteGuard<'_, NonEmpty<InMemoryTemporaryStorageState>> {
self.states.write().unwrap()
for (&address, change) in &execution.changes {
// check account info conflicts
if let Some(account) = self.read_account(address)? {
if let Some(expected) = change.nonce.take_original_ref() {
let original = &account.nonce;
if expected != original {
conflicts.add_nonce(address, *original, *expected);
}
}
if let Some(expected) = change.balance.take_original_ref() {
let original = &account.balance;
if expected != original {
conflicts.add_balance(address, *original, *expected);
}
}
}

// check slots conflicts
for (&slot_index, slot_change) in &change.slots {
if let Some(expected) = slot_change.take_original_ref() {
let Some(original) = self.read_slot(address, slot_index)? else {
continue;
};
if expected.value != original.value {
conflicts.add_slot(address, slot_index, original.value, expected.value);
}
}
}
}
Ok(conflicts.build())
}
}

Expand Down Expand Up @@ -92,8 +111,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

// Uneeded clone here, return Cow
fn read_pending_block_header(&self) -> PendingBlockHeader {
let states = self.lock_read();
states.head.block.header.clone()
self.pending_block.read().unwrap().block.header.clone()
}

// -------------------------------------------------------------------------
Expand All @@ -102,9 +120,9 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

fn save_pending_execution(&self, tx: TransactionExecution, check_conflicts: bool) -> Result<(), StratusError> {
// check conflicts
let mut states = self.lock_write();
let mut pending_block = self.pending_block.write().unwrap();
if let TransactionExecution::Local(tx) = &tx {
let expected_input = EvmInput::from_eth_transaction(&tx.input, &states.head.block.header);
let expected_input = EvmInput::from_eth_transaction(&tx.input, &pending_block.block.header);

if expected_input != tx.evm_input {
return Err(StratusError::TransactionEvmInputMismatch {
Expand All @@ -115,16 +133,15 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

if check_conflicts {
if let Some(conflicts) = do_check_conflicts(&states, tx.execution()) {
if let Some(conflicts) = self.check_conflicts(tx.execution())? {
return Err(StratusError::TransactionConflict(conflicts.into()));
}
}

// save account changes
let changes = tx.execution().changes.values();
for change in changes {
let account = states
.head
let account = pending_block
.accounts
.entry(change.address)
.or_insert_with(|| AccountWithSlots::new(change.address));
Expand All @@ -151,23 +168,22 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

// save execution
states.head.block.push_transaction(tx);
pending_block.block.push_transaction(tx);

Ok(())
}

fn read_pending_executions(&self) -> Vec<TransactionExecution> {
self.lock_read().head.block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
self.pending_block.read().unwrap().block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
}

/// TODO: we cannot allow more than one pending block. Where to put this check?
fn finish_pending_block(&self) -> anyhow::Result<PendingBlock> {
let mut states = self.lock_write();
let mut pending_block = self.pending_block.write().unwrap();

#[cfg(feature = "dev")]
let mut finished_block = states.head.block.clone();
let mut finished_block = pending_block.block.clone();
#[cfg(not(feature = "dev"))]
let finished_block = states.head.block.clone();
let finished_block = pending_block.block.clone();

#[cfg(feature = "dev")]
{
Expand All @@ -178,20 +194,18 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}
}

// remove last state if reached limit
if states.len() + 1 >= MAX_BLOCKS {
let _ = states.pop();
}

// create new state
states.insert(0, InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()));
let mut latest = self.latest_block.write().unwrap();
*latest = Some(std::mem::replace(
&mut *pending_block,
InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()),
));

Ok(finished_block)
}

fn read_pending_execution(&self, hash: Hash) -> anyhow::Result<Option<TransactionExecution>> {
let states = self.lock_read();
match states.head.block.transactions.get(&hash) {
let pending_block = self.pending_block.read().unwrap();
match pending_block.block.transactions.get(&hash) {
Some(tx) => Ok(Some(tx.clone())),
None => Ok(None),
}
Expand All @@ -202,98 +216,39 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
// -------------------------------------------------------------------------

fn read_account(&self, address: Address) -> anyhow::Result<Option<Account>> {
let states = self.lock_read();
Ok(do_read_account(&states, address))
Ok(self
.pending_block
.read()
.unwrap()
.accounts
.get(&address)
.or(self.latest_block.read().unwrap().as_ref().and_then(|latest| latest.accounts.get(&address)))
.map(|account| account.info.clone()))
}

fn read_slot(&self, address: Address, index: SlotIndex) -> anyhow::Result<Option<Slot>> {
let states = self.lock_read();
Ok(do_read_slot(&states, address, index))
Ok(self
.pending_block
.read()
.unwrap()
.accounts
.get(&address)
.and_then(|account| account.slots.get(&index))
.or(self
.latest_block
.read()
.unwrap()
.as_ref()
.and_then(|latest| latest.accounts.get(&address).and_then(|account| account.slots.get(&index))))
.copied())
}

// -------------------------------------------------------------------------
// Global state
// -------------------------------------------------------------------------
fn reset(&self) -> anyhow::Result<()> {
let mut state = self.lock_write();
state.tail.clear();
state.head.reset();
self.pending_block.write().unwrap().reset();
*self.latest_block.write().unwrap() = None;
Ok(())
}
}

// -----------------------------------------------------------------------------
// Implementations without lock
// -----------------------------------------------------------------------------
fn do_read_account(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Address) -> Option<Account> {
// search all
for state in states.iter() {
let Some(account) = state.accounts.get(&address) else { continue };

let info = account.info.clone();
let account = Account {
address: info.address,
balance: info.balance,
nonce: info.nonce,
bytecode: info.bytecode,
code_hash: info.code_hash,
};

tracing::trace!(%address, ?account, "account found");
return Some(account);
}

// not found
tracing::trace!(%address, "account not found");
None
}

fn do_read_slot(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Address, index: SlotIndex) -> Option<Slot> {
let slot = states
.iter()
.find_map(|state| state.accounts.get(&address).and_then(|account| account.slots.get(&index)));

if let Some(&slot) = slot {
tracing::trace!(%address, %index, %slot, "slot found in temporary");
Some(slot)
} else {
tracing::trace!(%address, %index, "slot not found in temporary");
None
}
}

fn do_check_conflicts(states: &NonEmpty<InMemoryTemporaryStorageState>, execution: &EvmExecution) -> Option<ExecutionConflicts> {
let mut conflicts = ExecutionConflictsBuilder::default();

for (&address, change) in &execution.changes {
// check account info conflicts
if let Some(account) = do_read_account(states, address) {
if let Some(expected) = change.nonce.take_original_ref() {
let original = &account.nonce;
if expected != original {
conflicts.add_nonce(address, *original, *expected);
}
}
if let Some(expected) = change.balance.take_original_ref() {
let original = &account.balance;
if expected != original {
conflicts.add_balance(address, *original, *expected);
}
}
}

// check slots conflicts
for (&slot_index, slot_change) in &change.slots {
if let Some(expected) = slot_change.take_original_ref() {
let Some(original) = do_read_slot(states, address, slot_index) else {
continue;
};
if expected.value != original.value {
conflicts.add_slot(address, slot_index, original.value, expected.value);
}
}
}
}

conflicts.build()
}

0 comments on commit e3ccb0e

Please sign in to comment.