Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reexecute_external returns transaction and receipt instead of keeping track of transaction index #742

Merged
merged 6 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ rocks = ["rocksdb", "bincode"]
# XXX: Enable external transaction parallel execution.
executor-parallel = []

# XXX: Enable prefetching slots during EVM execution.
evm-slot-prefetch = []

# ------------------------------------------------------------------------------
# Lints
# ------------------------------------------------------------------------------
Expand Down
7 changes: 6 additions & 1 deletion src/eth/evm/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ use crate::eth::primitives::Wei;
use crate::ext::OptionExt;
use crate::if_else;

pub type EvmExecutionResult = (Execution, ExecutionMetrics);
/// Evm execution result.
#[derive(Debug)]
pub struct EvmExecutionResult {
pub execution: Execution,
pub metrics: ExecutionMetrics,
}

/// EVM operations.
pub trait Evm {
Expand Down
5 changes: 4 additions & 1 deletion src/eth/evm/revm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ impl Evm for Revm {
metrics::inc_evm_execution_slot_reads_cached(session_metrics.slot_reads_cached);
}

execution.map(|execution| (execution, session_metrics))
execution.map(|execution| EvmExecutionResult {
execution,
metrics: session_metrics,
})
}
}

Expand Down
134 changes: 72 additions & 62 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::sync::broadcast;
use tokio::sync::oneshot;
use tokio::sync::Mutex;

use crate::eth::evm;
use crate::eth::evm::EvmExecutionResult;
use crate::eth::evm::EvmInput;
use crate::eth::primitives::Block;
Expand Down Expand Up @@ -129,19 +130,16 @@ impl EthExecutor {

// track executions
let mut executions: Vec<ExternalTransactionExecution> = Vec::with_capacity(block.transactions.len());
let mut executions_metrics: Vec<ExecutionMetrics> = Vec::with_capacity(block.transactions.len());

// serial execution
#[cfg(not(feature = "executor-parallel"))]
{
for tx in &block.transactions {
let receipt = receipts.try_get(&tx.hash())?;
let (exec, exec_metrics) = self.reexecute_external(tx, receipt, &block).await?;
let evm_result = self.reexecute_external(tx, receipt, &block).await.2?;

storage.save_account_changes_to_temp(exec.changes_to_persist()).await?;

executions.push(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), exec));
executions_metrics.push(exec_metrics);
storage.save_account_changes_to_temp(evm_result.execution.changes_to_persist()).await?;
executions.push(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), evm_result));
}
}

Expand All @@ -156,67 +154,56 @@ impl EthExecutor {
}

// execute in parallel
let mut tx_index: usize = 0;
let mut tx_par_executions = futures::stream::iter(parallel_executions).buffered(self.num_evms);
let mut prev_tx_execution: Option<Execution> = None;
let mut parallel_executions = futures::stream::iter(parallel_executions).buffered(self.num_evms);
let mut prev_execution: Option<Execution> = None;

while let Some(result) = tx_par_executions.next().await {
while let Some(result) = parallel_executions.next().await {
// check result of parallel execution
let decision = match (result, &prev_tx_execution) {
let decision = match (result, &prev_execution) {
//
// execution success and no previous transaction execution
(Ok(result), None) => ExecutionDecision::Proceed(tx_index, result),
((tx, receipt, Ok(evm_result)), None) => ExecutionDecision::Proceed(tx, receipt, evm_result),
//
// execution success, but with previous transaction execution
(Ok(result), Some(prev_tx_execution)) => {
let current_execution = &result.0;
let conflicts = prev_tx_execution.check_conflicts(current_execution);
((tx, receipt, Ok(evm_result)), Some(prev_tx_execution)) => {
let conflicts = prev_tx_execution.check_conflicts(&evm_result.execution);
match conflicts {
None => ExecutionDecision::Proceed(tx_index, result),
None => ExecutionDecision::Proceed(tx, receipt, evm_result),
Some(conflicts) => {
tracing::warn!(?conflicts, "parallel execution conflicts");
ExecutionDecision::Reexecute(tx_index)
ExecutionDecision::Reexecute(tx, receipt)
}
}
}
//
// execution failure
(Err(e), _) => {
((tx, receipt, Err(e)), _) => {
tracing::warn!(reason = ?e, "parallel execution failed");
ExecutionDecision::Reexecute(tx_index)
ExecutionDecision::Reexecute(tx, receipt)
}
};

// re-execute if necessary
// TODO: execution must return tx and receipt to avoid having to retrieve them again
let (tx, receipt, exec, exec_metrics) = match decision {
ExecutionDecision::Proceed(tx_index, result) => {
let tx = &block.transactions[tx_index];
let receipt = receipts.try_get(&tx.hash())?;
(tx, receipt, result.0, result.1)
}
ExecutionDecision::Reexecute(tx_index) => {
let tx = &block.transactions[tx_index];
let receipt = receipts.try_get(&tx.hash())?;
let result = self.reexecute_external(tx, receipt, &block).await?;
(tx, receipt, result.0, result.1)
}
let (tx, receipt, evm_result) = match decision {
ExecutionDecision::Proceed(tx, receipt, evm_result) => (tx, receipt, evm_result),
ExecutionDecision::Reexecute(tx, receipt) => match self.reexecute_external(tx, receipt, &block).await {
(tx, receipt, Ok(evm_result)) => (tx, receipt, evm_result),
(.., Err(e)) => return Err(e),
},
};

storage.save_account_changes_to_temp(exec.changes_to_persist()).await?;
storage.save_account_changes_to_temp(evm_result.execution.changes_to_persist()).await?;

tx_index += 1;
prev_tx_execution = Some(exec.clone());
executions.push(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), exec));
executions_metrics.push(exec_metrics);
prev_execution = Some(evm_result.execution.clone());
executions.push(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), evm_result));
}
}

