From 237b75cfca8215b0de147d32d6b724b28bfdffcc Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Fri, 26 Apr 2024 12:35:15 -0300 Subject: [PATCH] feat: initial version of parallel execution (#708) --- Cargo.lock | 4 +- Cargo.toml | 9 +- src/config.rs | 2 +- src/eth/executor.rs | 131 +++++++++++++++--- src/eth/primitives/execution.rs | 8 +- src/eth/primitives/external_transaction.rs | 7 +- src/eth/primitives/transaction_mined.rs | 2 +- tests/test_import_external_snapshot_common.rs | 10 +- .../test_import_external_snapshot_postgres.rs | 4 +- 9 files changed, 140 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2a70853e8..a4f53be89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1246,9 +1246,9 @@ dependencies = [ [[package]] name = "either" -version = "1.9.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" dependencies = [ "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 2824eeaf2..de9659a83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,15 +144,18 @@ dev = [] # Application is running in performance test mode. perf = [] -# Enable prefetching slots during EVM execution. -evm-slot-prefetch = [] - # Enable metrics dependencies and code for metrics collection. metrics = ["dep:metrics", "dep:metrics-exporter-prometheus"] # Enable RocksDB dependencies. rocks = ["rocksdb", "bincode", "kube", "k8s-openapi"] +# XXX: Enable external transaction parallel execution. +executor-parallel = [] + +# XXX: Enable prefetching slots during EVM execution. +evm-slot-prefetch = [] + # ------------------------------------------------------------------------------ # Lints # ------------------------------------------------------------------------------ diff --git a/src/config.rs b/src/config.rs index 937e11323..f05730caf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -235,7 +235,7 @@ impl ExecutorConfig { .expect("spawning evm threads should not fail"); } - Arc::new(EthExecutor::new(evm_tx, Arc::clone(&storage), self.forward_to.as_ref()).await) + Arc::new(EthExecutor::new(Arc::clone(&storage), evm_tx, self.num_evms, self.forward_to.as_ref()).await) } } diff --git a/src/eth/executor.rs b/src/eth/executor.rs index 66b4c9f7f..371a2ac01 100644 --- a/src/eth/executor.rs +++ b/src/eth/executor.rs @@ -1,3 +1,7 @@ +#![allow(dead_code)] +#![allow(unused_imports)] +// TODO: Remove clippy `allow` after feature-flags are enabled. + //! EthExecutor: Ethereum Transaction Coordinator //! //! This module provides the `EthExecutor` struct, which acts as a coordinator for executing Ethereum transactions. @@ -8,6 +12,7 @@ use std::sync::Arc; use anyhow::anyhow; +use futures::StreamExt; use itertools::Itertools; use tokio::sync::broadcast; use tokio::sync::oneshot; @@ -43,26 +48,29 @@ pub type EvmTask = (EvmInput, oneshot::Sender /// It holds references to the EVM, block miner, and storage, managing the overall process of /// transaction execution, block production, and state management. pub struct EthExecutor { - // Channel to send transactions to background EVMs. + /// Channel to send transactions to background EVMs. evm_tx: crossbeam_channel::Sender, - // Mutex-wrapped miner for creating new blockchain blocks. + // Number of running EVMs. + num_evms: usize, + + /// Mutex-wrapped miner for creating new blockchain blocks. miner: Mutex, - // Provider for sending rpc calls to substrate + /// Provider for sending rpc calls to substrate relay: Option, - // Shared storage backend for persisting blockchain state. + /// Shared storage backend for persisting blockchain state. storage: Arc, - // Broadcast channels for notifying subscribers about new blocks and logs. + /// Broadcast channels for notifying subscribers about new blocks and logs. block_notifier: broadcast::Sender, log_notifier: broadcast::Sender, } impl EthExecutor { /// Creates a new executor. - pub async fn new(evm_tx: crossbeam_channel::Sender, storage: Arc, forward_to: Option<&String>) -> Self { + pub async fn new(storage: Arc, evm_tx: crossbeam_channel::Sender, num_evms: usize, forward_to: Option<&String>) -> Self { let relay = match forward_to { Some(rpc_url) => Some(TransactionRelay::new(rpc_url).await.expect("failed to instantiate the relay")), None => None, @@ -70,6 +78,7 @@ impl EthExecutor { Self { evm_tx, + num_evms, miner: Mutex::new(BlockMiner::new(Arc::clone(&storage))), storage, block_notifier: broadcast::channel(NOTIFIER_CAPACITY).0, @@ -115,31 +124,100 @@ impl EthExecutor { tracing::info!(number = %block.number(), "importing external block"); // track active block number - self.storage.set_active_block_number(block.number()).await?; + let storage = &self.storage; + storage.set_active_block_number(block.number()).await?; - // re-execute transactions + // track executions let mut executions: Vec = Vec::with_capacity(block.transactions.len()); - for tx in &block.transactions { - // re-execute transaction - let receipt = receipts.try_get(&tx.hash())?; - let (execution, _execution_metrics) = self.reexecute_external(tx, receipt, &block).await?; - - // track execution changes - self.storage - .save_account_changes_to_temp(execution.changes.values().cloned().collect_vec()) - .await?; - executions.push((tx.clone(), receipt.clone(), execution)); - - // track metrics - #[cfg(feature = "metrics")] - { - block_metrics += _execution_metrics; + let mut executions_metrics: Vec = Vec::with_capacity(block.transactions.len()); + + // serial execution + #[cfg(not(feature = "executor-parallel"))] + { + for tx in &block.transactions { + let receipt = receipts.try_get(&tx.hash())?; + let (exec, exec_metrics) = self.reexecute_external(tx, receipt, &block).await?; + + storage.save_account_changes_to_temp(exec.changes_to_persist()).await?; + + executions.push(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), exec)); + executions_metrics.push(exec_metrics); + } + } + + // parallel execution + #[cfg(feature = "executor-parallel")] + { + // prepare execution for parallelism + let mut parallel_executions = Vec::with_capacity(block.transactions.len()); + for tx in &block.transactions { + let receipt = receipts.try_get(&tx.hash())?; + parallel_executions.push(self.reexecute_external(tx, receipt, &block)); + } + + // execute in parallel + let mut tx_index: usize = 0; + let mut tx_par_executions = futures::stream::iter(parallel_executions).buffered(self.num_evms); + let mut prev_tx_execution: Option = None; + + while let Some(result) = tx_par_executions.next().await { + // check result of parallel execution + let decision = match (result, &prev_tx_execution) { + // + // execution success and no previous transaction execution + (Ok(result), None) => ExecutionDecision::Proceed(tx_index, result), + // + // execution success, but with previous transaction execution + (Ok(result), Some(prev_tx_execution)) => { + let current_execution = &result.0; + let conflicts = prev_tx_execution.check_conflicts(current_execution); + match conflicts { + None => ExecutionDecision::Proceed(tx_index, result), + Some(conflicts) => { + tracing::warn!(?conflicts, "parallel execution conflicts"); + ExecutionDecision::Reexecute(tx_index) + } + } + } + // + // execution failure + (Err(e), _) => { + tracing::warn!(reason = ?e, "parallel execution failed"); + ExecutionDecision::Reexecute(tx_index) + } + }; + + // re-execute if necessary + // TODO: execution must return tx and receipt to avoid having to retrieve them again + let (tx, receipt, exec, exec_metrics) = match decision { + ExecutionDecision::Proceed(tx_index, result) => { + let tx = &block.transactions[tx_index]; + let receipt = receipts.try_get(&tx.hash())?; + (tx, receipt, result.0, result.1) + } + ExecutionDecision::Reexecute(tx_index) => { + let tx = &block.transactions[tx_index]; + let receipt = receipts.try_get(&tx.hash())?; + let result = self.reexecute_external(tx, receipt, &block).await?; + (tx, receipt, result.0, result.1) + } + }; + + storage.save_account_changes_to_temp(exec.changes_to_persist()).await?; + + tx_index += 1; + prev_tx_execution = Some(exec.clone()); + executions.push(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), exec)); + executions_metrics.push(exec_metrics); } } // track metrics #[cfg(feature = "metrics")] { + for execution_metrics in executions_metrics { + block_metrics += execution_metrics; + } metrics::inc_executor_external_block(start.elapsed()); metrics::inc_executor_external_block_account_reads(block_metrics.account_reads); metrics::inc_executor_external_block_slot_reads(block_metrics.slot_reads); @@ -333,3 +411,10 @@ impl EthExecutor { execution_rx.await? } } + +enum ExecutionDecision { + /// Parallel execution succeeded and can be persisted. + Proceed(usize, (Execution, ExecutionMetrics)), + /// Parallel execution failed and must be re-executed in a serial manner. + Reexecute(usize), +} diff --git a/src/eth/primitives/execution.rs b/src/eth/primitives/execution.rs index 8a8b1b2ca..51f4d0637 100644 --- a/src/eth/primitives/execution.rs +++ b/src/eth/primitives/execution.rs @@ -11,6 +11,7 @@ use std::fmt::Debug; use anyhow::anyhow; use anyhow::Ok; +use itertools::Itertools; use crate::eth::primitives::Account; use crate::eth::primitives::Address; @@ -92,7 +93,7 @@ impl Execution { matches!(self.result, ExecutionResult::Success { .. }) } - /// When the transaction is a contract deployment, returns the address of the deployed contract. + /// Returns the address of the deployed contract if the transaction is a deployment. pub fn contract_address(&self) -> Option
{ if let Some(contract_address) = &self.deployed_contract_address { return Some(contract_address.to_owned()); @@ -106,6 +107,11 @@ impl Execution { None } + /// Returns account changes to be persisted. + pub fn changes_to_persist(&self) -> Vec { + self.changes.values().cloned().collect_vec() + } + /// Checks conflicts between two executions. /// /// Assumes self is the present execution and next should happen after self in a serialized context. diff --git a/src/eth/primitives/external_transaction.rs b/src/eth/primitives/external_transaction.rs index ebd0a97fb..459d84936 100644 --- a/src/eth/primitives/external_transaction.rs +++ b/src/eth/primitives/external_transaction.rs @@ -5,7 +5,12 @@ use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::Hash; use crate::eth::primitives::TransactionInput; -pub type ExternalTransactionExecution = (ExternalTransaction, ExternalReceipt, Execution); +#[derive(Debug, Clone, derive_new::new)] +pub struct ExternalTransactionExecution { + pub tx: ExternalTransaction, + pub receipt: ExternalReceipt, + pub execution: Execution, +} #[derive(Debug, Clone, Default, derive_more:: Deref, serde::Deserialize, serde::Serialize)] #[serde(transparent)] diff --git a/src/eth/primitives/transaction_mined.rs b/src/eth/primitives/transaction_mined.rs index d434895c3..9c788a41a 100644 --- a/src/eth/primitives/transaction_mined.rs +++ b/src/eth/primitives/transaction_mined.rs @@ -48,7 +48,7 @@ impl TransactionMined { /// /// TODO: this kind of conversion should be infallibe. pub fn from_external(execution: ExternalTransactionExecution) -> anyhow::Result { - let (tx, receipt, execution) = execution; + let ExternalTransactionExecution { tx, receipt, execution } = execution; Ok(Self { input: tx.try_into()?, execution, diff --git a/tests/test_import_external_snapshot_common.rs b/tests/test_import_external_snapshot_common.rs index 6b590f818..19b0200bf 100644 --- a/tests/test_import_external_snapshot_common.rs +++ b/tests/test_import_external_snapshot_common.rs @@ -24,6 +24,7 @@ mod m { pub use const_format::formatcp; pub use stratus::infra::metrics::METRIC_EVM_EXECUTION; pub use stratus::infra::metrics::METRIC_EVM_EXECUTION_SLOT_READS_CACHED; + pub use stratus::infra::metrics::METRIC_EXECUTOR_EXTERNAL_BLOCK; pub use stratus::infra::metrics::METRIC_STORAGE_COMMIT; pub use stratus::infra::metrics::METRIC_STORAGE_READ_ACCOUNT; pub use stratus::infra::metrics::METRIC_STORAGE_READ_SLOT; @@ -31,7 +32,10 @@ mod m { } #[cfg(feature = "metrics")] -const METRIC_QUERIES: [&str; 46] = [ +const METRIC_QUERIES: [&str; 48] = [ + // Executor + "* Executor", + m::formatcp!("{}_sum", m::METRIC_EXECUTOR_EXTERNAL_BLOCK), // EVM "* EVM", m::formatcp!("{}_count", m::METRIC_EVM_EXECUTION), @@ -98,6 +102,8 @@ pub fn init_config_and_data( // init config let mut global_services = GlobalServices::::init(); global_services.config.executor.chain_id = 2009; + global_services.config.executor.num_evms = 8; + global_services.config.stratus_storage.perm_storage.perm_storage_connections = 9; // init block data let block_json = fs::read_to_string(format!("tests/fixtures/snapshots/{}/block.json", block_number)).unwrap(); @@ -145,8 +151,6 @@ pub async fn execute_test( ) { println!("Executing: {}", test_name); - // restart prometheus, so the metrics are reset - // init executor and execute let storage = StratusStorage::new(Arc::new(InMemoryTemporaryStorage::new()), Arc::new(perm_storage)); let executor = config.executor.init(Arc::new(storage)).await; diff --git a/tests/test_import_external_snapshot_postgres.rs b/tests/test_import_external_snapshot_postgres.rs index 35a6dc886..77f4bfe80 100644 --- a/tests/test_import_external_snapshot_postgres.rs +++ b/tests/test_import_external_snapshot_postgres.rs @@ -20,8 +20,8 @@ fn test_import_external_snapshot_with_postgres() { let pg = PostgresPermanentStorage::new(PostgresPermanentStorageConfig { url: docker.postgres_connection_url().to_string(), - connections: 5, - acquire_timeout: Duration::from_secs(10), + connections: global_services.config.stratus_storage.perm_storage.perm_storage_connections, + acquire_timeout: Duration::from_millis(global_services.config.stratus_storage.perm_storage.perm_storage_timeout_millis), }) .await .unwrap();