Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: executor #829

Merged
merged 2 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::eth::primitives::LocalTransactionExecution;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::TransactionExecution;
use crate::eth::primitives::TransactionInput;
use crate::eth::primitives::TransactionKind;
use crate::eth::primitives::TransactionMined;
use crate::eth::storage::StratusStorage;
use crate::ext::not;
Expand Down Expand Up @@ -77,11 +76,11 @@ impl BlockMiner {
let mut block = block_from_external(external_block, mined_txs)?;

// mine local transactions
for (tx, execution) in local_txs {
if execution.is_success() {
for tx in local_txs {
if tx.result.is_success() {
return log_and_err!("cannot mine mixed block because one of the local execution is a success");
}
block.push_execution(tx, execution);
block.push_execution(tx.input, tx.result);
}

Ok(block)
Expand Down Expand Up @@ -219,10 +218,12 @@ fn partition_transactions(txs: Vec<TransactionExecution>) -> (Vec<LocalTransacti
let mut external_txs = Vec::with_capacity(txs.len());

for tx in txs {
match tx.kind {
TransactionKind::Local(tx_input) => local_txs.push((tx_input, tx.result)),
TransactionKind::External(external_tx, external_receipt) => {
external_txs.push((external_tx, external_receipt, tx.result));
match tx {
TransactionExecution::Local(tx) => {
local_txs.push(tx);
}
TransactionExecution::External(tx) => {
external_txs.push(tx);
}
}
}
Expand All @@ -231,11 +232,11 @@ fn partition_transactions(txs: Vec<TransactionExecution>) -> (Vec<LocalTransacti

fn mine_external_transactions(block_number: BlockNumber, txs: Vec<ExternalTransactionExecution>) -> anyhow::Result<Vec<TransactionMined>> {
let mut mined_txs = Vec::with_capacity(txs.len());
for (tx, receipt, evm_result) in txs {
if tx.block_number() != block_number {
for tx in txs {
if tx.tx.block_number() != block_number {
return log_and_err!("cannot mine external block because one of the transactions does not belong to the external block");
}
mined_txs.push(TransactionMined::from_external(tx, receipt, evm_result.execution)?);
mined_txs.push(TransactionMined::from_external(tx.tx, tx.receipt, tx.result.execution)?);
}
Ok(mined_txs)
}
Expand Down
73 changes: 36 additions & 37 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::ExternalReceipts;
use crate::eth::primitives::ExternalTransaction;
use crate::eth::primitives::ExternalTransactionExecution;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::StoragePointInTime;
use crate::eth::primitives::TransactionExecution;
use crate::eth::primitives::TransactionInput;
use crate::eth::primitives::TransactionKind;
use crate::eth::storage::StorageError;
use crate::eth::storage::StratusStorage;
use crate::eth::BlockMiner;
Expand Down Expand Up @@ -106,31 +106,28 @@ impl Executor {

// execute serial transactions joining them with parallel
for tx_route in tx_routes {
let tx_execution = match tx_route {
let tx = match tx_route {
// serial: execute now
ParallelExecutionRoute::Serial(tx, receipt) => {
let (tx, receipt, evm_result) = self.reexecute_external_tx(tx, receipt, block).await;
TransactionExecution::from_external(tx.clone(), receipt.clone(), evm_result?)
}
ParallelExecutionRoute::Serial(tx, receipt) => self.reexecute_external_tx(tx, receipt, block).await.map_err(|(_, _, e)| e)?,

// parallel: get parallel execution result and reexecute if failed because of error
ParallelExecutionRoute::Parallel(..) => {
match parallel_executions.next().await.unwrap() {
// success, check conflicts
(tx, receipt, Ok(evm_result)) => match storage.temp.check_conflicts(&evm_result.execution).await? {
None => TransactionExecution::from_external(tx.clone(), receipt.clone(), evm_result),
// success: check conflicts
Ok(tx) => match storage.temp.check_conflicts(&tx.result.execution).await? {
// no conflict: proceeed
None => tx,
// conflict: reexecute
Some(conflicts) => {
tracing::warn!(?conflicts, "reexecuting serially because parallel execution conflicted");
let (tx, receipt, evm_result) = self.reexecute_external_tx(tx, receipt, block).await;
TransactionExecution::from_external(tx.clone(), receipt.clone(), evm_result?)
self.reexecute_external_tx(&tx.tx, &tx.receipt, block).await.map_err(|(_, _, e)| e)?
}
},

// failure, reexecute
(tx, receipt, Err(e)) => {
// failure: reexecute
Err((tx, receipt, e)) => {
tracing::warn!(reason = ?e, "reexecuting serially because parallel execution errored");
let (tx, receipt, evm_result) = self.reexecute_external_tx(tx, receipt, block).await;
TransactionExecution::from_external(tx.clone(), receipt.clone(), evm_result?)
self.reexecute_external_tx(tx, receipt, block).await.map_err(|(_, _, e)| e)?
}
}
}
Expand All @@ -139,11 +136,11 @@ impl Executor {
// track transaction metrics
#[cfg(feature = "metrics")]
{
block_metrics += tx_execution.result.metrics;
block_metrics += tx.result.metrics;
}

// persist state
storage.save_execution_to_temp(tx_execution).await?;
storage.save_execution_to_temp(TransactionExecution::External(tx)).await?;
}

// track block metrics
Expand All @@ -164,42 +161,44 @@ impl Executor {
tx: &'b ExternalTransaction,
receipt: &'b ExternalReceipt,
block: &ExternalBlock,
) -> (&'b ExternalTransaction, &'b ExternalReceipt, anyhow::Result<EvmExecutionResult>) {
) -> Result<ExternalTransactionExecution, (&'b ExternalTransaction, &'b ExternalReceipt, anyhow::Error)> {
#[cfg(feature = "metrics")]
let start = metrics::now();

// reexecute transaction or create a fake execution from the failed external transaction
let evm_result = if receipt.is_success() {
let evm_input = match EvmInput::from_external(tx, receipt, block) {
Ok(evm_input) => evm_input,
Err(e) => return (tx, receipt, Err(e)),
};
self.execute_in_evm(evm_input).await
} else {
// when transaction externally failed, create fake transaction instead of reexecuting
if receipt.is_failure() {
let sender = match self.storage.read_account(&receipt.from.into(), &StoragePointInTime::Present).await {
Ok(sender) => sender,
Err(e) => return (tx, receipt, Err(e)),
Err(e) => return Err((tx, receipt, e)),
};
let execution = match EvmExecution::from_failed_external_transaction(sender, receipt, block) {
Ok(execution) => execution,
Err(e) => return (tx, receipt, Err(e)),
Err(e) => return Err((tx, receipt, e)),
};
Ok(EvmExecutionResult {
let evm_result = EvmExecutionResult {
execution,
metrics: ExecutionMetrics::default(),
})
};
return Ok(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), evm_result));
}

// reexecute transaction
let evm_input = match EvmInput::from_external(tx, receipt, block) {
Ok(evm_input) => evm_input,
Err(e) => return Err((tx, receipt, e)),
};
let evm_result = self.execute_in_evm(evm_input).await;

// handle execution result
// handle reexecution result
match evm_result {
Ok(mut evm_result) => {
// apply execution costs that were not consided when reexecuting the transaction
if let Err(e) = evm_result.execution.apply_execution_costs(receipt) {
return (tx, receipt, Err(e));
return Err((tx, receipt, e));
};
evm_result.execution.gas = match receipt.gas_used.unwrap_or_default().try_into() {
Ok(gas) => gas,
Err(e) => return (tx, receipt, Err(e)),
Err(e) => return Err((tx, receipt, e)),
};

// ensure it matches receipt before saving
Expand All @@ -208,20 +207,20 @@ impl Executor {
let json_receipt = serde_json::to_string(&receipt).unwrap();
let json_execution_logs = serde_json::to_string(&evm_result.execution.logs).unwrap();
tracing::error!(%json_tx, %json_receipt, %json_execution_logs, "mismatch reexecuting transaction");
return (tx, receipt, Err(e));
return Err((tx, receipt, e));
};

// track metrics
#[cfg(feature = "metrics")]
metrics::inc_executor_external_transaction(start.elapsed());

(tx, receipt, Ok(evm_result))
Ok(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), evm_result))
}
Err(e) => {
let json_tx = serde_json::to_string(&tx).unwrap();
let json_receipt = serde_json::to_string(&receipt).unwrap();
tracing::error!(reason = ?e, %json_tx, %json_receipt, "unexpected error reexecuting transaction");
(tx, receipt, Err(e))
Err((tx, receipt, e))
}
}
}
Expand Down Expand Up @@ -332,7 +331,7 @@ impl Executor {
}

// -------------------------------------------------------------------------
// Private
// Helpers
// -------------------------------------------------------------------------

/// Submits a transaction to the EVM and awaits for its execution.
Expand Down
6 changes: 2 additions & 4 deletions src/eth/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ mod slot_value;
mod storage_point_in_time;
mod transaction_execution;
mod transaction_input;
mod transaction_kind;
mod transaction_mined;
mod unix_time;
mod wei;
Expand Down Expand Up @@ -175,11 +174,10 @@ pub use slot_indexes::SlotIndexes;
pub use slot_sample::SlotSample;
pub use slot_value::SlotValue;
pub use storage_point_in_time::StoragePointInTime;
pub use transaction_execution::ExternalTransactionExecution;
pub use transaction_execution::LocalTransactionExecution;
pub use transaction_execution::TransactionExecution;
pub use transaction_input::TransactionInput;
pub use transaction_kind::ExternalTransactionExecution;
pub use transaction_kind::LocalTransactionExecution;
pub use transaction_kind::TransactionKind;
pub use transaction_mined::TransactionMined;
pub use unix_time::UnixTime;
pub use wei::Wei;
Expand Down
51 changes: 32 additions & 19 deletions src/eth/primitives/transaction_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,49 @@ use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::ExternalTransaction;
use crate::eth::primitives::Hash;
use crate::eth::primitives::TransactionInput;
use crate::eth::primitives::TransactionKind;

#[allow(clippy::large_enum_variant)]
#[derive(DebugAsJson, Clone, serde::Serialize)]
pub struct TransactionExecution {
pub kind: TransactionKind,
pub result: EvmExecutionResult,
pub enum TransactionExecution {
/// Transaction that was sent directly to Stratus.
Local(LocalTransactionExecution),

/// Transaction that imported from external source.
External(ExternalTransactionExecution),
}

impl TransactionExecution {
/// Creates a new transaction execution from a local transaction.
pub fn from_local(tx: TransactionInput, result: EvmExecutionResult) -> Self {
Self {
kind: TransactionKind::new_local(tx),
result,
}
pub fn new_local(tx: TransactionInput, result: EvmExecutionResult) -> Self {
Self::Local(LocalTransactionExecution { input: tx, result })
}

/// Creates a new transaction execution from an external transaction and its receipt.
pub fn from_external(tx: ExternalTransaction, receipt: ExternalReceipt, execution: EvmExecutionResult) -> Self {
Self {
kind: TransactionKind::new_external(tx, receipt),
result: execution,
/// Returns the transaction hash.
pub fn hash(&self) -> Hash {
match self {
Self::Local(LocalTransactionExecution { input, .. }) => input.hash,
Self::External(ExternalTransactionExecution { tx, .. }) => tx.hash(),
}
}

/// Returns the transaction hash.
pub fn hash(&self) -> Hash {
match self.kind {
TransactionKind::Local(ref tx) => tx.hash,
TransactionKind::External(ref tx, _) => tx.hash(),
/// Returns the execution result.
pub fn result(&self) -> &EvmExecutionResult {
match self {
Self::Local(LocalTransactionExecution { result, .. }) => result,
Self::External(ExternalTransactionExecution { result, .. }) => result,
}
}
}

#[derive(DebugAsJson, Clone, derive_new::new, serde::Serialize)]
pub struct LocalTransactionExecution {
pub input: TransactionInput,
pub result: EvmExecutionResult,
}

#[derive(DebugAsJson, Clone, derive_new::new, serde::Serialize)]
pub struct ExternalTransactionExecution {
pub tx: ExternalTransaction,
pub receipt: ExternalReceipt,
pub result: EvmExecutionResult,
}
20 changes: 0 additions & 20 deletions src/eth/primitives/transaction_kind.rs

This file was deleted.

5 changes: 3 additions & 2 deletions src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,13 @@ impl TemporaryStorageExecutionOps for InMemoryTemporaryStorage {
tracing::debug!(hash = %tx.hash(), tx_executions_len = %state.tx_executions.len(), "saving execution");

// check conflicts
if let Some(conflicts) = state.check_conflicts(&tx.result.execution) {
let execution = &tx.result().execution;
if let Some(conflicts) = state.check_conflicts(execution) {
return Err(StorageError::Conflict(conflicts)).context("execution conflicts with current state");
}

// save account changes
let changes = tx.result.execution.changes.values();
let changes = execution.changes.values();
for change in changes {
let account = state
.accounts
Expand Down
2 changes: 1 addition & 1 deletion src/eth/transaction_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl TransactionRelayer {
// handle local failure
if evm_result.is_failure() {
tracing::debug!("transaction failed in local execution");
let tx_execution = TransactionExecution::from_local(tx_input, evm_result);
let tx_execution = TransactionExecution::new_local(tx_input, evm_result);
self.storage.save_execution_to_temp(tx_execution).await?;
return Ok(());
}
Expand Down
Loading