From e34f6b14a770c76c4ce0c16748598bdd6a8fc960 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 12 Aug 2024 11:20:40 -0300 Subject: [PATCH 1/6] add global node mode state --- src/globals.rs | 41 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/main.rs | 41 ++++++++++++++++++++++++----------------- 3 files changed, 66 insertions(+), 17 deletions(-) diff --git a/src/globals.rs b/src/globals.rs index dfa5d8d5b..125b6cd97 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -70,6 +70,16 @@ where } } +// ----------------------------------------------------------------------------- +// Node mode +// ----------------------------------------------------------------------------- + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum NodeMode { + Leader, + Follower, +} + // ----------------------------------------------------------------------------- // Global state // ----------------------------------------------------------------------------- @@ -86,6 +96,9 @@ static MINER_ENABLED: AtomicBool = AtomicBool::new(true); /// Unknown clients can interact with the application? static UNKNOWN_CLIENT_ENABLED: AtomicBool = AtomicBool::new(true); +/// Current node mode. +static IS_LEADER: AtomicBool = AtomicBool::new(false); + pub struct GlobalState; impl GlobalState { @@ -168,4 +181,32 @@ impl GlobalState { pub fn is_unknown_client_enabled() -> bool { UNKNOWN_CLIENT_ENABLED.load(Ordering::Relaxed) } + + // ------------------------------------------------------------------------- + // Node Mode + // ------------------------------------------------------------------------- + + /// Sets the current node mode. + pub fn set_node_mode(mode: NodeMode) { + IS_LEADER.store(matches!(mode, NodeMode::Leader), Ordering::Relaxed); + } + + /// Gets the current node mode. + pub fn get_node_mode() -> NodeMode { + if IS_LEADER.load(Ordering::Relaxed) { + NodeMode::Leader + } else { + NodeMode::Follower + } + } + + /// Checks if the node is in follower mode. + pub fn is_follower() -> bool { + !IS_LEADER.load(Ordering::Relaxed) + } + + /// Checks if the node is in leader mode. + pub fn is_leader() -> bool { + IS_LEADER.load(Ordering::Relaxed) + } } diff --git a/src/lib.rs b/src/lib.rs index 1c8e1f352..7c97628bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,3 +8,4 @@ pub mod utils; pub use globals::GlobalServices; pub use globals::GlobalState; +pub use globals::NodeMode; diff --git a/src/main.rs b/src/main.rs index 6809b1c80..acdf1ca71 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,9 +5,16 @@ use stratus::eth::follower::consensus::Consensus; use stratus::eth::rpc::serve_rpc; use stratus::infra::BlockchainClient; use stratus::GlobalServices; +use stratus::GlobalState; +use stratus::NodeMode; fn main() -> anyhow::Result<()> { let global_services = GlobalServices::::init(); + GlobalState::set_node_mode(if global_services.config.follower { + NodeMode::Follower + } else { + NodeMode::Leader + }); global_services.runtime.block_on(run(global_services.config)) } @@ -16,29 +23,29 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { let storage = config.storage.init()?; // Init miner - let miner = if config.follower { - config.miner.init_external_mode(Arc::clone(&storage))? - } else { - config.miner.init(Arc::clone(&storage))? + let miner = match GlobalState::get_node_mode() { + NodeMode::Follower => config.miner.init_external_mode(Arc::clone(&storage))?, + NodeMode::Leader => config.miner.init(Arc::clone(&storage))?, }; // Init executor let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); // Init importer - let consensus: Option> = if config.follower { - let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?; - let chain = Arc::new( - BlockchainClient::new_http_ws( - importer_config.external_rpc.as_ref(), - importer_config.external_rpc_ws.as_deref(), - importer_config.external_rpc_timeout, - ) - .await?, - ); - Some(importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?) - } else { - None + let consensus: Option> = match GlobalState::get_node_mode() { + NodeMode::Follower => { + let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?; + let chain = Arc::new( + BlockchainClient::new_http_ws( + importer_config.external_rpc.as_ref(), + importer_config.external_rpc_ws.as_deref(), + importer_config.external_rpc_timeout, + ) + .await?, + ); + Some(importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?) + } + NodeMode::Leader => None, }; // Init RPC server From 545b11dfb9fce94a98394aacf3f3ae4c37e9db47 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 12 Aug 2024 11:28:22 -0300 Subject: [PATCH 2/6] chore: pr suggestion --- src/globals.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/globals.rs b/src/globals.rs index 125b6cd97..c747d1f23 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -74,6 +74,7 @@ where // Node mode // ----------------------------------------------------------------------------- +#[repr(u8)] #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum NodeMode { Leader, From ae176ed5831d387fc4225af251900767a903bd14 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 12 Aug 2024 13:48:46 -0300 Subject: [PATCH 3/6] chore: make miner config more explicit and encapsulated --- src/bin/importer_offline.rs | 3 ++- src/eth/miner/miner_config.rs | 23 +++++++++++-------- src/main.rs | 5 +--- tests/test_import_external_snapshot_common.rs | 3 ++- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index e5e99d3c6..f8b501a9a 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -19,6 +19,7 @@ use itertools::Itertools; use stratus::config::ImporterOfflineConfig; use stratus::eth::executor::Executor; use stratus::eth::miner::Miner; +use stratus::eth::miner::MinerMode; use stratus::eth::primitives::Block; use stratus::eth::primitives::BlockNumber; use stratus::eth::primitives::ExternalBlock; @@ -53,7 +54,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // init services let rpc_storage = config.rpc_storage.init().await?; let storage = config.storage.init()?; - let miner = config.miner.init_external_mode(Arc::clone(&storage))?; + let miner = config.miner.init_with_mode(MinerMode::External, Arc::clone(&storage))?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); // init block snapshots to export diff --git a/src/eth/miner/miner_config.rs b/src/eth/miner/miner_config.rs index fbdaffc87..43655539d 100644 --- a/src/eth/miner/miner_config.rs +++ b/src/eth/miner/miner_config.rs @@ -8,6 +8,8 @@ use display_json::DebugAsJson; use crate::eth::miner::Miner; use crate::eth::storage::StratusStorage; use crate::ext::parse_duration; +use crate::GlobalState; +use crate::NodeMode; // ----------------------------------------------------------------------------- // Config @@ -21,18 +23,21 @@ pub struct MinerConfig { } impl MinerConfig { - /// Inits [`BlockMiner`] with external mining mode, ignoring the configured value. - pub fn init_external_mode(&self, storage: Arc) -> anyhow::Result> { - self.init_with_mode(MinerMode::External, storage) - } - - /// Inits [`BlockMiner`] with the configured mining mode. + /// Inits [`Miner`] with the appropriate miner mode based on the node mode. pub fn init(&self, storage: Arc) -> anyhow::Result> { - self.init_with_mode(self.block_mode, storage) + tracing::info!(config = ?self, "creating block miner"); + + let mode = match GlobalState::get_node_mode() { + NodeMode::Follower => MinerMode::External, + NodeMode::Leader => self.block_mode, + }; + + self.init_with_mode(mode, storage) } - fn init_with_mode(&self, mode: MinerMode, storage: Arc) -> anyhow::Result> { - tracing::info!(config = ?self, "creating block miner"); + /// Inits [`Miner`] with a specific miner mode, regardless of node mode. + pub fn init_with_mode(&self, mode: MinerMode, storage: Arc) -> anyhow::Result> { + tracing::info!(config = ?self, mode = ?mode, "creating block miner with specific mode"); // create miner let miner = Miner::new(Arc::clone(&storage), mode); diff --git a/src/main.rs b/src/main.rs index acdf1ca71..619d094b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,10 +23,7 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { let storage = config.storage.init()?; // Init miner - let miner = match GlobalState::get_node_mode() { - NodeMode::Follower => config.miner.init_external_mode(Arc::clone(&storage))?, - NodeMode::Leader => config.miner.init(Arc::clone(&storage))?, - }; + let miner = config.miner.init(Arc::clone(&storage))?; // Init executor let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); diff --git a/tests/test_import_external_snapshot_common.rs b/tests/test_import_external_snapshot_common.rs index 98b17cebd..0ac64290b 100644 --- a/tests/test_import_external_snapshot_common.rs +++ b/tests/test_import_external_snapshot_common.rs @@ -9,6 +9,7 @@ use fancy_duration::AsFancyDuration; use itertools::Itertools; use stratus::alias::JsonValue; use stratus::config::IntegrationTestConfig; +use stratus::eth::miner::MinerMode; use stratus::eth::primitives::Account; use stratus::eth::primitives::Address; use stratus::eth::primitives::ExternalBlock; @@ -151,7 +152,7 @@ pub async fn execute_test( // init services let storage = Arc::new(StratusStorage::new(Box::::default(), Box::new(perm_storage))); - let miner = config.miner.init_external_mode(Arc::clone(&storage)).unwrap(); + let miner = config.miner.init_with_mode(MinerMode::External, Arc::clone(&storage)).unwrap(); let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); // execute and mine From 70d8833c748ab9fbfd40ddbd1a73bc021d704527 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 12 Aug 2024 13:49:50 -0300 Subject: [PATCH 4/6] chore: naming --- src/eth/miner/miner_config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eth/miner/miner_config.rs b/src/eth/miner/miner_config.rs index 43655539d..debdf5739 100644 --- a/src/eth/miner/miner_config.rs +++ b/src/eth/miner/miner_config.rs @@ -23,7 +23,7 @@ pub struct MinerConfig { } impl MinerConfig { - /// Inits [`Miner`] with the appropriate miner mode based on the node mode. + /// Inits [`Miner`] with the appropriate mining mode based on the node mode. pub fn init(&self, storage: Arc) -> anyhow::Result> { tracing::info!(config = ?self, "creating block miner"); @@ -35,7 +35,7 @@ impl MinerConfig { self.init_with_mode(mode, storage) } - /// Inits [`Miner`] with a specific miner mode, regardless of node mode. + /// Inits [`Miner`] with a specific mining mode, regardless of node mode. pub fn init_with_mode(&self, mode: MinerMode, storage: Arc) -> anyhow::Result> { tracing::info!(config = ?self, mode = ?mode, "creating block miner with specific mode"); From 2c8febf3c786dde25c171a68f6a07b5a1ee69bf2 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 12 Aug 2024 14:41:54 -0300 Subject: [PATCH 5/6] chore: encapsulate importer init --- src/eth/follower/importer/importer_config.rs | 26 +++++++++++--------- src/main.rs | 20 +++------------ 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/src/eth/follower/importer/importer_config.rs b/src/eth/follower/importer/importer_config.rs index b72229994..eaf5e458f 100644 --- a/src/eth/follower/importer/importer_config.rs +++ b/src/eth/follower/importer/importer_config.rs @@ -5,12 +5,15 @@ use clap::Parser; use display_json::DebugAsJson; use crate::eth::executor::Executor; +use crate::eth::follower::consensus::Consensus; use crate::eth::follower::importer::Importer; use crate::eth::miner::Miner; use crate::eth::storage::StratusStorage; use crate::ext::parse_duration; use crate::ext::spawn_named; use crate::infra::BlockchainClient; +use crate::GlobalState; +use crate::NodeMode; #[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize)] #[group(requires_all = ["external_rpc"])] @@ -32,19 +35,20 @@ pub struct ImporterConfig { } impl ImporterConfig { - pub fn init( - &self, - executor: Arc, - miner: Arc, - storage: Arc, - chain: Arc, - ) -> anyhow::Result> { + pub async fn init(&self, executor: Arc, miner: Arc, storage: Arc) -> anyhow::Result>> { + match GlobalState::get_node_mode() { + NodeMode::Follower => self.init_follower(executor, miner, storage).await, + NodeMode::Leader => Ok(None), + } + } + + async fn init_follower(&self, executor: Arc, miner: Arc, storage: Arc) -> anyhow::Result>> { const TASK_NAME: &str = "importer::init"; - tracing::info!(config = ?self, "creating importer"); + tracing::info!(config = ?self, "creating importer for follower node"); - let config = self.clone(); + let chain = Arc::new(BlockchainClient::new_http_ws(&self.external_rpc, self.external_rpc_ws.as_deref(), self.external_rpc_timeout).await?); - let importer = Importer::new(executor, miner, Arc::clone(&storage), chain, config.sync_interval); + let importer = Importer::new(executor, Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain), self.sync_interval); let importer = Arc::new(importer); spawn_named(TASK_NAME, { @@ -56,6 +60,6 @@ impl ImporterConfig { } }); - Ok(importer) + Ok(Some(importer)) } } diff --git a/src/main.rs b/src/main.rs index 619d094b3..c11f80421 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,7 @@ use std::sync::Arc; use stratus::config::StratusConfig; -use stratus::eth::follower::consensus::Consensus; use stratus::eth::rpc::serve_rpc; -use stratus::infra::BlockchainClient; use stratus::GlobalServices; use stratus::GlobalState; use stratus::NodeMode; @@ -29,20 +27,10 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); // Init importer - let consensus: Option> = match GlobalState::get_node_mode() { - NodeMode::Follower => { - let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?; - let chain = Arc::new( - BlockchainClient::new_http_ws( - importer_config.external_rpc.as_ref(), - importer_config.external_rpc_ws.as_deref(), - importer_config.external_rpc_timeout, - ) - .await?, - ); - Some(importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?) - } - NodeMode::Leader => None, + let consensus = if let Some(importer_config) = &config.importer { + importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage)).await? + } else { + None }; // Init RPC server From f2d7c5052e0715e128c5649086d48def41d1ecb3 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 12 Aug 2024 15:44:35 -0300 Subject: [PATCH 6/6] chore: refactor main initialization --- src/globals.rs | 7 +++++++ src/main.rs | 7 +------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/globals.rs b/src/globals.rs index c747d1f23..797d4e773 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -8,6 +8,7 @@ use tokio::runtime::Runtime; use tokio_util::sync::CancellationToken; use crate::config; +use crate::config::StratusConfig; use crate::config::WithCommonConfig; use crate::ext::spawn_signal_handler; use crate::infra::tracing::warn_task_cancellation; @@ -187,6 +188,12 @@ impl GlobalState { // Node Mode // ------------------------------------------------------------------------- + /// Initializes the node mode based on the StratusConfig. + pub fn initialize_node_mode(config: &StratusConfig) { + let mode = if config.follower { NodeMode::Follower } else { NodeMode::Leader }; + Self::set_node_mode(mode); + } + /// Sets the current node mode. pub fn set_node_mode(mode: NodeMode) { IS_LEADER.store(matches!(mode, NodeMode::Leader), Ordering::Relaxed); diff --git a/src/main.rs b/src/main.rs index c11f80421..4c9ca88fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,15 +4,10 @@ use stratus::config::StratusConfig; use stratus::eth::rpc::serve_rpc; use stratus::GlobalServices; use stratus::GlobalState; -use stratus::NodeMode; fn main() -> anyhow::Result<()> { let global_services = GlobalServices::::init(); - GlobalState::set_node_mode(if global_services.config.follower { - NodeMode::Follower - } else { - NodeMode::Leader - }); + GlobalState::initialize_node_mode(&global_services.config); global_services.runtime.block_on(run(global_services.config)) }