Skip to content

Commit

Permalink
enha: implement Consensus trait for Importer (#1608)
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Aug 6, 2024
1 parent 8b79e7e commit f19c442
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 64 deletions.
49 changes: 46 additions & 3 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,56 @@
pub mod raft;
pub mod simple_consensus;
use std::sync::Arc;

use async_trait::async_trait;

use crate::eth::primitives::Bytes;
use crate::eth::primitives::Hash;
use crate::infra::metrics;
use crate::infra::BlockchainClient;

#[async_trait]
pub trait Consensus: Send + Sync {
async fn should_serve(&self) -> bool;
fn should_forward(&self) -> bool;
async fn forward(&self, transaction: Bytes) -> anyhow::Result<Hash>;
/// Whether this node should serve requests.
async fn should_serve(&self) -> bool {
let lag = match self.lag().await {
Ok(lag) => lag,
Err(err) => {
tracing::error!(?err, "failed to get the lag between this node and the leader");
return false;
}
};

let should_serve = lag <= 3;

if !should_serve {
tracing::info!(?lag, "validator and replica are too far appart");
}

return should_serve;
}

/// Forward a transaction
async fn forward(&self, transaction: Bytes) -> anyhow::Result<Hash> {
#[cfg(feature = "metrics")]
let start = metrics::now();

let blockchain_client = self.get_chain()?;

let result = blockchain_client.send_raw_transaction(transaction.into()).await?;

#[cfg(feature = "metrics")]
metrics::inc_consensus_forward(start.elapsed());

let tx_hash = result.tx_hash;
let validator_url = &blockchain_client.http_url;
tracing::info!(%tx_hash, ?validator_url, "forwarded eth_sendRawTransaction to leader");

Ok(result.tx_hash)
}

fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>>;

/// Get the lag between this node and the leader.
async fn lag(&self) -> anyhow::Result<u64>;
}
13 changes: 9 additions & 4 deletions src/eth/consensus/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,15 +616,20 @@ impl Consensus for Raft {
self.should_serve().await
}

fn should_forward(&self) -> bool {
self.should_forward()
}

async fn forward(&self, transaction: Bytes) -> anyhow::Result<Hash> {
let (tx_hash, url) = self.forward(transaction).await?;
tracing::info!(%tx_hash, %url, "forwarded eth_sendRawTransaction to leader");
Ok(tx_hash)
}

fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>> {
todo!();
}

/// Get the lag between this node and the leader.
async fn lag(&self) -> anyhow::Result<u64> {
todo!();
}
}

#[cfg(test)]
Expand Down
54 changes: 9 additions & 45 deletions src/eth/consensus/simple_consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;

use super::Consensus;
use crate::eth::primitives::Bytes;
use crate::eth::primitives::Hash;
use crate::eth::storage::StratusStorage;
use crate::infra::metrics;
use crate::infra::BlockchainClient;
use crate::log_and_err;

