Skip to content

Commit

Permalink
feat: preparing executor for parallel execution (#691)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored Apr 24, 2024
1 parent 41d8a86 commit 46c07c7
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 53 deletions.
4 changes: 2 additions & 2 deletions src/eth/evm/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ impl EvmInput {
/// Creates a transaction that was executed in an external blockchain and imported to Stratus.
///
/// Successful external transactions executes with max gas and zero gas price to ensure we will have the same execution result.
pub fn from_external_transaction(block: &ExternalBlock, tx: ExternalTransaction, receipt: &ExternalReceipt) -> anyhow::Result<Self> {
pub fn from_external_transaction(tx: &ExternalTransaction, receipt: &ExternalReceipt, block: &ExternalBlock) -> anyhow::Result<Self> {
Ok(Self {
from: tx.0.from.into(),
to: tx.0.to.map_into(),
value: tx.0.value.into(),
data: tx.0.input.into(),
data: tx.0.input.clone().into(),
nonce: Some(tx.0.nonce.try_into()?),
gas_limit: if_else!(receipt.is_success(), Gas::MAX, tx.0.gas.try_into()?),
gas_price: if_else!(receipt.is_success(), Wei::ZERO, tx.0.gas_price.map_into().unwrap_or(Wei::ZERO)),
Expand Down
117 changes: 67 additions & 50 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use crate::eth::primitives::CallInput;
use crate::eth::primitives::Execution;
use crate::eth::primitives::ExecutionMetrics;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::ExternalReceipts;
use crate::eth::primitives::ExternalTransaction;
use crate::eth::primitives::ExternalTransactionExecution;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::StoragePointInTime;
Expand Down Expand Up @@ -77,13 +79,12 @@ impl EthExecutor {
}

// -------------------------------------------------------------------------
// Transaction execution
// External transactions
// -------------------------------------------------------------------------

/// Re-executes an external block locally and imports it to the permanent storage.
pub async fn import_external_to_perm(&self, block: ExternalBlock, receipts: &ExternalReceipts) -> anyhow::Result<Block> {
// import block

let block = if let Some(relay) = &self.relay {
tracing::debug!("relay found, draining pending failed transactions");
let mut block = self.import_external_to_temp(block, receipts).await?;
Expand Down Expand Up @@ -111,64 +112,30 @@ impl EthExecutor {
/// Re-executes an external block locally and imports it to the temporary storage.
pub async fn import_external_to_temp(&self, block: ExternalBlock, receipts: &ExternalReceipts) -> anyhow::Result<Block> {
#[cfg(feature = "metrics")]
let start = metrics::now();
let (start, mut block_metrics) = (metrics::now(), ExecutionMetrics::default());

let mut block_metrics = ExecutionMetrics::default();
tracing::info!(number = %block.number(), "importing external block");

// track active block number
self.storage.set_active_block_number(block.number()).await?;

// re-execute transactions
let mut executions: Vec<ExternalTransactionExecution> = Vec::with_capacity(block.transactions.len());
for tx in block.transactions.clone() {
#[cfg(feature = "metrics")]
let tx_start = metrics::now();

// re-execute transaction or create a fake execution from the failed external transaction
for tx in &block.transactions {
// re-execute transaction
let receipt = receipts.try_get(&tx.hash())?;
let execution = if receipt.is_success() {
let evm_input = EvmInput::from_external_transaction(&block, tx.clone(), receipt)?;
self.execute_in_evm(evm_input).await
} else {
let sender = self.storage.read_account(&receipt.from.into(), &StoragePointInTime::Present).await?;
let execution = Execution::from_failed_external_transaction(&block, receipt, sender)?;
Ok((execution, ExecutionMetrics::default()))
};
let (execution, _execution_metrics) = self.reexecute_external(tx, receipt, &block).await?;

// handle execution result
match execution {
Ok((mut execution, execution_metrics)) => {
// apply execution costs that were not consided when re-executing the transaction
execution.apply_execution_costs(receipt)?;
execution.gas = receipt.gas_used.unwrap_or_default().try_into()?;

// ensure it matches receipt before saving
if let Err(e) = execution.compare_with_receipt(receipt) {
let json_tx = serde_json::to_string(&tx).unwrap();
let json_receipt = serde_json::to_string(&receipt).unwrap();
let json_execution_logs = serde_json::to_string(&execution.logs).unwrap();
tracing::error!(%json_tx, %json_receipt, %json_execution_logs, "mismatch reexecuting transaction");
return Err(e);
};

// temporarily save state to next transactions from the same block
self.storage
.save_account_changes_to_temp(execution.changes.values().cloned().collect_vec())
.await?;
executions.push((tx, receipt.clone(), execution.clone()));

// track metrics
#[cfg(feature = "metrics")]
metrics::inc_executor_external_transaction(tx_start.elapsed());
block_metrics += execution_metrics;
}
Err(e) => {
let json_tx = serde_json::to_string(&tx).unwrap();
let json_receipt = serde_json::to_string(&receipt).unwrap();
tracing::error!(reason = ?e, %json_tx, %json_receipt, "unexpected error reexecuting transaction");
return Err(e);
}
// track execution changes
self.storage
.save_account_changes_to_temp(execution.changes.values().cloned().collect_vec())
.await?;
executions.push((tx.clone(), receipt.clone(), execution));

// track metrics
#[cfg(feature = "metrics")]
{
block_metrics += _execution_metrics;
}
}

Expand All @@ -184,6 +151,56 @@ impl EthExecutor {
Block::from_external(block, executions)
}

/// Reexecutes an external transaction locally ensuring it produces the same output.
pub async fn reexecute_external(&self, tx: &ExternalTransaction, receipt: &ExternalReceipt, block: &ExternalBlock) -> anyhow::Result<EvmExecutionResult> {
#[cfg(feature = "metrics")]
let start = metrics::now();

// re-execute transaction or create a fake execution from the failed external transaction
let execution_result = if receipt.is_success() {
let evm_input = EvmInput::from_external_transaction(tx, receipt, block)?;
self.execute_in_evm(evm_input).await
} else {
let sender = self.storage.read_account(&receipt.from.into(), &StoragePointInTime::Present).await?;
let execution = Execution::from_failed_external_transaction(sender, receipt, block)?;
Ok((execution, ExecutionMetrics::default()))
};

// handle execution result
match execution_result {
Ok((mut execution, execution_metrics)) => {
// apply execution costs that were not consided when re-executing the transaction
execution.apply_execution_costs(receipt)?;
execution.gas = receipt.gas_used.unwrap_or_default().try_into()?;

// ensure it matches receipt before saving
if let Err(e) = execution.compare_with_receipt(receipt) {
let json_tx = serde_json::to_string(&tx).unwrap();
let json_receipt = serde_json::to_string(&receipt).unwrap();
let json_execution_logs = serde_json::to_string(&execution.logs).unwrap();
tracing::error!(%json_tx, %json_receipt, %json_execution_logs, "mismatch reexecuting transaction");
return Err(e);
};

// track metrics
#[cfg(feature = "metrics")]
metrics::inc_executor_external_transaction(start.elapsed());

Ok((execution, execution_metrics))
}
Err(e) => {
let json_tx = serde_json::to_string(&tx).unwrap();
let json_receipt = serde_json::to_string(&receipt).unwrap();
tracing::error!(reason = ?e, %json_tx, %json_receipt, "unexpected error reexecuting transaction");
Err(e)
}
}
}

// -------------------------------------------------------------------------
// Direct transactions
// -------------------------------------------------------------------------

/// Executes a transaction persisting state changes.
pub async fn transact(&self, transaction: TransactionInput) -> anyhow::Result<Execution> {
#[cfg(feature = "metrics")]
Expand Down
2 changes: 1 addition & 1 deletion src/eth/primitives/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct Execution {

impl Execution {
/// Creates an execution from an external transaction that failed.
pub fn from_failed_external_transaction(block: &ExternalBlock, receipt: &ExternalReceipt, sender: Account) -> anyhow::Result<Self> {
pub fn from_failed_external_transaction(sender: Account, receipt: &ExternalReceipt, block: &ExternalBlock) -> anyhow::Result<Self> {
if receipt.is_success() {
return log_and_err!("cannot create failed execution for successful transaction");
}
Expand Down

0 comments on commit 46c07c7

Please sign in to comment.