Skip to content

Commit

Permalink
pass consensus into rpc server (#951)
Browse files Browse the repository at this point in the history
* chore: display the block number correctly

* lint

* chore: pass consensus into rpc server

* lint

* fix contract

* lint
  • Loading branch information
renancloudwalk authored May 29, 2024
1 parent 2ecd9e6 commit 3d9d292
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 23 deletions.
17 changes: 14 additions & 3 deletions src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {

// init services
let storage = config.storage.init().await?;
let consensus = Arc::new(Consensus::new(config.clone().leader_node)); // in development, with no leader configured, the current node ends up being the leader
let consensus = Consensus::new(config.clone().leader_node); // in development, with no leader configured, the current node ends up being the leader
let (http_url, ws_url) = consensus.get_chain_url(config.clone());
let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref(), config.online.external_rpc_timeout).await?);

Expand All @@ -31,15 +31,26 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
.miner
.init_external_mode(Arc::clone(&storage), Some(Arc::clone(&consensus)), external_relayer)
.await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, Some(consensus)).await;
let executor = config
.executor
.init(Arc::clone(&storage), Arc::clone(&miner), relayer, Some(Arc::clone(&consensus)))
.await;

let rpc_storage = Arc::clone(&storage);
let rpc_executor = Arc::clone(&executor);
let rpc_miner = Arc::clone(&miner);

