Skip to content

Commit

Permalink
importer-offline: make logs less verbose and show progress summary (#920
Browse files Browse the repository at this point in the history
)

Show TPS and BPM (blocks per minute) for each block re-execution batch.

Batch size is controlled by `--blocks-by-fetch`

* importer-offline: make INFO logs less verbose
* importer-offline: report TPS and BPM
  • Loading branch information
marcospb19-cw authored May 24, 2024
1 parent 7ba6862 commit 71283ab
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 14 deletions.
46 changes: 36 additions & 10 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use stratus::GlobalServices;
use stratus::GlobalState;
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use tokio::time::Instant;

/// Number of tasks in the backlog. Each task contains 10_000 blocks and all receipts for them.
const BACKLOG_SIZE: usize = 50;
Expand Down Expand Up @@ -127,12 +128,13 @@ async fn execute_block_importer(
const TASK_NAME: &str = "external-block-executor";
tracing::info!("starting {}", TASK_NAME);

// receives blocks and receipts from the backlog to reexecute and import
loop {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return Ok(());
};

// retrieve new tasks to execute or exit
// receive new tasks to execute, or exit
let Some((blocks, receipts)) = backlog_rx.recv().await else {
tracing::info!("{} has no more blocks to process", TASK_NAME);
return Ok(());
Expand All @@ -141,32 +143,56 @@ async fn execute_block_importer(
// imports block transactions
let block_start = blocks.first().unwrap().number();
let block_end = blocks.last().unwrap().number();
let blocks_len = blocks.len();
let block_last_index = blocks.len() - 1;
let receipts = ExternalReceipts::from(receipts);

tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks");
tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "reexecuting (and importing) blocks");
let mut transaction_count = 0;
let instant_before_execution = Instant::now();

for (block_index, block) in blocks.into_iter().enumerate() {
if GlobalState::warn_if_shutdown(TASK_NAME) {
return Ok(());
}

// re-execute block
// re-execute (and import) block
executor.reexecute_external(&block, &receipts).await?;
transaction_count += block.transactions.len();

// mine block
let mined_block = miner.mine_external().await?;

// export to csv OR permanent storage
match csv {
Some(ref mut csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?,
None => miner.commit(mined_block.clone()).await?,
};

// export snapshot for tests
if blocks_to_export_snapshot.contains(mined_block.number()) {
export_snapshot(&block, &receipts, &mined_block)?;
}

// export to csv OR permanent storage
match &mut csv {
Some(csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?,
None => miner.commit(mined_block.clone()).await?,
}
}

let seconds_elapsed = match instant_before_execution.elapsed().as_secs() as usize {
// avoid division by zero
0 => 1,
non_zero => non_zero,
};
let tps = transaction_count.checked_div(seconds_elapsed).unwrap_or(transaction_count);
let minutes_elapsed = seconds_elapsed as f64 / 60.0;
let blocks_per_minute = blocks_len as f64 / minutes_elapsed;
tracing::info!(
tps,
blocks_per_minute = format_args!("{blocks_per_minute:.2}"),
seconds_elapsed,
%block_start,
%block_end,
transaction_count,
receipts = receipts.len(),
"reexecuted blocks batch",
);
}
}

Expand Down Expand Up @@ -230,7 +256,7 @@ async fn execute_external_rpc_storage_loader(
}

async fn load_blocks_and_receipts(rpc_storage: Arc<dyn ExternalRpcStorage>, start: BlockNumber, end: BlockNumber) -> anyhow::Result<BacklogTask> {
tracing::info!(%start, %end, "retrieving blocks and receipts");
tracing::info!(%start, %end, "loading blocks and receipts");
let blocks_task = rpc_storage.read_blocks_in_range(start, end);
let receipts_task = rpc_storage.read_receipts_in_range(start, end);
try_join!(blocks_task, receipts_task)
Expand Down
4 changes: 3 additions & 1 deletion src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,10 @@ async fn start_block_executor(executor: Arc<Executor>, miner: Arc<BlockMiner>, m

// execute and mine
let receipts = ExternalReceipts::from(receipts);

tracing::info!(number = %block.number(), txs_len = block.transactions.len(), "reexecuting external block");
if executor.reexecute_external(&block, &receipts).await.is_err() {
GlobalState::shutdown_from(TASK_NAME, "failed to re-execute block");
GlobalState::shutdown_from(TASK_NAME, "failed to re-execute external block");
return;
};
if miner.mine_external_mixed_and_commit().await.is_err() {
Expand Down
2 changes: 1 addition & 1 deletion src/eth/evm/revm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ fn parse_revm_execution(revm_result: RevmResultAndState, input: EvmInput, execut
let (result, output, logs, gas) = parse_revm_result(revm_result.result);
let changes = parse_revm_state(revm_result.state, execution_changes);

tracing::info!(?result, %gas, output_len = %output.len(), %output, "evm executed");
tracing::debug!(?result, %gas, output_len = %output.len(), %output, "evm executed");
EvmExecution {
block_timestamp: input.block_timestamp,
receipt_applied: false,
Expand Down
2 changes: 0 additions & 2 deletions src/eth/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ impl Executor {
#[cfg(feature = "metrics")]
let (start, mut block_metrics) = (metrics::now(), ExecutionMetrics::default());

tracing::info!(number = %block.number(), "reexecuting external block");

// track active block number
let storage = &self.storage;
storage.set_active_external_block(block.clone()).await?;
Expand Down

0 comments on commit 71283ab

Please sign in to comment.