// track metrics
#[cfg(feature = "metrics")]
{
for execution_metrics in executions_metrics {
block_metrics += execution_metrics;
for execution in &executions {
block_metrics += execution.evm_result.metrics;
}
metrics::inc_executor_external_block(start.elapsed());
metrics::inc_executor_external_block_account_reads(block_metrics.account_reads);
Expand All @@ -228,47 +215,69 @@ impl EthExecutor {
}

/// Reexecutes an external transaction locally ensuring it produces the same output.
pub async fn reexecute_external(&self, tx: &ExternalTransaction, receipt: &ExternalReceipt, block: &ExternalBlock) -> anyhow::Result<EvmExecutionResult> {
pub async fn reexecute_external<'a, 'b>(
&'a self,
tx: &'b ExternalTransaction,
receipt: &'b ExternalReceipt,
block: &ExternalBlock,
) -> (&'b ExternalTransaction, &'b ExternalReceipt, anyhow::Result<EvmExecutionResult>) {
#[cfg(feature = "metrics")]
let start = metrics::now();

// re-execute transaction or create a fake execution from the failed external transaction
let execution_result = if receipt.is_success() {
let evm_input = EvmInput::from_external_transaction(tx, receipt, block)?;
let evm_result = if receipt.is_success() {
let evm_input = match EvmInput::from_external_transaction(tx, receipt, block) {
Ok(evm_input) => evm_input,
Err(e) => return (tx, receipt, Err(e)),
};
self.execute_in_evm(evm_input).await
} else {
let sender = self.storage.read_account(&receipt.from.into(), &StoragePointInTime::Present).await?;
let execution = Execution::from_failed_external_transaction(sender, receipt, block)?;
Ok((execution, ExecutionMetrics::default()))
let sender = match self.storage.read_account(&receipt.from.into(), &StoragePointInTime::Present).await {
Ok(sender) => sender,
Err(e) => return (tx, receipt, Err(e)),
};
let execution = match Execution::from_failed_external_transaction(sender, receipt, block) {
Ok(execution) => execution,
Err(e) => return (tx, receipt, Err(e)),
};
Ok(EvmExecutionResult {
execution,
metrics: ExecutionMetrics::default(),
})
};

// handle execution result
match execution_result {
Ok((mut execution, execution_metrics)) => {
match evm_result {
Ok(mut evm_result) => {
// apply execution costs that were not consided when re-executing the transaction
execution.apply_execution_costs(receipt)?;
execution.gas = receipt.gas_used.unwrap_or_default().try_into()?;
if let Err(e) = evm_result.execution.apply_execution_costs(receipt) {
return (tx, receipt, Err(e));
};
evm_result.execution.gas = match receipt.gas_used.unwrap_or_default().try_into() {
Ok(gas) => gas,
Err(e) => return (tx, receipt, Err(e)),
};

// ensure it matches receipt before saving
if let Err(e) = execution.compare_with_receipt(receipt) {
if let Err(e) = evm_result.execution.compare_with_receipt(receipt) {
let json_tx = serde_json::to_string(&tx).unwrap();
let json_receipt = serde_json::to_string(&receipt).unwrap();
let json_execution_logs = serde_json::to_string(&execution.logs).unwrap();
let json_execution_logs = serde_json::to_string(&evm_result.execution.logs).unwrap();
tracing::error!(%json_tx, %json_receipt, %json_execution_logs, "mismatch reexecuting transaction");
return Err(e);
return (tx, receipt, Err(e));
};

// track metrics
#[cfg(feature = "metrics")]
metrics::inc_executor_external_transaction(start.elapsed());

Ok((execution, execution_metrics))
(tx, receipt, Ok(evm_result))
}
Err(e) => {
let json_tx = serde_json::to_string(&tx).unwrap();
let json_receipt = serde_json::to_string(&receipt).unwrap();
tracing::error!(reason = ?e, %json_tx, %json_receipt, "unexpected error reexecuting transaction");
Err(e)
(tx, receipt, Err(e))
}
}
}
Expand Down Expand Up @@ -302,7 +311,7 @@ impl EthExecutor {

let execution = if let Some(relay) = &self.relay {
let evm_input = EvmInput::from_eth_transaction(transaction.clone());
let execution = self.execute_in_evm(evm_input).await?.0;
let execution = self.execute_in_evm(evm_input).await?.execution;
relay.forward_transaction(execution.clone(), transaction).await?; // TODO: Check if we should run this in paralel by spawning a task when running the online importer.
execution
} else {
Expand All @@ -311,7 +320,7 @@ impl EthExecutor {
let (execution, block) = loop {
// execute and check conflicts before mining block
let evm_input = EvmInput::from_eth_transaction(transaction.clone());
let execution = self.execute_in_evm(evm_input).await?.0;
let execution = self.execute_in_evm(evm_input).await?.execution;

// mine and commit block
let mut miner_lock = self.miner.lock().await;
Expand Down Expand Up @@ -364,12 +373,12 @@ impl EthExecutor {
);

let evm_input = EvmInput::from_eth_call(input, point_in_time);
let result = self.execute_in_evm(evm_input).await;
let evm_result = self.execute_in_evm(evm_input).await;

#[cfg(feature = "metrics")]
metrics::inc_executor_call(start.elapsed(), result.is_ok());
metrics::inc_executor_call(start.elapsed(), evm_result.is_ok());

result.map(|x| x.0)
evm_result.map(|x| x.execution)
}

#[cfg(feature = "dev")]
Expand Down Expand Up @@ -412,9 +421,10 @@ impl EthExecutor {
}
}

enum ExecutionDecision {
enum ExecutionDecision<'a> {
/// Parallel execution succeeded and can be persisted.
Proceed(usize, (Execution, ExecutionMetrics)),
Proceed(&'a ExternalTransaction, &'a ExternalReceipt, EvmExecutionResult),

/// Parallel execution failed and must be re-executed in a serial manner.
Reexecute(usize),
Reexecute(&'a ExternalTransaction, &'a ExternalReceipt),
}
2 changes: 1 addition & 1 deletion src/eth/primitives/execution_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[derive(Debug, Default, derive_more::Add, derive_more::AddAssign)]
#[derive(Clone, Copy, Debug, Default, derive_more::Add, derive_more::AddAssign)]
pub struct ExecutionMetrics {
/// Number of account reads during EVM execution.
pub account_reads: usize,
Expand Down
6 changes: 3 additions & 3 deletions src/eth/primitives/external_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use ethers_core::types::Transaction as EthersTransaction;

use crate::eth::primitives::Execution;
use crate::eth::evm::EvmExecutionResult;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::Hash;
use crate::eth::primitives::TransactionInput;

#[derive(Debug, Clone, derive_new::new)]
#[derive(Debug, derive_new::new)]
pub struct ExternalTransactionExecution {
pub tx: ExternalTransaction,
pub receipt: ExternalReceipt,
pub execution: Execution,
pub evm_result: EvmExecutionResult,
}

#[derive(Debug, Clone, Default, derive_more:: Deref, serde::Deserialize, serde::Serialize)]
Expand Down
10 changes: 7 additions & 3 deletions src/eth/primitives/transaction_mined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ impl TransactionMined {
/// Creates a new mined transaction from an external mined transaction that was re-executed locally.
///
/// TODO: this kind of conversion should be infallibe.
pub fn from_external(execution: ExternalTransactionExecution) -> anyhow::Result<Self> {
let ExternalTransactionExecution { tx, receipt, execution } = execution;
pub fn from_external(evm_result: ExternalTransactionExecution) -> anyhow::Result<Self> {
let ExternalTransactionExecution {
tx,
receipt,
evm_result: execution,
} = evm_result;
Ok(Self {
input: tx.try_into()?,
execution,
execution: execution.execution,
block_number: receipt.block_number(),
block_hash: receipt.block_hash(),
transaction_index: receipt.transaction_index.into(),
Expand Down
Loading