Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enha + fix: possible storage bug and remove states vec from inmemory temporary storage #1904

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 85 additions & 123 deletions src/eth/storage/temporary/inmemory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

use std::collections::HashMap;
use std::sync::RwLock;
use std::sync::RwLockReadGuard;
use std::sync::RwLockWriteGuard;

use nonempty::NonEmpty;

use crate::eth::executor::EvmInput;
use crate::eth::primitives::Account;
Expand All @@ -28,33 +24,56 @@ use crate::eth::primitives::UnixTimeNow;
use crate::eth::storage::AccountWithSlots;
use crate::eth::storage::TemporaryStorage;

/// Number of previous blocks to keep inmemory to detect conflicts between different blocks.
const MAX_BLOCKS: usize = 1;

#[derive(Debug)]
pub struct InMemoryTemporaryStorage {
/// TODO: very inneficient, it is O(N), but it should be 0(1)
pub states: RwLock<NonEmpty<InMemoryTemporaryStorageState>>,
pub pending_block: RwLock<InMemoryTemporaryStorageState>,
pub latest_block: RwLock<Option<InMemoryTemporaryStorageState>>,
}

impl InMemoryTemporaryStorage {
pub fn new(block_number: BlockNumber) -> Self {
Self {
states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState {
pending_block: RwLock::new(InMemoryTemporaryStorageState {
block: PendingBlock::new_at_now(block_number),
accounts: HashMap::default(),
})),
}),
latest_block: RwLock::new(None),
}
}

/// Locks inner state for reading.
pub fn lock_read(&self) -> RwLockReadGuard<'_, NonEmpty<InMemoryTemporaryStorageState>> {
self.states.read().unwrap()
}
fn check_conflicts(&self, execution: &EvmExecution) -> anyhow::Result<Option<ExecutionConflicts>> {
let mut conflicts = ExecutionConflictsBuilder::default();

for (&address, change) in &execution.changes {
// check account info conflicts
if let Some(account) = self.read_account(address)? {
if let Some(expected) = change.nonce.take_original_ref() {
let original = &account.nonce;
if expected != original {
conflicts.add_nonce(address, *original, *expected);
}
}
if let Some(expected) = change.balance.take_original_ref() {
let original = &account.balance;
if expected != original {
conflicts.add_balance(address, *original, *expected);
}
}
}

/// Locks inner state for writing.
pub fn lock_write(&self) -> RwLockWriteGuard<'_, NonEmpty<InMemoryTemporaryStorageState>> {
self.states.write().unwrap()
// check slots conflicts
for (&slot_index, slot_change) in &change.slots {
if let Some(expected) = slot_change.take_original_ref() {
let Some(original) = self.read_slot(address, slot_index)? else {
continue;
};
if expected.value != original.value {
conflicts.add_slot(address, slot_index, original.value, expected.value);
}
}
}
}
Ok(conflicts.build())
}
}

Expand Down Expand Up @@ -92,8 +111,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

// Uneeded clone here, return Cow
fn read_pending_block_header(&self) -> PendingBlockHeader {
let states = self.lock_read();
states.head.block.header.clone()
self.pending_block.read().unwrap().block.header.clone()
}

// -------------------------------------------------------------------------
Expand All @@ -102,9 +120,9 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

