From 71283ab8ab0a5fdf68f6b66d51770ba9be99bc3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Marcos?= <164224824+marcospb19-cw@users.noreply.github.com> Date: Fri, 24 May 2024 13:50:52 -0300 Subject: [PATCH] importer-offline: make logs less verbose and show progress summary (#920) Show TPS and BPM (blocks per minute) for each block re-execution batch. Batch size is controlled by `--blocks-by-fetch` * importer-offline: make INFO logs less verbose * importer-offline: report TPS and BPM --- src/bin/importer_offline.rs | 46 +++++++++++++++++++++++++++++-------- src/bin/importer_online.rs | 4 +++- src/eth/evm/revm.rs | 2 +- src/eth/executor.rs | 2 -- 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 7aa0065da..6ead9c00b 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -23,6 +23,7 @@ use stratus::GlobalServices; use stratus::GlobalState; use tokio::runtime::Handle; use tokio::sync::mpsc; +use tokio::time::Instant; /// Number of tasks in the backlog. Each task contains 10_000 blocks and all receipts for them. const BACKLOG_SIZE: usize = 50; @@ -127,12 +128,13 @@ async fn execute_block_importer( const TASK_NAME: &str = "external-block-executor"; tracing::info!("starting {}", TASK_NAME); + // receives blocks and receipts from the backlog to reexecute and import loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); }; - // retrieve new tasks to execute or exit + // receive new tasks to execute, or exit let Some((blocks, receipts)) = backlog_rx.recv().await else { tracing::info!("{} has no more blocks to process", TASK_NAME); return Ok(()); @@ -141,32 +143,56 @@ async fn execute_block_importer( // imports block transactions let block_start = blocks.first().unwrap().number(); let block_end = blocks.last().unwrap().number(); + let blocks_len = blocks.len(); let block_last_index = blocks.len() - 1; let receipts = ExternalReceipts::from(receipts); - tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks"); + tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "reexecuting (and importing) blocks"); + let mut transaction_count = 0; + let instant_before_execution = Instant::now(); + for (block_index, block) in blocks.into_iter().enumerate() { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); } - // re-execute block + // re-execute (and import) block executor.reexecute_external(&block, &receipts).await?; + transaction_count += block.transactions.len(); // mine block let mined_block = miner.mine_external().await?; - // export to csv OR permanent storage - match csv { - Some(ref mut csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?, - None => miner.commit(mined_block.clone()).await?, - }; - // export snapshot for tests if blocks_to_export_snapshot.contains(mined_block.number()) { export_snapshot(&block, &receipts, &mined_block)?; } + + // export to csv OR permanent storage + match &mut csv { + Some(csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?, + None => miner.commit(mined_block.clone()).await?, + } } + + let seconds_elapsed = match instant_before_execution.elapsed().as_secs() as usize { + // avoid division by zero + 0 => 1, + non_zero => non_zero, + }; + let tps = transaction_count.checked_div(seconds_elapsed).unwrap_or(transaction_count); + let minutes_elapsed = seconds_elapsed as f64 / 60.0; + let blocks_per_minute = blocks_len as f64 / minutes_elapsed; + tracing::info!( + tps, + blocks_per_minute = format_args!("{blocks_per_minute:.2}"), + seconds_elapsed, + %block_start, + %block_end, + transaction_count, + receipts = receipts.len(), + "reexecuted blocks batch", + ); } } @@ -230,7 +256,7 @@ async fn execute_external_rpc_storage_loader( } async fn load_blocks_and_receipts(rpc_storage: Arc, start: BlockNumber, end: BlockNumber) -> anyhow::Result { - tracing::info!(%start, %end, "retrieving blocks and receipts"); + tracing::info!(%start, %end, "loading blocks and receipts"); let blocks_task = rpc_storage.read_blocks_in_range(start, end); let receipts_task = rpc_storage.read_receipts_in_range(start, end); try_join!(blocks_task, receipts_task) diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index bcfe2497f..c9c71b6c9 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -130,8 +130,10 @@ async fn start_block_executor(executor: Arc, miner: Arc, m // execute and mine let receipts = ExternalReceipts::from(receipts); + + tracing::info!(number = %block.number(), txs_len = block.transactions.len(), "reexecuting external block"); if executor.reexecute_external(&block, &receipts).await.is_err() { - GlobalState::shutdown_from(TASK_NAME, "failed to re-execute block"); + GlobalState::shutdown_from(TASK_NAME, "failed to re-execute external block"); return; }; if miner.mine_external_mixed_and_commit().await.is_err() { diff --git a/src/eth/evm/revm.rs b/src/eth/evm/revm.rs index cef02a7f6..f02fc550a 100644 --- a/src/eth/evm/revm.rs +++ b/src/eth/evm/revm.rs @@ -318,7 +318,7 @@ fn parse_revm_execution(revm_result: RevmResultAndState, input: EvmInput, execut let (result, output, logs, gas) = parse_revm_result(revm_result.result); let changes = parse_revm_state(revm_result.state, execution_changes); - tracing::info!(?result, %gas, output_len = %output.len(), %output, "evm executed"); + tracing::debug!(?result, %gas, output_len = %output.len(), %output, "evm executed"); EvmExecution { block_timestamp: input.block_timestamp, receipt_applied: false, diff --git a/src/eth/executor.rs b/src/eth/executor.rs index 4c92c20c0..60a41b94f 100644 --- a/src/eth/executor.rs +++ b/src/eth/executor.rs @@ -94,8 +94,6 @@ impl Executor { #[cfg(feature = "metrics")] let (start, mut block_metrics) = (metrics::now(), ExecutionMetrics::default()); - tracing::info!(number = %block.number(), "reexecuting external block"); - // track active block number let storage = &self.storage; storage.set_active_external_block(block.clone()).await?;