Skip to content

Commit

Permalink
feat: miner read all values from storage (#808)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored May 8, 2024
1 parent 6a82ac4 commit da19374
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 53 deletions.
6 changes: 4 additions & 2 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ async fn execute_block_importer(
#[cfg(feature = "metrics")]
let start = metrics::now();

// re-execute and mine
// re-execute block
executor.reexecute_external(&block, &receipts).await?;
let mined_block = miner.mine_external(&block).await?;

// mine block
let mined_block = miner.mine_external().await?;
storage.temp.remove_executions_before(mined_block.transactions.len()).await?;

// export to csv OR permanent storage
Expand Down
6 changes: 4 additions & 2 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ async fn import(executor: &Executor, miner: &BlockMiner, chain: &BlockchainClien
let receipts = futures::stream::iter(receipts).buffered(RECEIPTS_PARALELLISM).try_collect::<Vec<_>>().await?;
let receipts: ExternalReceipts = receipts.into();

// import block
// re-execute block
executor.reexecute_external(&block, &receipts).await?;
let mined_block = miner.mine_mixed(&block).await?;

// mine block
let mined_block = miner.mine_mixed().await?;
miner.commit(mined_block).await?;

// track metrics
Expand Down
122 changes: 74 additions & 48 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ use tokio::sync::broadcast;

use crate::eth::primitives::Block;
use crate::eth::primitives::BlockHeader;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::EvmExecution;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalTransactionExecution;
use crate::eth::primitives::Hash;
use crate::eth::primitives::Index;
use crate::eth::primitives::LocalTransactionExecution;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::TransactionExecution;
use crate::eth::primitives::TransactionInput;
use crate::eth::primitives::TransactionKind;
use crate::eth::primitives::TransactionMined;
Expand Down Expand Up @@ -47,63 +51,37 @@ impl BlockMiner {
}

/// Mine a block from an external block.
///
/// TODO: external_block must come from storage.
/// TODO: validate if transactions really belong to the specified block.
pub async fn mine_external(&self, external_block: &ExternalBlock) -> anyhow::Result<Block> {
let txs = self.storage.temp.read_executions().await?;

// mine external transactions
// fails if finds a transaction that is not external
let mut mined_txs = Vec::with_capacity(txs.len());
for tx in txs {
let TransactionKind::External(external_tx, external_receipt) = tx.kind else {
return log_and_err!("cannot mine external block because one of the transactions is not an external transaction");
};
let mined_tx = TransactionMined::from_external(external_tx, external_receipt, tx.execution)?;
mined_txs.push(mined_tx);
pub async fn mine_external(&self) -> anyhow::Result<Block> {
// retrieve data
let (external_block, txs) = read_external_block_and_executions(&self.storage).await?;
let (local_txs, external_txs) = partition_transactions(txs);

// validate
if not(local_txs.is_empty()) {
return log_and_err!("cannot mine external block because one of the transactions is not an external transaction");
}

Ok(Block {
header: BlockHeader::try_from(external_block)?,
transactions: mined_txs,
})
// mine external transactions
let mined_txs = mine_external_transactions(external_block.number(), external_txs)?;
block_from_external(external_block, mined_txs)
}

/// Mine a block from an external block and local failed transactions.
///
/// TODO: external_block must come from storage.
/// TODO: validate if transactions really belong to the specified block.
pub async fn mine_mixed(&self, external_block: &ExternalBlock) -> anyhow::Result<Block> {
let txs = self.storage.temp.read_executions().await?;
pub async fn mine_mixed(&self) -> anyhow::Result<Block> {
// retrieve data
let (external_block, txs) = read_external_block_and_executions(&self.storage).await?;
let (local_txs, external_txs) = partition_transactions(txs);

// mine external transactions
let mut mined_txs = Vec::with_capacity(txs.len());
let mut failed_txs = Vec::new();
for tx in txs {
match tx.kind {
TransactionKind::External(external_tx, external_receipt) => {
let mined_tx = TransactionMined::from_external(external_tx, external_receipt, tx.execution)?;
mined_txs.push(mined_tx);
}
TransactionKind::Local(tx_input) => {
failed_txs.push((tx_input, tx.execution));
}
}
}

let mut block = Block {
header: BlockHeader::try_from(external_block)?,
transactions: mined_txs,
};
let mined_txs = mine_external_transactions(external_block.number(), external_txs)?;
let mut block = block_from_external(external_block, mined_txs)?;

// mine failed transactions
// fails if finds a local transaction that is not a failure
for (failed_tx_input, failed_tx_execution) in failed_txs {
if failed_tx_execution.is_success() {
return log_and_err!("cannot mine mixed block because one of the local execution is not a failure");
// mine local transactions
for (tx, execution) in local_txs {
if execution.is_success() {
return log_and_err!("cannot mine mixed block because one of the local execution is a success");
}
block.push_execution(failed_tx_input, failed_tx_execution);
block.push_execution(tx, execution);
}

Ok(block)
Expand Down Expand Up @@ -219,3 +197,51 @@ impl BlockMiner {
Ok(())
}
}

// -----------------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------------

async fn read_external_block_and_executions(storage: &StratusStorage) -> anyhow::Result<(ExternalBlock, Vec<TransactionExecution>)> {
let block = match storage.temp.read_external_block().await {
Ok(Some(block)) => block,
Ok(None) => return log_and_err!("no active external block being re-executed"),
Err(e) => return Err(e),
};
let txs = storage.temp.read_executions().await?;

Ok((block, txs))
}

fn partition_transactions(txs: Vec<TransactionExecution>) -> (Vec<LocalTransactionExecution>, Vec<ExternalTransactionExecution>) {
let mut local_txs = Vec::with_capacity(txs.len());
let mut external_txs = Vec::with_capacity(txs.len());

for tx in txs {
match tx.kind {
TransactionKind::Local(tx_input) => local_txs.push((tx_input, tx.execution)),
TransactionKind::External(external_tx, external_receipt) => {
external_txs.push((external_tx, external_receipt, tx.execution));
}
}
}
(local_txs, external_txs)
}

fn mine_external_transactions(block_number: BlockNumber, txs: Vec<ExternalTransactionExecution>) -> anyhow::Result<Vec<TransactionMined>> {
let mut mined_txs = Vec::with_capacity(txs.len());
for (tx, receipt, execution) in txs {
if tx.block_number() != block_number {
return log_and_err!("cannot mine external block because one of the transactions does not belong to the external block");
}
mined_txs.push(TransactionMined::from_external(tx, receipt, execution)?);
}
Ok(mined_txs)
}

fn block_from_external(external_block: ExternalBlock, mined_txs: Vec<TransactionMined>) -> anyhow::Result<Block> {
Ok(Block {
header: BlockHeader::try_from(&external_block)?,
transactions: mined_txs,
})
}
1 change: 1 addition & 0 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl Executor {
let storage = &self.storage;

// track active block number
storage.temp.set_external_block(block.clone()).await?;
storage.set_active_block_number(block.number()).await?;

// execute mixing serial and parallel approaches
Expand Down
6 changes: 6 additions & 0 deletions src/eth/primitives/external_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use ethers_core::types::Transaction as EthersTransaction;

use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::Hash;

#[derive(Debug, Clone, Default, derive_more:: Deref, serde::Deserialize, serde::Serialize)]
#[serde(transparent)]
pub struct ExternalTransaction(#[deref] pub EthersTransaction);

impl ExternalTransaction {
/// Returns the block number where the transaction was mined.
pub fn block_number(&self) -> BlockNumber {
self.0.block_number.unwrap().into()
}

/// Returns the transaction hash.
pub fn hash(&self) -> Hash {
self.0.hash.into()
Expand Down
2 changes: 2 additions & 0 deletions src/eth/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ pub use slot_value::SlotValue;
pub use storage_point_in_time::StoragePointInTime;
pub use transaction_execution::TransactionExecution;
pub use transaction_input::TransactionInput;
pub use transaction_kind::ExternalTransactionExecution;
pub use transaction_kind::LocalTransactionExecution;
pub use transaction_kind::TransactionKind;
pub use transaction_mined::TransactionMined;
pub use unix_time::UnixTime;
Expand Down
4 changes: 4 additions & 0 deletions src/eth/primitives/transaction_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use display_json::DebugAsJson;

use crate::eth::primitives::EvmExecution;
use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::ExternalTransaction;
use crate::eth::primitives::TransactionInput;
Expand All @@ -14,3 +15,6 @@ pub enum TransactionKind {
/// Transaction that imported from external source.
External(ExternalTransaction, ExternalReceipt),
}

pub type LocalTransactionExecution = (TransactionInput, EvmExecution);
pub type ExternalTransactionExecution = (ExternalTransaction, ExternalReceipt, EvmExecution);
21 changes: 21 additions & 0 deletions src/eth/storage/inmemory/inmemory_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::sync::RwLockWriteGuard;
use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::TransactionExecution;
Expand All @@ -18,13 +19,22 @@ use crate::eth::storage::TemporaryStorage;

#[derive(Debug, Default)]
pub struct InMemoryTemporaryStorageState {
/// External block being re-executed.
pub external_block: Option<ExternalBlock>,

/// Pending transactions executions during block execution.
pub tx_executions: Vec<TransactionExecution>,

/// Pending accounts modified during block execution.
pub accounts: HashMap<Address, InMemoryTemporaryAccount>,

/// Pending slots modified during block execution.
pub active_block_number: Option<BlockNumber>,
}

impl InMemoryTemporaryStorageState {
pub fn reset(&mut self) {
self.external_block = None;
self.tx_executions.clear();
self.accounts.clear();
self.active_block_number = None;
Expand Down Expand Up @@ -63,6 +73,17 @@ impl InMemoryTemporaryStorage {

#[async_trait]
impl TemporaryStorageExecutionOps for InMemoryTemporaryStorage {
async fn set_external_block(&self, block: ExternalBlock) -> anyhow::Result<()> {
let mut state = self.lock_write().await;
state.external_block = Some(block);
Ok(())
}

async fn read_external_block(&self) -> anyhow::Result<Option<ExternalBlock>> {
let state = self.lock_read().await;
Ok(state.external_block.clone())
}

async fn save_execution(&self, tx: TransactionExecution) -> anyhow::Result<()> {
let mut state = self.lock_write().await;
tracing::debug!(hash = %tx.hash(), tx_executions_len = %state.tx_executions.len(), "saving execution");
Expand Down
9 changes: 9 additions & 0 deletions src/eth/storage/rocks/rocks_temporary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::rocks_state::RocksStorageState;
use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::StoragePointInTime;
Expand Down Expand Up @@ -40,6 +41,14 @@ impl RocksTemporaryStorage {

#[async_trait]
impl TemporaryStorageExecutionOps for RocksTemporaryStorage {
async fn set_external_block(&self, block: ExternalBlock) -> anyhow::Result<()> {
self.temp.set_external_block(block).await
}

async fn read_external_block(&self) -> anyhow::Result<Option<ExternalBlock>> {
self.temp.read_external_block().await
}

async fn save_execution(&self, tx: TransactionExecution) -> anyhow::Result<()> {
self.temp.save_execution(tx).await?;
Ok(())
Expand Down
7 changes: 7 additions & 0 deletions src/eth/storage/temporary_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use async_trait::async_trait;
use crate::eth::primitives::Account;
use crate::eth::primitives::Address;
use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::Slot;
use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::TransactionExecution;
Expand Down Expand Up @@ -47,6 +48,12 @@ pub trait TemporaryStorage: Send + Sync + TemporaryStorageExecutionOps {

#[async_trait]
pub trait TemporaryStorageExecutionOps {
/// Sets the external block being re-executed.
async fn set_external_block(&self, block: ExternalBlock) -> anyhow::Result<()>;

/// Reads an external block being re-executed.
async fn read_external_block(&self) -> anyhow::Result<Option<ExternalBlock>>;

/// Saves an executed transaction.
async fn save_execution(&self, tx: TransactionExecution) -> anyhow::Result<()>;

Expand Down
2 changes: 1 addition & 1 deletion tests/test_import_external_snapshot_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub async fn execute_test(

// execute and mine
executor.reexecute_external(&block, &receipts).await.unwrap();
let mined_block = miner.mine_external(&block).await.unwrap();
let mined_block = miner.mine_external().await.unwrap();
miner.commit(mined_block).await.unwrap();

// get metrics from prometheus (sleep to ensure prometheus collected)
Expand Down

0 comments on commit da19374

Please sign in to comment.