// run rpc and importer-online in parallel
let rpc_task = async move {
let res = serve_rpc(rpc_storage, rpc_executor, rpc_miner, config.address, config.executor.chain_id.into()).await;
let res = serve_rpc(
rpc_storage,
rpc_executor,
rpc_miner,
Arc::clone(&consensus),
config.address,
config.executor.chain_id.into(),
)
.await;
GlobalState::shutdown_from(TASK_NAME, "rpc server finished unexpectedly");
res
};
Expand Down
58 changes: 45 additions & 13 deletions src/eth/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(feature = "kubernetes")]
pub mod consensus_kube {
use std::env;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
Expand All @@ -10,6 +13,7 @@ pub mod consensus_kube {
use kube::Client;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self};
use tokio::sync::Mutex;
use tokio::time::sleep;
use tonic::transport::Channel;
use tonic::transport::Server;
Expand Down Expand Up @@ -49,14 +53,14 @@ pub mod consensus_kube {

pub struct Consensus {
pub sender: Sender<Block>,
leader_name: String, //XXX check the peers instead of using it
//XXX current_index: AtomicU64,
leader_name: String, //XXX check the peers instead of using it
last_arrived_block_number: AtomicU64, //TODO use a true index for both executions and blocks, currently we use something like Bully algorithm so block number is fine
}

impl Consensus {
//XXX for now we pick the leader name from the environment
// the correct is to have a leader election algorithm
pub fn new(leader_name: Option<String>) -> Self {
pub fn new(leader_name: Option<String>) -> Arc<Self> {
let Some(_node_name) = Self::current_node() else {
tracing::info!("No consensus module available, running in standalone mode");
return Self::new_stand_alone();
Expand Down Expand Up @@ -101,11 +105,19 @@ pub mod consensus_kube {
}
});

Self::initialize_server();
Self { leader_name, sender }
let last_arrived_block_number = AtomicU64::new(0); //TODO load from consensus storage

let consensus = Self {
leader_name,
sender,
last_arrived_block_number,
};
let consensus = Arc::new(consensus);
Self::initialize_server(Arc::clone(&consensus));
consensus
}

fn new_stand_alone() -> Self {
fn new_stand_alone() -> Arc<Self> {
let (sender, mut receiver) = mpsc::channel::<Block>(32);

tokio::spawn(async move {
Expand All @@ -114,18 +126,23 @@ pub mod consensus_kube {
}
});

Self {
let last_arrived_block_number = AtomicU64::new(0);

Arc::new(Self {
leader_name: "standalone".to_string(),
sender,
}
last_arrived_block_number,
})
}

fn initialize_server() {
fn initialize_server(consensus: Arc<Consensus>) {
tokio::spawn(async move {
tracing::info!("Starting append entry service at port 3777");
let addr = "0.0.0.0:3777".parse().unwrap();

let append_entry_service = AppendEntryServiceImpl;
let append_entry_service = AppendEntryServiceImpl {
consensus: Mutex::new(consensus),
};

Server::builder()
.add_service(AppendEntryServiceServer::new(append_entry_service))
Expand Down Expand Up @@ -283,7 +300,9 @@ pub mod consensus_kube {
}
}

pub struct AppendEntryServiceImpl;
pub struct AppendEntryServiceImpl {
consensus: Mutex<Arc<Consensus>>,
}

#[tonic::async_trait]
impl AppendEntryService for AppendEntryServiceImpl {
Expand Down Expand Up @@ -311,17 +330,30 @@ pub mod consensus_kube {

tracing::info!(number = header.number, "appending new block");

let consensus = self.consensus.lock().await;
let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst);

consensus.last_arrived_block_number.store(header.number, Ordering::SeqCst);

tracing::info!(
last_last_arrived_block_number = last_last_arrived_block_number,
new_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst),
"last arrived block number set",
);

Ok(Response::new(AppendBlockCommitResponse {
status: StatusCode::AppendSuccess as i32,
message: "Block Commit appended successfully".into(),
last_committed_block_number: 0,
last_committed_block_number: consensus.last_arrived_block_number.load(Ordering::SeqCst),
}))
}
}
}

#[cfg(not(feature = "kubernetes"))]
pub mod consensus_mock {
use std::sync::Arc;

use tokio::sync::mpsc::Sender;

use crate::config::RunWithImporterConfig;
Expand All @@ -332,7 +364,7 @@ pub mod consensus_mock {
}

impl Consensus {
pub fn new(_leader_name: Option<String>) -> Self {
pub fn new(_leader_name: Option<String>) -> Arc<Self> {
todo!()
}

Expand Down
2 changes: 2 additions & 0 deletions src/eth/rpc/rpc_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::eth::primitives::ChainId;
use crate::eth::rpc::rpc_subscriptions::RpcSubscriptionsConnected;
use crate::eth::storage::StratusStorage;
use crate::eth::BlockMiner;
use crate::eth::Consensus;
use crate::eth::Executor;

pub struct RpcContext {
Expand All @@ -19,6 +20,7 @@ pub struct RpcContext {
pub executor: Arc<Executor>,
pub miner: Arc<BlockMiner>,
pub storage: Arc<StratusStorage>,
pub consensus: Arc<Consensus>,
pub subs: Arc<RpcSubscriptionsConnected>,
}

Expand Down
9 changes: 3 additions & 6 deletions src/eth/rpc/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::eth::rpc::RpcMiddleware;
use crate::eth::rpc::RpcSubscriptions;
use crate::eth::storage::StratusStorage;
use crate::eth::BlockMiner;
use crate::eth::Consensus;
use crate::eth::Executor;
use crate::ext::ResultExt;
use crate::infra::tracing::warn_task_cancellation;
Expand All @@ -56,6 +57,7 @@ pub async fn serve_rpc(
storage: Arc<StratusStorage>,
executor: Arc<Executor>,
miner: Arc<BlockMiner>,
consensus: Arc<Consensus>,
// config
address: SocketAddr,
chain_id: ChainId,
Expand All @@ -80,6 +82,7 @@ pub async fn serve_rpc(
executor,
storage,
miner,
consensus,

// subscriptions
subs: Arc::clone(&subs.connected),
Expand Down Expand Up @@ -139,9 +142,6 @@ fn register_methods(mut module: RpcModule<RpcContext>) -> anyhow::Result<RpcModu
module.register_async_method("stratus_readiness", stratus_readiness)?;
module.register_async_method("stratus_liveness", stratus_liveness)?;

// consensus
module.register_async_method("stratus_appendEntries", stratus_append_entries)?;

// blockchain
module.register_async_method("net_version", net_version)?;
module.register_async_method("net_listening", net_listening)?;
Expand Down Expand Up @@ -236,9 +236,6 @@ async fn stratus_liveness(_: Params<'_>, _: Arc<RpcContext>) -> anyhow::Result<J
Ok(json!(true))
}

async fn stratus_append_entries(_: Params<'_>, _: Arc<RpcContext>) -> anyhow::Result<JsonValue, RpcError> {
Ok(json!(true))
}
// Blockchain

async fn net_version(_: Params<'_>, ctx: Arc<RpcContext>) -> String {
Expand Down
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use stratus::config::StratusConfig;
use stratus::eth::rpc::serve_rpc;
use stratus::eth::Consensus;
use stratus::GlobalServices;

fn main() -> anyhow::Result<()> {
Expand All @@ -16,9 +17,10 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> {
let external_relayer = if let Some(c) = config.external_relayer { Some(c.init().await) } else { None };
let miner = config.miner.init(Arc::clone(&storage), None, external_relayer).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await;
let consensus = Consensus::new(None); // for now, we force None to initiate with the current node being the leader

// start rpc server
serve_rpc(storage, executor, miner, config.address, config.executor.chain_id.into()).await?;
serve_rpc(storage, executor, miner, consensus, config.address, config.executor.chain_id.into()).await?;

Ok(())
}

0 comments on commit 3d9d292

Please sign in to comment.