Skip to content

Commit

Permalink
feat: initial version of parallel execution (#708)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored Apr 26, 2024
1 parent dc0d510 commit 237b75c
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 37 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,18 @@ dev = []
# Application is running in performance test mode.
perf = []

# Enable prefetching slots during EVM execution.
evm-slot-prefetch = []

# Enable metrics dependencies and code for metrics collection.
metrics = ["dep:metrics", "dep:metrics-exporter-prometheus"]

# Enable RocksDB dependencies.
rocks = ["rocksdb", "bincode", "kube", "k8s-openapi"]

# XXX: Enable external transaction parallel execution.
executor-parallel = []

# XXX: Enable prefetching slots during EVM execution.
evm-slot-prefetch = []

# ------------------------------------------------------------------------------
# Lints
# ------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl ExecutorConfig {
.expect("spawning evm threads should not fail");
}

Arc::new(EthExecutor::new(evm_tx, Arc::clone(&storage), self.forward_to.as_ref()).await)
Arc::new(EthExecutor::new(Arc::clone(&storage), evm_tx, self.num_evms, self.forward_to.as_ref()).await)
}
}

Expand Down
131 changes: 108 additions & 23 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#![allow(dead_code)]
#![allow(unused_imports)]
// TODO: Remove clippy `allow` after feature-flags are enabled.

//! EthExecutor: Ethereum Transaction Coordinator
//!
//! This module provides the `EthExecutor` struct, which acts as a coordinator for executing Ethereum transactions.
Expand All @@ -8,6 +12,7 @@
use std::sync::Arc;

