From 960c6c8b19623b59d25eaa201e8558c7c77045d8 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw <166405807+gabriel-aranha-cw@users.noreply.github.com> Date: Mon, 12 Aug 2024 15:54:24 -0300 Subject: [PATCH] enha: add global node mode (#1626) * add global node mode state * chore: pr suggestion * chore: make miner config more explicit and encapsulated * chore: naming * chore: encapsulate importer init * chore: refactor main initialization --- src/bin/importer_offline.rs | 3 +- src/eth/follower/importer/importer_config.rs | 26 +++++----- src/eth/miner/miner_config.rs | 23 +++++---- src/globals.rs | 49 +++++++++++++++++++ src/lib.rs | 1 + src/main.rs | 23 ++------- tests/test_import_external_snapshot_common.rs | 3 +- 7 files changed, 88 insertions(+), 40 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index e5e99d3c6..f8b501a9a 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -19,6 +19,7 @@ use itertools::Itertools; use stratus::config::ImporterOfflineConfig; use stratus::eth::executor::Executor; use stratus::eth::miner::Miner; +use stratus::eth::miner::MinerMode; use stratus::eth::primitives::Block; use stratus::eth::primitives::BlockNumber; use stratus::eth::primitives::ExternalBlock; @@ -53,7 +54,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { // init services let rpc_storage = config.rpc_storage.init().await?; let storage = config.storage.init()?; - let miner = config.miner.init_external_mode(Arc::clone(&storage))?; + let miner = config.miner.init_with_mode(MinerMode::External, Arc::clone(&storage))?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); // init block snapshots to export diff --git a/src/eth/follower/importer/importer_config.rs b/src/eth/follower/importer/importer_config.rs index b72229994..eaf5e458f 100644 --- a/src/eth/follower/importer/importer_config.rs +++ b/src/eth/follower/importer/importer_config.rs @@ -5,12 +5,15 @@ use clap::Parser; use display_json::DebugAsJson; use crate::eth::executor::Executor; +use crate::eth::follower::consensus::Consensus; use crate::eth::follower::importer::Importer; use crate::eth::miner::Miner; use crate::eth::storage::StratusStorage; use crate::ext::parse_duration; use crate::ext::spawn_named; use crate::infra::BlockchainClient; +use crate::GlobalState; +use crate::NodeMode; #[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize)] #[group(requires_all = ["external_rpc"])] @@ -32,19 +35,20 @@ pub struct ImporterConfig { } impl ImporterConfig { - pub fn init( - &self, - executor: Arc, - miner: Arc, - storage: Arc, - chain: Arc, - ) -> anyhow::Result> { + pub async fn init(&self, executor: Arc, miner: Arc, storage: Arc) -> anyhow::Result>> { + match GlobalState::get_node_mode() { + NodeMode::Follower => self.init_follower(executor, miner, storage).await, + NodeMode::Leader => Ok(None), + } + } + + async fn init_follower(&self, executor: Arc, miner: Arc, storage: Arc) -> anyhow::Result>> { const TASK_NAME: &str = "importer::init"; - tracing::info!(config = ?self, "creating importer"); + tracing::info!(config = ?self, "creating importer for follower node"); - let config = self.clone(); + let chain = Arc::new(BlockchainClient::new_http_ws(&self.external_rpc, self.external_rpc_ws.as_deref(), self.external_rpc_timeout).await?); - let importer = Importer::new(executor, miner, Arc::clone(&storage), chain, config.sync_interval); + let importer = Importer::new(executor, Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain), self.sync_interval); let importer = Arc::new(importer); spawn_named(TASK_NAME, { @@ -56,6 +60,6 @@ impl ImporterConfig { } }); - Ok(importer) + Ok(Some(importer)) } } diff --git a/src/eth/miner/miner_config.rs b/src/eth/miner/miner_config.rs index fbdaffc87..debdf5739 100644 --- a/src/eth/miner/miner_config.rs +++ b/src/eth/miner/miner_config.rs @@ -8,6 +8,8 @@ use display_json::DebugAsJson; use crate::eth::miner::Miner; use crate::eth::storage::StratusStorage; use crate::ext::parse_duration; +use crate::GlobalState; +use crate::NodeMode; // ----------------------------------------------------------------------------- // Config @@ -21,18 +23,21 @@ pub struct MinerConfig { } impl MinerConfig { - /// Inits [`BlockMiner`] with external mining mode, ignoring the configured value. - pub fn init_external_mode(&self, storage: Arc) -> anyhow::Result> { - self.init_with_mode(MinerMode::External, storage) - } - - /// Inits [`BlockMiner`] with the configured mining mode. + /// Inits [`Miner`] with the appropriate mining mode based on the node mode. pub fn init(&self, storage: Arc) -> anyhow::Result> { - self.init_with_mode(self.block_mode, storage) + tracing::info!(config = ?self, "creating block miner"); + + let mode = match GlobalState::get_node_mode() { + NodeMode::Follower => MinerMode::External, + NodeMode::Leader => self.block_mode, + }; + + self.init_with_mode(mode, storage) } - fn init_with_mode(&self, mode: MinerMode, storage: Arc) -> anyhow::Result> { - tracing::info!(config = ?self, "creating block miner"); + /// Inits [`Miner`] with a specific mining mode, regardless of node mode. + pub fn init_with_mode(&self, mode: MinerMode, storage: Arc) -> anyhow::Result> { + tracing::info!(config = ?self, mode = ?mode, "creating block miner with specific mode"); // create miner let miner = Miner::new(Arc::clone(&storage), mode); diff --git a/src/globals.rs b/src/globals.rs index dfa5d8d5b..797d4e773 100644 --- a/src/globals.rs +++ b/src/globals.rs @@ -8,6 +8,7 @@ use tokio::runtime::Runtime; use tokio_util::sync::CancellationToken; use crate::config; +use crate::config::StratusConfig; use crate::config::WithCommonConfig; use crate::ext::spawn_signal_handler; use crate::infra::tracing::warn_task_cancellation; @@ -70,6 +71,17 @@ where } } +// ----------------------------------------------------------------------------- +// Node mode +// ----------------------------------------------------------------------------- + +#[repr(u8)] +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum NodeMode { + Leader, + Follower, +} + // ----------------------------------------------------------------------------- // Global state // ----------------------------------------------------------------------------- @@ -86,6 +98,9 @@ static MINER_ENABLED: AtomicBool = AtomicBool::new(true); /// Unknown clients can interact with the application? static UNKNOWN_CLIENT_ENABLED: AtomicBool = AtomicBool::new(true); +/// Current node mode. +static IS_LEADER: AtomicBool = AtomicBool::new(false); + pub struct GlobalState; impl GlobalState { @@ -168,4 +183,38 @@ impl GlobalState { pub fn is_unknown_client_enabled() -> bool { UNKNOWN_CLIENT_ENABLED.load(Ordering::Relaxed) } + + // ------------------------------------------------------------------------- + // Node Mode + // ------------------------------------------------------------------------- + + /// Initializes the node mode based on the StratusConfig. + pub fn initialize_node_mode(config: &StratusConfig) { + let mode = if config.follower { NodeMode::Follower } else { NodeMode::Leader }; + Self::set_node_mode(mode); + } + + /// Sets the current node mode. + pub fn set_node_mode(mode: NodeMode) { + IS_LEADER.store(matches!(mode, NodeMode::Leader), Ordering::Relaxed); + } + + /// Gets the current node mode. + pub fn get_node_mode() -> NodeMode { + if IS_LEADER.load(Ordering::Relaxed) { + NodeMode::Leader + } else { + NodeMode::Follower + } + } + + /// Checks if the node is in follower mode. + pub fn is_follower() -> bool { + !IS_LEADER.load(Ordering::Relaxed) + } + + /// Checks if the node is in leader mode. + pub fn is_leader() -> bool { + IS_LEADER.load(Ordering::Relaxed) + } } diff --git a/src/lib.rs b/src/lib.rs index 1c8e1f352..7c97628bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,3 +8,4 @@ pub mod utils; pub use globals::GlobalServices; pub use globals::GlobalState; +pub use globals::NodeMode; diff --git a/src/main.rs b/src/main.rs index 6809b1c80..4c9ca88fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,13 @@ use std::sync::Arc; use stratus::config::StratusConfig; -use stratus::eth::follower::consensus::Consensus; use stratus::eth::rpc::serve_rpc; -use stratus::infra::BlockchainClient; use stratus::GlobalServices; +use stratus::GlobalState; fn main() -> anyhow::Result<()> { let global_services = GlobalServices::::init(); + GlobalState::initialize_node_mode(&global_services.config); global_services.runtime.block_on(run(global_services.config)) } @@ -16,27 +16,14 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { let storage = config.storage.init()?; // Init miner - let miner = if config.follower { - config.miner.init_external_mode(Arc::clone(&storage))? - } else { - config.miner.init(Arc::clone(&storage))? - }; + let miner = config.miner.init(Arc::clone(&storage))?; // Init executor let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); // Init importer - let consensus: Option> = if config.follower { - let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?; - let chain = Arc::new( - BlockchainClient::new_http_ws( - importer_config.external_rpc.as_ref(), - importer_config.external_rpc_ws.as_deref(), - importer_config.external_rpc_timeout, - ) - .await?, - ); - Some(importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?) + let consensus = if let Some(importer_config) = &config.importer { + importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage)).await? } else { None }; diff --git a/tests/test_import_external_snapshot_common.rs b/tests/test_import_external_snapshot_common.rs index 98b17cebd..0ac64290b 100644 --- a/tests/test_import_external_snapshot_common.rs +++ b/tests/test_import_external_snapshot_common.rs @@ -9,6 +9,7 @@ use fancy_duration::AsFancyDuration; use itertools::Itertools; use stratus::alias::JsonValue; use stratus::config::IntegrationTestConfig; +use stratus::eth::miner::MinerMode; use stratus::eth::primitives::Account; use stratus::eth::primitives::Address; use stratus::eth::primitives::ExternalBlock; @@ -151,7 +152,7 @@ pub async fn execute_test( // init services let storage = Arc::new(StratusStorage::new(Box::::default(), Box::new(perm_storage))); - let miner = config.miner.init_external_mode(Arc::clone(&storage)).unwrap(); + let miner = config.miner.init_with_mode(MinerMode::External, Arc::clone(&storage)).unwrap(); let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); // execute and mine