Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enha: add global node mode #1626

Merged
merged 8 commits into from
Aug 12, 2024
3 changes: 2 additions & 1 deletion src/bin/importer_offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use itertools::Itertools;
use stratus::config::ImporterOfflineConfig;
use stratus::eth::executor::Executor;
use stratus::eth::miner::Miner;
use stratus::eth::miner::MinerMode;
use stratus::eth::primitives::Block;
use stratus::eth::primitives::BlockNumber;
use stratus::eth::primitives::ExternalBlock;
Expand Down Expand Up @@ -53,7 +54,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> {
// init services
let rpc_storage = config.rpc_storage.init().await?;
let storage = config.storage.init()?;
let miner = config.miner.init_external_mode(Arc::clone(&storage))?;
let miner = config.miner.init_with_mode(MinerMode::External, Arc::clone(&storage))?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner));

// init block snapshots to export
Expand Down
26 changes: 15 additions & 11 deletions src/eth/follower/importer/importer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ use clap::Parser;
use display_json::DebugAsJson;

use crate::eth::executor::Executor;
use crate::eth::follower::consensus::Consensus;
use crate::eth::follower::importer::Importer;
use crate::eth::miner::Miner;
use crate::eth::storage::StratusStorage;
use crate::ext::parse_duration;
use crate::ext::spawn_named;
use crate::infra::BlockchainClient;
use crate::GlobalState;
use crate::NodeMode;

#[derive(Default, Parser, DebugAsJson, Clone, serde::Serialize)]
#[group(requires_all = ["external_rpc"])]
Expand All @@ -32,19 +35,20 @@ pub struct ImporterConfig {
}

impl ImporterConfig {
pub fn init(
&self,
executor: Arc<Executor>,
miner: Arc<Miner>,
storage: Arc<StratusStorage>,
chain: Arc<BlockchainClient>,
) -> anyhow::Result<Arc<Importer>> {
pub async fn init(&self, executor: Arc<Executor>, miner: Arc<Miner>, storage: Arc<StratusStorage>) -> anyhow::Result<Option<Arc<dyn Consensus>>> {
match GlobalState::get_node_mode() {
NodeMode::Follower => self.init_follower(executor, miner, storage).await,
NodeMode::Leader => Ok(None),
}
}

async fn init_follower(&self, executor: Arc<Executor>, miner: Arc<Miner>, storage: Arc<StratusStorage>) -> anyhow::Result<Option<Arc<dyn Consensus>>> {
const TASK_NAME: &str = "importer::init";
tracing::info!(config = ?self, "creating importer");
tracing::info!(config = ?self, "creating importer for follower node");

let config = self.clone();
let chain = Arc::new(BlockchainClient::new_http_ws(&self.external_rpc, self.external_rpc_ws.as_deref(), self.external_rpc_timeout).await?);

let importer = Importer::new(executor, miner, Arc::clone(&storage), chain, config.sync_interval);
let importer = Importer::new(executor, Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain), self.sync_interval);
let importer = Arc::new(importer);

spawn_named(TASK_NAME, {
Expand All @@ -56,6 +60,6 @@ impl ImporterConfig {
}
});

Ok(importer)
Ok(Some(importer))
}
}
23 changes: 14 additions & 9 deletions src/eth/miner/miner_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use display_json::DebugAsJson;
use crate::eth::miner::Miner;
use crate::eth::storage::StratusStorage;
use crate::ext::parse_duration;
use crate::GlobalState;
use crate::NodeMode;

// -----------------------------------------------------------------------------
// Config
Expand All @@ -21,18 +23,21 @@ pub struct MinerConfig {
}

impl MinerConfig {
/// Inits [`BlockMiner`] with external mining mode, ignoring the configured value.
pub fn init_external_mode(&self, storage: Arc<StratusStorage>) -> anyhow::Result<Arc<Miner>> {
self.init_with_mode(MinerMode::External, storage)
}

/// Inits [`BlockMiner`] with the configured mining mode.
/// Inits [`Miner`] with the appropriate mining mode based on the node mode.
pub fn init(&self, storage: Arc<StratusStorage>) -> anyhow::Result<Arc<Miner>> {
self.init_with_mode(self.block_mode, storage)
tracing::info!(config = ?self, "creating block miner");

let mode = match GlobalState::get_node_mode() {
NodeMode::Follower => MinerMode::External,
NodeMode::Leader => self.block_mode,
};

self.init_with_mode(mode, storage)
}

fn init_with_mode(&self, mode: MinerMode, storage: Arc<StratusStorage>) -> anyhow::Result<Arc<Miner>> {
tracing::info!(config = ?self, "creating block miner");
/// Inits [`Miner`] with a specific mining mode, regardless of node mode.
pub fn init_with_mode(&self, mode: MinerMode, storage: Arc<StratusStorage>) -> anyhow::Result<Arc<Miner>> {
tracing::info!(config = ?self, mode = ?mode, "creating block miner with specific mode");

// create miner
let miner = Miner::new(Arc::clone(&storage), mode);
Expand Down
49 changes: 49 additions & 0 deletions src/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

use crate::config;
use crate::config::StratusConfig;
use crate::config::WithCommonConfig;
use crate::ext::spawn_signal_handler;
use crate::infra::tracing::warn_task_cancellation;
Expand Down Expand Up @@ -70,6 +71,17 @@ where
}
}

// -----------------------------------------------------------------------------
// Node mode
// -----------------------------------------------------------------------------

#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum NodeMode {
Leader,
Follower,
}