fn save_pending_execution(&self, tx: TransactionExecution, check_conflicts: bool) -> Result<(), StratusError> {
// check conflicts
let mut states = self.lock_write();
let mut pending_block = self.pending_block.write().unwrap();
if let TransactionExecution::Local(tx) = &tx {
let expected_input = EvmInput::from_eth_transaction(&tx.input, &states.head.block.header);
let expected_input = EvmInput::from_eth_transaction(&tx.input, &pending_block.block.header);

if expected_input != tx.evm_input {
return Err(StratusError::TransactionEvmInputMismatch {
Expand All @@ -115,16 +133,15 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

if check_conflicts {
if let Some(conflicts) = do_check_conflicts(&states, tx.execution()) {
if let Some(conflicts) = self.check_conflicts(tx.execution())? {
return Err(StratusError::TransactionConflict(conflicts.into()));
}
}

// save account changes
let changes = tx.execution().changes.values();
for change in changes {
let account = states
.head
let account = pending_block
.accounts
.entry(change.address)
.or_insert_with(|| AccountWithSlots::new(change.address));
Expand All @@ -151,23 +168,22 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

// save execution
states.head.block.push_transaction(tx);
pending_block.block.push_transaction(tx);

Ok(())
}

fn read_pending_executions(&self) -> Vec<TransactionExecution> {
self.lock_read().head.block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
self.pending_block.read().unwrap().block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
}

/// TODO: we cannot allow more than one pending block. Where to put this check?
fn finish_pending_block(&self) -> anyhow::Result<PendingBlock> {
let mut states = self.lock_write();
let mut pending_block = self.pending_block.write().unwrap();

#[cfg(feature = "dev")]
let mut finished_block = states.head.block.clone();
let mut finished_block = pending_block.block.clone();
#[cfg(not(feature = "dev"))]
let finished_block = states.head.block.clone();
let finished_block = pending_block.block.clone();

#[cfg(feature = "dev")]
{
Expand All @@ -178,20 +194,18 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}
}

// remove last state if reached limit
if states.len() + 1 >= MAX_BLOCKS {
let _ = states.pop();
}

// create new state
states.insert(0, InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()));
let mut latest = self.latest_block.write().unwrap();
*latest = Some(std::mem::replace(
&mut *pending_block,
InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()),
));

Ok(finished_block)
}

fn read_pending_execution(&self, hash: Hash) -> anyhow::Result<Option<TransactionExecution>> {
let states = self.lock_read();
match states.head.block.transactions.get(&hash) {
let pending_block = self.pending_block.read().unwrap();
match pending_block.block.transactions.get(&hash) {
Some(tx) => Ok(Some(tx.clone())),
None => Ok(None),
}
Expand All @@ -202,98 +216,46 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
// -------------------------------------------------------------------------

fn read_account(&self, address: Address) -> anyhow::Result<Option<Account>> {
let states = self.lock_read();
Ok(do_read_account(&states, address))
Ok(match self.pending_block.read().unwrap().accounts.get(&address) {
Some(pending_account) => Some(pending_account.info.clone()),
None => self
.latest_block
.read()
.unwrap()
.as_ref()
.and_then(|latest| latest.accounts.get(&address))
.map(|account| account.info.clone()),
})
}

fn read_slot(&self, address: Address, index: SlotIndex) -> anyhow::Result<Option<Slot>> {
let states = self.lock_read();
Ok(do_read_slot(&states, address, index))
Ok(
match self
.pending_block
.read()
.unwrap()
.accounts
.get(&address)
.and_then(|account| account.slots.get(&index))
{
Some(pending_slot) => Some(*pending_slot),
None => self
.latest_block
.read()
.unwrap()
.as_ref()
.and_then(|latest| latest.accounts.get(&address).and_then(|account| account.slots.get(&index)))
.copied(),
},
)
}

// -------------------------------------------------------------------------
// Global state
// -------------------------------------------------------------------------
fn reset(&self) -> anyhow::Result<()> {
let mut state = self.lock_write();
state.tail.clear();
state.head.reset();
self.pending_block.write().unwrap().reset();
*self.latest_block.write().unwrap() = None;
Ok(())
}
}

// -----------------------------------------------------------------------------
// Implementations without lock
// -----------------------------------------------------------------------------
fn do_read_account(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Address) -> Option<Account> {
// search all
for state in states.iter() {
let Some(account) = state.accounts.get(&address) else { continue };

let info = account.info.clone();
let account = Account {
address: info.address,
balance: info.balance,
nonce: info.nonce,
bytecode: info.bytecode,
code_hash: info.code_hash,
};

tracing::trace!(%address, ?account, "account found");
return Some(account);
}

// not found
tracing::trace!(%address, "account not found");
None
}

fn do_read_slot(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Address, index: SlotIndex) -> Option<Slot> {
let slot = states
.iter()
.find_map(|state| state.accounts.get(&address).and_then(|account| account.slots.get(&index)));

if let Some(&slot) = slot {
tracing::trace!(%address, %index, %slot, "slot found in temporary");
Some(slot)
} else {
tracing::trace!(%address, %index, "slot not found in temporary");
None
}
}

fn do_check_conflicts(states: &NonEmpty<InMemoryTemporaryStorageState>, execution: &EvmExecution) -> Option<ExecutionConflicts> {
let mut conflicts = ExecutionConflictsBuilder::default();

for (&address, change) in &execution.changes {
// check account info conflicts
if let Some(account) = do_read_account(states, address) {
if let Some(expected) = change.nonce.take_original_ref() {
let original = &account.nonce;
if expected != original {
conflicts.add_nonce(address, *original, *expected);
}
}
if let Some(expected) = change.balance.take_original_ref() {
let original = &account.balance;
if expected != original {
conflicts.add_balance(address, *original, *expected);
}
}
}

// check slots conflicts
for (&slot_index, slot_change) in &change.slots {
if let Some(expected) = slot_change.take_original_ref() {
let Some(original) = do_read_slot(states, address, slot_index) else {
continue;
};
if expected.value != original.value {
conflicts.add_slot(address, slot_index, original.value, expected.value);
}
}
}
}

conflicts.build()
}
Loading