Skip to content

Commit

Permalink
Candidate peers (#986)
Browse files Browse the repository at this point in the history
* chore: add periodic peer discovery

* chore: add candidate peers to facilitate other connections

* fix: merge
  • Loading branch information
renancloudwalk authored Jun 4, 2024
1 parent f3a9beb commit d13a1c2
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 31 deletions.
6 changes: 3 additions & 3 deletions config/production.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ kubernetes:
httpGet:
path: /readiness
port: 3000
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 5
initialDelaySeconds: 1
periodSeconds: 1
timeoutSeconds: 1
startupProbe:
httpGet:
path: /startup
Expand Down
2 changes: 1 addition & 1 deletion src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {

// init services
let storage = config.storage.init().await?;
let consensus = Consensus::new(Arc::clone(&storage), Some(config.clone())).await; // in development, with no leader configured, the current node ends up being the leader
let consensus = Consensus::new(Arc::clone(&storage), config.clone().candidate_peers.clone(), Some(config.clone())).await; // in development, with no leader configured, the current node ends up being the leader
let Some((http_url, ws_url)) = consensus.get_chain_url() else {
return Err(anyhow!("No chain url found"));
};
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub struct CommonConfig {
#[arg(long = "nocapture")]
pub nocapture: bool,

/// Direct access to peers via IP address, why will be included on data propagation and leader election.
#[arg(long = "candidate-peers", env = "CANDIDATE_PEERS", value_delimiter = ',')]
pub candidate_peers: Vec<String>,

/// Url to the sentry project
#[arg(long = "sentry-url", env = "SENTRY_URL")]
pub sentry_url: Option<String>,
Expand Down
85 changes: 60 additions & 25 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ pub struct Consensus {
importer_config: Option<RunWithImporterConfig>, //HACK this is used with sync online only
storage: Arc<StratusStorage>,
peers: Arc<RwLock<HashMap<PeerAddress, Peer>>>,
candidate_peers: Vec<String>,
leader_name: String, //XXX check the peers instead of using it
last_arrived_block_number: AtomicU64, //TODO use a true index for both executions and blocks, currently we use something like Bully algorithm so block number is fine
}

impl Consensus {
//XXX for now we pick the leader name from the environment
// the correct is to have a leader election algorithm
pub async fn new(storage: Arc<StratusStorage>, importer_config: Option<RunWithImporterConfig>) -> Arc<Self> {
pub async fn new(storage: Arc<StratusStorage>, candidate_peers: Vec<String>, importer_config: Option<RunWithImporterConfig>) -> Arc<Self> {
if Self::is_stand_alone() {
tracing::info!("No consensus module available, running in standalone mode");
return Self::new_stand_alone(storage, importer_config);
Expand All @@ -118,6 +119,7 @@ impl Consensus {
sender,
storage,
peers,
candidate_peers,
leader_name,
importer_config,
last_arrived_block_number,
Expand Down Expand Up @@ -149,6 +151,7 @@ impl Consensus {
storage,
sender,
peers,
candidate_peers: vec![],
last_arrived_block_number,
importer_config,
})
Expand Down Expand Up @@ -293,33 +296,65 @@ impl Consensus {

#[tracing::instrument(skip_all)]
pub async fn discover_peers(consensus: Arc<Consensus>) {
let mut new_peers: Vec<(PeerAddress, Peer)> = Vec::new();

#[cfg(feature = "kubernetes")]
if let Ok(new_peers) = Self::discover_peers_kubernetes().await {
let mut peers_lock = consensus.peers.write().await;
for (address, new_peer) in new_peers {
tracing::info!("Processing peer: {}", address.0);
peers_lock
.entry(address.clone())
.and_modify(|existing_peer| {
tracing::info!("Updating existing peer: {}", address.0);
existing_peer.client = new_peer.client.clone();
existing_peer.last_heartbeat = new_peer.last_heartbeat;
existing_peer.role = new_peer.role.clone();
existing_peer.term = new_peer.term;
// Preserve match_index and next_index of existing peers
})
.or_insert_with(|| {
tracing::info!("Adding new peer: {}", address.0);
new_peer
});
if let Ok(k8s_peers) = Self::discover_peers_kubernetes().await {
new_peers.extend(k8s_peers);
}

if let Ok(env_peers) = Self::discover_peers_env(&consensus.candidate_peers).await {
new_peers.extend(env_peers);
}

let mut peers_lock = consensus.peers.write().await;
for (address, new_peer) in new_peers {
tracing::info!("Processing peer: {}", address.0);
peers_lock
.entry(address.clone())
.and_modify(|existing_peer| {
tracing::info!("Updating existing peer: {}", address.0);
existing_peer.client = new_peer.client.clone();
existing_peer.last_heartbeat = new_peer.last_heartbeat;
existing_peer.role = new_peer.role.clone();
existing_peer.term = new_peer.term;
// Preserve match_index and next_index of existing peers
})
.or_insert_with(|| {
tracing::info!("Adding new peer: {}", address.0);
new_peer
});
}
tracing::info!(
peers = ?peers_lock.keys().collect::<Vec<&PeerAddress>>(),
"Discovered peers",
);
}

async fn discover_peers_env(addresses: &[String]) -> Result<Vec<(PeerAddress, Peer)>, anyhow::Error> {
let mut peers: Vec<(PeerAddress, Peer)> = Vec::new();

for address in addresses {
match AppendEntryServiceClient::connect(address.clone()).await {
Ok(client) => {
let peer = Peer {
client,
last_heartbeat: std::time::Instant::now(),
match_index: 0,
next_index: 0,
role: Role::Follower,
term: 0, // Replace with actual term
};
peers.push((PeerAddress(address.clone()), peer));
tracing::info!("Peer {} is available", address);
}
Err(_) => {
tracing::warn!("Peer {} is not available", address);
}
}
tracing::info!(
peers = ?peers_lock.keys().collect::<Vec<&PeerAddress>>(),
"Discovered peers",
);
} else {
tracing::error!("Failed to discover peers");
}

Ok(peers)
}

#[cfg(feature = "kubernetes")]
Expand Down
8 changes: 6 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ fn main() -> anyhow::Result<()> {
async fn run(config: StratusConfig) -> anyhow::Result<()> {
// init services
let storage = config.storage.init().await?;
let external_relayer = if let Some(c) = config.external_relayer { Some(c.init().await) } else { None };
let external_relayer = if let Some(c) = config.clone().external_relayer {
Some(c.init().await)
} else {
None
};
let miner = config.miner.init(Arc::clone(&storage), None, external_relayer).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)).await;
let consensus = Consensus::new(Arc::clone(&storage), None).await; // for now, we force None to initiate with the current node being the leader
let consensus = Consensus::new(Arc::clone(&storage), config.clone().candidate_peers.clone(), None).await; // for now, we force None to initiate with the current node being the leader

// start rpc server
serve_rpc(storage, executor, miner, consensus, config.address, config.executor.chain_id.into()).await?;
Expand Down

0 comments on commit d13a1c2

Please sign in to comment.