Skip to content

Commit

Permalink
fix: block-miner does not block on startup and also accepts cancellla…
Browse files Browse the repository at this point in the history
…tion signal (#896)
  • Loading branch information
dinhani-cw authored May 22, 2024
1 parent fee3df5 commit 7e28bc7
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 65 deletions.
6 changes: 4 additions & 2 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// init cancellation handler
let cancellation = signal_handler();

// init services
let rpc_storage = config.rpc_storage.init().await?;
let storage = config.storage.init().await?;
let miner = config.miner.init(Arc::clone(&storage), None).await?;
let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), None, None).await;

// init block snapshots to export
Expand All @@ -69,7 +72,6 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {

// init shared data between importer and external rpc storage loader
let (backlog_tx, backlog_rx) = mpsc::channel::<BacklogTask>(BACKLOG_SIZE);
let cancellation = signal_handler();

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;
Expand Down
8 changes: 5 additions & 3 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> {
// init cancellation handler
let cancellation = signal_handler();

// init server
let storage = config.storage.init().await?;
let relayer = config.relayer.init(Arc::clone(&storage)).await?;

let miner = config.miner.init(Arc::clone(&storage), None).await?;
let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; //XXX TODO implement the consensus here, in case of it being a follower, it should not even enter here
let chain = Arc::new(BlockchainClient::new_http_ws(&config.base.external_rpc, config.base.external_rpc_ws.as_deref()).await?);
let cancellation: CancellationToken = signal_handler();

let result = run_importer_online(executor, miner, storage, chain, cancellation, config.base.sync_interval).await;
if let Err(ref e) = result {
Expand Down
9 changes: 7 additions & 2 deletions src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
// init cancellation handler
let cancellation = signal_handler();

// init services
let storage = config.storage.init().await?;
let consensus = Arc::new(Consensus::new(config.clone().leader_node)); // in development, with no leader configured, the current node ends up being the leader
Expand All @@ -25,14 +28,16 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref()).await?);

let relayer = config.relayer.init(Arc::clone(&storage)).await?;
let miner = config.miner.init(Arc::clone(&storage), Some(Arc::clone(&consensus))).await?;
let miner = config
.miner
.init(Arc::clone(&storage), Some(Arc::clone(&consensus)), cancellation.clone())
.await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, Some(consensus)).await;

let rpc_storage = Arc::clone(&storage);
let rpc_executor = Arc::clone(&executor);
let rpc_miner = Arc::clone(&miner);

let cancellation = signal_handler();
let rpc_cancellation = cancellation.clone();

// run rpc and importer-online in parallel
Expand Down
10 changes: 8 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use display_json::DebugAsJson;
use tokio::runtime::Builder;
use tokio::runtime::Handle;
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

use crate::bin_name;
use crate::eth::evm::revm::Revm;
Expand Down Expand Up @@ -236,7 +237,12 @@ pub struct MinerConfig {
}

