Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: finishing blocks #834

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import '.justfile_helpers' # _lint, _outdated

# Environment variables automatically passed to executed commands.
export CARGO_PROFILE_RELEASE_DEBUG := env("CARGO_PROFILE_RELEASE_DEBUG", "1")
export RUST_BACKTRACE := "1"
export RUST_LOG := env("RUST_LOG", "stratus=info,rpc-downloader=info,importer-offline=info,importer-online=info,state-validator=info")
export RUST_BACKTRACE := "0"
export RUST_LOG := env("RUST_LOG", "stratus=info,rpc_downloader=info,importer_offline=info,importer_online=info,state_validator=info")

# Global arguments that can be passed to receipts.
feature_flags := "dev," + env("FEATURES", "")
Expand Down
14 changes: 2 additions & 12 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ use stratus::ext::not;
#[cfg(feature = "metrics")]
use stratus::infra::metrics;
use stratus::log_and_err;
use stratus::utils::new_context_id;
use stratus::utils::signal_handler;
use stratus::GlobalServices;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::info_span;
use tracing::Instrument;

/// Number of tasks in the backlog. Each task contains 10_000 blocks and all receipts for them.
const BACKLOG_SIZE: usize = 50;
Expand Down Expand Up @@ -134,7 +131,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// -----------------------------------------------------------------------------
// Block importer
// -----------------------------------------------------------------------------
#[tracing::instrument(name = "[Importer]", skip_all)]
#[tracing::instrument(name = "block_importer", skip_all)]
async fn execute_block_importer(
// services
executor: Arc<Executor>,
Expand Down Expand Up @@ -168,8 +165,6 @@ async fn execute_block_importer(
// imports block transactions
tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks");
for (block_index, block) in blocks.into_iter().enumerate() {
let span = info_span!("re-executing block", context_id = new_context_id());

async {
#[cfg(feature = "metrics")]
let start = metrics::now();
Expand All @@ -179,7 +174,6 @@ async fn execute_block_importer(

// mine block
let mined_block = miner.mine_external().await?;
storage.temp.remove_executions_before(mined_block.transactions.len()).await?;

// export to csv OR permanent storage
match csv {
Expand All @@ -197,7 +191,6 @@ async fn execute_block_importer(

anyhow::Ok(())
}
.instrument(span)
.await?;
}
};
Expand All @@ -209,7 +202,7 @@ async fn execute_block_importer(
// -----------------------------------------------------------------------------
// Block loader
// -----------------------------------------------------------------------------
#[tracing::instrument(name = "[RPC]", skip_all, fields(start, end, block_by_fetch))]
#[tracing::instrument(name = "storage_loader", skip_all, fields(start, end, block_by_fetch))]
async fn execute_external_rpc_storage_loader(
// services
rpc_storage: Arc<dyn ExternalRpcStorage>,
Expand All @@ -228,10 +221,7 @@ async fn execute_external_rpc_storage_loader(
while start <= end {
let end = min(start + (blocks_by_fetch - 1), end);

let span = info_span!("fetching block", context_id = new_context_id());

let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), cancellation.clone(), start, end);
let task = task.instrument(span);
tasks.push(task);

start += blocks_by_fetch;
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ impl StratusStorageConfig {
pub async fn init(&self) -> anyhow::Result<Arc<StratusStorage>> {
let temp_storage = self.temp_storage.init().await?;
let perm_storage = self.perm_storage.init().await?;

let storage = StratusStorage::new(temp_storage, perm_storage);
storage.set_active_block_number_as_next_if_not_set().await?;

// enable genesis block
if self.enable_genesis {
let genesis = storage.read_block(&BlockSelection::Number(BlockNumber::ZERO)).await?;
if genesis.is_none() {
Expand All @@ -160,6 +163,7 @@ impl StratusStorageConfig {
}
}

// enable test accounts
#[cfg(feature = "dev")]
if self.enable_test_accounts {
let mut test_accounts_to_insert = Vec::new();
Expand Down
192 changes: 89 additions & 103 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,21 @@ impl BlockMiner {

/// Mine a block with no transactions.
pub async fn mine_empty(&self) -> anyhow::Result<Block> {
let number = self.storage.increment_block_number().await?;
let number = self.storage.temp.finish_block().await?;
Ok(Block::new_at_now(number))
}

/// Mine a block from an external block.
pub async fn mine_external(&self) -> anyhow::Result<Block> {
let _ = self.storage.temp.finish_block().await?;

// retrieve data
let (external_block, txs) = read_external_block_and_executions(&self.storage).await?;
let (local_txs, external_txs) = partition_transactions(txs);

// validate
if not(local_txs.is_empty()) {
return log_and_err!("cannot mine external block because one of the transactions is a local transaction");
return log_and_err!("failed to mine external block because one of the transactions is a local transaction");
}

// mine external transactions
Expand All @@ -65,6 +67,8 @@ impl BlockMiner {

/// Mine a block from an external block and local failed transactions.
pub async fn mine_mixed(&self) -> anyhow::Result<Block> {
let _ = self.storage.temp.finish_block().await?;

// retrieve data
let (external_block, txs) = read_external_block_and_executions(&self.storage).await?;
let (local_txs, external_txs) = partition_transactions(txs);
Expand All @@ -76,7 +80,7 @@ impl BlockMiner {
// mine local transactions
for tx in local_txs {
if tx.is_success() {
return log_and_err!("cannot mine mixed block because one of the local execution is a success");
return log_and_err!("failed to mine mixed block because one of the local execution is a success");
}
block.push_execution(tx.input, tx.result);
}
Expand All @@ -86,111 +90,24 @@ impl BlockMiner {

/// Mine a block from local transactions.
pub async fn mine_local(&self) -> anyhow::Result<Block> {
let number = self.storage.temp.finish_block().await?;

// retrieve data
let txs = self.storage.temp.read_executions().await?;
let txs = self.storage.temp.read_pending_executions().await?;
let (local_txs, external_txs) = partition_transactions(txs);

// validate
if not(external_txs.is_empty()) {
return log_and_err!("cannot mine local block because one of the transactions is an external transaction");
return log_and_err!("failed to mine local block because one of the transactions is an external transaction");
}

// mine local transactions
match NonEmpty::from_vec(local_txs) {
Some(txs) => self.mine_with_many_transactions(txs).await,
None => self.mine_empty().await,
Some(txs) => block_from_local(number, txs),
None => Ok(Block::new_at_now(number)),
}
}

/// Mine one block with a single transaction.
/// Internally, it wraps the single transaction into a format suitable for `mine_with_many_transactions`,
/// enabling consistent processing for both single and multiple transaction scenarios.
///
/// TODO: remove
pub async fn mine_with_one_transaction(&self, tx: LocalTransactionExecution) -> anyhow::Result<Block> {
let txs = NonEmpty::new(tx);
self.mine_with_many_transactions(txs).await
}

/// Mines a new block from one or more transactions.
/// This is the core function for block creation, processing each transaction, generating the necessary logs,
/// and finalizing the block. It is used both directly for multiple transactions and indirectly by `mine_with_one_transaction`.
///
/// TODO: Future enhancements may include breaking down this method for improved readability and maintenance.
pub async fn mine_with_many_transactions(&self, txs: NonEmpty<LocalTransactionExecution>) -> anyhow::Result<Block> {
// init block
let number = self.storage.increment_block_number().await?;
let block_timestamp = txs
.minimum_by(|tx1, tx2| tx1.result.execution.block_timestamp.cmp(&tx2.result.execution.block_timestamp))
.result
.execution
.block_timestamp;

let mut block = Block::new(number, block_timestamp);
block.transactions.reserve(txs.len());

// mine transactions and logs
let mut log_index = Index::ZERO;
for (tx_idx, tx) in txs.into_iter().enumerate() {
let transaction_index = Index::new(tx_idx as u64);
// mine logs
let mut mined_logs: Vec<LogMined> = Vec::with_capacity(tx.result.execution.logs.len());
for mined_log in tx.result.execution.logs.clone() {
// calculate bloom
block.header.bloom.accrue(BloomInput::Raw(mined_log.address.as_ref()));
for topic in mined_log.topics().into_iter() {
block.header.bloom.accrue(BloomInput::Raw(topic.as_ref()));
}

// mine log
let mined_log = LogMined {
log: mined_log,
transaction_hash: tx.input.hash,
transaction_index,
log_index,
block_number: block.header.number,
block_hash: block.header.hash,
};
mined_logs.push(mined_log);

// increment log index
log_index = log_index + Index::ONE;
}

// mine transaction
let mined_transaction = TransactionMined {
input: tx.input,
execution: tx.result.execution,
transaction_index,
block_number: block.header.number,
block_hash: block.header.hash,
logs: mined_logs,
};

// add transaction to block
block.transactions.push(mined_transaction);
}

// calculate transactions hash
if not(block.transactions.is_empty()) {
let transactions_hashes: Vec<&Hash> = block.transactions.iter().map(|x| &x.input.hash).collect();
block.header.transactions_root = triehash::ordered_trie_root::<KeccakHasher, _>(transactions_hashes).into();
}

// calculate final block hash

// replicate calculated block hash from header to transactions and logs
for transaction in block.transactions.iter_mut() {
transaction.block_hash = block.header.hash;
for log in transaction.logs.iter_mut() {
log.block_hash = block.header.hash;
}
}

// TODO: calculate size, state_root, receipts_root, parent_hash
Ok(block)
}

/// Persists a mined block to permanent storage and prepares new block.
pub async fn commit(&self, block: Block) -> anyhow::Result<()> {
let block_number = *block.number();
Expand All @@ -199,10 +116,6 @@ impl BlockMiner {
self.storage.save_block_to_perm(block.clone()).await?;
self.storage.set_mined_block_number(block_number).await?;

// prepare new block to be mined
self.storage.set_active_block_number(block_number.next()).await?;
self.storage.reset_temp().await?;

// notify
let logs: Vec<LogMined> = block.transactions.iter().flat_map(|tx| &tx.logs).cloned().collect();
for log in logs {
Expand All @@ -219,12 +132,12 @@ impl BlockMiner {
// -----------------------------------------------------------------------------

async fn read_external_block_and_executions(storage: &StratusStorage) -> anyhow::Result<(ExternalBlock, Vec<TransactionExecution>)> {
let block = match storage.temp.read_external_block().await {
let block = match storage.temp.read_pending_external_block().await {
Ok(Some(block)) => block,
Ok(None) => return log_and_err!("no active external block being re-executed"),
Err(e) => return Err(e),
};
let txs = storage.temp.read_executions().await?;
let txs = storage.temp.read_pending_executions().await?;

Ok((block, txs))
}
Expand All @@ -250,7 +163,7 @@ fn mine_external_transactions(block_number: BlockNumber, txs: Vec<ExternalTransa
let mut mined_txs = Vec::with_capacity(txs.len());
for tx in txs {
if tx.tx.block_number() != block_number {
return log_and_err!("cannot mine external block because one of the transactions does not belong to the external block");
return log_and_err!("failed to mine external block because one of the transactions does not belong to the external block");
}
mined_txs.push(TransactionMined::from_external(tx.tx, tx.receipt, tx.result.execution)?);
}
Expand All @@ -263,3 +176,76 @@ fn block_from_external(external_block: ExternalBlock, mined_txs: Vec<Transaction
transactions: mined_txs,
})
}

pub fn block_from_local(number: BlockNumber, txs: NonEmpty<LocalTransactionExecution>) -> anyhow::Result<Block> {
// init block
let block_timestamp = txs
.minimum_by(|tx1, tx2| tx1.result.execution.block_timestamp.cmp(&tx2.result.execution.block_timestamp))
.result
.execution
.block_timestamp;

let mut block = Block::new(number, block_timestamp);
block.transactions.reserve(txs.len());

// mine transactions and logs
let mut log_index = Index::ZERO;
for (tx_idx, tx) in txs.into_iter().enumerate() {
let transaction_index = Index::new(tx_idx as u64);
// mine logs
let mut mined_logs: Vec<LogMined> = Vec::with_capacity(tx.result.execution.logs.len());
for mined_log in tx.result.execution.logs.clone() {
// calculate bloom
block.header.bloom.accrue(BloomInput::Raw(mined_log.address.as_ref()));
for topic in mined_log.topics().into_iter() {
block.header.bloom.accrue(BloomInput::Raw(topic.as_ref()));
}

// mine log
let mined_log = LogMined {
log: mined_log,
transaction_hash: tx.input.hash,
transaction_index,
log_index,
block_number: block.header.number,
block_hash: block.header.hash,
};
mined_logs.push(mined_log);

// increment log index
log_index = log_index + Index::ONE;
}

// mine transaction
let mined_transaction = TransactionMined {
input: tx.input,
execution: tx.result.execution,
transaction_index,
block_number: block.header.number,
block_hash: block.header.hash,
logs: mined_logs,
};

// add transaction to block
block.transactions.push(mined_transaction);
}

// calculate transactions hash
if not(block.transactions.is_empty()) {
let transactions_hashes: Vec<&Hash> = block.transactions.iter().map(|x| &x.input.hash).collect();
block.header.transactions_root = triehash::ordered_trie_root::<KeccakHasher, _>(transactions_hashes).into();
}

// calculate final block hash

// replicate calculated block hash from header to transactions and logs
for transaction in block.transactions.iter_mut() {
transaction.block_hash = block.header.hash;
for log in transaction.logs.iter_mut() {
log.block_hash = block.header.hash;
}
}

// TODO: calculate size, state_root, receipts_root, parent_hash
Ok(block)
}
Loading
Loading