// -----------------------------------------------------------------------------
// Global state
// -----------------------------------------------------------------------------
Expand All @@ -86,6 +98,9 @@ static MINER_ENABLED: AtomicBool = AtomicBool::new(true);
/// Unknown clients can interact with the application?
static UNKNOWN_CLIENT_ENABLED: AtomicBool = AtomicBool::new(true);

/// Current node mode.
static IS_LEADER: AtomicBool = AtomicBool::new(false);

pub struct GlobalState;

impl GlobalState {
Expand Down Expand Up @@ -168,4 +183,38 @@ impl GlobalState {
pub fn is_unknown_client_enabled() -> bool {
UNKNOWN_CLIENT_ENABLED.load(Ordering::Relaxed)
}

// -------------------------------------------------------------------------
// Node Mode
// -------------------------------------------------------------------------

/// Initializes the node mode based on the StratusConfig.
pub fn initialize_node_mode(config: &StratusConfig) {
let mode = if config.follower { NodeMode::Follower } else { NodeMode::Leader };
Self::set_node_mode(mode);
}

/// Sets the current node mode.
pub fn set_node_mode(mode: NodeMode) {
IS_LEADER.store(matches!(mode, NodeMode::Leader), Ordering::Relaxed);
}

/// Gets the current node mode.
pub fn get_node_mode() -> NodeMode {
if IS_LEADER.load(Ordering::Relaxed) {
NodeMode::Leader
} else {
NodeMode::Follower
}
}

/// Checks if the node is in follower mode.
pub fn is_follower() -> bool {
!IS_LEADER.load(Ordering::Relaxed)
}

/// Checks if the node is in leader mode.
pub fn is_leader() -> bool {
IS_LEADER.load(Ordering::Relaxed)
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ pub mod utils;

pub use globals::GlobalServices;
pub use globals::GlobalState;
pub use globals::NodeMode;
23 changes: 5 additions & 18 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::sync::Arc;

use stratus::config::StratusConfig;
use stratus::eth::follower::consensus::Consensus;
use stratus::eth::rpc::serve_rpc;
use stratus::infra::BlockchainClient;
use stratus::GlobalServices;
use stratus::GlobalState;

fn main() -> anyhow::Result<()> {
let global_services = GlobalServices::<StratusConfig>::init();
GlobalState::initialize_node_mode(&global_services.config);
global_services.runtime.block_on(run(global_services.config))
}

Expand All @@ -16,27 +16,14 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> {
let storage = config.storage.init()?;

// Init miner
let miner = if config.follower {
config.miner.init_external_mode(Arc::clone(&storage))?
} else {
config.miner.init(Arc::clone(&storage))?
};
let miner = config.miner.init(Arc::clone(&storage))?;

// Init executor
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner));

// Init importer
let consensus: Option<Arc<dyn Consensus>> = if config.follower {
let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?;
let chain = Arc::new(
BlockchainClient::new_http_ws(
importer_config.external_rpc.as_ref(),
importer_config.external_rpc_ws.as_deref(),
importer_config.external_rpc_timeout,
)
.await?,
);
Some(importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?)
let consensus = if let Some(importer_config) = &config.importer {
importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage)).await?
} else {
None
};
Expand Down
3 changes: 2 additions & 1 deletion tests/test_import_external_snapshot_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fancy_duration::AsFancyDuration;
use itertools::Itertools;
use stratus::alias::JsonValue;
use stratus::config::IntegrationTestConfig;
use stratus::eth::miner::MinerMode;
use stratus::eth::primitives::Account;
use stratus::eth::primitives::Address;
use stratus::eth::primitives::ExternalBlock;
Expand Down Expand Up @@ -151,7 +152,7 @@ pub async fn execute_test(

// init services
let storage = Arc::new(StratusStorage::new(Box::<InMemoryTemporaryStorage>::default(), Box::new(perm_storage)));
let miner = config.miner.init_external_mode(Arc::clone(&storage)).unwrap();
let miner = config.miner.init_with_mode(MinerMode::External, Arc::clone(&storage)).unwrap();
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner));

// execute and mine
Expand Down