From 5555d0f5fd16e0b994563cfec68010054056d243 Mon Sep 17 00:00:00 2001 From: Renato Dinhani Date: Wed, 22 May 2024 17:42:53 -0300 Subject: [PATCH 1/3] feat: GlobalState::shutdown instead of CancellationToken everywhere --- Cargo.lock | 1 + Cargo.toml | 4 +- src/bin/importer_offline.rs | 98 +++++++------------ src/bin/importer_online.rs | 96 ++++++++---------- src/bin/rpc_downloader.rs | 2 +- src/bin/run_with_importer.rs | 32 ++---- src/bin/state_validator.rs | 2 +- src/config.rs | 45 ++++++--- src/eth/block_miner.rs | 39 ++++---- src/eth/rpc/rpc_server.rs | 17 ++-- src/eth/rpc/rpc_subscriptions.rs | 49 +++------- src/ext.rs | 25 ----- src/globals.rs | 93 ++++++++++++++++++ src/infra/tracing.rs | 24 +++++ src/lib.rs | 70 +------------ src/main.rs | 10 +- src/utils.rs | 46 ++++----- tests/test_import_external_snapshot_common.rs | 5 +- 18 files changed, 309 insertions(+), 349 deletions(-) create mode 100644 src/globals.rs diff --git a/Cargo.lock b/Cargo.lock index 4db393f2c..493b77159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5333,6 +5333,7 @@ dependencies = [ "nom", "nonempty", "num-traits", + "once_cell", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", diff --git a/Cargo.toml b/Cargo.toml index af9abc287..ac4da9ea3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ humantime = "=2.1.0" indexmap = { version = "=2.2.6", features = ["serde"] } itertools = "=0.12.1" nonempty = { version = "=0.10.0", features = ["serialize"] } +once_cell = "=1.19.0" paste = "=1.0.14" phf = "=0.11.2" pin-project = "=1.1.5" @@ -61,12 +62,9 @@ ethereum-types = "=0.14.1" ethers-core = "=2.0.14" evm-disassembler = "=0.5.0" keccak-hasher = "=0.15.3" # this version must be compatible with triehash - - rlp = "=0.5.2" triehash = "=0.8.4" - # network jsonrpsee = { version = "=0.22.4", features = ["server", "client"] } reqwest = { version = "=0.12.4", features = ["json"] } diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index fac0ef2c7..d6580b738 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -21,12 +21,11 @@ use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; use stratus::ext::not; -use stratus::log_and_err; -use stratus::utils::signal_handler; +use stratus::infra::tracing::warn_task_cancellation; use stratus::GlobalServices; +use stratus::GlobalState; use tokio::runtime::Handle; use tokio::sync::mpsc; -use tokio_util::sync::CancellationToken; /// Number of tasks in the backlog. Each task contains 10_000 blocks and all receipts for them. const BACKLOG_SIZE: usize = 50; @@ -40,18 +39,15 @@ const CSV_CHUNKING_BLOCKS_INTERVAL: u64 = 2_000_000; type BacklogTask = (Vec, Vec); fn main() -> anyhow::Result<()> { - let global_services = GlobalServices::::init(); + let global_services = GlobalServices::::init()?; global_services.runtime.block_on(run(global_services.config)) } async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { - // init cancellation handler - let cancellation = signal_handler(); - // init services let rpc_storage = config.rpc_storage.init().await?; let storage = config.storage.init().await?; - let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?; + let miner = config.miner.init(Arc::clone(&storage), None).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), None, None).await; // init block snapshots to export @@ -85,13 +81,11 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // execute thread: external rpc storage loader let storage_thread = thread::Builder::new().name("storage-loader".into()); let storage_tokio = Handle::current(); - let storage_cancellation = cancellation.clone(); let _ = storage_thread.spawn(move || { let _tokio_guard = storage_tokio.enter(); let result = storage_tokio.block_on(execute_external_rpc_storage_loader( rpc_storage, - storage_cancellation, config.blocks_by_fetch, config.paralellism, block_start, @@ -106,18 +100,9 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // execute thread: block importer let importer_thread = thread::Builder::new().name("block-importer".into()); let importer_tokio = Handle::current(); - let importer_cancellation = cancellation.clone(); let importer_join = importer_thread.spawn(move || { let _tokio_guard = importer_tokio.enter(); - let result = importer_tokio.block_on(execute_block_importer( - executor, - miner, - storage, - csv, - importer_cancellation, - backlog_rx, - block_snapshots, - )); + let result = importer_tokio.block_on(execute_block_importer(executor, miner, storage, csv, backlog_rx, block_snapshots)); if let Err(e) = result { tracing::error!(reason = ?e, "block-importer failed"); } @@ -138,31 +123,32 @@ async fn execute_block_importer( miner: Arc, storage: Arc, mut csv: Option, - cancellation: CancellationToken, // data mut backlog_rx: mpsc::Receiver, blocks_to_export_snapshot: Vec, ) -> anyhow::Result<()> { - tracing::info!("block importer starting"); - - // import blocks and transactions in foreground - let reason = loop { - // retrieve new tasks to execute - let Some((blocks, receipts)) = backlog_rx.recv().await else { - cancellation.cancel(); - break "block loader finished or failed"; + const TASK_NAME: &str = "external-block-executor"; + tracing::info!("starting {}", TASK_NAME); + + loop { + // check cancellation + if GlobalState::is_shutdown() { + warn_task_cancellation(TASK_NAME); + return Ok(()); }; - if cancellation.is_cancelled() { - break "exiting block importer"; + // retrieve new tasks to execute or exit + let Some((blocks, receipts)) = backlog_rx.recv().await else { + tracing::info!("{} has no more blocks to process", TASK_NAME); + return Ok(()); }; + // imports block transactions let block_start = blocks.first().unwrap().number(); let block_end = blocks.last().unwrap().number(); let block_last_index = blocks.len() - 1; let receipts = ExternalReceipts::from(receipts); - // imports block transactions tracing::info!(%block_start, %block_end, receipts = %receipts.len(), "importing blocks"); for (block_index, block) in blocks.into_iter().enumerate() { async { @@ -187,10 +173,7 @@ async fn execute_block_importer( } .await?; } - }; - - tracing::info!(%reason, "block importer finished"); - Ok(()) + } } // ----------------------------------------------------------------------------- @@ -200,7 +183,6 @@ async fn execute_block_importer( async fn execute_external_rpc_storage_loader( // services rpc_storage: Arc, - cancellation: CancellationToken, // data blocks_by_fetch: usize, paralellism: usize, @@ -208,14 +190,15 @@ async fn execute_external_rpc_storage_loader( end: BlockNumber, backlog: mpsc::Sender, ) -> anyhow::Result<()> { - tracing::info!(%start, %end, "external rpc storage loader starting"); + const TASK_NAME: &str = "external-block-loader"; + tracing::info!(%start, %end, "starting {}", TASK_NAME); // prepare loads to be executed in parallel let mut tasks = Vec::new(); while start <= end { let end = min(start + (blocks_by_fetch - 1), end); - let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), cancellation.clone(), start, end); + let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), start, end); tasks.push(task); start += blocks_by_fetch; @@ -223,46 +206,39 @@ async fn execute_external_rpc_storage_loader( // execute loads in parallel let mut tasks = futures::stream::iter(tasks).buffered(paralellism); - let reason = loop { + loop { + // check cancellation + if GlobalState::is_shutdown() { + warn_task_cancellation(TASK_NAME); + return Ok(()); + }; + // retrieve next batch of loaded blocks + // if finished, do not cancel, it is expected to finish let Some(result) = tasks.next().await else { - break "no more blocks to process"; + tracing::info!("{} has no more blocks to process", TASK_NAME); + return Ok(()); }; // check if executed correctly let Ok((blocks, receipts)) = result else { - cancellation.cancel(); - break "block or receipt fetch failed"; + return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt"))); }; // check blocks were really loaded if blocks.is_empty() { - cancellation.cancel(); - return log_and_err!("no blocks returned when they were expected"); + return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected"))); } // send to backlog if backlog.send((blocks, receipts)).await.is_err() { - tracing::error!("failed to send task to importer"); - cancellation.cancel(); - return log_and_err!("failed to send blocks and receipts to importer"); + return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer"))); }; - }; - - tracing::info!(%reason, "external rpc storage loader finished"); - Ok(()) + } } -async fn load_blocks_and_receipts( - rpc_storage: Arc, - cancellation: CancellationToken, - start: BlockNumber, - end: BlockNumber, -) -> anyhow::Result { +async fn load_blocks_and_receipts(rpc_storage: Arc, start: BlockNumber, end: BlockNumber) -> anyhow::Result { tracing::info!(%start, %end, "retrieving blocks and receipts"); - if cancellation.is_cancelled() { - return Err(anyhow!("cancelled")); - } let blocks_task = rpc_storage.read_blocks_in_range(start, end); let receipts_task = rpc_storage.read_receipts_in_range(start, end); try_join!(blocks_task, receipts_task) diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 6b452045c..c640b1f0d 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -16,19 +16,19 @@ use stratus::eth::primitives::Hash; use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; -use stratus::ext::warn_task_cancellation; -use stratus::ext::warn_task_tx_closed; use stratus::ext::DisplayExt; #[cfg(feature = "metrics")] use stratus::infra::metrics; +use stratus::infra::tracing::warn_task_cancellation; +use stratus::infra::tracing::warn_task_rx_closed; +use stratus::infra::tracing::warn_task_tx_closed; use stratus::infra::BlockchainClient; -use stratus::utils::signal_handler; use stratus::GlobalServices; +use stratus::GlobalState; use tokio::sync::mpsc; use tokio::task::yield_now; use tokio::time::sleep; use tokio::time::timeout; -use tokio_util::sync::CancellationToken; // ----------------------------------------------------------------------------- // Globals @@ -56,22 +56,19 @@ const TIMEOUT_NEW_HEADS: Duration = Duration::from_millis(2000); // ----------------------------------------------------------------------------- #[allow(dead_code)] fn main() -> anyhow::Result<()> { - let global_services = GlobalServices::::init(); + let global_services = GlobalServices::::init()?; global_services.runtime.block_on(run(global_services.config)) } async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> { - // init cancellation handler - let cancellation = signal_handler(); - // init server let storage = config.storage.init().await?; let relayer = config.relayer.init(Arc::clone(&storage)).await?; - let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?; + let miner = config.miner.init(Arc::clone(&storage), None).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; //XXX TODO implement the consensus here, in case of it being a follower, it should not even enter here let chain = Arc::new(BlockchainClient::new_http_ws(&config.base.external_rpc, config.base.external_rpc_ws.as_deref()).await?); - let result = run_importer_online(executor, miner, storage, chain, cancellation, config.base.sync_interval).await; + let result = run_importer_online(executor, miner, storage, chain, config.base.sync_interval).await; if let Err(ref e) = result { tracing::error!(reason = ?e, "importer-online failed"); } @@ -83,7 +80,6 @@ pub async fn run_importer_online( miner: Arc, storage: Arc, chain: Arc, - cancellation: CancellationToken, sync_interval: Duration, ) -> anyhow::Result<()> { // start from last imported block @@ -96,21 +92,18 @@ pub async fn run_importer_online( // spawn block executor: // it executes and mines blocks and expects to receive them via channel in the correct order. - let executor_cancellation = cancellation.clone(); - let task_executor = tokio::spawn(start_block_executor(executor, miner, backlog_rx, executor_cancellation)); + let task_executor = tokio::spawn(start_block_executor(executor, miner, backlog_rx)); // spawn block number: // it keeps track of the blockchain current block number. let number_fetcher_chain = Arc::clone(&chain); - let number_fetcher_cancellation = cancellation.clone(); - let task_number_fetcher = tokio::spawn(start_number_fetcher(number_fetcher_chain, number_fetcher_cancellation, sync_interval)); + let task_number_fetcher = tokio::spawn(start_number_fetcher(number_fetcher_chain, sync_interval)); // spawn block fetcher: // it fetches blocks and receipts in parallel and sends them to the executor in the correct order. // it uses the number fetcher current block to determine if should keep downloading more blocks or not. let block_fetcher_chain = Arc::clone(&chain); - let block_fetcher_cancellation = cancellation.clone(); - let task_block_fetcher = tokio::spawn(start_block_fetcher(block_fetcher_chain, block_fetcher_cancellation, backlog_tx, number)); + let task_block_fetcher = tokio::spawn(start_block_fetcher(block_fetcher_chain, backlog_tx, number)); // await all tasks try_join!(task_executor, task_block_fetcher, task_number_fetcher)?; @@ -122,16 +115,13 @@ pub async fn run_importer_online( // ----------------------------------------------------------------------------- // Executes external blocks and persist them to storage. -async fn start_block_executor( - executor: Arc, - miner: Arc, - mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>, - cancellation: CancellationToken, -) { +async fn start_block_executor(executor: Arc, miner: Arc, mut backlog_rx: mpsc::UnboundedReceiver<(ExternalBlock, Vec)>) { + const TASK_NAME: &str = "block-executor"; + while let Some((block, receipts)) = backlog_rx.recv().await { - if cancellation.is_cancelled() { - warn_task_cancellation("block-executor"); - break; + if GlobalState::is_shutdown() { + warn_task_cancellation(TASK_NAME); + return; } #[cfg(feature = "metrics")] @@ -139,15 +129,13 @@ async fn start_block_executor( // execute and mine let receipts = ExternalReceipts::from(receipts); - if let Err(e) = executor.reexecute_external(&block, &receipts).await { - tracing::error!(reason = ?e, number = %block.number(), "cancelling block-executor because failed to reexecute block"); - cancellation.cancel(); - break; + if executor.reexecute_external(&block, &receipts).await.is_err() { + GlobalState::shutdown_from(TASK_NAME, "failed to re-execute block"); + return; }; - if let Err(e) = miner.mine_external_mixed_and_commit().await { - tracing::error!(reason = ?e, number = %block.number(), "cancelling block-executor because failed to mine external block"); - cancellation.cancel(); - break; + if miner.mine_external_mixed_and_commit().await.is_err() { + GlobalState::shutdown_from(TASK_NAME, "failed to mine external block"); + return; }; #[cfg(feature = "metrics")] @@ -164,31 +152,32 @@ async fn start_block_executor( // ----------------------------------------------------------------------------- /// Retrieves the blockchain current block number. -async fn start_number_fetcher(chain: Arc, cancellation: CancellationToken, sync_interval: Duration) { +async fn start_number_fetcher(chain: Arc, sync_interval: Duration) { + const TASK_NAME: &str = "external-number-fetcher"; + // subscribe to newHeads event if WS is enabled let mut sub_new_heads = match chain.is_ws_enabled() { true => { - tracing::info!("subscribing number-fetcher to newHeads event"); + tracing::info!("subscribing {} to newHeads event", TASK_NAME); match chain.subscribe_new_heads().await { Ok(sub) => Some(sub), - Err(e) => { - tracing::error!(reason = ?e, "cancelling number-fetcher because cannot subscribe to newHeads event"); - cancellation.cancel(); + Err(_) => { + GlobalState::shutdown_from(TASK_NAME, "cannot subscribe to newHeads event"); return; } } } false => { - tracing::warn!("number-fetcher blockchain client does not have websocket enabled"); + tracing::warn!("{} blockchain client does not have websocket enabled", TASK_NAME); None } }; loop { // check cancellation - if cancellation.is_cancelled() { - warn_task_cancellation("number-fetcher"); - break; + if GlobalState::is_shutdown() { + warn_task_cancellation(TASK_NAME); + return; } tracing::info!("fetching current block number"); @@ -252,16 +241,14 @@ async fn start_number_fetcher(chain: Arc, cancellation: Cancel // ----------------------------------------------------------------------------- /// Retrieves blocks and receipts. -async fn start_block_fetcher( - chain: Arc, - cancellation: CancellationToken, - backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, - mut number: BlockNumber, -) { +async fn start_block_fetcher(chain: Arc, backlog_tx: mpsc::UnboundedSender<(ExternalBlock, Vec)>, mut number: BlockNumber) { + const TASK_NAME: &str = "external-block-fetcher"; + loop { - if cancellation.is_cancelled() { - warn_task_cancellation("block-fetcher"); - break; + // check cancellation + if GlobalState::is_shutdown() { + warn_task_cancellation(TASK_NAME); + return; } // if we are ahead of current block number, await until we are behind again @@ -286,9 +273,8 @@ async fn start_block_fetcher( let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS); while let Some((block, receipts)) = tasks.next().await { if backlog_tx.send((block, receipts)).is_err() { - tracing::error!("cancelling block-fetcher because backlog channel was closed by the other side"); - cancellation.cancel(); - break; + warn_task_rx_closed(TASK_NAME); + return; } } } diff --git a/src/bin/rpc_downloader.rs b/src/bin/rpc_downloader.rs index 87c75adbb..94ec2f6c5 100644 --- a/src/bin/rpc_downloader.rs +++ b/src/bin/rpc_downloader.rs @@ -21,7 +21,7 @@ use stratus::GlobalServices; const BLOCKS_BY_TASK: usize = 1_000; fn main() -> anyhow::Result<()> { - let global_services = GlobalServices::::init(); + let global_services = GlobalServices::::init()?; global_services.runtime.block_on(run(global_services.config)) } diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index a8e231021..11a29860b 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -7,18 +7,17 @@ use stratus::config::RunWithImporterConfig; use stratus::eth::consensus::Consensus; use stratus::eth::rpc::serve_rpc; use stratus::infra::BlockchainClient; -use stratus::utils::signal_handler; use stratus::GlobalServices; +use stratus::GlobalState; use tokio::join; fn main() -> anyhow::Result<()> { - let global_services = GlobalServices::::init(); + let global_services = GlobalServices::::init()?; global_services.runtime.block_on(run(global_services.config)) } async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { - // init cancellation handler - let cancellation = signal_handler(); + const TASK_NAME: &str = "run-with-importer"; // init services let storage = config.storage.init().await?; @@ -28,38 +27,23 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref()).await?); let relayer = config.relayer.init(Arc::clone(&storage)).await?; - let miner = config - .miner - .init(Arc::clone(&storage), Some(Arc::clone(&consensus)), cancellation.clone()) - .await?; + let miner = config.miner.init(Arc::clone(&storage), Some(Arc::clone(&consensus))).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, Some(consensus)).await; let rpc_storage = Arc::clone(&storage); let rpc_executor = Arc::clone(&executor); let rpc_miner = Arc::clone(&miner); - let rpc_cancellation = cancellation.clone(); - // run rpc and importer-online in parallel let rpc_task = async move { - let res = serve_rpc( - rpc_storage, - rpc_executor, - rpc_miner, - config.address, - config.executor.chain_id.into(), - rpc_cancellation.clone(), - ) - .await; - tracing::warn!("serve_rpc finished, cancelling tasks"); - rpc_cancellation.cancel(); + let res = serve_rpc(rpc_storage, rpc_executor, rpc_miner, config.address, config.executor.chain_id.into()).await; + GlobalState::shutdown_from(TASK_NAME, "rpc server finished unexpectedly"); res }; let importer_task = async move { - let res = run_importer_online(executor, miner, storage, chain, cancellation.clone(), config.online.sync_interval).await; - tracing::warn!("run_importer_online finished, cancelling tasks"); - cancellation.cancel(); + let res = run_importer_online(executor, miner, storage, chain, config.online.sync_interval).await; + GlobalState::shutdown_from(TASK_NAME, "importer online finished unexpectedly"); res }; diff --git a/src/bin/state_validator.rs b/src/bin/state_validator.rs index 15632694b..43a2bcfc3 100644 --- a/src/bin/state_validator.rs +++ b/src/bin/state_validator.rs @@ -10,7 +10,7 @@ use stratus::GlobalServices; use tokio::task::JoinSet; fn main() -> anyhow::Result<()> { - let global_services = GlobalServices::::init(); + let global_services = GlobalServices::::init()?; global_services.runtime.block_on(run(global_services.config)) } diff --git a/src/config.rs b/src/config.rs index 6257cdd38..171f8bdbe 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,9 +13,7 @@ use display_json::DebugAsJson; use tokio::runtime::Builder; use tokio::runtime::Handle; use tokio::runtime::Runtime; -use tokio_util::sync::CancellationToken; -use crate::bin_name; use crate::eth::evm::revm::Revm; use crate::eth::evm::Evm; use crate::eth::evm::EvmConfig; @@ -42,13 +40,15 @@ use crate::eth::Consensus; use crate::eth::EvmTask; use crate::eth::Executor; use crate::eth::TransactionRelayer; -use crate::ext::warn_task_tx_closed; +use crate::infra::tracing::warn_task_cancellation; +use crate::infra::tracing::warn_task_tx_closed; use crate::infra::BlockchainClient; +use crate::GlobalState; /// Loads .env files according to the binary and environment. pub fn load_dotenv() { let env = std::env::var("ENV").unwrap_or_else(|_| "local".to_string()); - let env_filename = format!("config/{}.env.{}", bin_name(), env); + let env_filename = format!("config/{}.env.{}", binary_name(), env); println!("reading env file: {}", env_filename); if let Err(e) = dotenvy::from_filename(env_filename) { @@ -170,6 +170,8 @@ impl ExecutorConfig { relayer: Option>, consensus: Option>, ) -> Arc { + const TASK_NAME: &str = "evm-thread"; + let num_evms = max(self.num_evms, 1); tracing::info!(config = ?self, "starting executor"); @@ -202,12 +204,20 @@ impl ExecutorConfig { // keep executing transactions until the channel is closed while let Ok((input, tx)) = evm_rx.recv() { + // check cancellation + if GlobalState::is_shutdown() { + warn_task_cancellation(TASK_NAME); + return; + } + + // execute let result = evm.execute(input); if let Err(e) = tx.send(result) { tracing::error!(reason = ?e, "failed to send evm execution result"); }; } - warn_task_tx_closed("evm thread"); + + warn_task_tx_closed(TASK_NAME); }) .expect("spawning evm threads should not fail"); } @@ -237,12 +247,7 @@ pub struct MinerConfig { } impl MinerConfig { - pub async fn init( - &self, - storage: Arc, - consensus: Option>, - cancellation: CancellationToken, - ) -> anyhow::Result> { + pub async fn init(&self, storage: Arc, consensus: Option>) -> anyhow::Result> { tracing::info!(config = ?self, "starting block miner"); // create miner @@ -271,7 +276,7 @@ impl MinerConfig { // enable interval miner if miner.is_interval_miner_mode() { - Arc::clone(&miner).spawn_interval_miner(cancellation)?; + Arc::clone(&miner).spawn_interval_miner()?; } Ok(miner) @@ -791,8 +796,10 @@ impl FromStr for ValidatorMethodConfig { } // ----------------------------------------------------------------------------- -// Parsers +// Helpers // ----------------------------------------------------------------------------- + +/// Parses a duration specified using human-time notation or fallback to milliseconds. fn parse_duration(s: &str) -> anyhow::Result { // try millis let millis: Result = s.parse(); @@ -808,3 +815,15 @@ fn parse_duration(s: &str) -> anyhow::Result { // error Err(anyhow!("invalid duration format: {}", s)) } + +/// Gets the current binary basename. +fn binary_name() -> String { + let binary = std::env::current_exe().unwrap(); + let binary_basename = binary.file_name().unwrap().to_str().unwrap().to_lowercase(); + + if binary_basename.starts_with("test_") { + "tests".to_string() + } else { + binary_basename + } +} diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index 1506a5249..f8ea77685 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -7,7 +7,6 @@ use nonempty::NonEmpty; use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::time::Instant; -use tokio_util::sync::CancellationToken; use super::Consensus; use crate::eth::primitives::Block; @@ -61,7 +60,7 @@ impl BlockMiner { } /// Spawns a new thread that keep mining blocks in the specified interval. - pub fn spawn_interval_miner(self: Arc, cancellation: CancellationToken) -> anyhow::Result<()> { + pub fn spawn_interval_miner(self: Arc) -> anyhow::Result<()> { // validate let Some(block_time) = self.block_time else { return log_and_err!("cannot spawn interval miner because it does not have a block time defined"); @@ -70,8 +69,8 @@ impl BlockMiner { // spawn miner and ticker let (ticks_tx, ticks_rx) = mpsc::unbounded_channel::(); - spawn_named("miner::miner", interval_miner::run(Arc::clone(&self), ticks_rx, cancellation.child_token())); - spawn_named("miner::ticker", interval_miner_ticker::run(block_time, ticks_tx, cancellation.child_token())); + spawn_named("miner::miner", interval_miner::run(Arc::clone(&self), ticks_rx)); + spawn_named("miner::ticker", interval_miner_ticker::run(block_time, ticks_tx)); Ok(()) } @@ -316,17 +315,19 @@ mod interval_miner { use tokio::sync::mpsc; use tokio::time::Instant; - use tokio_util::sync::CancellationToken; use crate::eth::BlockMiner; - use crate::ext::warn_task_cancellation; - use crate::ext::warn_task_rx_closed; + use crate::infra::tracing::warn_task_cancellation; + use crate::infra::tracing::warn_task_rx_closed; + use crate::GlobalState; + + pub async fn run(miner: Arc, mut ticks_rx: mpsc::UnboundedReceiver) { + const TASK_NAME: &str = "interval-miner-ticker"; - pub async fn run(miner: Arc, mut ticks_rx: mpsc::UnboundedReceiver, cancellation: CancellationToken) { while let Some(tick) = ticks_rx.recv().await { // check cancellation - if cancellation.is_cancelled() { - warn_task_cancellation("interval miner"); + if GlobalState::is_shutdown() { + warn_task_cancellation(TASK_NAME); return; } @@ -334,7 +335,7 @@ mod interval_miner { tracing::info!(lag_ys = %tick.elapsed().as_micros(), "interval mining block"); mine_and_commit(&miner).await; } - warn_task_rx_closed("interval miner"); + warn_task_rx_closed(TASK_NAME); } #[inline(always)] @@ -370,12 +371,14 @@ mod interval_miner_ticker { use chrono::Utc; use tokio::sync::mpsc; use tokio::time::Instant; - use tokio_util::sync::CancellationToken; - use crate::ext::warn_task_cancellation; - use crate::ext::warn_task_rx_closed; + use crate::infra::tracing::warn_task_cancellation; + use crate::infra::tracing::warn_task_rx_closed; + use crate::GlobalState; + + pub async fn run(block_time: Duration, ticks_tx: mpsc::UnboundedSender) { + const TASK_NAME: &str = "interval-miner-ticker"; - pub async fn run(block_time: Duration, ticks_tx: mpsc::UnboundedSender, cancellation: CancellationToken) { // sync to next second let next_second = (Utc::now() + Duration::from_secs(1)).with_nanosecond(0).unwrap(); thread::sleep((next_second - Utc::now()).to_std().unwrap()); @@ -388,15 +391,15 @@ mod interval_miner_ticker { ticker.tick().await; loop { // check cancellation - if cancellation.is_cancelled() { - warn_task_cancellation("interval miner ticker"); + if GlobalState::is_shutdown() { + warn_task_cancellation(TASK_NAME); return; } // await next tick let tick = ticker.tick().await; if ticks_tx.send(tick).is_err() { - warn_task_rx_closed("interval miner ticker"); + warn_task_rx_closed(TASK_NAME); break; }; } diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index c5e8f43da..a1164b582 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -17,7 +17,6 @@ use jsonrpsee::PendingSubscriptionSink; use serde_json::json; use serde_json::Value as JsonValue; use tokio::select; -use tokio_util::sync::CancellationToken; use crate::eth::primitives::Address; #[cfg(feature = "dev")] @@ -43,6 +42,8 @@ use crate::eth::rpc::RpcSubscriptions; use crate::eth::storage::StratusStorage; use crate::eth::BlockMiner; use crate::eth::Executor; +use crate::infra::tracing::warn_task_cancellation; +use crate::GlobalState; // ----------------------------------------------------------------------------- // Server @@ -57,16 +58,16 @@ pub async fn serve_rpc( // config address: SocketAddr, chain_id: ChainId, - cancellation: CancellationToken, ) -> anyhow::Result<()> { - tracing::info!("starting rpc server"); + const TASK_NAME: &str = "rpc-server"; + + tracing::info!("starting {}", TASK_NAME); // configure subscriptions let subs = RpcSubscriptions::spawn( miner.notifier_pending_txs.subscribe(), miner.notifier_blocks.subscribe(), miner.notifier_logs.subscribe(), - cancellation.child_token(), ); // configure context @@ -83,7 +84,6 @@ pub async fn serve_rpc( // subscriptions subs: Arc::clone(&subs.connected), }; - tracing::info!(%address, ?ctx, "starting rpc server"); // configure module let mut module = RpcModule::::new(ctx); @@ -110,11 +110,10 @@ pub async fn serve_rpc( // await for cancellation or jsonrpsee to stop (should not happen) select! { _ = handle_rpc_server_watch.stopped() => { - tracing::warn!("rpc server finished unexpectedly, cancelling tasks"); - cancellation.cancel(); + GlobalState::shutdown_from(TASK_NAME, "finished unexpectedly"); }, - _ = cancellation.cancelled() => { - tracing::info!("rpc server cancelled, stopping it"); + _ = GlobalState::until_shutdown() => { + warn_task_cancellation(TASK_NAME); let _ = handle_rpc_server.stop(); } } diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index 5ac3b62de..7480262c6 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -9,7 +9,6 @@ use tokio::sync::broadcast; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio::time::Duration; -use tokio_util::sync::CancellationToken; use crate::eth::primitives::BlockHeader; use crate::eth::primitives::Hash; @@ -17,11 +16,12 @@ use crate::eth::primitives::LogFilter; use crate::eth::primitives::LogMined; use crate::ext::not; use crate::ext::spawn_named; -use crate::ext::warn_task_cancellation; -use crate::ext::warn_task_tx_closed; use crate::if_else; #[cfg(feature = "metrics")] use crate::infra::metrics; +use crate::infra::tracing::warn_task_cancellation; +use crate::infra::tracing::warn_task_tx_closed; +use crate::GlobalState; /// Frequency of cleaning up closed subscriptions. const CLEANING_FREQUENCY: Duration = Duration::from_secs(10); @@ -47,31 +47,26 @@ pub struct RpcSubscriptions { impl RpcSubscriptions { /// Creates a new subscriptin manager that automatically spawns all necessary tasks in background. - pub fn spawn( - rx_pending_txs: broadcast::Receiver, - rx_blocks: broadcast::Receiver, - rx_logs: broadcast::Receiver, - cancellation: CancellationToken, - ) -> Self { + pub fn spawn(rx_pending_txs: broadcast::Receiver, rx_blocks: broadcast::Receiver, rx_logs: broadcast::Receiver) -> Self { let connected = Arc::new(RpcSubscriptionsConnected::default()); - Self::spawn_subscriptions_cleaner(Arc::clone(&connected), cancellation.child_token()); + Self::spawn_subscriptions_cleaner(Arc::clone(&connected)); let handles = RpcSubscriptionsHandles { - new_pending_txs: Self::spawn_new_pending_txs_notifier(Arc::clone(&connected), rx_pending_txs, cancellation.child_token()), - new_heads: Self::spawn_new_heads_notifier(Arc::clone(&connected), rx_blocks, cancellation.child_token()), - logs: Self::spawn_logs_notifier(Arc::clone(&connected), rx_logs, cancellation.child_token()), + new_pending_txs: Self::spawn_new_pending_txs_notifier(Arc::clone(&connected), rx_pending_txs), + new_heads: Self::spawn_new_heads_notifier(Arc::clone(&connected), rx_blocks), + logs: Self::spawn_logs_notifier(Arc::clone(&connected), rx_logs), }; Self { connected, handles } } /// Spawns a new task to clean up closed subscriptions from time to time. - fn spawn_subscriptions_cleaner(subs: Arc, cancellation: CancellationToken) -> JoinHandle> { + fn spawn_subscriptions_cleaner(subs: Arc) -> JoinHandle> { tracing::info!("spawning rpc subscriptions cleaner"); spawn_named("rpc::sub::cleaner", async move { loop { - if cancellation.is_cancelled() { + if GlobalState::is_shutdown() { warn_task_cancellation("rpc subscription cleaner"); return Ok(()); } @@ -96,16 +91,12 @@ impl RpcSubscriptions { } /// Spawns a new task that notifies subscribers about new executed transactions. - fn spawn_new_pending_txs_notifier( - subs: Arc, - mut rx: broadcast::Receiver, - cancellation: CancellationToken, - ) -> JoinHandle> { + fn spawn_new_pending_txs_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { tracing::info!("spawning rpc newPendingTransactions notifier"); spawn_named("rpc::sub::newPendingTransactions", async move { loop { - if cancellation.is_cancelled() { + if GlobalState::is_shutdown() { warn_task_cancellation("rpc newPendingTransactions notifier"); return Ok(()); } @@ -123,16 +114,12 @@ impl RpcSubscriptions { } /// Spawns a new task that notifies subscribers about new created blocks. - fn spawn_new_heads_notifier( - subs: Arc, - mut rx: broadcast::Receiver, - cancellation: CancellationToken, - ) -> JoinHandle> { + fn spawn_new_heads_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { tracing::info!("spawning rpc newHeads notifier"); spawn_named("rpc::sub::newHeads", async move { loop { - if cancellation.is_cancelled() { + if GlobalState::is_shutdown() { warn_task_cancellation("rpc newHeads notifier"); return Ok(()); } @@ -150,16 +137,12 @@ impl RpcSubscriptions { } /// Spawns a new task that notifies subscribers about new transactions logs. - fn spawn_logs_notifier( - subs: Arc, - mut rx: broadcast::Receiver, - cancellation: CancellationToken, - ) -> JoinHandle> { + fn spawn_logs_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { tracing::info!("spawning rpc logs notifier"); spawn_named("rpc::sub::logs", async move { loop { - if cancellation.is_cancelled() { + if GlobalState::is_shutdown() { warn_task_cancellation("rpc logs notifier"); return Ok(()); } diff --git a/src/ext.rs b/src/ext.rs index 2866e5817..1f86c7970 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -129,31 +129,6 @@ impl DisplayExt for std::time::Duration { } } -// ----------------------------------------------------------------------------- -// Tracing -// ----------------------------------------------------------------------------- - -/// Emits an warning that a task is exiting because it received a cancenllation signal. -#[track_caller] -pub fn warn_task_cancellation(task: &str) { - let message = format!("exiting {} because it received a cancellation signal", task); - tracing::warn!(%message); -} - -/// Emits an warning that a task is exiting because the tx side was closed. -#[track_caller] -pub fn warn_task_tx_closed(task: &str) { - let message = format!("exiting {} because the tx channel on the other side was closed", task); - tracing::warn!(%message); -} - -/// Emits an warning that a task is exiting because the rx side was closed. -#[track_caller] -pub fn warn_task_rx_closed(task: &str) { - let message = format!("exiting {} because the rx channel on the other side was closed", task); - tracing::warn!(%message); -} - // ----------------------------------------------------------------------------- // Tokio // ----------------------------------------------------------------------------- diff --git a/src/globals.rs b/src/globals.rs new file mode 100644 index 000000000..e5f8d0373 --- /dev/null +++ b/src/globals.rs @@ -0,0 +1,93 @@ +use std::env; +use std::fmt::Debug; + +use once_cell::sync::Lazy; +use sentry::ClientInitGuard; +use tokio::runtime::Runtime; +use tokio_util::sync::CancellationToken; + +use crate::config::load_dotenv; +use crate::config::WithCommonConfig; +use crate::infra; +use crate::utils::spawn_signal_handler; + +// ----------------------------------------------------------------------------- +// Global services +// ----------------------------------------------------------------------------- + +pub struct GlobalServices +where + T: clap::Parser + WithCommonConfig + Debug, +{ + pub config: T, + pub runtime: Runtime, + _sentry_guard: Option, +} + +impl GlobalServices +where + T: clap::Parser + WithCommonConfig + Debug, +{ + /// Executes global services initialization. + pub fn init() -> anyhow::Result + where + T: clap::Parser + WithCommonConfig + Debug, + { + // parse configuration + load_dotenv(); + let config = T::parse(); + + if env::var_os("PERM_STORAGE_CONNECTIONS").is_some_and(|value| value == "1") { + println!("WARNING: env var PERM_STORAGE_CONNECTIONS is set to 1, if it cause connection problems, try increasing it"); + } + + // init tokio + let runtime = config.common().init_runtime(); + + // init metrics + #[cfg(feature = "metrics")] + infra::init_metrics(config.common().metrics_histogram_kind); + + // init sentry + let _sentry_guard = config.common().sentry_url.as_ref().map(|sentry_url| infra::init_sentry(sentry_url)); + + // init signal handler + runtime.block_on(spawn_signal_handler())?; + + // init tracing + runtime.block_on(infra::init_tracing(config.common().tracing_url.as_ref())); + + Ok(Self { + config, + runtime, + _sentry_guard, + }) + } +} + +// ----------------------------------------------------------------------------- +// Global state +// ----------------------------------------------------------------------------- + +static CANCELLATION: Lazy = Lazy::new(CancellationToken::new); +pub struct GlobalState; + +impl GlobalState { + #[track_caller] + /// Shutdown the application. + pub fn shutdown_from(caller: &str, reason: &str) -> String { + tracing::warn!(%caller, %reason, "application is shutting down"); + CANCELLATION.cancel(); + format!("{} {}", caller, reason) + } + + /// Checks if the application is being shutdown. + pub fn is_shutdown() -> bool { + CANCELLATION.is_cancelled() + } + + /// Awaits until a shutdown is received. + pub async fn until_shutdown() { + CANCELLATION.cancelled().await; + } +} diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index b143a60bf..e8b327277 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -78,3 +78,27 @@ pub async fn init_tracing(url: Option<&String>) { tracing::info!("started tracing"); } + +/// Emits an warning that a task is exiting because it received a cancenllation signal. +#[track_caller] +pub fn warn_task_cancellation(task: &str) -> String { + let message = format!("exiting {} because it received a cancellation signal", task); + tracing::warn!(%message); + message +} + +/// Emits an warning that a task is exiting because the tx side was closed. +#[track_caller] +pub fn warn_task_tx_closed(task: &str) -> String { + let message = format!("exiting {} because the tx channel on the other side was closed", task); + tracing::warn!(%message); + message +} + +/// Emits an warning that a task is exiting because the rx side was closed. +#[track_caller] +pub fn warn_task_rx_closed(task: &str) -> String { + let message = format!("exiting {} because the rx channel on the other side was closed", task); + tracing::warn!(%message); + message +} diff --git a/src/lib.rs b/src/lib.rs index 969a9aa3f..a4fe2d41c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,73 +1,9 @@ -use std::env; -use std::fmt::Debug; - -use sentry::ClientInitGuard; -use tokio::runtime::Runtime; - -use crate::config::load_dotenv; -use crate::config::WithCommonConfig; - pub mod config; pub mod eth; pub mod ext; +mod globals; pub mod infra; pub mod utils; -pub struct GlobalServices -where - T: clap::Parser + WithCommonConfig + Debug, -{ - pub config: T, - pub runtime: Runtime, - _sentry_guard: Option, -} - -impl GlobalServices -where - T: clap::Parser + WithCommonConfig + Debug, -{ - /// Executes global services initialization. - pub fn init() -> Self - where - T: clap::Parser + WithCommonConfig + Debug, - { - // parse configuration - load_dotenv(); - let config = T::parse(); - - if env::var_os("PERM_STORAGE_CONNECTIONS").is_some_and(|value| value == "1") { - println!("WARNING: env var PERM_STORAGE_CONNECTIONS is set to 1, if it cause connection problems, try increasing it"); - } - - // init metrics - #[cfg(feature = "metrics")] - infra::init_metrics(config.common().metrics_histogram_kind); - - // init sentry - let _sentry_guard = config.common().sentry_url.as_ref().map(|sentry_url| infra::init_sentry(sentry_url)); - - // init tokio - let runtime = config.common().init_runtime(); - - // init tracing - runtime.block_on(infra::init_tracing(config.common().tracing_url.as_ref())); - - Self { - config, - runtime, - _sentry_guard, - } - } -} - -/// Get the current binary basename. -pub fn bin_name() -> String { - let binary = std::env::current_exe().unwrap(); - let binary_basename = binary.file_name().unwrap().to_str().unwrap().to_lowercase(); - - if binary_basename.starts_with("test_") { - "tests".to_string() - } else { - binary_basename - } -} +pub use globals::GlobalServices; +pub use globals::GlobalState; diff --git a/src/main.rs b/src/main.rs index 849fa9551..3ee3b5740 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,26 +2,22 @@ use std::sync::Arc; use stratus::config::StratusConfig; use stratus::eth::rpc::serve_rpc; -use stratus::utils::signal_handler; use stratus::GlobalServices; fn main() -> anyhow::Result<()> { - let global_services = GlobalServices::::init(); + let global_services = GlobalServices::::init()?; global_services.runtime.block_on(run(global_services.config)) } async fn run(config: StratusConfig) -> anyhow::Result<()> { - // init cancellation handler - let cancellation = signal_handler(); - // init services let storage = config.storage.init().await?; let relayer = config.relayer.init(Arc::clone(&storage)).await?; - let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?; + let miner = config.miner.init(Arc::clone(&storage), None).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; // start rpc server - serve_rpc(storage, executor, miner, config.address, config.executor.chain_id.into(), cancellation).await?; + serve_rpc(storage, executor, miner, config.address, config.executor.chain_id.into()).await?; Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index de5b5f717..db5c5d3dd 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,50 +1,38 @@ -use anyhow::anyhow; use tokio::select; use tokio::signal::unix::signal; -use tokio::signal::unix::Signal; use tokio::signal::unix::SignalKind; -use tokio_util::sync::CancellationToken; use uuid::Uuid; use crate::ext::spawn_named; +use crate::log_and_err; +use crate::GlobalState; pub fn new_context_id() -> String { Uuid::new_v4().to_string() } -fn signal_or_cancel(kind: SignalKind, cancellation: &CancellationToken) -> anyhow::Result { - match signal(kind) { - Ok(signal) => Ok(signal), - Err(err) => { - tracing::error!(?err, "unable to listen for SIGTERM"); - cancellation.cancel(); - Err(anyhow!("signal handler init failed")) - } - } -} - -pub fn signal_handler() -> CancellationToken { - let cancellation = CancellationToken::new(); - let task_cancellation = cancellation.clone(); - spawn_named("sys::signal_handler", async move { - let Ok(mut sigterm) = signal_or_cancel(SignalKind::terminate(), &task_cancellation) else { - return; - }; +pub async fn spawn_signal_handler() -> anyhow::Result<()> { + const TASK_NAME: &str = "signal-handler"; - let Ok(mut sigint) = signal_or_cancel(SignalKind::interrupt(), &task_cancellation) else { - return; - }; + let mut sigterm = match signal(SignalKind::terminate()) { + Ok(signal) => signal, + Err(e) => return log_and_err!(reason = e, "failed to init SIGTERM watcher"), + }; + let mut sigint = match signal(SignalKind::interrupt()) { + Ok(signal) => signal, + Err(e) => return log_and_err!(reason = e, "failed to init SIGINT watcher"), + }; + spawn_named("sys::signal_handler", async move { select! { _ = sigterm.recv() => { - tracing::info!("SIGTERM received, cancelling tasks"); + GlobalState::shutdown_from(TASK_NAME, "received SIGTERM"); } - _ = sigint.recv() => { - tracing::info!("SIGINT signal received, shutting down"); + GlobalState::shutdown_from(TASK_NAME, "received SIGINT"); } } - task_cancellation.cancel(); }); - cancellation + + Ok(()) } diff --git a/tests/test_import_external_snapshot_common.rs b/tests/test_import_external_snapshot_common.rs index 36d522e5f..b957c0f85 100644 --- a/tests/test_import_external_snapshot_common.rs +++ b/tests/test_import_external_snapshot_common.rs @@ -20,7 +20,6 @@ use stratus::eth::storage::PermanentStorage; use stratus::eth::storage::StratusStorage; use stratus::infra::docker::Docker; use stratus::GlobalServices; -use tokio_util::sync::CancellationToken; #[cfg(feature = "metrics")] mod m { pub use const_format::formatcp; @@ -102,7 +101,7 @@ pub fn init_config_and_data( InMemoryPermanentStorageState, ) { // init config - let mut global_services = GlobalServices::::init(); + let mut global_services = GlobalServices::::init().unwrap(); global_services.config.executor.chain_id = 2009; global_services.config.executor.num_evms = 8; global_services.config.storage.perm_storage.perm_storage_connections = 9; @@ -156,7 +155,7 @@ pub async fn execute_test( // init services let storage = Arc::new(StratusStorage::new(Arc::new(InMemoryTemporaryStorage::new()), Arc::new(perm_storage))); let relayer = config.relayer.init(Arc::clone(&storage)).await.unwrap(); - let miner = config.miner.init(Arc::clone(&storage), None, CancellationToken::new()).await.unwrap(); + let miner = config.miner.init(Arc::clone(&storage), None).await.unwrap(); let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; // execute and mine From d78319cf1a27281a7505c3b608eab4d009228815 Mon Sep 17 00:00:00 2001 From: Renato Dinhani Date: Wed, 22 May 2024 17:45:06 -0300 Subject: [PATCH 2/3] doc --- src/globals.rs | 2 ++ src/infra/tracing.rs | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/src/globals.rs b/src/globals.rs index e5f8d0373..90173fefb 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -75,6 +75,8 @@ pub struct GlobalState; impl GlobalState { #[track_caller] /// Shutdown the application. + /// + /// Returns the formatted reason for shutdown. pub fn shutdown_from(caller: &str, reason: &str) -> String { tracing::warn!(%caller, %reason, "application is shutting down"); CANCELLATION.cancel(); diff --git a/src/infra/tracing.rs b/src/infra/tracing.rs index e8b327277..7cfe5d320 100644 --- a/src/infra/tracing.rs +++ b/src/infra/tracing.rs @@ -80,6 +80,8 @@ pub async fn init_tracing(url: Option<&String>) { } /// Emits an warning that a task is exiting because it received a cancenllation signal. +/// +/// Returns the formatted tracing message. #[track_caller] pub fn warn_task_cancellation(task: &str) -> String { let message = format!("exiting {} because it received a cancellation signal", task); @@ -88,6 +90,8 @@ pub fn warn_task_cancellation(task: &str) -> String { } /// Emits an warning that a task is exiting because the tx side was closed. +/// +/// Returns the formatted tracing message. #[track_caller] pub fn warn_task_tx_closed(task: &str) -> String { let message = format!("exiting {} because the tx channel on the other side was closed", task); @@ -96,6 +100,8 @@ pub fn warn_task_tx_closed(task: &str) -> String { } /// Emits an warning that a task is exiting because the rx side was closed. +/// +/// Returns the formatted tracing message. #[track_caller] pub fn warn_task_rx_closed(task: &str) -> String { let message = format!("exiting {} because the rx channel on the other side was closed", task); From 0c719ad8b793d07ed400760a96a162b404798e6b Mon Sep 17 00:00:00 2001 From: Renato Dinhani Date: Wed, 22 May 2024 18:07:58 -0300 Subject: [PATCH 3/3] warn_if_shutdown --- src/bin/importer_offline.rs | 9 ++------- src/bin/importer_online.rs | 15 +++++---------- src/config.rs | 5 +---- src/eth/block_miner.rs | 11 ++--------- src/eth/rpc/rpc_subscriptions.rs | 31 +++++++++++++++---------------- src/globals.rs | 11 ++++++++++- 6 files changed, 35 insertions(+), 47 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index d6580b738..d1650cf57 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -21,7 +21,6 @@ use stratus::eth::storage::StratusStorage; use stratus::eth::BlockMiner; use stratus::eth::Executor; use stratus::ext::not; -use stratus::infra::tracing::warn_task_cancellation; use stratus::GlobalServices; use stratus::GlobalState; use tokio::runtime::Handle; @@ -131,9 +130,7 @@ async fn execute_block_importer( tracing::info!("starting {}", TASK_NAME); loop { - // check cancellation - if GlobalState::is_shutdown() { - warn_task_cancellation(TASK_NAME); + if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); }; @@ -207,9 +204,7 @@ async fn execute_external_rpc_storage_loader( // execute loads in parallel let mut tasks = futures::stream::iter(tasks).buffered(paralellism); loop { - // check cancellation - if GlobalState::is_shutdown() { - warn_task_cancellation(TASK_NAME); + if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); }; diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index c640b1f0d..a9f0038ac 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -19,7 +19,6 @@ use stratus::eth::Executor; use stratus::ext::DisplayExt; #[cfg(feature = "metrics")] use stratus::infra::metrics; -use stratus::infra::tracing::warn_task_cancellation; use stratus::infra::tracing::warn_task_rx_closed; use stratus::infra::tracing::warn_task_tx_closed; use stratus::infra::BlockchainClient; @@ -119,8 +118,7 @@ async fn start_block_executor(executor: Arc, miner: Arc, m const TASK_NAME: &str = "block-executor"; while let Some((block, receipts)) = backlog_rx.recv().await { - if GlobalState::is_shutdown() { - warn_task_cancellation(TASK_NAME); + if GlobalState::warn_if_shutdown(TASK_NAME) { return; } @@ -144,7 +142,8 @@ async fn start_block_executor(executor: Arc, miner: Arc, m metrics::inc_import_online_mined_block(start.elapsed()); } } - warn_task_tx_closed("block-executor"); + + warn_task_tx_closed(TASK_NAME); } // ----------------------------------------------------------------------------- @@ -174,9 +173,7 @@ async fn start_number_fetcher(chain: Arc, sync_interval: Durat }; loop { - // check cancellation - if GlobalState::is_shutdown() { - warn_task_cancellation(TASK_NAME); + if GlobalState::warn_if_shutdown(TASK_NAME) { return; } tracing::info!("fetching current block number"); @@ -245,9 +242,7 @@ async fn start_block_fetcher(chain: Arc, backlog_tx: mpsc::Unb const TASK_NAME: &str = "external-block-fetcher"; loop { - // check cancellation - if GlobalState::is_shutdown() { - warn_task_cancellation(TASK_NAME); + if GlobalState::warn_if_shutdown(TASK_NAME) { return; } diff --git a/src/config.rs b/src/config.rs index 171f8bdbe..f7d2100a9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -40,7 +40,6 @@ use crate::eth::Consensus; use crate::eth::EvmTask; use crate::eth::Executor; use crate::eth::TransactionRelayer; -use crate::infra::tracing::warn_task_cancellation; use crate::infra::tracing::warn_task_tx_closed; use crate::infra::BlockchainClient; use crate::GlobalState; @@ -204,9 +203,7 @@ impl ExecutorConfig { // keep executing transactions until the channel is closed while let Ok((input, tx)) = evm_rx.recv() { - // check cancellation - if GlobalState::is_shutdown() { - warn_task_cancellation(TASK_NAME); + if GlobalState::warn_if_shutdown(TASK_NAME) { return; } diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index f8ea77685..79b391c43 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -317,7 +317,6 @@ mod interval_miner { use tokio::time::Instant; use crate::eth::BlockMiner; - use crate::infra::tracing::warn_task_cancellation; use crate::infra::tracing::warn_task_rx_closed; use crate::GlobalState; @@ -325,9 +324,7 @@ mod interval_miner { const TASK_NAME: &str = "interval-miner-ticker"; while let Some(tick) = ticks_rx.recv().await { - // check cancellation - if GlobalState::is_shutdown() { - warn_task_cancellation(TASK_NAME); + if GlobalState::warn_if_shutdown(TASK_NAME) { return; } @@ -372,7 +369,6 @@ mod interval_miner_ticker { use tokio::sync::mpsc; use tokio::time::Instant; - use crate::infra::tracing::warn_task_cancellation; use crate::infra::tracing::warn_task_rx_closed; use crate::GlobalState; @@ -390,13 +386,10 @@ mod interval_miner_ticker { // keep ticking ticker.tick().await; loop { - // check cancellation - if GlobalState::is_shutdown() { - warn_task_cancellation(TASK_NAME); + if GlobalState::warn_if_shutdown(TASK_NAME) { return; } - // await next tick let tick = ticker.tick().await; if ticks_tx.send(tick).is_err() { warn_task_rx_closed(TASK_NAME); diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index 7480262c6..a50d4938e 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -19,7 +19,6 @@ use crate::ext::spawn_named; use crate::if_else; #[cfg(feature = "metrics")] use crate::infra::metrics; -use crate::infra::tracing::warn_task_cancellation; use crate::infra::tracing::warn_task_tx_closed; use crate::GlobalState; @@ -62,12 +61,12 @@ impl RpcSubscriptions { /// Spawns a new task to clean up closed subscriptions from time to time. fn spawn_subscriptions_cleaner(subs: Arc) -> JoinHandle> { - tracing::info!("spawning rpc subscriptions cleaner"); + const TASK_NAME: &str = "rpc-subscription-cleaner"; + tracing::info!("spawning {}", TASK_NAME); spawn_named("rpc::sub::cleaner", async move { loop { - if GlobalState::is_shutdown() { - warn_task_cancellation("rpc subscription cleaner"); + if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); } @@ -92,17 +91,17 @@ impl RpcSubscriptions { /// Spawns a new task that notifies subscribers about new executed transactions. fn spawn_new_pending_txs_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { - tracing::info!("spawning rpc newPendingTransactions notifier"); + const TASK_NAME: &str = "rpc-newPendingTransactions-notifier"; + tracing::info!("spawning {}", TASK_NAME); spawn_named("rpc::sub::newPendingTransactions", async move { loop { - if GlobalState::is_shutdown() { - warn_task_cancellation("rpc newPendingTransactions notifier"); + if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); } let Ok(hash) = rx.recv().await else { - warn_task_tx_closed("rpc newPendingTransactions notifier"); + warn_task_tx_closed(TASK_NAME); break; }; @@ -115,17 +114,17 @@ impl RpcSubscriptions { /// Spawns a new task that notifies subscribers about new created blocks. fn spawn_new_heads_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { - tracing::info!("spawning rpc newHeads notifier"); + const TASK_NAME: &str = "rpc-newHeads-notifier"; + tracing::info!("spawning {}", TASK_NAME); spawn_named("rpc::sub::newHeads", async move { loop { - if GlobalState::is_shutdown() { - warn_task_cancellation("rpc newHeads notifier"); + if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); } let Ok(header) = rx.recv().await else { - warn_task_tx_closed("rpc newHeads notifier"); + warn_task_tx_closed(TASK_NAME); break; }; @@ -138,17 +137,17 @@ impl RpcSubscriptions { /// Spawns a new task that notifies subscribers about new transactions logs. fn spawn_logs_notifier(subs: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { - tracing::info!("spawning rpc logs notifier"); + const TASK_NAME: &str = "rpc-logs-notifier"; + tracing::info!("spawning {}", TASK_NAME); spawn_named("rpc::sub::logs", async move { loop { - if GlobalState::is_shutdown() { - warn_task_cancellation("rpc logs notifier"); + if GlobalState::warn_if_shutdown(TASK_NAME) { return Ok(()); } let Ok(log) = rx.recv().await else { - warn_task_tx_closed("rpc logs notifier"); + warn_task_tx_closed(TASK_NAME); break; }; diff --git a/src/globals.rs b/src/globals.rs index 90173fefb..ef4228d77 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -9,6 +9,7 @@ use tokio_util::sync::CancellationToken; use crate::config::load_dotenv; use crate::config::WithCommonConfig; use crate::infra; +use crate::infra::tracing::warn_task_cancellation; use crate::utils::spawn_signal_handler; // ----------------------------------------------------------------------------- @@ -73,7 +74,6 @@ static CANCELLATION: Lazy = Lazy::new(CancellationToken::new) pub struct GlobalState; impl GlobalState { - #[track_caller] /// Shutdown the application. /// /// Returns the formatted reason for shutdown. @@ -88,6 +88,15 @@ impl GlobalState { CANCELLATION.is_cancelled() } + /// Checks if the application is being shutdown. Emits an warning with the task name in case it is. + pub fn warn_if_shutdown(task_name: &str) -> bool { + let shutdown = Self::is_shutdown(); + if shutdown { + warn_task_cancellation(task_name); + } + shutdown + } + /// Awaits until a shutdown is received. pub async fn until_shutdown() { CANCELLATION.cancelled().await;