use anyhow::anyhow;
use futures::StreamExt;
use itertools::Itertools;
use tokio::sync::broadcast;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -43,33 +48,37 @@ pub type EvmTask = (EvmInput, oneshot::Sender<anyhow::Result<EvmExecutionResult>
/// It holds references to the EVM, block miner, and storage, managing the overall process of
/// transaction execution, block production, and state management.
pub struct EthExecutor {
// Channel to send transactions to background EVMs.
/// Channel to send transactions to background EVMs.
evm_tx: crossbeam_channel::Sender<EvmTask>,

// Mutex-wrapped miner for creating new blockchain blocks.
// Number of running EVMs.
num_evms: usize,

/// Mutex-wrapped miner for creating new blockchain blocks.
miner: Mutex<BlockMiner>,

// Provider for sending rpc calls to substrate
/// Provider for sending rpc calls to substrate
relay: Option<TransactionRelay>,

// Shared storage backend for persisting blockchain state.
/// Shared storage backend for persisting blockchain state.
storage: Arc<StratusStorage>,

// Broadcast channels for notifying subscribers about new blocks and logs.
/// Broadcast channels for notifying subscribers about new blocks and logs.
block_notifier: broadcast::Sender<Block>,
log_notifier: broadcast::Sender<LogMined>,
}

impl EthExecutor {
/// Creates a new executor.
pub async fn new(evm_tx: crossbeam_channel::Sender<EvmTask>, storage: Arc<StratusStorage>, forward_to: Option<&String>) -> Self {
pub async fn new(storage: Arc<StratusStorage>, evm_tx: crossbeam_channel::Sender<EvmTask>, num_evms: usize, forward_to: Option<&String>) -> Self {
let relay = match forward_to {
Some(rpc_url) => Some(TransactionRelay::new(rpc_url).await.expect("failed to instantiate the relay")),
None => None,
};

Self {
evm_tx,
num_evms,
miner: Mutex::new(BlockMiner::new(Arc::clone(&storage))),
storage,
block_notifier: broadcast::channel(NOTIFIER_CAPACITY).0,
Expand Down Expand Up @@ -115,31 +124,100 @@ impl EthExecutor {
tracing::info!(number = %block.number(), "importing external block");

// track active block number
self.storage.set_active_block_number(block.number()).await?;
let storage = &self.storage;
storage.set_active_block_number(block.number()).await?;

// re-execute transactions
// track executions
let mut executions: Vec<ExternalTransactionExecution> = Vec::with_capacity(block.transactions.len());
for tx in &block.transactions {
// re-execute transaction
let receipt = receipts.try_get(&tx.hash())?;
let (execution, _execution_metrics) = self.reexecute_external(tx, receipt, &block).await?;

// track execution changes
self.storage
.save_account_changes_to_temp(execution.changes.values().cloned().collect_vec())
.await?;
executions.push((tx.clone(), receipt.clone(), execution));

// track metrics
#[cfg(feature = "metrics")]
{
block_metrics += _execution_metrics;
let mut executions_metrics: Vec<ExecutionMetrics> = Vec::with_capacity(block.transactions.len());

// serial execution
#[cfg(not(feature = "executor-parallel"))]
{
for tx in &block.transactions {
let receipt = receipts.try_get(&tx.hash())?;
let (exec, exec_metrics) = self.reexecute_external(tx, receipt, &block).await?;

storage.save_account_changes_to_temp(exec.changes_to_persist()).await?;

executions.push(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), exec));
executions_metrics.push(exec_metrics);
}
}

// parallel execution
#[cfg(feature = "executor-parallel")]
{
// prepare execution for parallelism
let mut parallel_executions = Vec::with_capacity(block.transactions.len());
for tx in &block.transactions {
let receipt = receipts.try_get(&tx.hash())?;
parallel_executions.push(self.reexecute_external(tx, receipt, &block));
}

// execute in parallel
let mut tx_index: usize = 0;
let mut tx_par_executions = futures::stream::iter(parallel_executions).buffered(self.num_evms);
let mut prev_tx_execution: Option<Execution> = None;

while let Some(result) = tx_par_executions.next().await {
// check result of parallel execution
let decision = match (result, &prev_tx_execution) {
//
// execution success and no previous transaction execution
(Ok(result), None) => ExecutionDecision::Proceed(tx_index, result),
//
// execution success, but with previous transaction execution
(Ok(result), Some(prev_tx_execution)) => {
let current_execution = &result.0;
let conflicts = prev_tx_execution.check_conflicts(current_execution);
match conflicts {
None => ExecutionDecision::Proceed(tx_index, result),
Some(conflicts) => {
tracing::warn!(?conflicts, "parallel execution conflicts");
ExecutionDecision::Reexecute(tx_index)
}
}
}
//
// execution failure
(Err(e), _) => {
tracing::warn!(reason = ?e, "parallel execution failed");
ExecutionDecision::Reexecute(tx_index)
}
};

// re-execute if necessary
// TODO: execution must return tx and receipt to avoid having to retrieve them again
let (tx, receipt, exec, exec_metrics) = match decision {
ExecutionDecision::Proceed(tx_index, result) => {
let tx = &block.transactions[tx_index];
let receipt = receipts.try_get(&tx.hash())?;
(tx, receipt, result.0, result.1)
}
ExecutionDecision::Reexecute(tx_index) => {
let tx = &block.transactions[tx_index];
let receipt = receipts.try_get(&tx.hash())?;
let result = self.reexecute_external(tx, receipt, &block).await?;
(tx, receipt, result.0, result.1)
}
};

storage.save_account_changes_to_temp(exec.changes_to_persist()).await?;

tx_index += 1;
prev_tx_execution = Some(exec.clone());
executions.push(ExternalTransactionExecution::new(tx.clone(), receipt.clone(), exec));
executions_metrics.push(exec_metrics);
}
}

// track metrics
#[cfg(feature = "metrics")]
{
for execution_metrics in executions_metrics {
block_metrics += execution_metrics;
}
metrics::inc_executor_external_block(start.elapsed());
metrics::inc_executor_external_block_account_reads(block_metrics.account_reads);
metrics::inc_executor_external_block_slot_reads(block_metrics.slot_reads);
Expand Down Expand Up @@ -333,3 +411,10 @@ impl EthExecutor {
execution_rx.await?
}
}

enum ExecutionDecision {
/// Parallel execution succeeded and can be persisted.
Proceed(usize, (Execution, ExecutionMetrics)),
/// Parallel execution failed and must be re-executed in a serial manner.
Reexecute(usize),
}
8 changes: 7 additions & 1 deletion src/eth/primitives/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::fmt::Debug;

use anyhow::anyhow;
use anyhow::Ok;
use itertools::Itertools;

use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
Expand Down Expand Up @@ -92,7 +93,7 @@ impl Execution {
matches!(self.result, ExecutionResult::Success { .. })
}

/// When the transaction is a contract deployment, returns the address of the deployed contract.
/// Returns the address of the deployed contract if the transaction is a deployment.
pub fn contract_address(&self) -> Option<Address> {
if let Some(contract_address) = &self.deployed_contract_address {
return Some(contract_address.to_owned());
Expand All @@ -106,6 +107,11 @@ impl Execution {
None
}

/// Returns account changes to be persisted.
pub fn changes_to_persist(&self) -> Vec<ExecutionAccountChanges> {
self.changes.values().cloned().collect_vec()
}

/// Checks conflicts between two executions.
///
/// Assumes self is the present execution and next should happen after self in a serialized context.
Expand Down
7 changes: 6 additions & 1 deletion src/eth/primitives/external_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::Hash;
use crate::eth::primitives::TransactionInput;

pub type ExternalTransactionExecution = (ExternalTransaction, ExternalReceipt, Execution);
#[derive(Debug, Clone, derive_new::new)]
pub struct ExternalTransactionExecution {
pub tx: ExternalTransaction,
pub receipt: ExternalReceipt,
pub execution: Execution,
}

#[derive(Debug, Clone, Default, derive_more:: Deref, serde::Deserialize, serde::Serialize)]
#[serde(transparent)]
Expand Down
2 changes: 1 addition & 1 deletion src/eth/primitives/transaction_mined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl TransactionMined {
///
/// TODO: this kind of conversion should be infallibe.
pub fn from_external(execution: ExternalTransactionExecution) -> anyhow::Result<Self> {
let (tx, receipt, execution) = execution;
let ExternalTransactionExecution { tx, receipt, execution } = execution;
Ok(Self {
input: tx.try_into()?,
execution,
Expand Down
10 changes: 7 additions & 3 deletions tests/test_import_external_snapshot_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ mod m {
pub use const_format::formatcp;
pub use stratus::infra::metrics::METRIC_EVM_EXECUTION;
pub use stratus::infra::metrics::METRIC_EVM_EXECUTION_SLOT_READS_CACHED;
pub use stratus::infra::metrics::METRIC_EXECUTOR_EXTERNAL_BLOCK;
pub use stratus::infra::metrics::METRIC_STORAGE_COMMIT;
pub use stratus::infra::metrics::METRIC_STORAGE_READ_ACCOUNT;
pub use stratus::infra::metrics::METRIC_STORAGE_READ_SLOT;
pub use stratus::infra::metrics::METRIC_STORAGE_READ_SLOTS;
}

#[cfg(feature = "metrics")]
const METRIC_QUERIES: [&str; 46] = [
const METRIC_QUERIES: [&str; 48] = [
// Executor
"* Executor",
m::formatcp!("{}_sum", m::METRIC_EXECUTOR_EXTERNAL_BLOCK),
// EVM
"* EVM",
m::formatcp!("{}_count", m::METRIC_EVM_EXECUTION),
Expand Down Expand Up @@ -98,6 +102,8 @@ pub fn init_config_and_data(
// init config
let mut global_services = GlobalServices::<IntegrationTestConfig>::init();
global_services.config.executor.chain_id = 2009;
global_services.config.executor.num_evms = 8;
global_services.config.stratus_storage.perm_storage.perm_storage_connections = 9;

// init block data
let block_json = fs::read_to_string(format!("tests/fixtures/snapshots/{}/block.json", block_number)).unwrap();
Expand Down Expand Up @@ -145,8 +151,6 @@ pub async fn execute_test(
) {
println!("Executing: {}", test_name);

// restart prometheus, so the metrics are reset

// init executor and execute
let storage = StratusStorage::new(Arc::new(InMemoryTemporaryStorage::new()), Arc::new(perm_storage));
let executor = config.executor.init(Arc::new(storage)).await;
Expand Down
4 changes: 2 additions & 2 deletions tests/test_import_external_snapshot_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ fn test_import_external_snapshot_with_postgres() {

let pg = PostgresPermanentStorage::new(PostgresPermanentStorageConfig {
url: docker.postgres_connection_url().to_string(),
connections: 5,
acquire_timeout: Duration::from_secs(10),
connections: global_services.config.stratus_storage.perm_storage.perm_storage_connections,
acquire_timeout: Duration::from_millis(global_services.config.stratus_storage.perm_storage.perm_storage_timeout_millis),
})
.await
.unwrap();
Expand Down

0 comments on commit 237b75c

Please sign in to comment.