Skip to content

Commit

Permalink
enha: make pending block header required (#1885)
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Nov 27, 2024
1 parent f63fc5c commit e2f0684
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 168 deletions.
5 changes: 2 additions & 3 deletions src/eth/executor/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ impl Executor {
let block_number = block.number();
let block_timestamp = block.timestamp();
let block_transactions = mem::take(&mut block.transactions);
self.storage.set_pending_block_number(block_number)?;

// determine how to execute each transaction
for tx in block_transactions {
Expand Down Expand Up @@ -483,7 +482,7 @@ impl Executor {
});

// prepare evm input
let pending_header = self.storage.read_pending_block_header()?.unwrap_or_default();
let pending_header = self.storage.read_pending_block_header();
let evm_input = EvmInput::from_eth_transaction(tx_input.clone(), pending_header);

// execute transaction in evm (retry only in case of conflict, but do not retry on other failures)
Expand Down Expand Up @@ -546,7 +545,7 @@ impl Executor {
);

// retrieve block info
let pending_header = self.storage.read_pending_block_header()?.unwrap_or_default();
let pending_header = self.storage.read_pending_block_header();
let mined_block = match point_in_time {
PointInTime::MinedPast(number) => {
let Some(block) = self.storage.read_block(BlockFilter::Number(number))? else {
Expand Down
6 changes: 0 additions & 6 deletions src/eth/primitives/execution_conflict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,4 @@ pub enum ExecutionConflict {
expected: SlotValue,
actual: SlotValue,
},

/// Number of modified accounts mismatch.
AccountModifiedCount { expected: usize, actual: usize },

/// Number of modified slots mismatch.
SlotModifiedCount { expected: usize, actual: usize },
}
2 changes: 1 addition & 1 deletion src/eth/primitives/log_filter_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl LogFilterInput {

// translate point-in-time to block according to context
let from = match from {
PointInTime::Pending => storage.read_pending_block_header()?.unwrap_or_default().number,
PointInTime::Pending => storage.read_pending_block_header().number,
PointInTime::Mined => storage.read_mined_block_number()?,
PointInTime::MinedPast(number) => number,
};
Expand Down
11 changes: 3 additions & 8 deletions src/eth/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,10 @@ pub trait Storage: Sized {

fn read_block_number_to_resume_import(&self) -> Result<BlockNumber, StratusError>;

fn read_pending_block_header(&self) -> Result<Option<PendingBlockHeader>, StratusError>;
fn read_pending_block_header(&self) -> PendingBlockHeader;

fn read_mined_block_number(&self) -> Result<BlockNumber, StratusError>;

fn set_pending_block_number(&self, block_number: BlockNumber) -> Result<(), StratusError>;

fn set_pending_block_number_as_next(&self) -> Result<(), StratusError>;

fn set_pending_block_number_as_next_if_not_set(&self) -> Result<(), StratusError>;

fn set_mined_block_number(&self, block_number: BlockNumber) -> Result<(), StratusError>;

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -121,8 +115,9 @@ pub struct StorageConfig {
impl StorageConfig {
/// Initializes Stratus storage.
pub fn init(&self) -> Result<Arc<StratusStorage>, StratusError> {
let temp_storage = self.temp_storage.init()?;
let perm_storage = self.perm_storage.init()?;
let temp_storage = self.temp_storage.init(&*perm_storage)?;

let StorageKind::StratusStorage = self.storage_kind;
let storage = StratusStorage::new(temp_storage, perm_storage)?;

Expand Down
109 changes: 20 additions & 89 deletions src/eth/storage/stratus_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ impl StratusStorage {
}
}

this.set_pending_block_number_as_next_if_not_set()?;

Ok(this)
}

Expand All @@ -63,7 +61,7 @@ impl StratusStorage {
use crate::eth::storage::InMemoryPermanentStorage;

let perm = Box::new(InMemoryPermanentStorage::default());
let temp = Box::new(InMemoryTemporaryStorage::default());
let temp = Box::new(InMemoryTemporaryStorage::new(0.into()));

Self::new(temp, perm)
}
Expand All @@ -77,49 +75,17 @@ impl Storage for StratusStorage {
fn read_block_number_to_resume_import(&self) -> Result<BlockNumber, StratusError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!("storage::read_block_number_to_resume_import").entered();

// if does not have the zero block present, should resume from zero
let zero = self.read_block(BlockFilter::Number(BlockNumber::ZERO))?;
if zero.is_none() {
tracing::info!(block_number = %0, reason = %"block ZERO does not exist", "resume from ZERO");
return Ok(BlockNumber::ZERO);
}

// try to resume from pending block number
let pending_header = self.read_pending_block_header()?;
if let Some(pending_header) = pending_header {
tracing::info!(block_number = %pending_header.number, reason = %"set in storage", "resume from PENDING");
return Ok(pending_header.number);
}

// fallback to last mined block number
let mined_number = self.read_mined_block_number()?;
let mined_block = self.read_block(BlockFilter::Number(mined_number))?;
match mined_block {
Some(_) => {
tracing::info!(block_number = %mined_number, reason = %"set in storage and block exist", "resume from MINED + 1");
Ok(mined_number.next_block_number())
}
None => {
tracing::info!(block_number = %mined_number, reason = %"set in storage but block does not exist", "resume from MINED");
Ok(mined_number)
}
}
Ok(self.read_pending_block_header().number)
}

fn read_pending_block_header(&self) -> Result<Option<PendingBlockHeader>, StratusError> {
fn read_pending_block_header(&self) -> PendingBlockHeader {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!("storage::read_pending_block_number").entered();
tracing::debug!(storage = %label::TEMP, "reading pending block number");

timed(|| self.temp.read_pending_block_header())
.with(|m| {
metrics::inc_storage_read_pending_block_number(m.elapsed, label::TEMP, m.result.is_ok());
if let Err(ref e) = m.result {
tracing::error!(reason = ?e, "failed to read pending block number");
}
})
.map_err(Into::into)
timed(|| self.temp.read_pending_block_header()).with(|m| {
metrics::inc_storage_read_pending_block_number(m.elapsed, label::TEMP, true);
})
}

fn read_mined_block_number(&self) -> Result<BlockNumber, StratusError> {
Expand All @@ -137,38 +103,6 @@ impl Storage for StratusStorage {
.map_err(Into::into)
}

fn set_pending_block_number(&self, block_number: BlockNumber) -> Result<(), StratusError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!("storage::set_pending_block_number", %block_number).entered();
tracing::debug!(storage = &label::TEMP, %block_number, "setting pending block number");

timed(|| self.temp.set_pending_block_number(block_number))
.with(|m| {
metrics::inc_storage_set_pending_block_number(m.elapsed, label::TEMP, m.result.is_ok());
if let Err(ref e) = m.result {
tracing::error!(reason = ?e, "failed to set pending block number");
}
})
.map_err(Into::into)
}

fn set_pending_block_number_as_next(&self) -> Result<(), StratusError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!("storage::set_pending_block_number_as_next").entered();

let last_mined_block = self.read_mined_block_number()?;
self.set_pending_block_number(last_mined_block.next_block_number())?;
Ok(())
}

fn set_pending_block_number_as_next_if_not_set(&self) -> Result<(), StratusError> {
let pending_block = self.read_pending_block_header()?;
if pending_block.is_none() {
self.set_pending_block_number_as_next()?;
}
Ok(())
}

fn set_mined_block_number(&self, block_number: BlockNumber) -> Result<(), StratusError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!("storage::set_mined_block_number", %block_number).entered();
Expand Down Expand Up @@ -353,14 +287,13 @@ impl Storage for StratusStorage {
}

// check pending number
if let Some(pending_header) = self.read_pending_block_header()? {
if block_number >= pending_header.number {
tracing::error!(%block_number, pending_number = %pending_header.number, "failed to save block because mismatch with pending block number");
return Err(StratusError::StoragePendingNumberConflict {
new: block_number,
pending: pending_header.number,
});
}
let pending_header = self.read_pending_block_header();
if block_number >= pending_header.number {
tracing::error!(%block_number, pending_number = %pending_header.number, "failed to save block because mismatch with pending block number");
return Err(StratusError::StoragePendingNumberConflict {
new: block_number,
pending: pending_header.number,
});
}

// check mined block
Expand Down Expand Up @@ -401,14 +334,13 @@ impl Storage for StratusStorage {
}

// check pending number
if let Some(pending_header) = self.read_pending_block_header()? {
if first_number >= pending_header.number {
tracing::error!(%first_number, pending_number = %pending_header.number, "failed to save block because mismatch with pending block number");
return Err(StratusError::StoragePendingNumberConflict {
new: first_number,
pending: pending_header.number,
});
}
let pending_header = self.read_pending_block_header();
if first_number >= pending_header.number {
tracing::error!(%first_number, pending_number = %pending_header.number, "failed to save block because mismatch with pending block number");
return Err(StratusError::StoragePendingNumberConflict {
new: first_number,
pending: pending_header.number,
});
}

// check number of rest of blocks
Expand Down Expand Up @@ -532,7 +464,6 @@ impl Storage for StratusStorage {

// block number
self.set_mined_block_number(BlockNumber::ZERO)?;
self.set_pending_block_number_as_next()?;

Ok(())
}
Expand Down
75 changes: 23 additions & 52 deletions src/eth/storage/temporary/inmemory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::eth::primitives::UnixTime;
#[cfg(feature = "dev")]
use crate::eth::primitives::UnixTimeNow;
use crate::eth::storage::TemporaryStorage;
use crate::log_and_err;

/// Number of previous blocks to keep inmemory to detect conflicts between different blocks.
const MAX_BLOCKS: usize = 64;
Expand All @@ -36,16 +35,16 @@ pub struct InMemoryTemporaryStorage {
pub states: RwLock<NonEmpty<InMemoryTemporaryStorageState>>,
}

impl Default for InMemoryTemporaryStorage {
fn default() -> Self {
tracing::info!("creating inmemory temporary storage");
impl InMemoryTemporaryStorage {
pub fn new(block_number: BlockNumber) -> Self {
Self {
states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState::default())),
states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState {
block: PendingBlock::new_at_now(block_number),
accounts: HashMap::default(),
})),
}
}
}

impl InMemoryTemporaryStorage {
/// Locks inner state for reading.
pub fn lock_read(&self) -> RwLockReadGuard<'_, NonEmpty<InMemoryTemporaryStorageState>> {
self.states.read().unwrap()
Expand All @@ -61,34 +60,25 @@ impl InMemoryTemporaryStorage {
// Inner State
// -----------------------------------------------------------------------------

#[derive(Debug, Default)]
#[derive(Debug)]
pub struct InMemoryTemporaryStorageState {
/// Block that is being mined.
pub block: Option<PendingBlock>,
pub block: PendingBlock,

/// Last state of accounts and slots. Can be recreated from the executions inside the pending block.
pub accounts: HashMap<Address, InMemoryTemporaryAccount, hash_hasher::HashBuildHasher>,
}

impl InMemoryTemporaryStorageState {
/// Validates there is a pending block being mined and returns a reference to it.
fn require_pending_block(&self) -> anyhow::Result<&PendingBlock> {
match &self.block {
Some(block) => Ok(block),
None => log_and_err!("no pending block being mined"), // try calling set_pending_block_number_as_next_if_not_set or any other method to create a new block on temp storage
}
}

/// Validates there is a pending block being mined and returns a mutable reference to it.
fn require_pending_block_mut(&mut self) -> anyhow::Result<&mut PendingBlock> {
match &mut self.block {
Some(block) => Ok(block),
None => log_and_err!("no pending block being mined"), // try calling set_pending_block_number_as_next_if_not_set or any other method to create a new block on temp storage
pub fn new(block_number: BlockNumber) -> Self {
Self {
block: PendingBlock::new_at_now(block_number),
accounts: HashMap::default(),
}
}

pub fn reset(&mut self) {
self.block = None;
self.block = PendingBlock::new_at_now(1.into());
self.accounts.clear();
}
}
Expand All @@ -114,23 +104,10 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
// Block number
// -------------------------------------------------------------------------

fn set_pending_block_number(&self, number: BlockNumber) -> anyhow::Result<()> {
let mut states = self.lock_write();
match states.head.block.as_mut() {
Some(block) => block.header.number = number,
None => {
states.head.block = Some(PendingBlock::new_at_now(number));
}
}
Ok(())
}

fn read_pending_block_header(&self) -> anyhow::Result<Option<PendingBlockHeader>> {
// Uneeded clone here, return Cow
fn read_pending_block_header(&self) -> PendingBlockHeader {
let states = self.lock_read();
match &states.head.block {
Some(block) => Ok(Some(block.header.clone())),
None => Ok(None),
}
states.head.block.header.clone()
}

// -------------------------------------------------------------------------
Expand All @@ -140,6 +117,7 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
fn save_pending_execution(&self, tx: TransactionExecution, check_conflicts: bool) -> Result<(), StratusError> {
// check conflicts
let mut states = self.lock_write();

if check_conflicts {
if let Some(conflicts) = do_check_conflicts(&states, tx.execution()) {
return Err(StratusError::TransactionConflict(conflicts.into()));
Expand Down Expand Up @@ -177,28 +155,23 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

// save execution
states.head.require_pending_block_mut()?.push_transaction(tx);
states.head.block.push_transaction(tx);

Ok(())
}

fn read_pending_executions(&self) -> Vec<TransactionExecution> {
self.lock_read()
.head
.block
.as_ref()
.map(|pending_block| pending_block.transactions.iter().map(|(_, tx)| tx.clone()).collect())
.unwrap_or_default()
self.lock_read().head.block.transactions.iter().map(|(_, tx)| tx.clone()).collect()
}

/// TODO: we cannot allow more than one pending block. Where to put this check?
fn finish_pending_block(&self) -> anyhow::Result<PendingBlock> {
let mut states = self.lock_write();

#[cfg(feature = "dev")]
let mut finished_block = states.head.require_pending_block()?.clone();
let mut finished_block = states.head.block.clone();
#[cfg(not(feature = "dev"))]
let finished_block = states.head.require_pending_block()?.clone();
let finished_block = states.head.block.clone();

#[cfg(feature = "dev")]
{
Expand All @@ -215,16 +188,14 @@ impl TemporaryStorage for InMemoryTemporaryStorage {
}

// create new state
states.insert(0, InMemoryTemporaryStorageState::default());
states.head.block = Some(PendingBlock::new_at_now(finished_block.header.number.next_block_number()));
states.insert(0, InMemoryTemporaryStorageState::new(finished_block.header.number.next_block_number()));

Ok(finished_block)
}

fn read_pending_execution(&self, hash: Hash) -> anyhow::Result<Option<TransactionExecution>> {
let states = self.lock_read();
let Some(ref pending_block) = states.head.block else { return Ok(None) };
match pending_block.transactions.get(&hash) {
match states.head.block.transactions.get(&hash) {
Some(tx) => Ok(Some(tx.clone())),
None => Ok(None),
}
Expand Down
Loading

0 comments on commit e2f0684

Please sign in to comment.