Skip to content

Commit

Permalink
enha: add global node mode (#1626)
Browse files Browse the repository at this point in the history
* add global node mode state

* chore: pr suggestion

* chore: make miner config more explicit and encapsulated

* chore: naming

* chore: encapsulate importer init

* chore: refactor main initialization
  • Loading branch information
gabriel-aranha-cw authored Aug 12, 2024
1 parent d4fca19 commit 960c6c8
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 40 deletions.
3 changes: 2 additions & 1 deletion src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use itertools::Itertools;
use stratus::config::ImporterOfflineConfig;
use stratus::eth::executor::Executor;
use stratus::eth::miner::Miner;
use stratus::eth::miner::MinerMode;
use stratus::eth::primitives::Block;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::ExternalBlock;
Expand Down Expand Up @@ -53,7 +54,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// init services
let rpc_storage = config.rpc_storage.init().await?;
let storage = config.storage.init()?;
let miner = config.miner.init_external_mode(Arc::clone(&storage))?;
let miner = config.miner.init_with_mode(MinerMode::External, Arc::clone(&storage))?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner));

// init block snapshots to export
Expand Down
26 changes: 15 additions & 11 deletions src/eth/follower/importer/importer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ use clap::Parser;
use display_json::DebugAsJson;

use crate::eth::executor::Executor;
use crate::eth::follower::consensus::Consensus;
use crate::eth::follower::importer::Importer;
use crate::eth::miner::Miner;
use crate::eth::storage::StratusStorage;
use crate::ext::parse_duration;
use crate::ext::spawn_named;
use crate::infra::BlockchainClient;
use crate::GlobalState;
use crate::NodeMode;

#[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize)]
#[group(requires_all = ["external_rpc"])]
Expand All @@ -32,19 +35,20 @@ pub struct ImporterConfig {
}

impl ImporterConfig {
pub fn init(
&self,
executor: Arc<Executor>,
miner: Arc<Miner>,
storage: Arc<StratusStorage>,
chain: Arc<BlockchainClient>,
) -> anyhow::Result<Arc<Importer>> {
pub async fn init(&self, executor: Arc<Executor>, miner: Arc<Miner>, storage: Arc<StratusStorage>) -> anyhow::Result<Option<Arc<dyn Consensus>>> {
match GlobalState::get_node_mode() {
NodeMode::Follower => self.init_follower(executor, miner, storage).await,
NodeMode::Leader => Ok(None),
}
}

async fn init_follower(&self, executor: Arc<Executor>, miner: Arc<Miner>, storage: Arc<StratusStorage>) -> anyhow::Result<Option<Arc<dyn Consensus>>> {
const TASK_NAME: &str = "importer::init";
tracing::info!(config = ?self, "creating importer");
tracing::info!(config = ?self, "creating importer for follower node");

let config = self.clone();
let chain = Arc::new(BlockchainClient::new_http_ws(&self.external_rpc, self.external_rpc_ws.as_deref(), self.external_rpc_timeout).await?);

let importer = Importer::new(executor, miner, Arc::clone(&storage), chain, config.sync_interval);
let importer = Importer::new(executor, Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain), self.sync_interval);
let importer = Arc::new(importer);

spawn_named(TASK_NAME, {
Expand All @@ -56,6 +60,6 @@ impl ImporterConfig {
}
});

Ok(importer)
Ok(Some(importer))
}
}
23 changes: 14 additions & 9 deletions src/eth/miner/miner_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use display_json::DebugAsJson;
use crate::eth::miner::Miner;
use crate::eth::storage::StratusStorage;
use crate::ext::parse_duration;
use crate::GlobalState;
use crate::NodeMode;

// -----------------------------------------------------------------------------
// Config
Expand All @@ -21,18 +23,21 @@ pub struct MinerConfig {
}

impl MinerConfig {
/// Inits [`BlockMiner`] with external mining mode, ignoring the configured value.
pub fn init_external_mode(&self, storage: Arc<StratusStorage>) -> anyhow::Result<Arc<Miner>> {
self.init_with_mode(MinerMode::External, storage)
}

/// Inits [`BlockMiner`] with the configured mining mode.
/// Inits [`Miner`] with the appropriate mining mode based on the node mode.
pub fn init(&self, storage: Arc<StratusStorage>) -> anyhow::Result<Arc<Miner>> {
self.init_with_mode(self.block_mode, storage)
tracing::info!(config = ?self, "creating block miner");

let mode = match GlobalState::get_node_mode() {
NodeMode::Follower => MinerMode::External,
NodeMode::Leader => self.block_mode,
};

self.init_with_mode(mode, storage)
}

fn init_with_mode(&self, mode: MinerMode, storage: Arc<StratusStorage>) -> anyhow::Result<Arc<Miner>> {
tracing::info!(config = ?self, "creating block miner");
/// Inits [`Miner`] with a specific mining mode, regardless of node mode.
pub fn init_with_mode(&self, mode: MinerMode, storage: Arc<StratusStorage>) -> anyhow::Result<Arc<Miner>> {
tracing::info!(config = ?self, mode = ?mode, "creating block miner with specific mode");

// create miner
let miner = Miner::new(Arc::clone(&storage), mode);
Expand Down
49 changes: 49 additions & 0 deletions src/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

use crate::config;
use crate::config::StratusConfig;
use crate::config::WithCommonConfig;
use crate::ext::spawn_signal_handler;
use crate::infra::tracing::warn_task_cancellation;
Expand Down Expand Up @@ -70,6 +71,17 @@ where
}
}

// -----------------------------------------------------------------------------
// Node mode
// -----------------------------------------------------------------------------

#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum NodeMode {
Leader,
Follower,
}

