diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index 6bbb5c43a..bbcd1c3f8 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -21,7 +21,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { // init services let storage = config.storage.init().await?; - let consensus = Arc::new(Consensus::new(config.clone().leader_node)); // in development, with no leader configured, the current node ends up being the leader + let consensus = Consensus::new(config.clone().leader_node); // in development, with no leader configured, the current node ends up being the leader let (http_url, ws_url) = consensus.get_chain_url(config.clone()); let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref(), config.online.external_rpc_timeout).await?); @@ -31,7 +31,10 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { .miner .init_external_mode(Arc::clone(&storage), Some(Arc::clone(&consensus)), external_relayer) .await?; - let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, Some(consensus)).await; + let executor = config + .executor + .init(Arc::clone(&storage), Arc::clone(&miner), relayer, Some(Arc::clone(&consensus))) + .await; let rpc_storage = Arc::clone(&storage); let rpc_executor = Arc::clone(&executor); @@ -39,7 +42,15 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { // run rpc and importer-online in parallel let rpc_task = async move { - let res = serve_rpc(rpc_storage, rpc_executor, rpc_miner, config.address, config.executor.chain_id.into()).await; + let res = serve_rpc( + rpc_storage, + rpc_executor, + rpc_miner, + Arc::clone(&consensus), + config.address, + config.executor.chain_id.into(), + ) + .await; GlobalState::shutdown_from(TASK_NAME, "rpc server finished unexpectedly"); res }; diff --git a/src/eth/consensus.rs b/src/eth/consensus.rs index b1f736382..0d5f1259d 100644 --- a/src/eth/consensus.rs +++ b/src/eth/consensus.rs @@ -1,6 +1,9 @@ #[cfg(feature = "kubernetes")] pub mod consensus_kube { use std::env; + use std::sync::atomic::AtomicU64; + use std::sync::atomic::Ordering; + use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; @@ -10,6 +13,7 @@ pub mod consensus_kube { use kube::Client; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{self}; + use tokio::sync::Mutex; use tokio::time::sleep; use tonic::transport::Channel; use tonic::transport::Server; @@ -49,14 +53,14 @@ pub mod consensus_kube { pub struct Consensus { pub sender: Sender, - leader_name: String, //XXX check the peers instead of using it - //XXX current_index: AtomicU64, + leader_name: String, //XXX check the peers instead of using it + last_arrived_block_number: AtomicU64, //TODO use a true index for both executions and blocks, currently we use something like Bully algorithm so block number is fine } impl Consensus { //XXX for now we pick the leader name from the environment // the correct is to have a leader election algorithm - pub fn new(leader_name: Option) -> Self { + pub fn new(leader_name: Option) -> Arc { let Some(_node_name) = Self::current_node() else { tracing::info!("No consensus module available, running in standalone mode"); return Self::new_stand_alone(); @@ -101,11 +105,19 @@ pub mod consensus_kube { } }); - Self::initialize_server(); - Self { leader_name, sender } + let last_arrived_block_number = AtomicU64::new(0); //TODO load from consensus storage + + let consensus = Self { + leader_name, + sender, + last_arrived_block_number, + }; + let consensus = Arc::new(consensus); + Self::initialize_server(Arc::clone(&consensus)); + consensus } - fn new_stand_alone() -> Self { + fn new_stand_alone() -> Arc { let (sender, mut receiver) = mpsc::channel::(32); tokio::spawn(async move { @@ -114,18 +126,23 @@ pub mod consensus_kube { } }); - Self { + let last_arrived_block_number = AtomicU64::new(0); + + Arc::new(Self { leader_name: "standalone".to_string(), sender, - } + last_arrived_block_number, + }) } - fn initialize_server() { + fn initialize_server(consensus: Arc) { tokio::spawn(async move { tracing::info!("Starting append entry service at port 3777"); let addr = "0.0.0.0:3777".parse().unwrap(); - let append_entry_service = AppendEntryServiceImpl; + let append_entry_service = AppendEntryServiceImpl { + consensus: Mutex::new(consensus), + }; Server::builder() .add_service(AppendEntryServiceServer::new(append_entry_service)) @@ -283,7 +300,9 @@ pub mod consensus_kube { } } - pub struct AppendEntryServiceImpl; + pub struct AppendEntryServiceImpl { + consensus: Mutex>, + } #[tonic::async_trait] impl AppendEntryService for AppendEntryServiceImpl { @@ -311,10 +330,21 @@ pub mod consensus_kube { tracing::info!(number = header.number, "appending new block"); + let consensus = self.consensus.lock().await; + let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); + + consensus.last_arrived_block_number.store(header.number, Ordering::SeqCst); + + tracing::info!( + last_last_arrived_block_number = last_last_arrived_block_number, + new_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst), + "last arrived block number set", + ); + Ok(Response::new(AppendBlockCommitResponse { status: StatusCode::AppendSuccess as i32, message: "Block Commit appended successfully".into(), - last_committed_block_number: 0, + last_committed_block_number: consensus.last_arrived_block_number.load(Ordering::SeqCst), })) } } @@ -322,6 +352,8 @@ pub mod consensus_kube { #[cfg(not(feature = "kubernetes"))] pub mod consensus_mock { + use std::sync::Arc; + use tokio::sync::mpsc::Sender; use crate::config::RunWithImporterConfig; @@ -332,7 +364,7 @@ pub mod consensus_mock { } impl Consensus { - pub fn new(_leader_name: Option) -> Self { + pub fn new(_leader_name: Option) -> Arc { todo!() } diff --git a/src/eth/rpc/rpc_context.rs b/src/eth/rpc/rpc_context.rs index 080bcde9e..8b7dd4001 100644 --- a/src/eth/rpc/rpc_context.rs +++ b/src/eth/rpc/rpc_context.rs @@ -5,6 +5,7 @@ use crate::eth::primitives::ChainId; use crate::eth::rpc::rpc_subscriptions::RpcSubscriptionsConnected; use crate::eth::storage::StratusStorage; use crate::eth::BlockMiner; +use crate::eth::Consensus; use crate::eth::Executor; pub struct RpcContext { @@ -19,6 +20,7 @@ pub struct RpcContext { pub executor: Arc, pub miner: Arc, pub storage: Arc, + pub consensus: Arc, pub subs: Arc, } diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index c512ba664..1cf3c710c 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -41,6 +41,7 @@ use crate::eth::rpc::RpcMiddleware; use crate::eth::rpc::RpcSubscriptions; use crate::eth::storage::StratusStorage; use crate::eth::BlockMiner; +use crate::eth::Consensus; use crate::eth::Executor; use crate::ext::ResultExt; use crate::infra::tracing::warn_task_cancellation; @@ -56,6 +57,7 @@ pub async fn serve_rpc( storage: Arc, executor: Arc, miner: Arc, + consensus: Arc, // config address: SocketAddr, chain_id: ChainId, @@ -80,6 +82,7 @@ pub async fn serve_rpc( executor, storage, miner, + consensus, // subscriptions subs: Arc::clone(&subs.connected), @@ -139,9 +142,6 @@ fn register_methods(mut module: RpcModule) -> anyhow::Result, _: Arc) -> anyhow::Result, _: Arc) -> anyhow::Result { - Ok(json!(true)) -} // Blockchain async fn net_version(_: Params<'_>, ctx: Arc) -> String { diff --git a/src/main.rs b/src/main.rs index c2f7678fc..0e0383b53 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use stratus::config::StratusConfig; use stratus::eth::rpc::serve_rpc; +use stratus::eth::Consensus; use stratus::GlobalServices; fn main() -> anyhow::Result<()> { @@ -16,9 +17,10 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { let external_relayer = if let Some(c) = config.external_relayer { Some(c.init().await) } else { None }; let miner = config.miner.init(Arc::clone(&storage), None, external_relayer).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; + let consensus = Consensus::new(None); // for now, we force None to initiate with the current node being the leader // start rpc server - serve_rpc(storage, executor, miner, config.address, config.executor.chain_id.into()).await?; + serve_rpc(storage, executor, miner, consensus, config.address, config.executor.chain_id.into()).await?; Ok(()) }