diff --git a/Cargo.lock b/Cargo.lock index 0e4632143..ec0c1e4ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5282,6 +5282,7 @@ dependencies = [ "tracing-subscriber", "triehash", "url", + "uuid", ] [[package]] @@ -6053,6 +6054,8 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ + "getrandom", + "rand", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 2cc7c628e..b224df990 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ default-run = "stratus" [dependencies] -# stdx +# general anyhow = "=1.0.82" async-trait = "=0.1.80" byte-unit = "=5.1.4" @@ -32,6 +32,7 @@ rand = "=0.8.5" strum = "=0.26.2" thiserror = "=1.0.59" url = "=2.5.0" +uuid = { version = "=1.8.0", features = ["v4", "fast-rng" ] } # async tokio = { version = "=1.37.0", features = ["rt-multi-thread", "macros", "signal"] } diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 9cd6897a0..30609e24c 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -24,10 +24,13 @@ use stratus::ext::not; #[cfg(feature = "metrics")] use stratus::infra::metrics; use stratus::log_and_err; +use stratus::utils::new_context_id; use stratus::GlobalServices; use tokio::runtime::Handle; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; +use tracing::info_span; +use tracing::Instrument; /// Number of tasks in the backlog. Each task contains 10_000 blocks and all receipts for them. const BACKLOG_SIZE: usize = 50; @@ -143,6 +146,7 @@ fn signal_handler(cancellation: CancellationToken) { // ----------------------------------------------------------------------------- // Block importer // ----------------------------------------------------------------------------- +#[tracing::instrument(name = "[Importer]", skip_all)] async fn execute_block_importer( // services executor: Arc, @@ -176,29 +180,37 @@ async fn execute_block_importer( // imports block transactions tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks"); for (block_index, block) in blocks.into_iter().enumerate() { - #[cfg(feature = "metrics")] - let start = metrics::now(); + let span = info_span!("re-executing block", context_id = new_context_id()); - // re-execute block - executor.reexecute_external(&block, &receipts).await?; + async { + #[cfg(feature = "metrics")] + let start = metrics::now(); - // mine block - let mined_block = miner.mine_external().await?; - storage.temp.remove_executions_before(mined_block.transactions.len()).await?; + // re-execute block + executor.reexecute_external(&block, &receipts).await?; - // export to csv OR permanent storage - match csv { - Some(ref mut csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?, - None => miner.commit(mined_block.clone()).await?, - }; + // mine block + let mined_block = miner.mine_external().await?; + storage.temp.remove_executions_before(mined_block.transactions.len()).await?; - // export snapshot for tests - if blocks_to_export_snapshot.contains(mined_block.number()) { - export_snapshot(&block, &receipts, &mined_block)?; - } + // export to csv OR permanent storage + match csv { + Some(ref mut csv) => import_external_to_csv(&storage, csv, mined_block.clone(), block_index, block_last_index).await?, + None => miner.commit(mined_block.clone()).await?, + }; + + // export snapshot for tests + if blocks_to_export_snapshot.contains(mined_block.number()) { + export_snapshot(&block, &receipts, &mined_block)?; + } - #[cfg(feature = "metrics")] - metrics::inc_import_offline(start.elapsed()); + #[cfg(feature = "metrics")] + metrics::inc_import_offline(start.elapsed()); + + anyhow::Ok(()) + } + .instrument(span) + .await?; } }; @@ -209,6 +221,7 @@ async fn execute_block_importer( // ----------------------------------------------------------------------------- // Block loader // ----------------------------------------------------------------------------- +#[tracing::instrument(name = "[RPC]", skip_all, fields(start, end, block_by_fetch))] async fn execute_external_rpc_storage_loader( // services rpc_storage: Arc, @@ -226,7 +239,13 @@ async fn execute_external_rpc_storage_loader( let mut tasks = Vec::new(); while start <= end { let end = min(start + (blocks_by_fetch - 1), end); - tasks.push(load_blocks_and_receipts(Arc::clone(&rpc_storage), cancellation.clone(), start, end)); + + let span = info_span!("fetching block", context_id = new_context_id()); + + let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), cancellation.clone(), start, end); + let task = task.instrument(span); + tasks.push(task); + start += blocks_by_fetch; } diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index f9cb7ad23..73d772873 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -10,7 +10,6 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; use tracing_subscriber::Layer; -//use tracing_subscriber::EnvFilter; /// Init application global tracing. pub fn init_tracing(url: Option<&String>) { diff --git a/src/lib.rs b/src/lib.rs index c18dce53b..892e2fc69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ pub mod config; pub mod eth; pub mod ext; pub mod infra; +pub mod utils; pub struct GlobalServices where diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 000000000..5fefabfbf --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,5 @@ +use uuid::Uuid; + +pub fn new_context_id() -> String { + Uuid::new_v4().to_string() +}