diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 2819a2249..1f2e83109 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -136,34 +136,35 @@ fn execute_block_importer( } // imports block transactions - let receipts = ExternalReceipts::from(receipts); - let mut transaction_count = 0; - let instant_before_execution = Instant::now(); + let mut receipts = ExternalReceipts::from(receipts); + let receipts_len = receipts.len(); + let mut tx_len = 0; + let before_blocks_execution = Instant::now(); for block in blocks.into_iter() { if GlobalState::is_shutdown_warn(TASK_NAME) { return Ok(()); } // re-execute (and import) block - executor.execute_external_block(&block, &receipts)?; - transaction_count += block.transactions.len(); + tx_len += block.transactions.len(); + receipts = executor.execute_external_block(block, receipts)?; // mine and save block let mined_block = miner.mine_external()?; miner.commit(mined_block.clone())?; } + let block_execution_duration = before_blocks_execution.elapsed(); - let duration = instant_before_execution.elapsed(); - let (tps, bpm) = calculate_tps_and_bpm(duration, transaction_count, blocks_len); + let (tps, bpm) = calculate_tps_and_bpm(block_execution_duration, tx_len, blocks_len); tracing::info!( tps, blocks_per_minute = format_args!("{bpm:.2}"), - ?duration, + ?block_execution_duration, %block_start, %block_end, - receipts = receipts.len(), + %receipts_len, "reexecuted blocks batch", ); } diff --git a/src/eth/executor/executor.rs b/src/eth/executor/executor.rs index f65f16d23..1f5e80bfc 100644 --- a/src/eth/executor/executor.rs +++ b/src/eth/executor/executor.rs @@ -1,4 +1,5 @@ use std::cmp::max; +use std::mem; use std::str::FromStr; use std::sync::Arc; use std::sync::Mutex; @@ -225,10 +226,13 @@ impl Executor { // ------------------------------------------------------------------------- /// Reexecutes an external block locally and imports it to the temporary storage. - pub fn execute_external_block(&self, block: &ExternalBlock, receipts: &ExternalReceipts) -> anyhow::Result<()> { + /// + /// Returns the remaining receipts that were not consumed by the execution. + pub fn execute_external_block(&self, mut block: ExternalBlock, mut receipts: ExternalReceipts) -> anyhow::Result { // track #[cfg(feature = "metrics")] let (start, mut block_metrics) = (metrics::now(), EvmExecutionMetrics::default()); + #[cfg(feature = "tracing")] let _span = info_span!("executor::external_block", block_number = %block.number()).entered(); tracing::info!(block_number = %block.number(), "reexecuting external block"); @@ -236,12 +240,13 @@ impl Executor { // track pending block let block_number = block.number(); let block_timestamp = block.timestamp(); - self.storage.set_pending_external_block(block.clone())?; + let block_transactions = mem::take(&mut block.transactions); + self.storage.set_pending_external_block(block)?; self.storage.set_pending_block_number(block_number)?; // determine how to execute each transaction - for tx in &block.transactions { - let receipt = receipts.try_get(&tx.hash())?; + for tx in block_transactions { + let receipt = receipts.try_take(&tx.hash())?; self.execute_external_transaction( tx, receipt, @@ -260,7 +265,7 @@ impl Executor { metrics::inc_executor_external_block_slot_reads(block_metrics.slot_reads); } - Ok(()) + Ok(receipts) } /// Reexecutes an external transaction locally ensuring it produces the same output. @@ -269,15 +274,16 @@ impl Executor { /// to facilitate re-execution of parallel transactions that failed fn execute_external_transaction( &self, - tx: &ExternalTransaction, - receipt: &ExternalReceipt, + tx: ExternalTransaction, + receipt: ExternalReceipt, block_number: BlockNumber, block_timestamp: UnixTime, #[cfg(feature = "metrics")] block_metrics: &mut EvmExecutionMetrics, ) -> anyhow::Result<()> { // track #[cfg(feature = "metrics")] - let start = metrics::now(); + let (start, tx_function) = (metrics::now(), codegen::function_sig_for_o11y(&tx.0.input)); + #[cfg(feature = "tracing")] let _span = info_span!("executor::external_transaction", tx_hash = %tx.hash).entered(); tracing::info!(%block_number, tx_hash = %tx.hash(), "reexecuting external transaction"); @@ -288,8 +294,8 @@ impl Executor { // successful external transaction, re-execute locally true => { // re-execute transaction - let evm_input = EvmInput::from_external(tx, receipt, block_number, block_timestamp)?; - let evm_execution = self.evms.execute(evm_input.clone(), EvmRoute::External); + let evm_input = EvmInput::from_external(&tx, &receipt, block_number, block_timestamp)?; + let evm_execution = self.evms.execute(evm_input, EvmRoute::External); // handle re-execution result let mut evm_execution = match evm_execution { @@ -303,10 +309,10 @@ impl Executor { }; // update execution with receipt - evm_execution.execution.apply_receipt(receipt)?; + evm_execution.execution.apply_receipt(&receipt)?; // ensure it matches receipt before saving - if let Err(e) = evm_execution.execution.compare_with_receipt(receipt) { + if let Err(e) = evm_execution.execution.compare_with_receipt(&receipt) { let json_tx = to_json_string(&tx); let json_receipt = to_json_string(&receipt); let json_execution_logs = to_json_string(&evm_execution.execution.logs); @@ -314,18 +320,18 @@ impl Executor { return Err(e); }; - ExternalTransactionExecution::new(tx.clone(), receipt.clone(), evm_execution) + ExternalTransactionExecution::new(tx, receipt, evm_execution) } // - // failed external transaction, re-cretea from receipt without re-executing + // failed external transaction, re-create from receipt without re-executing false => { let sender = self.storage.read_account(&receipt.from.into(), &StoragePointInTime::Pending)?; - let execution = EvmExecution::from_failed_external_transaction(sender, receipt, block_timestamp)?; + let execution = EvmExecution::from_failed_external_transaction(sender, &receipt, block_timestamp)?; let evm_result = EvmExecutionResult { execution, metrics: EvmExecutionMetrics::default(), }; - ExternalTransactionExecution::new(tx.clone(), receipt.clone(), evm_result) + ExternalTransactionExecution::new(tx, receipt, evm_result) } }; @@ -346,11 +352,10 @@ impl Executor { { *block_metrics += tx_metrics; - let function = codegen::function_sig_for_o11y(&tx.0.input); - metrics::inc_executor_external_transaction(start.elapsed(), function); - metrics::inc_executor_external_transaction_account_reads(tx_metrics.account_reads, function); - metrics::inc_executor_external_transaction_slot_reads(tx_metrics.slot_reads, function); - metrics::inc_executor_external_transaction_gas(tx_gas.as_u64() as usize, function); + metrics::inc_executor_external_transaction(start.elapsed(), tx_function); + metrics::inc_executor_external_transaction_account_reads(tx_metrics.account_reads, tx_function); + metrics::inc_executor_external_transaction_slot_reads(tx_metrics.slot_reads, tx_function); + metrics::inc_executor_external_transaction_gas(tx_gas.as_u64() as usize, tx_function); } Ok(()) diff --git a/src/eth/follower/importer/importer.rs b/src/eth/follower/importer/importer.rs index 71866d159..4622f9c8e 100644 --- a/src/eth/follower/importer/importer.rs +++ b/src/eth/follower/importer/importer.rs @@ -164,11 +164,12 @@ impl Importer { }; #[cfg(feature = "metrics")] - let start = metrics::now(); + let (start, block_number, block_tx_len) = (metrics::now(), block.number(), block.transactions.len()); // execute and mine let receipts = ExternalReceipts::from(receipts); - if let Err(e) = executor.execute_external_block(&block, &receipts) { + let receipts_len = receipts.len(); + if let Err(e) = executor.execute_external_block(block, receipts) { let message = GlobalState::shutdown_from(TASK_NAME, "failed to reexecute external block"); return log_and_err!(reason = e, message); }; @@ -177,13 +178,13 @@ impl Importer { #[cfg(feature = "metrics")] { let duration = start.elapsed(); - let tps = calculate_tps(duration, block.transactions.len()); + let tps = calculate_tps(duration, block_tx_len); tracing::info!( tps, - duraton = %duration.to_string_ext(), - block_number = %block.number(), - receipts = receipts.len(), + %block_number, + duration = %duration.to_string_ext(), + %receipts_len, "reexecuted external block", ); } @@ -195,7 +196,7 @@ impl Importer { #[cfg(feature = "metrics")] { - metrics::inc_n_importer_online_transactions_total(receipts.len() as u64); + metrics::inc_n_importer_online_transactions_total(receipts_len as u64); metrics::inc_import_online_mined_block(start.elapsed()); } } diff --git a/src/eth/primitives/external_block.rs b/src/eth/primitives/external_block.rs index 995fc76a3..ab2bcfdec 100644 --- a/src/eth/primitives/external_block.rs +++ b/src/eth/primitives/external_block.rs @@ -12,7 +12,7 @@ use crate::eth::primitives::Hash; use crate::eth::primitives::UnixTime; use crate::log_and_err; -#[derive(Debug, Clone, derive_more:: Deref, serde::Deserialize, serde::Serialize)] +#[derive(Debug, Clone, derive_more:: Deref, derive_more::DerefMut, serde::Deserialize, serde::Serialize)] #[serde(transparent)] pub struct ExternalBlock(#[deref] pub EthersBlockExternalTransaction); diff --git a/src/eth/primitives/external_receipts.rs b/src/eth/primitives/external_receipts.rs index 97d98e382..2b9c8d49f 100644 --- a/src/eth/primitives/external_receipts.rs +++ b/src/eth/primitives/external_receipts.rs @@ -19,8 +19,8 @@ impl ExternalReceipts { } /// Tries to take a receipt by its hash. - pub fn try_get(&self, tx_hash: &Hash) -> anyhow::Result<&ExternalReceipt> { - match self.get(tx_hash) { + pub fn try_take(&mut self, tx_hash: &Hash) -> anyhow::Result { + match self.take(tx_hash) { Some(receipt) => Ok(receipt), None => { tracing::error!(%tx_hash, "receipt is missing for hash"); @@ -30,8 +30,8 @@ impl ExternalReceipts { } /// Takes a receipt by its hash. - pub fn get(&self, hash: &Hash) -> Option<&ExternalReceipt> { - self.0.get(hash) + pub fn take(&mut self, hash: &Hash) -> Option { + self.0.remove(hash) } /// Returns the number of receipts.