impl MinerConfig {
pub async fn init(&self, storage: Arc<StratusStorage>, consensus: Option<Arc<Consensus>>) -> anyhow::Result<Arc<BlockMiner>> {
pub async fn init(
&self,
storage: Arc<StratusStorage>,
consensus: Option<Arc<Consensus>>,
cancellation: CancellationToken,
) -> anyhow::Result<Arc<BlockMiner>> {
tracing::info!(config = ?self, "starting block miner");

// create miner
Expand Down Expand Up @@ -265,7 +271,7 @@ impl MinerConfig {

// enable interval miner
if miner.is_interval_miner_mode() {
Arc::clone(&miner).spawn_interval_miner()?;
Arc::clone(&miner).spawn_interval_miner(cancellation)?;
}

Ok(miner)
Expand Down
94 changes: 41 additions & 53 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::sync::Condvar;
use std::thread;
use std::time::Duration;

use ethereum_types::BloomInput;
use keccak_hasher::KeccakHasher;
use nonempty::NonEmpty;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;

use super::Consensus;
use crate::eth::primitives::Block;
Expand All @@ -24,6 +22,7 @@ use crate::eth::primitives::TransactionExecution;
use crate::eth::primitives::TransactionMined;
use crate::eth::storage::StratusStorage;
use crate::ext::not;
use crate::ext::spawn_named;
use crate::log_and_err;

pub struct BlockMiner {
Expand Down Expand Up @@ -60,42 +59,23 @@ impl BlockMiner {
}

/// Spawns a new thread that keep mining blocks in the specified interval.
pub fn spawn_interval_miner(self: Arc<Self>) -> anyhow::Result<()> {
pub fn spawn_interval_miner(self: Arc<Self>, cancellation: CancellationToken) -> anyhow::Result<()> {
// validate
let Some(block_time) = self.block_time else {
return log_and_err!("cannot spawn interval miner because it does not have a block time defined");
};
tracing::info!(block_time = %humantime::Duration::from(block_time), "spawning interval miner");

// spawn scoped threads (tokio does not support scoped tasks)
let pending_blocks = AtomicUsize::new(0);
let pending_blocks_cvar = Condvar::new();

thread::scope(|s| {
// spawn miner
let t_miner = thread::Builder::new().name("miner".into());
let t_miner_tokio = Handle::current();
let t_miner_pending_blocks = &pending_blocks;
let t_miner_pending_blocks_cvar = &pending_blocks_cvar;
t_miner
.spawn_scoped(s, move || {
let _tokio_guard = t_miner_tokio.enter();
interval_miner::start(self, t_miner_pending_blocks, t_miner_pending_blocks_cvar);
})
.expect("spawning interval miner should not fail");

// spawn ticker
let t_ticker = thread::Builder::new().name("miner-ticker".into());
let t_ticker_tokio = Handle::current();
let t_ticker_pending_blocks = &pending_blocks;
let t_ticker_pending_blocks_cvar = &pending_blocks_cvar;
t_ticker
.spawn_scoped(s, move || {
let _tokio_guard = t_ticker_tokio.enter();
interval_miner_ticker::start(block_time, t_ticker_pending_blocks, t_ticker_pending_blocks_cvar);
})
.expect("spawning interval miner ticker should not fail");
});
// spawn miner and ticker
let pending_blocks = Arc::new(AtomicUsize::new(0));
spawn_named(
"miner::miner",
interval_miner::run(Arc::clone(&self), Arc::clone(&pending_blocks), cancellation.child_token()),
);
spawn_named(
"miner::ticker",
interval_miner_ticker::run(block_time, Arc::clone(&pending_blocks), cancellation.child_token()),
);

Ok(())
}
Expand Down Expand Up @@ -339,27 +319,31 @@ mod interval_miner {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;

use tokio::runtime::Handle;
use tokio::task::yield_now;
use tokio_util::sync::CancellationToken;

use crate::eth::BlockMiner;
use crate::ext::warn_task_cancellation;

pub fn start(miner: Arc<BlockMiner>, pending_blocks: &AtomicUsize, pending_blocks_cvar: &Condvar) {
let tokio = Handle::current();
let cvar_mutex = Mutex::new(());

pub async fn run(miner: Arc<BlockMiner>, pending_blocks: Arc<AtomicUsize>, cancellation: CancellationToken) {
loop {
// check cancellation
if cancellation.is_cancelled() {
warn_task_cancellation("interval miner");
return;
}

// check pending blocks and mine if necessary
let pending = pending_blocks
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| Some(n.saturating_sub(1)))
.unwrap();
if pending > 0 {
tracing::info!(%pending, "interval mining block");
tokio.block_on(mine_and_commit(&miner));
mine_and_commit(&miner).await;
} else {
tracing::debug!(%pending, "waiting for block interval");
let _ = pending_blocks_cvar.wait(cvar_mutex.lock().unwrap());
yield_now().await;
}
}
}
Expand Down Expand Up @@ -392,17 +376,17 @@ mod interval_miner {
mod interval_miner_ticker {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Condvar;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use chrono::Timelike;
use chrono::Utc;
use tokio::runtime::Handle;
use tokio_util::sync::CancellationToken;

pub fn start(block_time: Duration, pending_blocks: &AtomicUsize, pending_blocks_cvar: &Condvar) {
let tokio = Handle::current();
use crate::ext::warn_task_cancellation;

pub async fn run(block_time: Duration, pending_blocks: Arc<AtomicUsize>, cancellation: CancellationToken) {
// sync to next second
let next_second = (Utc::now() + Duration::from_secs(1)).with_nanosecond(0).unwrap();
thread::sleep((next_second - Utc::now()).to_std().unwrap());
Expand All @@ -412,13 +396,17 @@ mod interval_miner_ticker {
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);

// keep ticking
tokio.block_on(async move {
ticker.tick().await;
loop {
let _ = ticker.tick().await;
let _ = pending_blocks.fetch_add(1, Ordering::SeqCst);
pending_blocks_cvar.notify_one();
ticker.tick().await;
loop {
// check cancellation
if cancellation.is_cancelled() {
warn_task_cancellation("interval miner ticker");
return;
}
});

// await next tick
let _ = ticker.tick().await;
let _ = pending_blocks.fetch_add(1, Ordering::SeqCst);
}
}
}
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: StratusConfig) -> anyhow::Result<()> {
// init cancellation handler
let cancellation = signal_handler();

// init services
let storage = config.storage.init().await?;
let relayer = config.relayer.init(Arc::clone(&storage)).await?;
let miner = config.miner.init(Arc::clone(&storage), None).await?;
let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await;
let cancellation = signal_handler();

// start rpc server
serve_rpc(storage, executor, miner, config.address, config.executor.chain_id.into(), cancellation).await?;
Expand Down
3 changes: 2 additions & 1 deletion tests/test_import_external_snapshot_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use stratus::eth::storage::PermanentStorage;
use stratus::eth::storage::StratusStorage;
use stratus::infra::docker::Docker;
use stratus::GlobalServices;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "metrics")]
mod m {
pub use const_format::formatcp;
Expand Down Expand Up @@ -155,7 +156,7 @@ pub async fn execute_test(
// init services
let storage = Arc::new(StratusStorage::new(Arc::new(InMemoryTemporaryStorage::new()), Arc::new(perm_storage)));
let relayer = config.relayer.init(Arc::clone(&storage)).await.unwrap();
let miner = config.miner.init(Arc::clone(&storage), None).await.unwrap();
let miner = config.miner.init(Arc::clone(&storage), None, CancellationToken::new()).await.unwrap();
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await;

// execute and mine
Expand Down

0 comments on commit 7e28bc7

Please sign in to comment.