diff --git a/src/config.rs b/src/config.rs index a40f1b66d..9e2caf0b0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -205,7 +205,7 @@ impl ExecutorConfig { /// TODO: remove BlockMiner after migration is completed. pub async fn init(&self, storage: Arc, miner: Arc, relayer: Option>) -> Arc { let num_evms = max(self.num_evms, 1); - tracing::info!(config = ?self, "configuring executor"); + tracing::info!(config = ?self, "starting executor"); // spawn evm in background using native threads let (evm_tx, evm_rx) = crossbeam_channel::unbounded::(); @@ -255,12 +255,22 @@ impl ExecutorConfig { // Config: Miner // ----------------------------------------------------------------------------- #[derive(Parser, DebugAsJson, Clone, serde::Serialize)] -pub struct MinerConfig {} +pub struct MinerConfig { + /// Target block time. + #[arg(long = "block-time", value_parser=parse_duration, env = "BLOCK_TIME")] + pub block_time: Option, +} impl MinerConfig { pub fn init(&self, storage: Arc) -> Arc { - tracing::info!(config = ?self, "configuring block miner"); - Arc::new(BlockMiner::new(storage)) + tracing::info!(config = ?self, "starting block miner"); + let miner = Arc::new(BlockMiner::new(storage)); + + if let Some(block_time) = self.block_time { + Arc::clone(&miner).spawn_interval_miner(block_time); + } + + miner } } @@ -276,7 +286,7 @@ pub struct RelayerConfig { impl RelayerConfig { pub async fn init(&self, storage: Arc) -> anyhow::Result>> { - tracing::info!(config = ?self, "configuring transaction relayer"); + tracing::info!(config = ?self, "starting transaction relayer"); match self.forward_to { Some(ref url) => { @@ -585,7 +595,7 @@ pub enum ExternalRpcStorageKind { impl ExternalRpcStorageConfig { /// Initializes external rpc storage implementation. pub async fn init(&self) -> anyhow::Result> { - tracing::info!(config = ?self, "configuring external rpc storage"); + tracing::info!(config = ?self, "starting external rpc storage"); match self.external_rpc_storage_kind { ExternalRpcStorageKind::Postgres { ref url } => { @@ -631,7 +641,7 @@ pub enum TemporaryStorageKind { impl TemporaryStorageConfig { /// Initializes temporary storage implementation. pub async fn init(&self) -> anyhow::Result> { - tracing::info!(config = ?self, "configuring temporary storage"); + tracing::info!(config = ?self, "starting temporary storage"); match self.temp_storage_kind { TemporaryStorageKind::InMemory => Ok(Arc::new(InMemoryTemporaryStorage::default())), @@ -683,7 +693,7 @@ pub enum PermanentStorageKind { impl PermanentStorageConfig { /// Initializes permanent storage implementation. pub async fn init(&self) -> anyhow::Result> { - tracing::info!(config = ?self, "configuring permanent storage"); + tracing::info!(config = ?self, "starting permanent storage"); let perm: Arc = match self.perm_storage_kind { PermanentStorageKind::InMemory => Arc::new(InMemoryPermanentStorage::default()), diff --git a/src/eth/block_miner.rs b/src/eth/block_miner.rs index b8f6a36cc..fee1200a8 100644 --- a/src/eth/block_miner.rs +++ b/src/eth/block_miner.rs @@ -1,4 +1,6 @@ use std::sync::Arc; +use std::thread; +use std::time::Duration; use ethereum_types::BloomInput; use keccak_hasher::KeccakHasher; @@ -32,7 +34,7 @@ pub struct BlockMiner { impl BlockMiner { /// Creates a new [`BlockMiner`]. pub fn new(storage: Arc) -> Self { - tracing::info!("creating block miner"); + tracing::info!("starting block miner"); Self { storage, notifier_blocks: broadcast::channel(u16::MAX as usize).0, @@ -40,6 +42,14 @@ impl BlockMiner { } } + pub fn spawn_interval_miner(self: Arc, block_time: Duration) { + tracing::info!(block_time = %humantime::Duration::from(block_time), "spawning interval miner"); + + let t = thread::Builder::new().name("interval-miner".into()); + t.spawn(move || interval_miner(self, block_time)) + .expect("spawning interval miner should not fail"); + } + /// Mines external block and external transactions. /// /// Local transactions are not allowed to be part of the block. @@ -237,3 +247,33 @@ pub fn block_from_local(number: BlockNumber, txs: NonEmpty, block_time: Duration) { + loop { + thread::sleep(block_time); + tracing::info!("mining block"); + + // mine + let block = match futures::executor::block_on(miner.mine_local()) { + Ok(block) => block, + Err(e) => { + tracing::error!(reason = ?e, "failed to mine block"); + continue; + } + }; + + // commit + loop { + match futures::executor::block_on(miner.commit(block.clone())) { + Ok(_) => break, + Err(e) => { + tracing::error!(reason = ?e, "failed to commit block"); + continue; + } + } + } + } +} diff --git a/src/eth/evm/revm.rs b/src/eth/evm/revm.rs index a37fc6007..b8ba06297 100644 --- a/src/eth/evm/revm.rs +++ b/src/eth/evm/revm.rs @@ -63,7 +63,7 @@ impl Revm { /// Creates a new instance of the Revm ready to be used. #[allow(clippy::arc_with_non_send_sync)] pub fn new(storage: Arc, config: EvmConfig) -> Self { - tracing::info!(?config, "creating revm"); + tracing::info!(?config, "starting revm"); // configure handler let mut handler = Handler::mainnet_with_spec(SpecId::LONDON); diff --git a/src/eth/executor.rs b/src/eth/executor.rs index eaccf5555..8336be3fa 100644 --- a/src/eth/executor.rs +++ b/src/eth/executor.rs @@ -66,7 +66,7 @@ impl Executor { evm_tx: crossbeam_channel::Sender, num_evms: usize, ) -> Self { - tracing::info!(%num_evms, "creating executor"); + tracing::info!(%num_evms, "starting executor"); Self { evm_tx, num_evms, diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index e2bb22dd3..816436281 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -59,6 +59,8 @@ pub async fn serve_rpc( chain_id: ChainId, cancellation: CancellationToken, ) -> anyhow::Result<()> { + tracing::info!("starting rpc server"); + // configure subscriptions let subs = Arc::new(RpcSubscriptions::default()); let _handle_subs_cleaner = Arc::clone(&subs).spawn_subscriptions_cleaner(); diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index accfcc806..4a08954a6 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -35,6 +35,8 @@ pub struct RpcSubscriptions { impl RpcSubscriptions { /// Spawns a new thread to clean up closed subscriptions from time to time. pub fn spawn_subscriptions_cleaner(self: Arc) -> JoinHandle> { + tracing::info!("spawning rpc subscriptions cleaner"); + tokio::spawn(async move { loop { let any_new_heads_closed = self.new_heads.read().await.iter().any(|(_, sub)| sub.is_closed()); @@ -60,6 +62,8 @@ impl RpcSubscriptions { /// Spawns a new thread that notifies subscribers about new created blocks. pub fn spawn_new_heads_notifier(self: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { + tracing::info!("spawning rpc new heads notifier"); + tokio::spawn(async move { loop { let Ok(block) = rx.recv().await else { @@ -79,6 +83,8 @@ impl RpcSubscriptions { /// Spawns a new thread that notifies subscribers about new transactions logs. pub fn spawn_logs_notifier(self: Arc, mut rx: broadcast::Receiver) -> JoinHandle> { + tracing::info!("spawning rpc logs notifier"); + tokio::spawn(async move { loop { let Ok(log) = rx.recv().await else { diff --git a/src/eth/storage/inmemory/inmemory_permanent.rs b/src/eth/storage/inmemory/inmemory_permanent.rs index b3dea0624..642abb259 100644 --- a/src/eth/storage/inmemory/inmemory_permanent.rs +++ b/src/eth/storage/inmemory/inmemory_permanent.rs @@ -102,7 +102,7 @@ impl InMemoryPermanentStorage { /// Creates a new InMemoryPermanentStorage from a snapshot dump. pub fn from_snapshot(state: InMemoryPermanentStorageState) -> Self { - tracing::info!("creating inmemory permanent storage from snapshot"); + tracing::info!("starting inmemory permanent storage from snapshot"); Self { state: RwLock::new(state), block_number: AtomicU64::new(0), @@ -126,7 +126,7 @@ impl InMemoryPermanentStorage { impl Default for InMemoryPermanentStorage { fn default() -> Self { - tracing::info!("creating inmemory permanent storage"); + tracing::info!("starting inmemory permanent storage"); Self { state: RwLock::new(InMemoryPermanentStorageState::default()), block_number: Default::default(), diff --git a/src/eth/storage/inmemory/inmemory_temporary.rs b/src/eth/storage/inmemory/inmemory_temporary.rs index da9f97bae..d160af58f 100644 --- a/src/eth/storage/inmemory/inmemory_temporary.rs +++ b/src/eth/storage/inmemory/inmemory_temporary.rs @@ -52,7 +52,7 @@ impl InMemoryTemporaryStorage { impl Default for InMemoryTemporaryStorage { fn default() -> Self { - tracing::info!("creating inmemory temporary storage"); + tracing::info!("starting inmemory temporary storage"); Self { states: RwLock::new(NonEmpty::new(InMemoryTemporaryStorageState::default())), } diff --git a/src/eth/storage/postgres_permanent/postgres_permanent.rs b/src/eth/storage/postgres_permanent/postgres_permanent.rs index ecd9cf1a3..29cad6b28 100644 --- a/src/eth/storage/postgres_permanent/postgres_permanent.rs +++ b/src/eth/storage/postgres_permanent/postgres_permanent.rs @@ -72,7 +72,7 @@ impl Drop for PostgresPermanentStorage { impl PostgresPermanentStorage { /// Creates a new [`PostgresPermanentStorage`]. pub async fn new(config: PostgresPermanentStorageConfig) -> anyhow::Result { - tracing::info!(?config, "creating postgres permanent storage"); + tracing::info!(?config, "starting postgres permanent storage"); let result = PgPoolOptions::new() .min_connections(config.connections) diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index f0107f98c..b00138e5e 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -46,7 +46,7 @@ pub struct RocksPermanentStorage { impl RocksPermanentStorage { pub async fn new() -> anyhow::Result { - tracing::info!("creating rocksdb storage"); + tracing::info!("starting rocksdb storage"); let state = RocksStorageState::new(); state.sync_data().await?; diff --git a/src/eth/transaction_relayer.rs b/src/eth/transaction_relayer.rs index 61bde5f31..71cdbf6a2 100644 --- a/src/eth/transaction_relayer.rs +++ b/src/eth/transaction_relayer.rs @@ -21,7 +21,7 @@ pub struct TransactionRelayer { impl TransactionRelayer { /// Creates a new [`TransactionRelayer`]. pub fn new(storage: Arc, chain: BlockchainClient) -> Self { - tracing::info!(?chain, "creating transaction relayer"); + tracing::info!(?chain, "starting transaction relayer"); Self { storage, chain } } diff --git a/src/lib.rs b/src/lib.rs index 892e2fc69..ad3a8b781 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,14 +39,17 @@ where println!("WARNING: env var PERM_STORAGE_CONNECTIONS is set to 1, if it cause connection problems, try increasing it"); } - // init services + // init metrics #[cfg(feature = "metrics")] infra::init_metrics(config.common().metrics_histogram_kind); + // init sentry let _sentry_guard = config.common().sentry_url.as_ref().map(|sentry_url| infra::init_sentry(sentry_url)); + // init tokio let runtime = config.common().init_runtime(); + // init tracing runtime.block_on(async { infra::init_tracing(config.common().tracing_url.as_ref()) }); Self {