Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: block-miner does not block on startup and also accepts cancellation signal #896

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// init cancellation handler
let cancellation = signal_handler();

// init services
let rpc_storage = config.rpc_storage.init().await?;
let storage = config.storage.init().await?;
let miner = config.miner.init(Arc::clone(&storage), None).await?;
let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), None, None).await;

// init block snapshots to export
Expand All @@ -69,7 +72,6 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {

// init shared data between importer and external rpc storage loader
let (backlog_tx, backlog_rx) = mpsc::channel::<BacklogTask>(BACKLOG_SIZE);
let cancellation = signal_handler();

// load genesis accounts
let initial_accounts = rpc_storage.read_initial_accounts().await?;
Expand Down
8 changes: 5 additions & 3 deletions src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> {
// init cancellation handler
let cancellation = signal_handler();

// init server
let storage = config.storage.init().await?;
let relayer = config.relayer.init(Arc::clone(&storage)).await?;

let miner = config.miner.init(Arc::clone(&storage), None).await?;
let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; //XXX TODO implement the consensus here, in case of it being a follower, it should not even enter here
let chain = Arc::new(BlockchainClient::new_http_ws(&config.base.external_rpc, config.base.external_rpc_ws.as_deref()).await?);
let cancellation: CancellationToken = signal_handler();

let result = run_importer_online(executor, miner, storage, chain, cancellation, config.base.sync_interval).await;
if let Err(ref e) = result {
Expand Down
9 changes: 7 additions & 2 deletions src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
// init cancellation handler
let cancellation = signal_handler();

// init services
let storage = config.storage.init().await?;
let consensus = Arc::new(Consensus::new(config.clone().leader_node)); // in development, with no leader configured, the current node ends up being the leader
Expand All @@ -25,14 +28,16 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref()).await?);

let relayer = config.relayer.init(Arc::clone(&storage)).await?;
let miner = config.miner.init(Arc::clone(&storage), Some(Arc::clone(&consensus))).await?;
let miner = config
.miner
.init(Arc::clone(&storage), Some(Arc::clone(&consensus)), cancellation.clone())
.await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, Some(consensus)).await;

let rpc_storage = Arc::clone(&storage);
let rpc_executor = Arc::clone(&executor);
let rpc_miner = Arc::clone(&miner);

let cancellation = signal_handler();
let rpc_cancellation = cancellation.clone();

// run rpc and importer-online in parallel
Expand Down
10 changes: 8 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use display_json::DebugAsJson;
use tokio::runtime::Builder;
use tokio::runtime::Handle;
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

use crate::bin_name;
use crate::eth::evm::revm::Revm;
Expand Down Expand Up @@ -236,7 +237,12 @@ pub struct MinerConfig {
}

