From 548042502b23f251faa4c6f1be57664ef0370c43 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Mon, 27 May 2024 23:06:04 -0300 Subject: [PATCH] feat: more logs (#938) --- src/bin/importer_offline.rs | 6 ++++- src/config.rs | 3 +++ src/eth/rpc/rpc_server.rs | 3 +-- src/eth/rpc/rpc_subscriptions.rs | 24 +++++++------------ .../storage/inmemory/inmemory_temporary.rs | 2 +- src/eth/storage/rocks/rocks_permanent.rs | 2 +- src/ext.rs | 3 +++ .../blockchain_client/blockchain_client.rs | 2 +- src/infra/tracing.rs | 8 ++++++- 9 files changed, 30 insertions(+), 23 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index 200cd7dcd..313735901 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -20,6 +20,7 @@ use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; use stratus::ext::ResultExt; +use stratus::infra::tracing::info_task_spawn; use stratus::log_and_err; use stratus::utils::calculate_tps_and_bpm; use stratus::GlobalServices; @@ -82,6 +83,8 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // execute thread: external rpc storage loader let storage_thread = thread::Builder::new().name("storage-loader".into()); let storage_tokio = Handle::current(); + + info_task_spawn("storage-loader"); let storage_loader_thread = storage_thread .spawn(move || { let _tokio_guard = storage_tokio.enter(); @@ -103,6 +106,8 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // execute thread: block importer let importer_thread = thread::Builder::new().name("block-importer".into()); let importer_tokio = Handle::current(); + + info_task_spawn("block-importer"); let block_importer_thread = importer_thread .spawn(move || { let _tokio_guard = importer_tokio.enter(); @@ -139,7 +144,6 @@ async fn execute_block_importer( blocks_to_export_snapshot: Vec, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-executor"; - tracing::info!("creating task {}", TASK_NAME); // receives blocks and receipts from the backlog to reexecute and import loop { diff --git a/src/config.rs b/src/config.rs index f6817dc90..e74b8f030 100644 --- a/src/config.rs +++ b/src/config.rs @@ -42,6 +42,7 @@ use crate::eth::EvmTask; use crate::eth::Executor; use crate::eth::TransactionRelayer; use crate::ext::parse_duration; +use crate::infra::tracing::info_task_spawn; use crate::infra::tracing::warn_task_tx_closed; use crate::infra::BlockchainClient; use crate::GlobalState; @@ -191,6 +192,8 @@ impl ExecutorConfig { // spawn thread that will run evm // todo: needs a way to signal error like a cancellation token in case it fails to initialize let t = thread::Builder::new().name("evm".into()); + + info_task_spawn(TASK_NAME); t.spawn(move || { // init tokio let _tokio_guard = evm_tokio.enter(); diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index 45bdceddd..4f1b7b025 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -61,8 +61,7 @@ pub async fn serve_rpc( chain_id: ChainId, ) -> anyhow::Result<()> { const TASK_NAME: &str = "rpc-server"; - - tracing::info!("creating task {}", TASK_NAME); + tracing::info!("creating {}", TASK_NAME); // configure subscriptions let subs = RpcSubscriptions::spawn( diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index a50d4938e..2e32ff904 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -61,10 +61,8 @@ impl RpcSubscriptions { /// Spawns a new task to clean up closed subscriptions from time to time. fn spawn_subscriptions_cleaner(subs: Arc) -> JoinHandle> { - const TASK_NAME: &str = "rpc-subscription-cleaner"; - tracing::info!("spawning {}", TASK_NAME); - - spawn_named("rpc::sub::cleaner", async move { + const TASK_NAME: &str = "rpc::sub::cleaner"; + spawn_named(TASK_NAME, async move { loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); @@ -91,10 +89,8 @@ impl RpcSubscriptions { /// Spawns a new task that notifies subscribers about new executed transactions. fn spawn_new_pending_txs_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { - const TASK_NAME: &str = "rpc-newPendingTransactions-notifier"; - tracing::info!("spawning {}", TASK_NAME); - - spawn_named("rpc::sub::newPendingTransactions", async move { + const TASK_NAME: &str = "rpc::sub::newPendingTransactions"; + spawn_named(TASK_NAME, async move { loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); @@ -114,10 +110,8 @@ impl RpcSubscriptions { /// Spawns a new task that notifies subscribers about new created blocks. fn spawn_new_heads_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { - const TASK_NAME: &str = "rpc-newHeads-notifier"; - tracing::info!("spawning {}", TASK_NAME); - - spawn_named("rpc::sub::newHeads", async move { + const TASK_NAME: &str = "rpc::sub::newHeads"; + spawn_named(TASK_NAME, async move { loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); @@ -137,10 +131,8 @@ impl RpcSubscriptions { /// Spawns a new task that notifies subscribers about new transactions logs. fn spawn_logs_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { - const TASK_NAME: &str = "rpc-logs-notifier"; - tracing::info!("spawning {}", TASK_NAME); - - spawn_named("rpc::sub::logs", async move { + const TASK_NAME: &str = "rpc::sub::logs"; + spawn_named(TASK_NAME, async move { loop { if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); diff --git a/src/eth/storage/inmemory/inmemory_temporary.rs b/src/eth/storage/inmemory/inmemory_temporary.rs index 76b89c819..9c4dd72c3 100644 --- a/src/eth/storage/inmemory/inmemory_temporary.rs +++ b/src/eth/storage/inmemory/inmemory_temporary.rs @@ -52,7 +52,7 @@ impl InMemoryTemporaryStorage { impl Default for InMemoryTemporaryStorage { fn default() -> Self { - tracing::info!("starting inmemory temporary storage"); + tracing::info!("creating inmemory temporary storage"); Self { states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState::default())), } diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index c4722cdbb..64c82af7f 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -40,7 +40,7 @@ pub struct RocksPermanentStorage { impl RocksPermanentStorage { pub async fn new() -> anyhow::Result { - tracing::info!("starting rocksdb storage"); + tracing::info!("creating rocksdb storage"); let state = RocksStorageState::new(); state.sync_data().await?; diff --git a/src/ext.rs b/src/ext.rs index c1af69ef6..9c954d86a 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -4,6 +4,8 @@ use std::time::Duration; use anyhow::anyhow; +use crate::infra::tracing::info_task_spawn; + // ----------------------------------------------------------------------------- // Macros // ----------------------------------------------------------------------------- @@ -185,6 +187,7 @@ pub fn spawn_named(name: &str, task: impl std::future::Future + S where T: Send + 'static, { + info_task_spawn(name); tokio::task::Builder::new().name(name).spawn(task).expect("spawning named task should not fail") } diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index 270e73b38..edebd3223 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -124,7 +124,7 @@ impl BlockchainClient { } } - /// Retrieves the current block number. + /// Fetches the current block number. pub async fn fetch_block_number(&self) -> anyhow::Result { tracing::debug!("fetching block number"); diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index 38535a653..90219bbd0 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -33,7 +33,7 @@ pub async fn init_tracing(url: Option<&String>) { .with_filter(EnvFilter::from_default_env()) .boxed() } else { - println!("tracing registry enabling text logs"); + println!("tracing registry enabling text logs"); fmt::Layer::default() .with_target(false) .with_thread_ids(true) @@ -77,6 +77,12 @@ pub async fn init_tracing(url: Option<&String>) { }); } +/// Emits an info message that a task was spawned to backgroud. +#[track_caller] +pub fn info_task_spawn(name: &str) { + tracing::info!(%name, "spawning task"); +} + /// Emits an warning that a task is exiting because it received a cancenllation signal. /// /// Returns the formatted tracing message.