// -----------------------------------------------------------------------------
// Global state
// -----------------------------------------------------------------------------
Expand All @@ -86,6 +98,9 @@ static MINER_ENABLED: AtomicBool = AtomicBool::new(true);
/// Unknown clients can interact with the application?
static UNKNOWN_CLIENT_ENABLED: AtomicBool = AtomicBool::new(true);

/// Current node mode.
static IS_LEADER: AtomicBool = AtomicBool::new(false);

pub struct GlobalState;

impl GlobalState {
Expand Down Expand Up @@ -168,4 +183,38 @@ impl GlobalState {
pub fn is_unknown_client_enabled() -> bool {
UNKNOWN_CLIENT_ENABLED.load(Ordering::Relaxed)
}

// -------------------------------------------------------------------------
// Node Mode
// -------------------------------------------------------------------------

/// Initializes the node mode based on the StratusConfig.
pub fn initialize_node_mode(config: &StratusConfig) {
let mode = if config.follower { NodeMode::Follower } else { NodeMode::Leader };
Self::set_node_mode(mode);
}

/// Sets the current node mode.
pub fn set_node_mode(mode: NodeMode) {
IS_LEADER.store(matches!(mode, NodeMode::Leader), Ordering::Relaxed);
}

/// Gets the current node mode.
pub fn get_node_mode() -> NodeMode {
if IS_LEADER.load(Ordering::Relaxed) {
NodeMode::Leader
} else {
NodeMode::Follower
}
}

/// Checks if the node is in follower mode.
pub fn is_follower() -> bool {
!IS_LEADER.load(Ordering::Relaxed)
}

/// Checks if the node is in leader mode.
pub fn is_leader() -> bool {
IS_LEADER.load(Ordering::Relaxed)
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pub mod utils;

pub use globals::GlobalServices;
pub use globals::GlobalState;
pub use globals::NodeMode;
23 changes: 5 additions & 18 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::sync::Arc;

use stratus::config::StratusConfig;
use stratus::eth::follower::consensus::Consensus;
use stratus::eth::rpc::serve_rpc;
use stratus::infra::BlockchainClient;
use stratus::GlobalServices;
use stratus::GlobalState;

fn main() -> anyhow::Result<()> {
let global_services = GlobalServices::<StratusConfig>::init();
GlobalState::initialize_node_mode(&global_services.config);
global_services.runtime.block_on(run(global_services.config))
}

Expand All @@ -16,27 +16,14 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> {
let storage = config.storage.init()?;

// Init miner
let miner = if config.follower {
config.miner.init_external_mode(Arc::clone(&storage))?
} else {
config.miner.init(Arc::clone(&storage))?
};
let miner = config.miner.init(Arc::clone(&storage))?;

// Init executor
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner));

// Init importer
let consensus: Option<Arc<dyn Consensus>> = if config.follower {
let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?;
let chain = Arc::new(
BlockchainClient::new_http_ws(
importer_config.external_rpc.as_ref(),
importer_config.external_rpc_ws.as_deref(),
importer_config.external_rpc_timeout,
)
.await?,
);
Some(importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?)
let consensus = if let Some(importer_config) = &config.importer {
importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage)).await?
} else {
None
};
Expand Down
3 changes: 2 additions & 1 deletion tests/test_import_external_snapshot_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fancy_duration::AsFancyDuration;
use itertools::Itertools;
use stratus::alias::JsonValue;
use stratus::config::IntegrationTestConfig;
use stratus::eth::miner::MinerMode;
use stratus::eth::primitives::Account;
use stratus::eth::primitives::Address;
use stratus::eth::primitives::ExternalBlock;
Expand Down Expand Up @@ -151,7 +152,7 @@ pub async fn execute_test(

// init services
let storage = Arc::new(StratusStorage::new(Box::<InMemoryTemporaryStorage>::default(), Box::new(perm_storage)));
let miner = config.miner.init_external_mode(Arc::clone(&storage)).unwrap();
let miner = config.miner.init_with_mode(MinerMode::External, Arc::clone(&storage)).unwrap();
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner));

// execute and mine
Expand Down

0 comments on commit 960c6c8

Please sign in to comment.