impl MinerConfig {
pub async fn init(&self, storage: Arc<StratusStorage>, consensus: Option<Arc<Consensus>>) -> anyhow::Result<Arc<BlockMiner>> {
pub async fn init(
&self,
storage: Arc<StratusStorage>,
consensus: Option<Arc<Consensus>>,
cancellation: CancellationToken,
) -> anyhow::Result<Arc<BlockMiner>> {
tracing::info!(config = ?self, "starting block miner");

// create miner
Expand Down Expand Up @@ -265,7 +271,7 @@ impl MinerConfig {

// enable interval miner
if miner.is_interval_miner_mode() {
Arc::clone(&miner).spawn_interval_miner()?;
Arc::clone(&miner).spawn_interval_miner(cancellation)?;
}

Ok(miner)
Expand Down
94 changes: 41 additions & 53 deletions src/eth/block_miner.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::sync::Condvar;
use std::thread;
use std::time::Duration;

use ethereum_types::BloomInput;
use keccak_hasher::KeccakHasher;
use nonempty::NonEmpty;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;

use super::Consensus;
use crate::eth::primitives::Block;
Expand All @@ -24,6 +22,7 @@ use crate::eth::primitives::TransactionExecution;
use crate::eth::primitives::TransactionMined;
use crate::eth::storage::StratusStorage;
use crate::ext::not;
use crate::ext::spawn_named;
use crate::log_and_err;

pub struct BlockMiner {
Expand Down Expand Up @@ -60,42 +59,23 @@ impl BlockMiner {
}

/// Spawns a new thread that keep mining blocks in the specified interval.
pub fn spawn_interval_miner(self: Arc<Self>) -> anyhow::Result<()> {
pub fn spawn_interval_miner(self: Arc<Self>, cancellation: CancellationToken) -> anyhow::Result<()> {
// validate
let Some(block_time) = self.block_time else {
return log_and_err!("cannot spawn interval miner because it does not have a block time defined");
};
tracing::info!(block_time = %humantime::Duration::from(block_time), "spawning interval miner");

// spawn scoped threads (tokio does not support scoped tasks)
let pending_blocks = AtomicUsize::new(0);
let pending_blocks_cvar = Condvar::new();

thread::scope(|s| {
// spawn miner
let t_miner = thread::Builder::new().name("miner".into());
let t_miner_tokio = Handle::current();
let t_miner_pending_blocks = &pending_blocks;
let t_miner_pending_blocks_cvar = &pending_blocks_cvar;
t_miner
.spawn_scoped(s, move || {
let _tokio_guard = t_miner_tokio.enter();
interval_miner::start(self, t_miner_pending_blocks, t_miner_pending_blocks_cvar);
})
.expect("spawning interval miner should not fail");

// spawn ticker
let t_ticker = thread::Builder::new().name("miner-ticker".into());
let t_ticker_tokio = Handle::current();
let t_ticker_pending_blocks = &pending_blocks;
let t_ticker_pending_blocks_cvar = &pending_blocks_cvar;
t_ticker
.spawn_scoped(s, move || {
let _tokio_guard = t_ticker_tokio.enter();
interval_miner_ticker::start(block_time, t_ticker_pending_blocks, t_ticker_pending_blocks_cvar);
})
.expect("spawning interval miner ticker should not fail");
});
// spawn miner and ticker
let pending_blocks = Arc::new(AtomicUsize::new(0));
spawn_named(
"miner::miner",
interval_miner::run(Arc::clone(&self), Arc::clone(&pending_blocks), cancellation.child_token()),
);
spawn_named(
"miner::ticker",
interval_miner_ticker::run(block_time, Arc::clone(&pending_blocks), cancellation.child_token()),
);

Ok(())
}
Expand Down Expand Up @@ -339,27 +319,31 @@ mod interval_miner {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;

use tokio::runtime::Handle;
use tokio::task::yield_now;
use tokio_util::sync::CancellationToken;

use crate::eth::BlockMiner;
use crate::ext::warn_task_cancellation;

pub fn start(miner: Arc<BlockMiner>, pending_blocks: &AtomicUsize, pending_blocks_cvar: &Condvar) {
let tokio = Handle::current();
let cvar_mutex = Mutex::new(());

pub async fn run(miner: Arc<BlockMiner>, pending_blocks: Arc<AtomicUsize>, cancellation: CancellationToken) {
loop {
// check cancellation
if cancellation.is_cancelled() {
warn_task_cancellation("interval miner");
return;
}

// check pending blocks and mine if necessary
let pending = pending_blocks
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| Some(n.saturating_sub(1)))
.unwrap();
if pending > 0 {
tracing::info!(%pending, "interval mining block");
tokio.block_on(mine_and_commit(&miner));
mine_and_commit(&miner).await;
} else {
tracing::debug!(%pending, "waiting for block interval");
let _ = pending_blocks_cvar.wait(cvar_mutex.lock().unwrap());
yield_now().await;
}
}
}
Expand Down Expand Up @@ -392,17 +376,17 @@ mod interval_miner {
mod interval_miner_ticker {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Condvar;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use chrono::Timelike;
use chrono::Utc;
use tokio::runtime::Handle;
use tokio_util::sync::CancellationToken;

pub fn start(block_time: Duration, pending_blocks: &AtomicUsize, pending_blocks_cvar: &Condvar) {
let tokio = Handle::current();
use crate::ext::warn_task_cancellation;

pub async fn run(block_time: Duration, pending_blocks: Arc<AtomicUsize>, cancellation: CancellationToken) {
// sync to next second
let next_second = (Utc::now() + Duration::from_secs(1)).with_nanosecond(0).unwrap();
thread::sleep((next_second - Utc::now()).to_std().unwrap());
Expand All @@ -412,13 +396,17 @@ mod interval_miner_ticker {
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Burst);

// keep ticking
tokio.block_on(async move {
ticker.tick().await;
loop {
let _ = ticker.tick().await;
let _ = pending_blocks.fetch_add(1, Ordering::SeqCst);
pending_blocks_cvar.notify_one();
ticker.tick().await;
loop {
// check cancellation
if cancellation.is_cancelled() {
warn_task_cancellation("interval miner ticker");
return;
}
});

// await next tick
let _ = ticker.tick().await;
let _ = pending_blocks.fetch_add(1, Ordering::SeqCst);
}
}
}
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ fn main() -> anyhow::Result<()> {
}

async fn run(config: StratusConfig) -> anyhow::Result<()> {
// init cancellation handler
let cancellation = signal_handler();

// init services
let storage = config.storage.init().await?;
let relayer = config.relayer.init(Arc::clone(&storage)).await?;
let miner = config.miner.init(Arc::clone(&storage), None).await?;
let miner = config.miner.init(Arc::clone(&storage), None, cancellation.clone()).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await;
let cancellation = signal_handler();

// start rpc server
serve_rpc(storage, executor, miner, config.address, config.executor.chain_id.into(), cancellation).await?;
Expand Down
3 changes: 2 additions & 1 deletion tests/test_import_external_snapshot_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use stratus::eth::storage::PermanentStorage;
use stratus::eth::storage::StratusStorage;
use stratus::infra::docker::Docker;
use stratus::GlobalServices;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "metrics")]
mod m {
pub use const_format::formatcp;
Expand Down Expand Up @@ -155,7 +156,7 @@ pub async fn execute_test(
// init services
let storage = Arc::new(StratusStorage::new(Arc::new(InMemoryTemporaryStorage::new()), Arc::new(perm_storage)));
let relayer = config.relayer.init(Arc::clone(&storage)).await.unwrap();
let miner = config.miner.init(Arc::clone(&storage), None).await.unwrap();
let miner = config.miner.init(Arc::clone(&storage), None, CancellationToken::new()).await.unwrap();
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await;

// execute and mine
Expand Down
Loading