Skip to content

Commit

Permalink
enha: check shutdown signal in tasks (#1480)
Browse files Browse the repository at this point in the history
in miner, importer-online and subscribers

also remove warnings for channel timeouts in the notifier tasks
as they aren't necessary
  • Loading branch information
marcospb19-cw authored Jul 18, 2024
1 parent d00ce34 commit 34d35c8
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 87 deletions.
12 changes: 10 additions & 2 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::time::Duration;
use futures::try_join;
use futures::StreamExt;
use serde::Deserialize;
use stratus::channel_read;
use stratus::config::ImporterOnlineConfig;
use stratus::eth::executor::Executor;
use stratus::eth::miner::Miner;
Expand Down Expand Up @@ -152,11 +151,20 @@ async fn start_block_executor(
) -> anyhow::Result<()> {
const TASK_NAME: &str = "block-executor";

while let Some((block, receipts)) = channel_read!(backlog_rx) {
loop {
if GlobalState::is_shutdown_warn(TASK_NAME) {
return Ok(());
}

let (block, receipts) = match timeout(Duration::from_secs(2), backlog_rx.recv()).await {
Ok(Some(inner)) => inner,
Ok(None) => break, // channel closed
Err(_timed_out) => {
tracing::warn!(timeout = "2s", "timeout reading block executor channel, expected around 1 block per second");
continue;
}
};

#[cfg(feature = "metrics")]
let start = metrics::now();

Expand Down
16 changes: 13 additions & 3 deletions src/eth/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,12 @@ pub fn block_from_propagation(block_entry: append_entry::BlockEntry, temporary_t
// -----------------------------------------------------------------------------
mod interval_miner {
use std::sync::mpsc;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::Arc;
use std::time::Duration;

use tokio::time::Instant;

use crate::channel_read_sync;
use crate::eth::miner::Miner;
use crate::ext::not;
use crate::infra::tracing::warn_task_rx_closed;
Expand All @@ -472,11 +473,20 @@ mod interval_miner {
pub fn run(miner: Arc<Miner>, ticks_rx: mpsc::Receiver<Instant>) {
const TASK_NAME: &str = "interval-miner-ticker";

while let Ok(tick) = channel_read_sync!(ticks_rx) {
loop {
if GlobalState::is_shutdown_warn(TASK_NAME) {
return;
break;
}

let tick = match ticks_rx.recv_timeout(Duration::from_secs(2)) {
Ok(tick) => tick,
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => {
tracing::warn!(timeout = "2s", "timeout reading miner channel, expected 1 block per second");
continue;
}
};

if not(GlobalState::is_miner_enabled()) {
tracing::warn!("skipping mining block because block mining is disabled");
continue;
Expand Down
31 changes: 20 additions & 11 deletions src/eth/rpc/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use serde::ser::SerializeMap;
use tokio::sync::broadcast;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio::time::Duration;

use crate::channel_read;
use crate::eth::primitives::Block;
use crate::eth::primitives::DateTimeNow;
use crate::eth::primitives::LogFilter;
Expand All @@ -31,7 +31,7 @@ use crate::ext::SleepReason;
use crate::if_else;
#[cfg(feature = "metrics")]
use crate::infra::metrics;
use crate::infra::tracing::warn_task_tx_closed;
use crate::infra::tracing::warn_task_rx_closed;
use crate::GlobalState;

/// Frequency of cleaning up closed subscriptions.
Expand All @@ -40,6 +40,9 @@ const CLEANING_FREQUENCY: Duration = Duration::from_secs(10);
/// Timeout used when sending notifications to subscribers.
const NOTIFICATION_TIMEOUT: Duration = Duration::from_secs(10);

/// Max wait since last checked shutdown in notifier.
const NOTIFIER_SHUTDOWN_CHECK_INTERVAL: Duration = Duration::from_secs(2);

#[cfg(feature = "metrics")]
mod label {
pub(super) const PENDING_TXS: &str = "newPendingTransactions";
Expand Down Expand Up @@ -154,15 +157,17 @@ impl RpcSubscriptions {
return Ok(());
}

let Ok(tx) = channel_read!(rx_tx_hash) else {
warn_task_tx_closed(TASK_NAME);
break;
let tx = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_tx_hash.recv()).await {
Ok(Ok(tx)) => tx,
Ok(Err(_channel_closed)) => break,
Err(_timed_out) => continue,
};

let interested_subs = subs.pending_txs.read().await;
let interested_subs = interested_subs.values().collect_vec();
Self::notify(interested_subs, tx.hash().to_string());
}
warn_task_rx_closed(TASK_NAME);
Ok(())
})
}
Expand All @@ -176,15 +181,17 @@ impl RpcSubscriptions {
return Ok(());
}

let Ok(block) = channel_read!(rx_block) else {
warn_task_tx_closed(TASK_NAME);
break;
let block = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_block.recv()).await {
Ok(Ok(block)) => block,
Ok(Err(_channel_closed)) => break,
Err(_timed_out) => continue,
};

let interested_subs = subs.new_heads.read().await;
let interested_subs = interested_subs.values().collect_vec();
Self::notify(interested_subs, block.header);
}
warn_task_rx_closed(TASK_NAME);
Ok(())
})
}
Expand All @@ -198,9 +205,10 @@ impl RpcSubscriptions {
return Ok(());
}

let Ok(log) = channel_read!(rx_log_mined) else {
warn_task_tx_closed(TASK_NAME);
break;
let log = match timeout(NOTIFIER_SHUTDOWN_CHECK_INTERVAL, rx_log_mined.recv()).await {
Ok(Ok(log)) => log,
Ok(Err(_channel_closed)) => break,
Err(_timed_out) => continue,
};

let interested_subs = subs.logs.read().await;
Expand All @@ -212,6 +220,7 @@ impl RpcSubscriptions {

Self::notify(interested_subs, log);
}
warn_task_rx_closed(TASK_NAME);
Ok(())
})
}
Expand Down
71 changes: 0 additions & 71 deletions src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,77 +154,6 @@ pub fn parse_duration(s: &str) -> anyhow::Result<Duration> {
Err(anyhow!("invalid duration format: {}", s))
}

// -----------------------------------------------------------------------------
// Channels
// -----------------------------------------------------------------------------

/// Reads a value from an async channel logging timeout at some predefined interval.
///
/// Expects the tokio mpsc receiver.
#[macro_export]
macro_rules! channel_read {
($rx: ident) => {
$crate::channel_read_impl!($rx, timeout_ms: 2000)
};
($rx: ident, $timeout_ms:expr) => {
$crate::channel_read_impl!($rx, timeout_ms: $timeout_ms),
};
}

#[macro_export]
#[doc(hidden)]
macro_rules! channel_read_impl {
($rx: ident, timeout_ms: $timeout: expr) => {{
const TARGET: &str = const_format::formatcp!("{}::{}", module_path!(), "rx");
const TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_millis($timeout);

loop {
match tokio::time::timeout(TIMEOUT, $rx.recv()).await {
Ok(value) => break value,
Err(_) => {
tracing::warn!(target: TARGET, channel = %stringify!($rx), timeout_ms = %TIMEOUT.as_millis(), "timeout reading channel");
continue;
}
}
}
}};
}

/// Reads a value from a sync channel logging timeout at some predefined interval.
///
/// Expects the `std` mpsc receiver.
#[macro_export]
macro_rules! channel_read_sync {
($rx: ident) => {
$crate::channel_read_sync_impl!($rx, timeout_ms: 2000)
};
($rx: ident, $timeout_ms:expr) => {
$crate::channel_read_sync_impl!($rx, timeout_ms: $timeout_ms),
};
}

#[macro_export]
#[doc(hidden)]
macro_rules! channel_read_sync_impl {
($rx: ident, timeout_ms: $timeout: expr) => {{
const TARGET: &str = const_format::formatcp!("{}::{}", module_path!(), "rx");
const TIMEOUT: std::time::Duration = std::time::Duration::from_millis($timeout);

loop {
match $rx.recv_timeout(TIMEOUT) {
Ok(value) => break Ok(value),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
tracing::warn!(target: TARGET, channel = %stringify!($rx), timeout_ms = %TIMEOUT.as_millis(), "timeout reading channel");
continue;
}
e @ Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
break e
}
}
}
}};
}

// -----------------------------------------------------------------------------
// Tokio
// -----------------------------------------------------------------------------
Expand Down

0 comments on commit 34d35c8

Please sign in to comment.