diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 61979de0a..30306df1e 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -1,13 +1,56 @@ pub mod raft; pub mod simple_consensus; +use std::sync::Arc; + use async_trait::async_trait; use crate::eth::primitives::Bytes; use crate::eth::primitives::Hash; +use crate::infra::metrics; +use crate::infra::BlockchainClient; #[async_trait] pub trait Consensus: Send + Sync { - async fn should_serve(&self) -> bool; - fn should_forward(&self) -> bool; - async fn forward(&self, transaction: Bytes) -> anyhow::Result; + /// Whether this node should serve requests. + async fn should_serve(&self) -> bool { + let lag = match self.lag().await { + Ok(lag) => lag, + Err(err) => { + tracing::error!(?err, "failed to get the lag between this node and the leader"); + return false; + } + }; + + let should_serve = lag <= 3; + + if !should_serve { + tracing::info!(?lag, "validator and replica are too far appart"); + } + + return should_serve; + } + + /// Forward a transaction + async fn forward(&self, transaction: Bytes) -> anyhow::Result { + #[cfg(feature = "metrics")] + let start = metrics::now(); + + let blockchain_client = self.get_chain()?; + + let result = blockchain_client.send_raw_transaction(transaction.into()).await?; + + #[cfg(feature = "metrics")] + metrics::inc_consensus_forward(start.elapsed()); + + let tx_hash = result.tx_hash; + let validator_url = &blockchain_client.http_url; + tracing::info!(%tx_hash, ?validator_url, "forwarded eth_sendRawTransaction to leader"); + + Ok(result.tx_hash) + } + + fn get_chain(&self) -> anyhow::Result<&Arc>; + + /// Get the lag between this node and the leader. + async fn lag(&self) -> anyhow::Result; } diff --git a/src/eth/consensus/raft/mod.rs b/src/eth/consensus/raft/mod.rs index c876b4b65..fc3f425ab 100644 --- a/src/eth/consensus/raft/mod.rs +++ b/src/eth/consensus/raft/mod.rs @@ -616,15 +616,20 @@ impl Consensus for Raft { self.should_serve().await } - fn should_forward(&self) -> bool { - self.should_forward() - } - async fn forward(&self, transaction: Bytes) -> anyhow::Result { let (tx_hash, url) = self.forward(transaction).await?; tracing::info!(%tx_hash, %url, "forwarded eth_sendRawTransaction to leader"); Ok(tx_hash) } + + fn get_chain(&self) -> anyhow::Result<&Arc> { + todo!(); + } + + /// Get the lag between this node and the leader. + async fn lag(&self) -> anyhow::Result { + todo!(); + } } #[cfg(test)] diff --git a/src/eth/consensus/simple_consensus/mod.rs b/src/eth/consensus/simple_consensus/mod.rs index 45b9aaa01..87834e57f 100644 --- a/src/eth/consensus/simple_consensus/mod.rs +++ b/src/eth/consensus/simple_consensus/mod.rs @@ -1,15 +1,10 @@ use std::sync::Arc; -use anyhow::anyhow; use async_trait::async_trait; use super::Consensus; -use crate::eth::primitives::Bytes; -use crate::eth::primitives::Hash; use crate::eth::storage::StratusStorage; -use crate::infra::metrics; use crate::infra::BlockchainClient; -use crate::log_and_err; pub struct SimpleConsensus { storage: Arc, @@ -25,52 +20,21 @@ impl SimpleConsensus { #[async_trait] impl Consensus for SimpleConsensus { - fn should_forward(&self) -> bool { - self.blockchain_client.is_some() - } - - async fn should_serve(&self) -> bool { + async fn lag(&self) -> anyhow::Result { let Some(blockchain_client) = &self.blockchain_client else { - return true; - }; - - //gather the latest block number, check how far behind it is from current storage block - //if its greater than 3 blocks of distance, it should not be served - let Ok(validator_block_number) = blockchain_client.fetch_block_number().await else { - tracing::error!("unable to fetch latest block number"); - return false; - }; - - let Ok(current_block_number) = self.storage.read_mined_block_number() else { - tracing::error!("unable to fetch current block number"); - return false; + return Err(anyhow::anyhow!("blockchain client not set")); }; - let should_serve = (validator_block_number.as_u64() - 3) <= current_block_number.as_u64(); - if !should_serve { - tracing::info!(?validator_block_number, ?current_block_number, "validator and replica are too far appart"); - } + let validator_block_number = blockchain_client.fetch_block_number().await?; + let current_block_number = self.storage.read_mined_block_number()?; - return should_serve; + Ok(validator_block_number.as_u64() - current_block_number.as_u64()) } - async fn forward(&self, transaction: Bytes) -> anyhow::Result { - #[cfg(feature = "metrics")] - let start = metrics::now(); - - let Some(blockchain_client) = &self.blockchain_client else { - return log_and_err!("SimpleConsensus.forward was called but no blockchain client is set"); + fn get_chain(&self) -> anyhow::Result<&Arc> { + let Some(chain) = &self.blockchain_client else { + return Err(anyhow::anyhow!("blockchain client not set")); }; - - let result = blockchain_client.send_raw_transaction(transaction.into()).await?; - - #[cfg(feature = "metrics")] - metrics::inc_consensus_forward(start.elapsed()); - - let tx_hash = result.tx_hash; - let validator_url = &blockchain_client.http_url; - tracing::info!(%tx_hash, ?validator_url, "forwarded eth_sendRawTransaction to leader"); - - Ok(result.tx_hash) + Ok(chain) } } diff --git a/src/eth/importer/importer.rs b/src/eth/importer/importer.rs index 30176b431..76c6bd6e0 100644 --- a/src/eth/importer/importer.rs +++ b/src/eth/importer/importer.rs @@ -4,6 +4,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use futures::try_join; use futures::StreamExt; use serde::Deserialize; @@ -20,6 +21,7 @@ use crate::eth::primitives::ExternalReceipt; use crate::eth::primitives::ExternalReceipts; use crate::eth::primitives::Hash; use crate::eth::storage::StratusStorage; +use crate::eth::Consensus; use crate::ext::spawn_named; use crate::ext::traced_sleep; use crate::ext::DisplayExt; @@ -435,3 +437,14 @@ async fn fetch_receipt(chain: Arc, block_number: BlockNumber, } } } + +#[async_trait] +impl Consensus for Importer { + async fn lag(&self) -> anyhow::Result { + Ok(EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::SeqCst) - self.storage.read_mined_block_number()?.as_u64()) + } + + fn get_chain(&self) -> anyhow::Result<&Arc> { + Ok(&self.chain) + } +} diff --git a/src/eth/rpc/rpc_context.rs b/src/eth/rpc/rpc_context.rs index 1e1376991..efe31327a 100644 --- a/src/eth/rpc/rpc_context.rs +++ b/src/eth/rpc/rpc_context.rs @@ -25,7 +25,7 @@ pub struct RpcContext { pub executor: Arc, pub miner: Arc, pub storage: Arc, - pub consensus: Arc, + pub consensus: Option>, pub rpc_server: RpcServerConfig, pub subs: Arc, } diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index c30b79d7a..21f79aee3 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -69,7 +69,7 @@ pub async fn serve_rpc( storage: Arc, executor: Arc, miner: Arc, - consensus: Arc, + consensus: Option>, // config app_config: impl serde::Serialize, @@ -245,7 +245,12 @@ async fn stratus_health(_params: Params<'_>, context: Arc, _extensio return Err(StratusError::StratusShutdown); } - let should_serve = context.consensus.should_serve().await; + let should_serve = if let Some(consensus) = &context.consensus { + consensus.should_serve().await + } else { + true + }; + if not(should_serve) { tracing::warn!("readiness check failed because consensus is not ready"); metrics::set_consensus_is_ready(0_u64); @@ -618,9 +623,9 @@ fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc, ext: Exten } // forward transaction to the validator node - if ctx.consensus.should_forward() { + if let Some(consensus) = &ctx.consensus { tracing::info!(%tx_hash, "forwarding eth_sendRawTransaction to leader"); - return match Handle::current().block_on(ctx.consensus.forward(data)) { + return match Handle::current().block_on(consensus.forward(data)) { Ok(hash) => Ok(hex_data(hash)), Err(e) => { tracing::error!(reason = ?e, %tx_hash, "failed to forward eth_sendRawTransaction to leader"); diff --git a/src/main.rs b/src/main.rs index 927c94770..99b37b643 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use stratus::config::StratusConfig; -use stratus::eth::consensus::simple_consensus::SimpleConsensus; use stratus::eth::consensus::Consensus; use stratus::eth::rpc::serve_rpc; use stratus::infra::BlockchainClient; @@ -27,7 +26,7 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); // Init importer - let chain = if config.follower { + let consensus: Option> = if config.follower { let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?; let chain = Arc::new( BlockchainClient::new_http_ws( @@ -37,15 +36,11 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { ) .await?, ); - importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?; - Some(chain) + Some(importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?) } else { None }; - // Init consensus - let consensus: Arc = Arc::new(SimpleConsensus::new(Arc::clone(&storage), chain.clone())); - // Init RPC server serve_rpc( // Services