Skip to content

Commit

Permalink
Merge branch 'main' into parking_lot_mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Dec 6, 2024
2 parents 4c7fe29 + d1c86fe commit b625978
Showing 1 changed file with 83 additions and 122 deletions.
205 changes: 83 additions & 122 deletions src/eth/storage/temporary/inmemory.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
//! In-memory storage implementations.
use std::collections::HashMap;

use nonempty::NonEmpty;
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;


use crate::eth::executor::EvmInput;
use crate::eth::primitives::Account;
Expand All @@ -28,33 +26,56 @@ use crate::eth::primitives::UnixTimeNow;
use crate::eth::storage::AccountWithSlots;
use crate::eth::storage::TemporaryStorage;

/// Number of previous blocks to keep inmemory to detect conflicts between different blocks.
const MAX_BLOCKS: usize = 1;

#[derive(Debug)]
pub struct InMemoryTemporaryStorage {
/// TODO: very inneficient, it is O(N), but it should be 0(1)
pub states: RwLock<NonEmpty<InMemoryTemporaryStorageState>>,
pub pending_block: RwLock<InMemoryTemporaryStorageState>,
pub latest_block: RwLock<Option<InMemoryTemporaryStorageState>>,
}

impl InMemoryTemporaryStorage {
pub fn new(block_number: BlockNumber) -> Self {
Self {
states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState {
pending_block: RwLock::new(InMemoryTemporaryStorageState {
block: PendingBlock::new_at_now(block_number),
accounts: HashMap::default(),
})),
}),
latest_block: RwLock::new(None),
}
}

/// Locks inner state for reading.
pub fn lock_read(&self) -> RwLockReadGuard<'_, NonEmpty<InMemoryTemporaryStorageState>> {
self.states.read()
}
fn check_conflicts(&self, execution: &EvmExecution) -> anyhow::Result<Option<ExecutionConflicts>> {
let mut conflicts = ExecutionConflictsBuilder::default();

for (&address, change) in &execution.changes {
// check account info conflicts
if let Some(account) = self.read_account(address)? {
if let Some(expected) = change.nonce.take_original_ref() {
let original = &account.nonce;
if expected != original {
conflicts.add_nonce(address, *original, *expected);
}
}
if let Some(expected) = change.balance.take_original_ref() {
let original = &account.balance;
if expected != original {
conflicts.add_balance(address, *original, *expected);
}
}
}

/// Locks inner state for writing.
pub fn lock_write(&self) -> RwLockWriteGuard<'_, NonEmpty<InMemoryTemporaryStorageState>> {
self.states.write()
// check slots conflicts
for (&slot_index, slot_change) in &change.slots {
if let Some(expected) = slot_change.take_original_ref() {
let Some(original) = self.read_slot(address, slot_index)? else {
continue;
};
if expected.value != original.value {
conflicts.add_slot(address, slot_index, original.value, expected.value);
}
}
}
}
Ok(conflicts.build())
}
}

Expand Down Expand Up @@ -92,8 +113,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

// Uneeded clone here, return Cow
fn read_pending_block_header(&self) -> PendingBlockHeader {
let states = self.lock_read();
states.head.block.header.clone()
self.pending_block.read().block.header.clone()
}

