Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify temporary storage #838

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub async fn run_importer_online(
executor.reexecute_external(&block, &receipts).await?;

// mine block
let mined_block = miner.mine_mixed().await?;
let mined_block = miner.mine_external_mixed().await?;
miner.commit(mined_block).await?;

#[cfg(feature = "metrics")]
Expand Down
8 changes: 0 additions & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ use crate::eth::storage::PostgresPermanentStorage;
use crate::eth::storage::PostgresPermanentStorageConfig;
#[cfg(feature = "rocks")]
use crate::eth::storage::RocksPermanentStorage;
#[cfg(feature = "rocks")]
use crate::eth::storage::RocksTemporaryStorage;
use crate::eth::storage::StratusStorage;
use crate::eth::storage::TemporaryStorage;
use crate::eth::BlockMiner;
Expand Down Expand Up @@ -625,8 +623,6 @@ pub struct TemporaryStorageConfig {
#[derive(DebugAsJson, Clone, serde::Serialize)]
pub enum TemporaryStorageKind {
InMemory,
#[cfg(feature = "rocks")]
Rocks,
}

impl TemporaryStorageConfig {
Expand All @@ -636,8 +632,6 @@ impl TemporaryStorageConfig {

match self.temp_storage_kind {
TemporaryStorageKind::InMemory => Ok(Arc::new(InMemoryTemporaryStorage::default())),
#[cfg(feature = "rocks")]
TemporaryStorageKind::Rocks => Ok(Arc::new(RocksTemporaryStorage::new().await?)),
}
}
}
Expand All @@ -648,8 +642,6 @@ impl FromStr for TemporaryStorageKind {
fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
match s {
"inmemory" => Ok(Self::InMemory),
#[cfg(feature = "rocks")]
"rocks" => Ok(Self::Rocks),
s => Err(anyhow!("unknown temporary storage: {}", s)),
}
}
Expand Down
78 changes: 27 additions & 51 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::eth::primitives::Hash;
use crate::eth::primitives::Index;
use crate::eth::primitives::LocalTransactionExecution;
use crate::eth::primitives::LogMined;
use crate::eth::primitives::TransactionExecution;
use crate::eth::primitives::TransactionMined;
use crate::eth::storage::StratusStorage;
use crate::ext::not;
Expand All @@ -41,34 +40,40 @@ impl BlockMiner {
}
}

/// Mine a block from an external block.
/// Mines from external block and external transactions.
///
/// Local transactions are not allowed to be part of the block.
pub async fn mine_external(&self) -> anyhow::Result<Block> {
let _ = self.storage.temp.finish_block().await?;

// retrieve data
let (external_block, txs) = read_external_block_and_executions(&self.storage).await?;
let (local_txs, external_txs) = partition_transactions(txs);
let block = self.storage.temp.finish_block().await?;
let (local_txs, external_txs) = block.split_transactions();

// validate
let Some(external_block) = block.external_block else {
return log_and_err!("failed to mine external block because there is no external block being re-executed");
};
if not(local_txs.is_empty()) {
return log_and_err!("failed to mine external block because one of the transactions is a local transaction");
}

// mine external transactions
let mined_txs = mine_external_transactions(external_block.number(), external_txs)?;
let mined_txs = mine_external_transactions(block.number, external_txs)?;
block_from_external(external_block, mined_txs)
}

/// Mine a block from an external block and local failed transactions.
pub async fn mine_mixed(&self) -> anyhow::Result<Block> {
let _ = self.storage.temp.finish_block().await?;
/// Mines from external block and external transactions.
///
/// Local transactions are allowed to be part of the block if failed, but not succesful ones.
pub async fn mine_external_mixed(&self) -> anyhow::Result<Block> {
let block = self.storage.temp.finish_block().await?;
let (local_txs, external_txs) = block.split_transactions();

// retrieve data
let (external_block, txs) = read_external_block_and_executions(&self.storage).await?;
let (local_txs, external_txs) = partition_transactions(txs);
// validate
let Some(external_block) = block.external_block else {
return log_and_err!("failed to mine mixed block because there is no external block being re-executed");
};

// mine external transactions
let mined_txs = mine_external_transactions(external_block.number(), external_txs)?;
let mined_txs = mine_external_transactions(block.number, external_txs)?;
let mut block = block_from_external(external_block, mined_txs)?;

// mine local transactions
Expand All @@ -82,13 +87,12 @@ impl BlockMiner {
Ok(block)
}

/// Mine a block from local transactions.
/// Mines from local transactions.
///
/// External transactions are not allowed to be part of the block.
pub async fn mine_local(&self) -> anyhow::Result<Block> {
let number = self.storage.temp.finish_block().await?;

// retrieve data
let txs = self.storage.temp.read_pending_executions().await?;
let (local_txs, external_txs) = partition_transactions(txs);
let block = self.storage.temp.finish_block().await?;
let (local_txs, external_txs) = block.split_transactions();

// validate
if not(external_txs.is_empty()) {
Expand All @@ -97,8 +101,8 @@ impl BlockMiner {

// mine local transactions
match NonEmpty::from_vec(local_txs) {
Some(txs) => block_from_local(number, txs),
None => Ok(Block::new_at_now(number)),
Some(local_txs) => block_from_local(block.number, local_txs),
None => Ok(Block::new_at_now(block.number)),
}
}

Expand All @@ -125,34 +129,6 @@ impl BlockMiner {
// Helpers
// -----------------------------------------------------------------------------

async fn read_external_block_and_executions(storage: &StratusStorage) -> anyhow::Result<(ExternalBlock, Vec<TransactionExecution>)> {
let block = match storage.temp.read_pending_external_block().await {
Ok(Some(block)) => block,
Ok(None) => return log_and_err!("no active external block being re-executed"),
Err(e) => return Err(e),
};
let txs = storage.temp.read_pending_executions().await?;

Ok((block, txs))
}

fn partition_transactions(txs: Vec<TransactionExecution>) -> (Vec<LocalTransactionExecution>, Vec<ExternalTransactionExecution>) {
let mut local_txs = Vec::with_capacity(txs.len());
let mut external_txs = Vec::with_capacity(txs.len());

for tx in txs {
match tx {
TransactionExecution::Local(tx) => {
local_txs.push(tx);
}
TransactionExecution::External(tx) => {
external_txs.push(tx);
}
}
}
(local_txs, external_txs)
}

fn mine_external_transactions(block_number: BlockNumber, txs: Vec<ExternalTransactionExecution>) -> anyhow::Result<Vec<TransactionMined>> {
let mut mined_txs = Vec::with_capacity(txs.len());
for tx in txs {
Expand Down
3 changes: 3 additions & 0 deletions src/eth/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ mod log_topic;
pub mod logs_bloom;
mod miner_nonce;
mod nonce;
mod pending_block;
mod size;
mod slot;
mod slot_access;
Expand Down Expand Up @@ -166,6 +167,7 @@ pub use log_mined::LogMined;
pub use log_topic::LogTopic;
pub use miner_nonce::MinerNonce;
pub use nonce::Nonce;
pub use pending_block::PendingBlock;
pub use size::Size;
pub use slot::Slot;
pub use slot_access::SlotAccess;
Expand Down Expand Up @@ -196,6 +198,7 @@ mod tests {
type TransactionExecutionValueChangeSlot = ExecutionValueChange<Slot>;
type TransactionExecutionValueChangeWei = ExecutionValueChange<Wei>;

// TODO: add more serde tests for new primitives
gen_test_serde!(Address);
gen_test_serde!(Account);
gen_test_serde!(Block);
Expand Down
40 changes: 40 additions & 0 deletions src/eth/primitives/pending_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use display_json::DebugAsJson;

use crate::eth::primitives::BlockNumber;
use crate::eth::primitives::ExternalBlock;
use crate::eth::primitives::ExternalTransactionExecution;
use crate::eth::primitives::LocalTransactionExecution;
use crate::eth::primitives::TransactionExecution;

/// Block that is being mined and receiving updates.
#[derive(DebugAsJson, Clone, Default, serde::Serialize)]
pub struct PendingBlock {
pub number: BlockNumber,
pub tx_executions: Vec<TransactionExecution>,
pub external_block: Option<ExternalBlock>,
}

impl PendingBlock {
/// Creates a new [`PendingBlock`] with the specified number.
pub fn new(number: BlockNumber) -> Self {
Self { number, ..Default::default() }
}

/// Split transactions executions in local and external executions.
pub fn split_transactions(&self) -> (Vec<LocalTransactionExecution>, Vec<ExternalTransactionExecution>) {
let mut local_txs = Vec::with_capacity(self.tx_executions.len());
let mut external_txs = Vec::with_capacity(self.tx_executions.len());

for tx in self.tx_executions.clone() {
match tx {
TransactionExecution::Local(tx) => {
local_txs.push(tx);
}
TransactionExecution::External(tx) => {
external_txs.push(tx);
}
}
}
(local_txs, external_txs)
}
}
Loading
Loading