pub struct SimpleConsensus {
storage: Arc<StratusStorage>,
Expand All @@ -25,52 +20,21 @@ impl SimpleConsensus {

#[async_trait]
impl Consensus for SimpleConsensus {
fn should_forward(&self) -> bool {
self.blockchain_client.is_some()
}

async fn should_serve(&self) -> bool {
async fn lag(&self) -> anyhow::Result<u64> {
let Some(blockchain_client) = &self.blockchain_client else {
return true;
};

//gather the latest block number, check how far behind it is from current storage block
//if its greater than 3 blocks of distance, it should not be served
let Ok(validator_block_number) = blockchain_client.fetch_block_number().await else {
tracing::error!("unable to fetch latest block number");
return false;
};

let Ok(current_block_number) = self.storage.read_mined_block_number() else {
tracing::error!("unable to fetch current block number");
return false;
return Err(anyhow::anyhow!("blockchain client not set"));
};
let should_serve = (validator_block_number.as_u64() - 3) <= current_block_number.as_u64();

if !should_serve {
tracing::info!(?validator_block_number, ?current_block_number, "validator and replica are too far appart");
}
let validator_block_number = blockchain_client.fetch_block_number().await?;
let current_block_number = self.storage.read_mined_block_number()?;

return should_serve;
Ok(validator_block_number.as_u64() - current_block_number.as_u64())
}

async fn forward(&self, transaction: Bytes) -> anyhow::Result<Hash> {
#[cfg(feature = "metrics")]
let start = metrics::now();

let Some(blockchain_client) = &self.blockchain_client else {
return log_and_err!("SimpleConsensus.forward was called but no blockchain client is set");
fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>> {
let Some(chain) = &self.blockchain_client else {
return Err(anyhow::anyhow!("blockchain client not set"));
};

let result = blockchain_client.send_raw_transaction(transaction.into()).await?;

#[cfg(feature = "metrics")]
metrics::inc_consensus_forward(start.elapsed());

let tx_hash = result.tx_hash;
let validator_url = &blockchain_client.http_url;
tracing::info!(%tx_hash, ?validator_url, "forwarded eth_sendRawTransaction to leader");

Ok(result.tx_hash)
Ok(chain)
}
}
13 changes: 13 additions & 0 deletions src/eth/importer/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use futures::try_join;
use futures::StreamExt;
use serde::Deserialize;
Expand All @@ -20,6 +21,7 @@ use crate::eth::primitives::ExternalReceipt;
use crate::eth::primitives::ExternalReceipts;
use crate::eth::primitives::Hash;
use crate::eth::storage::StratusStorage;
use crate::eth::Consensus;
use crate::ext::spawn_named;
use crate::ext::traced_sleep;
use crate::ext::DisplayExt;
Expand Down Expand Up @@ -435,3 +437,14 @@ async fn fetch_receipt(chain: Arc<BlockchainClient>, block_number: BlockNumber,
}
}
}

#[async_trait]
impl Consensus for Importer {
async fn lag(&self) -> anyhow::Result<u64> {
Ok(EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::SeqCst) - self.storage.read_mined_block_number()?.as_u64())
}

fn get_chain(&self) -> anyhow::Result<&Arc<BlockchainClient>> {
Ok(&self.chain)
}
}
2 changes: 1 addition & 1 deletion src/eth/rpc/rpc_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct RpcContext {
pub executor: Arc<Executor>,
pub miner: Arc<Miner>,
pub storage: Arc<StratusStorage>,
pub consensus: Arc<dyn Consensus>,
pub consensus: Option<Arc<dyn Consensus>>,
pub rpc_server: RpcServerConfig,
pub subs: Arc<RpcSubscriptionsConnected>,
}
Expand Down
13 changes: 9 additions & 4 deletions src/eth/rpc/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn serve_rpc(
storage: Arc<StratusStorage>,
executor: Arc<Executor>,
miner: Arc<Miner>,
consensus: Arc<dyn Consensus>,
consensus: Option<Arc<dyn Consensus>>,

// config
app_config: impl serde::Serialize,
Expand Down Expand Up @@ -245,7 +245,12 @@ async fn stratus_health(_params: Params<'_>, context: Arc<RpcContext>, _extensio
return Err(StratusError::StratusShutdown);
}

let should_serve = context.consensus.should_serve().await;
let should_serve = if let Some(consensus) = &context.consensus {
consensus.should_serve().await
} else {
true
};

if not(should_serve) {
tracing::warn!("readiness check failed because consensus is not ready");
metrics::set_consensus_is_ready(0_u64);
Expand Down Expand Up @@ -618,9 +623,9 @@ fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc<RpcContext>, ext: Exten
}

// forward transaction to the validator node
if ctx.consensus.should_forward() {
if let Some(consensus) = &ctx.consensus {
tracing::info!(%tx_hash, "forwarding eth_sendRawTransaction to leader");
return match Handle::current().block_on(ctx.consensus.forward(data)) {
return match Handle::current().block_on(consensus.forward(data)) {
Ok(hash) => Ok(hex_data(hash)),
Err(e) => {
tracing::error!(reason = ?e, %tx_hash, "failed to forward eth_sendRawTransaction to leader");
Expand Down
9 changes: 2 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use stratus::config::StratusConfig;
use stratus::eth::consensus::simple_consensus::SimpleConsensus;
use stratus::eth::consensus::Consensus;
use stratus::eth::rpc::serve_rpc;
use stratus::infra::BlockchainClient;
Expand All @@ -27,7 +26,7 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> {
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner));

// Init importer
let chain = if config.follower {
let consensus: Option<Arc<dyn Consensus>> = if config.follower {
let importer_config = config.importer.as_ref().ok_or(anyhow::anyhow!("importer config is not set"))?;
let chain = Arc::new(
BlockchainClient::new_http_ws(
Expand All @@ -37,15 +36,11 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> {
)
.await?,
);
importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?;
Some(chain)
Some(importer_config.init(Arc::clone(&executor), Arc::clone(&miner), Arc::clone(&storage), Arc::clone(&chain))?)
} else {
None
};

// Init consensus
let consensus: Arc<dyn Consensus> = Arc::new(SimpleConsensus::new(Arc::clone(&storage), chain.clone()));

// Init RPC server
serve_rpc(
// Services
Expand Down

0 comments on commit f19c442

Please sign in to comment.