From 34d35c8fa5416cdaa3f50fae7ed66d34ca51b40b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Marcos?= <164224824+marcospb19-cw@users.noreply.github.com> Date: Thu, 18 Jul 2024 03:51:27 -0300 Subject: [PATCH] enha: check shutdown signal in tasks (#1480) in miner, importer-online and subscribers also remove warnings for channel timeouts in the notifier tasks as they aren't necessary --- src/bin/importer_online.rs | 12 +++++- src/eth/miner/miner.rs | 16 +++++-- src/eth/rpc/rpc_subscriptions.rs | 31 +++++++++----- src/ext.rs | 71 -------------------------------- 4 files changed, 43 insertions(+), 87 deletions(-) diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 10a3735aa..a5f0aac2d 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -7,7 +7,6 @@ use std::time::Duration; use futures::try_join; use futures::StreamExt; use serde::Deserialize; -use stratus::channel_read; use stratus::config::ImporterOnlineConfig; use stratus::eth::executor::Executor; use stratus::eth::miner::Miner; @@ -152,11 +151,20 @@ async fn start_block_executor( ) -> anyhow::Result<()> { const TASK_NAME: &str = "block-executor"; - while let Some((block, receipts)) = channel_read!(backlog_rx) { + loop { if GlobalState::is_shutdown_warn(TASK_NAME) { return Ok(()); } + let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await { + Ok(Some(inner)) => inner, + Ok(None) => break, // channel closed + Err(_timed_out) => { + tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second"); + continue; + } + }; + #[cfg(feature = "metrics")] let start = metrics::now(); diff --git a/src/eth/miner/miner.rs b/src/eth/miner/miner.rs index e32f33e48..e5a30cdb0 100644 --- a/src/eth/miner/miner.rs +++ b/src/eth/miner/miner.rs @@ -459,11 +459,12 @@ pub fn block_from_propagation(block_entry: append_entry::BlockEntry, temporary_t // ----------------------------------------------------------------------------- mod interval_miner { use std::sync::mpsc; + use std::sync::mpsc::RecvTimeoutError; use std::sync::Arc; + use std::time::Duration; use tokio::time::Instant; - use crate::channel_read_sync; use crate::eth::miner::Miner; use crate::ext::not; use crate::infra::tracing::warn_task_rx_closed; @@ -472,11 +473,20 @@ mod interval_miner { pub fn run(miner: Arc, ticks_rx: mpsc::Receiver) { const TASK_NAME: &str = "interval-miner-ticker"; - while let Ok(tick) = channel_read_sync!(ticks_rx) { + loop { if GlobalState::is_shutdown_warn(TASK_NAME) { - return; + break; } + let tick = match ticks_rx.recv_timeout(Duration::from_secs(2)) { + Ok(tick) => tick, + Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Timeout) => { + tracing::warn!(timeout = "2s", "timeout reading miner channel, expected 1 block per second"); + continue; + } + }; + if not(GlobalState::is_miner_enabled()) { tracing::warn!("skipping mining block because block mining is disabled"); continue; diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index f4a1c1894..cc82edbfd 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -12,9 +12,9 @@ use serde::ser::SerializeMap; use tokio::sync::broadcast; use tokio::sync::RwLock; use tokio::task::JoinHandle; +use tokio::time::timeout; use tokio::time::Duration; -use crate::channel_read; use crate::eth::primitives::Block; use crate::eth::primitives::DateTimeNow; use crate::eth::primitives::LogFilter; @@ -31,7 +31,7 @@ use crate::ext::SleepReason; use crate::if_else; #[cfg(feature = "metrics")] use crate::infra::metrics; -use crate::infra::tracing::warn_task_tx_closed; +use crate::infra::tracing::warn_task_rx_closed; use crate::GlobalState; /// Frequency of cleaning up closed subscriptions. @@ -40,6 +40,9 @@ const CLEANING_FREQUENCY: Duration = Duration::from_secs(10); /// Timeout used when sending notifications to subscribers. const NOTIFICATION_TIMEOUT: Duration = Duration::from_secs(10); +/// Max wait since last checked shutdown in notifier. +const NOTIFIER_SHUTDOWN_CHECK_INTERVAL: Duration = Duration::from_secs(2); + #[cfg(feature = "metrics")] mod label { pub(super) const PENDING_TXS: &str = "newPendingTransactions"; @@ -154,15 +157,17 @@ impl RpcSubscriptions { return Ok(()); } - let Ok(tx) = channel_read!(rx_tx_hash) else { - warn_task_tx_closed(TASK_NAME); - break; + let tx = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_tx_hash.recv()).await { + Ok(Ok(tx)) => tx, + Ok(Err(_channel_closed)) => break, + Err(_timed_out) => continue, }; let interested_subs = subs.pending_txs.read().await; let interested_subs = interested_subs.values().collect_vec(); Self::notify(interested_subs, tx.hash().to_string()); } + warn_task_rx_closed(TASK_NAME); Ok(()) }) } @@ -176,15 +181,17 @@ impl RpcSubscriptions { return Ok(()); } - let Ok(block) = channel_read!(rx_block) else { - warn_task_tx_closed(TASK_NAME); - break; + let block = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_block.recv()).await { + Ok(Ok(block)) => block, + Ok(Err(_channel_closed)) => break, + Err(_timed_out) => continue, }; let interested_subs = subs.new_heads.read().await; let interested_subs = interested_subs.values().collect_vec(); Self::notify(interested_subs, block.header); } + warn_task_rx_closed(TASK_NAME); Ok(()) }) } @@ -198,9 +205,10 @@ impl RpcSubscriptions { return Ok(()); } - let Ok(log) = channel_read!(rx_log_mined) else { - warn_task_tx_closed(TASK_NAME); - break; + let log = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_log_mined.recv()).await { + Ok(Ok(log)) => log, + Ok(Err(_channel_closed)) => break, + Err(_timed_out) => continue, }; let interested_subs = subs.logs.read().await; @@ -212,6 +220,7 @@ impl RpcSubscriptions { Self::notify(interested_subs, log); } + warn_task_rx_closed(TASK_NAME); Ok(()) }) } diff --git a/src/ext.rs b/src/ext.rs index 455d27699..a5ee1935d 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -154,77 +154,6 @@ pub fn parse_duration(s: &str) -> anyhow::Result { Err(anyhow!("invalid duration format: {}", s)) } -// ----------------------------------------------------------------------------- -// Channels -// ----------------------------------------------------------------------------- - -/// Reads a value from an async channel logging timeout at some predefined interval. -/// -/// Expects the tokio mpsc receiver. -#[macro_export] -macro_rules! channel_read { - ($rx: ident) => { - $crate::channel_read_impl!($rx, timeout_ms: 2000) - }; - ($rx: ident, $timeout_ms:expr) => { - $crate::channel_read_impl!($rx, timeout_ms: $timeout_ms), - }; -} - -#[macro_export] -#[doc(hidden)] -macro_rules! channel_read_impl { - ($rx: ident, timeout_ms: $timeout: expr) => {{ - const TARGET: &str = const_format::formatcp!("{}::{}", module_path!(), "rx"); - const TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_millis($timeout); - - loop { - match tokio::time::timeout(TIMEOUT, $rx.recv()).await { - Ok(value) => break value, - Err(_) => { - tracing::warn!(target: TARGET, channel = %stringify!($rx), timeout_ms = %TIMEOUT.as_millis(), "timeout reading channel"); - continue; - } - } - } - }}; -} - -/// Reads a value from a sync channel logging timeout at some predefined interval. -/// -/// Expects the `std` mpsc receiver. -#[macro_export] -macro_rules! channel_read_sync { - ($rx: ident) => { - $crate::channel_read_sync_impl!($rx, timeout_ms: 2000) - }; - ($rx: ident, $timeout_ms:expr) => { - $crate::channel_read_sync_impl!($rx, timeout_ms: $timeout_ms), - }; -} - -#[macro_export] -#[doc(hidden)] -macro_rules! channel_read_sync_impl { - ($rx: ident, timeout_ms: $timeout: expr) => {{ - const TARGET: &str = const_format::formatcp!("{}::{}", module_path!(), "rx"); - const TIMEOUT: std::time::Duration = std::time::Duration::from_millis($timeout); - - loop { - match $rx.recv_timeout(TIMEOUT) { - Ok(value) => break Ok(value), - Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { - tracing::warn!(target: TARGET, channel = %stringify!($rx), timeout_ms = %TIMEOUT.as_millis(), "timeout reading channel"); - continue; - } - e @ Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { - break e - } - } - } - }}; -} - // ----------------------------------------------------------------------------- // Tokio // -----------------------------------------------------------------------------