diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index bf7c67532..fac0ef2c7 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -45,10 +45,13 @@ fn main() -> anyhow::Result<()> { } async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { + // init cancellation handler + let cancellation = signal_handler(); + // init services let rpc_storage = config.rpc_storage.init().await?; let storage = config.storage.init().await?; - let miner = config.miner.init(Arc::clone(&storage), None).await?; + let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), None, None).await; // init block snapshots to export @@ -69,7 +72,6 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // init shared data between importer and external rpc storage loader let (backlog_tx, backlog_rx) = mpsc::channel::(BACKLOG_SIZE); - let cancellation = signal_handler(); // load genesis accounts let initial_accounts = rpc_storage.read_initial_accounts().await?; diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index 2bdfbc1ce..6fd5a9eaa 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -60,13 +60,15 @@ fn main() -> anyhow::Result<()> { } async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> { + // init cancellation handler + let cancellation = signal_handler(); + + // init server let storage = config.storage.init().await?; let relayer = config.relayer.init(Arc::clone(&storage)).await?; - - let miner = config.miner.init(Arc::clone(&storage), None).await?; + let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; //XXX TODO implement the consensus here, in case of it being a follower, it should not even enter here let chain = Arc::new(BlockchainClient::new_http_ws(&config.base.external_rpc, config.base.external_rpc_ws.as_deref()).await?); - let cancellation: CancellationToken = signal_handler(); let result = run_importer_online(executor, miner, storage, chain, cancellation, config.base.sync_interval).await; if let Err(ref e) = result { diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index 48c188d59..a8e231021 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -17,6 +17,9 @@ fn main() -> anyhow::Result<()> { } async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { + // init cancellation handler + let cancellation = signal_handler(); + // init services let storage = config.storage.init().await?; let consensus = Arc::new(Consensus::new(config.clone().leader_node)); // in development, with no leader configured, the current node ends up being the leader @@ -25,14 +28,16 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref()).await?); let relayer = config.relayer.init(Arc::clone(&storage)).await?; - let miner = config.miner.init(Arc::clone(&storage), Some(Arc::clone(&consensus))).await?; + let miner = config + .miner + .init(Arc::clone(&storage), Some(Arc::clone(&consensus)), cancellation.clone()) + .await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, Some(consensus)).await; let rpc_storage = Arc::clone(&storage); let rpc_executor = Arc::clone(&executor); let rpc_miner = Arc::clone(&miner); - let cancellation = signal_handler(); let rpc_cancellation = cancellation.clone(); // run rpc and importer-online in parallel diff --git a/src/config.rs b/src/config.rs index 0d9d93d47..6257cdd38 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,6 +13,7 @@ use display_json::DebugAsJson; use tokio::runtime::Builder; use tokio::runtime::Handle; use tokio::runtime::Runtime; +use tokio_util::sync::CancellationToken; use crate::bin_name; use crate::eth::evm::revm::Revm; @@ -236,7 +237,12 @@ pub struct MinerConfig { } impl MinerConfig { - pub async fn init(&self, storage: Arc, consensus: Option>) -> anyhow::Result> { + pub async fn init( + &self, + storage: Arc, + consensus: Option>, + cancellation: CancellationToken, + ) -> anyhow::Result> { tracing::info!(config = ?self, "starting block miner"); // create miner @@ -265,7 +271,7 @@ impl MinerConfig { // enable interval miner if miner.is_interval_miner_mode() { - Arc::clone(&miner).spawn_interval_miner()?; + Arc::clone(&miner).spawn_interval_miner(cancellation)?; } Ok(miner) diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index d1d5d2d89..be8631ee8 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -1,14 +1,12 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use std::sync::Condvar; -use std::thread; use std::time::Duration; use ethereum_types::BloomInput; use keccak_hasher::KeccakHasher; use nonempty::NonEmpty; -use tokio::runtime::Handle; use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; use super::Consensus; use crate::eth::primitives::Block; @@ -24,6 +22,7 @@ use crate::eth::primitives::TransactionExecution; use crate::eth::primitives::TransactionMined; use crate::eth::storage::StratusStorage; use crate::ext::not; +use crate::ext::spawn_named; use crate::log_and_err; pub struct BlockMiner { @@ -60,42 +59,23 @@ impl BlockMiner { } /// Spawns a new thread that keep mining blocks in the specified interval. - pub fn spawn_interval_miner(self: Arc) -> anyhow::Result<()> { + pub fn spawn_interval_miner(self: Arc, cancellation: CancellationToken) -> anyhow::Result<()> { // validate let Some(block_time) = self.block_time else { return log_and_err!("cannot spawn interval miner because it does not have a block time defined"); }; tracing::info!(block_time = %humantime::Duration::from(block_time), "spawning interval miner"); - // spawn scoped threads (tokio does not support scoped tasks) - let pending_blocks = AtomicUsize::new(0); - let pending_blocks_cvar = Condvar::new(); - - thread::scope(|s| { - // spawn miner - let t_miner = thread::Builder::new().name("miner".into()); - let t_miner_tokio = Handle::current(); - let t_miner_pending_blocks = &pending_blocks; - let t_miner_pending_blocks_cvar = &pending_blocks_cvar; - t_miner - .spawn_scoped(s, move || { - let _tokio_guard = t_miner_tokio.enter(); - interval_miner::start(self, t_miner_pending_blocks, t_miner_pending_blocks_cvar); - }) - .expect("spawning interval miner should not fail"); - - // spawn ticker - let t_ticker = thread::Builder::new().name("miner-ticker".into()); - let t_ticker_tokio = Handle::current(); - let t_ticker_pending_blocks = &pending_blocks; - let t_ticker_pending_blocks_cvar = &pending_blocks_cvar; - t_ticker - .spawn_scoped(s, move || { - let _tokio_guard = t_ticker_tokio.enter(); - interval_miner_ticker::start(block_time, t_ticker_pending_blocks, t_ticker_pending_blocks_cvar); - }) - .expect("spawning interval miner ticker should not fail"); - }); + // spawn miner and ticker + let pending_blocks = Arc::new(AtomicUsize::new(0)); + spawn_named( + "miner::miner", + interval_miner::run(Arc::clone(&self), Arc::clone(&pending_blocks), cancellation.child_token()), + ); + spawn_named( + "miner::ticker", + interval_miner_ticker::run(block_time, Arc::clone(&pending_blocks), cancellation.child_token()), + ); Ok(()) } @@ -339,27 +319,31 @@ mod interval_miner { use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; - use std::sync::Condvar; - use std::sync::Mutex; - use tokio::runtime::Handle; + use tokio::task::yield_now; + use tokio_util::sync::CancellationToken; use crate::eth::BlockMiner; + use crate::ext::warn_task_cancellation; - pub fn start(miner: Arc, pending_blocks: &AtomicUsize, pending_blocks_cvar: &Condvar) { - let tokio = Handle::current(); - let cvar_mutex = Mutex::new(()); - + pub async fn run(miner: Arc, pending_blocks: Arc, cancellation: CancellationToken) { loop { + // check cancellation + if cancellation.is_cancelled() { + warn_task_cancellation("interval miner"); + return; + } + + // check pending blocks and mine if necessary let pending = pending_blocks .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| Some(n.saturating_sub(1))) .unwrap(); if pending > 0 { tracing::info!(%pending, "interval mining block"); - tokio.block_on(mine_and_commit(&miner)); + mine_and_commit(&miner).await; } else { tracing::debug!(%pending, "waiting for block interval"); - let _ = pending_blocks_cvar.wait(cvar_mutex.lock().unwrap()); + yield_now().await; } } } @@ -392,17 +376,17 @@ mod interval_miner { mod interval_miner_ticker { use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; - use std::sync::Condvar; + use std::sync::Arc; use std::thread; use std::time::Duration; use chrono::Timelike; use chrono::Utc; - use tokio::runtime::Handle; + use tokio_util::sync::CancellationToken; - pub fn start(block_time: Duration, pending_blocks: &AtomicUsize, pending_blocks_cvar: &Condvar) { - let tokio = Handle::current(); + use crate::ext::warn_task_cancellation; + pub async fn run(block_time: Duration, pending_blocks: Arc, cancellation: CancellationToken) { // sync to next second let next_second = (Utc::now() + Duration::from_secs(1)).with_nanosecond(0).unwrap(); thread::sleep((next_second - Utc::now()).to_std().unwrap()); @@ -412,13 +396,17 @@ mod interval_miner_ticker { ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst); // keep ticking - tokio.block_on(async move { - ticker.tick().await; - loop { - let _ = ticker.tick().await; - let _ = pending_blocks.fetch_add(1, Ordering::SeqCst); - pending_blocks_cvar.notify_one(); + ticker.tick().await; + loop { + // check cancellation + if cancellation.is_cancelled() { + warn_task_cancellation("interval miner ticker"); + return; } - }); + + // await next tick + let _ = ticker.tick().await; + let _ = pending_blocks.fetch_add(1, Ordering::SeqCst); + } } } diff --git a/src/main.rs b/src/main.rs index 42e1c2a2a..849fa9551 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,12 +11,14 @@ fn main() -> anyhow::Result<()> { } async fn run(config: StratusConfig) -> anyhow::Result<()> { + // init cancellation handler + let cancellation = signal_handler(); + // init services let storage = config.storage.init().await?; let relayer = config.relayer.init(Arc::clone(&storage)).await?; - let miner = config.miner.init(Arc::clone(&storage), None).await?; + let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; - let cancellation = signal_handler(); // start rpc server serve_rpc(storage, executor, miner, config.address, config.executor.chain_id.into(), cancellation).await?; diff --git a/tests/test_import_external_snapshot_common.rs b/tests/test_import_external_snapshot_common.rs index 67bab94e4..36d522e5f 100644 --- a/tests/test_import_external_snapshot_common.rs +++ b/tests/test_import_external_snapshot_common.rs @@ -20,6 +20,7 @@ use stratus::eth::storage::PermanentStorage; use stratus::eth::storage::StratusStorage; use stratus::infra::docker::Docker; use stratus::GlobalServices; +use tokio_util::sync::CancellationToken; #[cfg(feature = "metrics")] mod m { pub use const_format::formatcp; @@ -155,7 +156,7 @@ pub async fn execute_test( // init services let storage = Arc::new(StratusStorage::new(Arc::new(InMemoryTemporaryStorage::new()), Arc::new(perm_storage))); let relayer = config.relayer.init(Arc::clone(&storage)).await.unwrap(); - let miner = config.miner.init(Arc::clone(&storage), None).await.unwrap(); + let miner = config.miner.init(Arc::clone(&storage), None, CancellationToken::new()).await.unwrap(); let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; // execute and mine