From da193749f125f29ed2bca3adf54079979da023dc Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Wed, 8 May 2024 14:46:07 -0300 Subject: [PATCH] feat: miner read all values from storage (#808) --- src/bin/importer_offline.rs | 6 +- src/bin/importer_online.rs | 6 +- src/eth/block_miner.rs | 122 +++++++++++------- src/eth/executor.rs | 1 + src/eth/primitives/external_transaction.rs | 6 + src/eth/primitives/mod.rs | 2 + src/eth/primitives/transaction_kind.rs | 4 + .../storage/inmemory/inmemory_temporary.rs | 21 +++ src/eth/storage/rocks/rocks_temporary.rs | 9 ++ src/eth/storage/temporary_storage.rs | 7 + tests/test_import_external_snapshot_common.rs | 2 +- 11 files changed, 133 insertions(+), 53 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index e6a84e0b5..9cd6897a0 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -179,9 +179,11 @@ async fn execute_block_importer( #[cfg(feature = "metrics")] let start = metrics::now(); - // re-execute and mine + // re-execute block executor.reexecute_external(&block, &receipts).await?; - let mined_block = miner.mine_external(&block).await?; + + // mine block + let mined_block = miner.mine_external().await?; storage.temp.remove_executions_before(mined_block.transactions.len()).await?; // export to csv OR permanent storage diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index a9608e063..91a971cf3 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -74,9 +74,11 @@ async fn import(executor: &Executor, miner: &BlockMiner, chain: &BlockchainClien let receipts = futures::stream::iter(receipts).buffered(RECEIPTS_PARALELLISM).try_collect::>().await?; let receipts: ExternalReceipts = receipts.into(); - // import block + // re-execute block executor.reexecute_external(&block, &receipts).await?; - let mined_block = miner.mine_mixed(&block).await?; + + // mine block + let mined_block = miner.mine_mixed().await?; miner.commit(mined_block).await?; // track metrics diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index 3f513e2bb..67039db5f 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -7,11 +7,15 @@ use tokio::sync::broadcast; use crate::eth::primitives::Block; use crate::eth::primitives::BlockHeader; +use crate::eth::primitives::BlockNumber; use crate::eth::primitives::EvmExecution; use crate::eth::primitives::ExternalBlock; +use crate::eth::primitives::ExternalTransactionExecution; use crate::eth::primitives::Hash; use crate::eth::primitives::Index; +use crate::eth::primitives::LocalTransactionExecution; use crate::eth::primitives::LogMined; +use crate::eth::primitives::TransactionExecution; use crate::eth::primitives::TransactionInput; use crate::eth::primitives::TransactionKind; use crate::eth::primitives::TransactionMined; @@ -47,63 +51,37 @@ impl BlockMiner { } /// Mine a block from an external block. - /// - /// TODO: external_block must come from storage. - /// TODO: validate if transactions really belong to the specified block. - pub async fn mine_external(&self, external_block: &ExternalBlock) -> anyhow::Result { - let txs = self.storage.temp.read_executions().await?; - - // mine external transactions - // fails if finds a transaction that is not external - let mut mined_txs = Vec::with_capacity(txs.len()); - for tx in txs { - let TransactionKind::External(external_tx, external_receipt) = tx.kind else { - return log_and_err!("cannot mine external block because one of the transactions is not an external transaction"); - }; - let mined_tx = TransactionMined::from_external(external_tx, external_receipt, tx.execution)?; - mined_txs.push(mined_tx); + pub async fn mine_external(&self) -> anyhow::Result { + // retrieve data + let (external_block, txs) = read_external_block_and_executions(&self.storage).await?; + let (local_txs, external_txs) = partition_transactions(txs); + + // validate + if not(local_txs.is_empty()) { + return log_and_err!("cannot mine external block because one of the transactions is not an external transaction"); } - Ok(Block { - header: BlockHeader::try_from(external_block)?, - transactions: mined_txs, - }) + // mine external transactions + let mined_txs = mine_external_transactions(external_block.number(), external_txs)?; + block_from_external(external_block, mined_txs) } /// Mine a block from an external block and local failed transactions. - /// - /// TODO: external_block must come from storage. - /// TODO: validate if transactions really belong to the specified block. - pub async fn mine_mixed(&self, external_block: &ExternalBlock) -> anyhow::Result { - let txs = self.storage.temp.read_executions().await?; + pub async fn mine_mixed(&self) -> anyhow::Result { + // retrieve data + let (external_block, txs) = read_external_block_and_executions(&self.storage).await?; + let (local_txs, external_txs) = partition_transactions(txs); // mine external transactions - let mut mined_txs = Vec::with_capacity(txs.len()); - let mut failed_txs = Vec::new(); - for tx in txs { - match tx.kind { - TransactionKind::External(external_tx, external_receipt) => { - let mined_tx = TransactionMined::from_external(external_tx, external_receipt, tx.execution)?; - mined_txs.push(mined_tx); - } - TransactionKind::Local(tx_input) => { - failed_txs.push((tx_input, tx.execution)); - } - } - } - - let mut block = Block { - header: BlockHeader::try_from(external_block)?, - transactions: mined_txs, - }; + let mined_txs = mine_external_transactions(external_block.number(), external_txs)?; + let mut block = block_from_external(external_block, mined_txs)?; - // mine failed transactions - // fails if finds a local transaction that is not a failure - for (failed_tx_input, failed_tx_execution) in failed_txs { - if failed_tx_execution.is_success() { - return log_and_err!("cannot mine mixed block because one of the local execution is not a failure"); + // mine local transactions + for (tx, execution) in local_txs { + if execution.is_success() { + return log_and_err!("cannot mine mixed block because one of the local execution is a success"); } - block.push_execution(failed_tx_input, failed_tx_execution); + block.push_execution(tx, execution); } Ok(block) @@ -219,3 +197,51 @@ impl BlockMiner { Ok(()) } } + +// ----------------------------------------------------------------------------- +// Helpers +// ----------------------------------------------------------------------------- + +async fn read_external_block_and_executions(storage: &StratusStorage) -> anyhow::Result<(ExternalBlock, Vec)> { + let block = match storage.temp.read_external_block().await { + Ok(Some(block)) => block, + Ok(None) => return log_and_err!("no active external block being re-executed"), + Err(e) => return Err(e), + }; + let txs = storage.temp.read_executions().await?; + + Ok((block, txs)) +} + +fn partition_transactions(txs: Vec) -> (Vec, Vec) { + let mut local_txs = Vec::with_capacity(txs.len()); + let mut external_txs = Vec::with_capacity(txs.len()); + + for tx in txs { + match tx.kind { + TransactionKind::Local(tx_input) => local_txs.push((tx_input, tx.execution)), + TransactionKind::External(external_tx, external_receipt) => { + external_txs.push((external_tx, external_receipt, tx.execution)); + } + } + } + (local_txs, external_txs) +} + +fn mine_external_transactions(block_number: BlockNumber, txs: Vec) -> anyhow::Result> { + let mut mined_txs = Vec::with_capacity(txs.len()); + for (tx, receipt, execution) in txs { + if tx.block_number() != block_number { + return log_and_err!("cannot mine external block because one of the transactions does not belong to the external block"); + } + mined_txs.push(TransactionMined::from_external(tx, receipt, execution)?); + } + Ok(mined_txs) +} + +fn block_from_external(external_block: ExternalBlock, mined_txs: Vec) -> anyhow::Result { + Ok(Block { + header: BlockHeader::try_from(&external_block)?, + transactions: mined_txs, + }) +} diff --git a/src/eth/executor.rs b/src/eth/executor.rs index 7befe09a4..56df2884b 100644 --- a/src/eth/executor.rs +++ b/src/eth/executor.rs @@ -89,6 +89,7 @@ impl Executor { let storage = &self.storage; // track active block number + storage.temp.set_external_block(block.clone()).await?; storage.set_active_block_number(block.number()).await?; // execute mixing serial and parallel approaches diff --git a/src/eth/primitives/external_transaction.rs b/src/eth/primitives/external_transaction.rs index 4b1e5855f..f8a2ae6cb 100644 --- a/src/eth/primitives/external_transaction.rs +++ b/src/eth/primitives/external_transaction.rs @@ -1,5 +1,6 @@ use ethers_core::types::Transaction as EthersTransaction; +use crate::eth::primitives::BlockNumber; use crate::eth::primitives::Hash; #[derive(Debug, Clone, Default, derive_more:: Deref, serde::Deserialize, serde::Serialize)] @@ -7,6 +8,11 @@ use crate::eth::primitives::Hash; pub struct ExternalTransaction(#[deref] pub EthersTransaction); impl ExternalTransaction { + /// Returns the block number where the transaction was mined. + pub fn block_number(&self) -> BlockNumber { + self.0.block_number.unwrap().into() + } + /// Returns the transaction hash. pub fn hash(&self) -> Hash { self.0.hash.into() diff --git a/src/eth/primitives/mod.rs b/src/eth/primitives/mod.rs index ecf77f2fe..616f32314 100644 --- a/src/eth/primitives/mod.rs +++ b/src/eth/primitives/mod.rs @@ -177,6 +177,8 @@ pub use slot_value::SlotValue; pub use storage_point_in_time::StoragePointInTime; pub use transaction_execution::TransactionExecution; pub use transaction_input::TransactionInput; +pub use transaction_kind::ExternalTransactionExecution; +pub use transaction_kind::LocalTransactionExecution; pub use transaction_kind::TransactionKind; pub use transaction_mined::TransactionMined; pub use unix_time::UnixTime; diff --git a/src/eth/primitives/transaction_kind.rs b/src/eth/primitives/transaction_kind.rs index 1e8e5e34c..c53a36fa7 100644 --- a/src/eth/primitives/transaction_kind.rs +++ b/src/eth/primitives/transaction_kind.rs @@ -2,6 +2,7 @@ use display_json::DebugAsJson; +use crate::eth::primitives::EvmExecution; use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::ExternalTransaction; use crate::eth::primitives::TransactionInput; @@ -14,3 +15,6 @@ pub enum TransactionKind { /// Transaction that imported from external source. External(ExternalTransaction, ExternalReceipt), } + +pub type LocalTransactionExecution = (TransactionInput, EvmExecution); +pub type ExternalTransactionExecution = (ExternalTransaction, ExternalReceipt, EvmExecution); diff --git a/src/eth/storage/inmemory/inmemory_temporary.rs b/src/eth/storage/inmemory/inmemory_temporary.rs index b9f1b3a74..094fb91d0 100644 --- a/src/eth/storage/inmemory/inmemory_temporary.rs +++ b/src/eth/storage/inmemory/inmemory_temporary.rs @@ -10,6 +10,7 @@ use tokio::sync::RwLockWriteGuard; use crate::eth::primitives::Account; use crate::eth::primitives::Address; use crate::eth::primitives::BlockNumber; +use crate::eth::primitives::ExternalBlock; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::TransactionExecution; @@ -18,13 +19,22 @@ use crate::eth::storage::TemporaryStorage; #[derive(Debug, Default)] pub struct InMemoryTemporaryStorageState { + /// External block being re-executed. + pub external_block: Option, + + /// Pending transactions executions during block execution. pub tx_executions: Vec, + + /// Pending accounts modified during block execution. pub accounts: HashMap, + + /// Pending slots modified during block execution. pub active_block_number: Option, } impl InMemoryTemporaryStorageState { pub fn reset(&mut self) { + self.external_block = None; self.tx_executions.clear(); self.accounts.clear(); self.active_block_number = None; @@ -63,6 +73,17 @@ impl InMemoryTemporaryStorage { #[async_trait] impl TemporaryStorageExecutionOps for InMemoryTemporaryStorage { + async fn set_external_block(&self, block: ExternalBlock) -> anyhow::Result<()> { + let mut state = self.lock_write().await; + state.external_block = Some(block); + Ok(()) + } + + async fn read_external_block(&self) -> anyhow::Result> { + let state = self.lock_read().await; + Ok(state.external_block.clone()) + } + async fn save_execution(&self, tx: TransactionExecution) -> anyhow::Result<()> { let mut state = self.lock_write().await; tracing::debug!(hash = %tx.hash(), tx_executions_len = %state.tx_executions.len(), "saving execution"); diff --git a/src/eth/storage/rocks/rocks_temporary.rs b/src/eth/storage/rocks/rocks_temporary.rs index 300f7540a..30337a316 100644 --- a/src/eth/storage/rocks/rocks_temporary.rs +++ b/src/eth/storage/rocks/rocks_temporary.rs @@ -8,6 +8,7 @@ use super::rocks_state::RocksStorageState; use crate::eth::primitives::Account; use crate::eth::primitives::Address; use crate::eth::primitives::BlockNumber; +use crate::eth::primitives::ExternalBlock; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::StoragePointInTime; @@ -40,6 +41,14 @@ impl RocksTemporaryStorage { #[async_trait] impl TemporaryStorageExecutionOps for RocksTemporaryStorage { + async fn set_external_block(&self, block: ExternalBlock) -> anyhow::Result<()> { + self.temp.set_external_block(block).await + } + + async fn read_external_block(&self) -> anyhow::Result> { + self.temp.read_external_block().await + } + async fn save_execution(&self, tx: TransactionExecution) -> anyhow::Result<()> { self.temp.save_execution(tx).await?; Ok(()) diff --git a/src/eth/storage/temporary_storage.rs b/src/eth/storage/temporary_storage.rs index 2a2167f58..bff3f5df9 100644 --- a/src/eth/storage/temporary_storage.rs +++ b/src/eth/storage/temporary_storage.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use crate::eth::primitives::Account; use crate::eth::primitives::Address; use crate::eth::primitives::BlockNumber; +use crate::eth::primitives::ExternalBlock; use crate::eth::primitives::Slot; use crate::eth::primitives::SlotIndex; use crate::eth::primitives::TransactionExecution; @@ -47,6 +48,12 @@ pub trait TemporaryStorage: Send + Sync + TemporaryStorageExecutionOps { #[async_trait] pub trait TemporaryStorageExecutionOps { + /// Sets the external block being re-executed. + async fn set_external_block(&self, block: ExternalBlock) -> anyhow::Result<()>; + + /// Reads an external block being re-executed. + async fn read_external_block(&self) -> anyhow::Result>; + /// Saves an executed transaction. async fn save_execution(&self, tx: TransactionExecution) -> anyhow::Result<()>; diff --git a/tests/test_import_external_snapshot_common.rs b/tests/test_import_external_snapshot_common.rs index 6f4e80737..dffd78009 100644 --- a/tests/test_import_external_snapshot_common.rs +++ b/tests/test_import_external_snapshot_common.rs @@ -159,7 +159,7 @@ pub async fn execute_test( // execute and mine executor.reexecute_external(&block, &receipts).await.unwrap(); - let mined_block = miner.mine_external(&block).await.unwrap(); + let mined_block = miner.mine_external().await.unwrap(); miner.commit(mined_block).await.unwrap(); // get metrics from prometheus (sleep to ensure prometheus collected)