// -------------------------------------------------------------------------
Expand All @@ -102,9 +122,9 @@ impl TemporaryStorage for InMemoryTemporaryStorage {

fn save_pending_execution(&self, tx: TransactionExecution, check_conflicts: bool) -> Result<(), StratusError> {
// check conflicts
let mut states = self.lock_write();
let mut pending_block = self.pending_block.write();
if let TransactionExecution::Local(tx) = &tx {
let expected_input = EvmInput::from_eth_transaction(&tx.input, &states.head.block.header);
let expected_input = EvmInput::from_eth_transaction(&tx.input, &pending_block.block.header);

if expected_input != tx.evm_input {
return Err(StratusError::TransactionEvmInputMismatch {
Expand All @@ -115,16 +135,15 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

if check_conflicts {
if let Some(conflicts) = do_check_conflicts(&states, tx.execution()) {
if let Some(conflicts) = self.check_conflicts(tx.execution())? {
return Err(StratusError::TransactionConflict(conflicts.into()));
}
}

// save account changes
let changes = tx.execution().changes.values();
for change in changes {
let account = states
.head
let account = pending_block
.accounts
.entry(change.address)
.or_insert_with(|| AccountWithSlots::new(change.address));
Expand All @@ -151,23 +170,22 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

// save execution
states.head.block.push_transaction(tx);
pending_block.block.push_transaction(tx);

Ok(())
}

fn read_pending_executions(&self) -> Vec<TransactionExecution> {
self.lock_read().head.block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
self.pending_block.read().block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
}

/// TODO: we cannot allow more than one pending block. Where to put this check?
fn finish_pending_block(&self) -> anyhow::Result<PendingBlock> {
let mut states = self.lock_write();
let mut pending_block = self.pending_block.write();

#[cfg(feature = "dev")]
let mut finished_block = states.head.block.clone();
let mut finished_block = pending_block.block.clone();
#[cfg(not(feature = "dev"))]
let finished_block = states.head.block.clone();
let finished_block = pending_block.block.clone();

#[cfg(feature = "dev")]
{
Expand All @@ -178,20 +196,18 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}
}

// remove last state if reached limit
if states.len() + 1 >= MAX_BLOCKS {
let _ = states.pop();
}

// create new state
states.insert(0, InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()));
let mut latest = self.latest_block.write();
*latest = Some(std::mem::replace(
&mut *pending_block,
InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()),
));

Ok(finished_block)
}

fn read_pending_execution(&self, hash: Hash) -> anyhow::Result<Option<TransactionExecution>> {
let states = self.lock_read();
match states.head.block.transactions.get(&hash) {
let pending_block = self.pending_block.read();
match pending_block.block.transactions.get(&hash) {
Some(tx) => Ok(Some(tx.clone())),
None => Ok(None),
}
Expand All @@ -202,98 +218,43 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
// -------------------------------------------------------------------------

fn read_account(&self, address: Address) -> anyhow::Result<Option<Account>> {
let states = self.lock_read();
Ok(do_read_account(&states, address))
Ok(match self.pending_block.read().accounts.get(&address) {
Some(pending_account) => Some(pending_account.info.clone()),
None => self
.latest_block
.read()
.as_ref()
.and_then(|latest| latest.accounts.get(&address))
.map(|account| account.info.clone()),
})
}

fn read_slot(&self, address: Address, index: SlotIndex) -> anyhow::Result<Option<Slot>> {
let states = self.lock_read();
Ok(do_read_slot(&states, address, index))
Ok(
match self
.pending_block
.read()
.accounts
.get(&address)
.and_then(|account| account.slots.get(&index))
{
Some(pending_slot) => Some(*pending_slot),
None => self
.latest_block
.read()
.as_ref()
.and_then(|latest| latest.accounts.get(&address).and_then(|account| account.slots.get(&index)))
.copied(),
},
)
}

// -------------------------------------------------------------------------
// Global state
// -------------------------------------------------------------------------
fn reset(&self) -> anyhow::Result<()> {
let mut state = self.lock_write();
state.tail.clear();
state.head.reset();
self.pending_block.write().reset();
*self.latest_block.write() = None;
Ok(())
}
}

// -----------------------------------------------------------------------------
// Implementations without lock
// -----------------------------------------------------------------------------
fn do_read_account(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Address) -> Option<Account> {
// search all
for state in states.iter() {
let Some(account) = state.accounts.get(&address) else { continue };

let info = account.info.clone();
let account = Account {
address: info.address,
balance: info.balance,
nonce: info.nonce,
bytecode: info.bytecode,
code_hash: info.code_hash,
};

tracing::trace!(%address, ?account, "account found");
return Some(account);
}

// not found
tracing::trace!(%address, "account not found");
None
}

fn do_read_slot(states: &NonEmpty<InMemoryTemporaryStorageState>, address: Address, index: SlotIndex) -> Option<Slot> {
let slot = states
.iter()
.find_map(|state| state.accounts.get(&address).and_then(|account| account.slots.get(&index)));

if let Some(&slot) = slot {
tracing::trace!(%address, %index, %slot, "slot found in temporary");
Some(slot)
} else {
tracing::trace!(%address, %index, "slot not found in temporary");
None
}
}

fn do_check_conflicts(states: &NonEmpty<InMemoryTemporaryStorageState>, execution: &EvmExecution) -> Option<ExecutionConflicts> {
let mut conflicts = ExecutionConflictsBuilder::default();

for (&address, change) in &execution.changes {
// check account info conflicts
if let Some(account) = do_read_account(states, address) {
if let Some(expected) = change.nonce.take_original_ref() {
let original = &account.nonce;
if expected != original {
conflicts.add_nonce(address, *original, *expected);
}
}
if let Some(expected) = change.balance.take_original_ref() {
let original = &account.balance;
if expected != original {
conflicts.add_balance(address, *original, *expected);
}
}
}

// check slots conflicts
for (&slot_index, slot_change) in &change.slots {
if let Some(expected) = slot_change.take_original_ref() {
let Some(original) = do_read_slot(states, address, slot_index) else {
continue;
};
if expected.value != original.value {
conflicts.add_slot(address, slot_index, original.value, expected.value);
}
}
}
}

conflicts.build()
}

0 comments on commit b625978

Please sign in to comment.