Skip to content

Commit

Permalink
Add more traces to election (#1008)
Browse files Browse the repository at this point in the history
* chore: add more traces to election

* lint
  • Loading branch information
renancloudwalk authored Jun 6, 2024
1 parent abd5b16 commit 9dc3359
Showing 1 changed file with 46 additions and 24 deletions.
70 changes: 46 additions & 24 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,17 @@ impl PeerAddress {
}
}

use std::fmt;

impl fmt::Display for PeerAddress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{};{}", self.address, self.jsonrpc_port, self.grpc_port)
}
}

#[derive(Clone)]
struct Peer {
client: AppendEntryServiceClient<Channel>,
last_heartbeat: std::time::Instant, //TODO implement metrics for this
match_index: u64,
next_index: u64,
role: Role,
Expand All @@ -137,7 +144,6 @@ pub struct Consensus {
last_arrived_block_number: AtomicU64, //TODO use a true index for both executions and blocks, currently we use something like Bully algorithm so block number is fine
role: RwLock<Role>,
heartbeat_timeout: Duration,
election_timeout: Duration,
my_address: PeerAddress,
}

Expand All @@ -162,8 +168,7 @@ impl Consensus {
importer_config,
role: RwLock::new(Role::Follower),
heartbeat_timeout: Duration::from_millis(rand::thread_rng().gen_range(1500..1700)), // Adjust as needed
election_timeout: Duration::from_millis(rand::thread_rng().gen_range(1700..1900)), // Adjust as needed
my_address,
my_address: my_address.clone(),
};
let consensus = Arc::new(consensus);

Expand All @@ -172,6 +177,7 @@ impl Consensus {
Self::initialize_server(Arc::clone(&consensus));
Self::initialize_heartbeat_timer(Arc::clone(&consensus));

tracing::info!(my_address = %my_address, "consensus module initialized");
consensus
}

Expand All @@ -183,6 +189,17 @@ impl Consensus {
PeerAddress::new(format!("http://{}", my_ip), 3000, 3777) //FIXME TODO pick ports from config
}

/// Initializes the heartbeat timer for the consensus module.
///
/// The heartbeat timer is responsible for periodically checking if the node should start a new election.
/// This is essential in a distributed system to ensure that nodes remain in sync and a leader is always present.
///
/// # Details
///
/// - **Heartbeat Timeout (`heartbeat_timeout`)**: This is the duration after which the node checks for a leader's heartbeat.
/// If a leader's heartbeat is not received within this duration, the node considers that the leader may be down and an election may need to be started.
/// - **Election Timeout (`election_timeout`)**: This is the duration within which the node expects to either complete the election process or restart it.
/// If an election is not completed within this timeframe, it indicates potential issues, and the process may be retried.
fn initialize_heartbeat_timer(consensus: Arc<Consensus>) {
named_spawn("consensus::heartbeat_timer", async move {
loop {
Expand All @@ -192,20 +209,33 @@ impl Consensus {
{
let peers = consensus.peers.read().await;
for (_, (peer, _)) in peers.iter() {
if peer.last_heartbeat.elapsed() < consensus.election_timeout {
if peer.role == Role::Leader {
leader_found = true;
break;
}
}
}
if !leader_found {
tracing::info!("leader not found, starting election");
consensus.start_election().await;
}
}
}
});
}

/// Starts the election process for the consensus module.
///
/// This method is called when a node suspects that there is no active leader in the cluster.
/// The node increments its term and votes for itself, then sends RequestVote RPCs to all other nodes in the cluster.
/// If the node receives a majority of votes, it becomes the leader. Otherwise, it remains a follower and waits for the next election timeout.
///
/// # Details
///
/// - The method first increments the current term and votes for itself.
/// - It then sends out `RequestVote` RPCs to all known peers.
/// - If a majority of the peers grant their votes, the node transitions to the leader role.
/// - If not, it remains a follower and waits for the next election cycle.
async fn start_election(&self) {
let term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1;
self.current_term.store(term, Ordering::SeqCst);
Expand Down Expand Up @@ -425,32 +455,26 @@ impl Consensus {

let mut peers_lock = consensus.peers.write().await;

for (address, new_peer) in new_peers {
for (address, peer) in new_peers {
if peers_lock.contains_key(&address) {
tracing::info!("Peer {} already exists, skipping initialization", address.address);
tracing::info!("consensus module peer {} already exists, skipping initialization", address.address);
continue;
}

//XXX why?
let peer = Peer {
receiver: Arc::new(Mutex::new(consensus.broadcast_sender.subscribe())),
..new_peer
};

let consensus_clone = Arc::clone(&consensus);
let peer_clone = peer.clone();

let handle = named_spawn("consensus::propagate", async move {
Self::handle_peer_block_propagation(peer_clone, consensus_clone).await;
});

tracing::info!("Adding new peer: {}", address.address);
tracing::info!("consensus module adding new peer: {}", address.address);
peers_lock.insert(address, (peer, handle));
}

tracing::info!(
peers = ?peers_lock.keys().collect::<Vec<&PeerAddress>>(),
"Discovered peers",
peers = peers_lock.keys().map(|p| p.to_string()).collect::<Vec<String>>().join(", "),
"consensus module discovered peers",
);
}

Expand All @@ -466,18 +490,17 @@ impl Consensus {
Ok(client) => {
let peer = Peer {
client,
last_heartbeat: std::time::Instant::now(),
match_index: 0,
next_index: 0,
role: Role::Follower, // FIXME it won't be always follower, we need to check the leader or candidates
term: 0, // Replace with actual term
receiver: Arc::new(Mutex::new(consensus.broadcast_sender.subscribe())),
};
peers.push((peer_address, peer));
tracing::info!("Peer {} is available", address);
peers.push((peer_address.clone(), peer));
tracing::info!(peer = peer_address.to_string(), "peer is available");
}
Err(_) => {
tracing::warn!("Peer {} is not available", address);
tracing::warn!(peer = peer_address.to_string(), "peer is not available");
}
}
}
Expand Down Expand Up @@ -512,7 +535,6 @@ impl Consensus {

let peer = Peer {
client,
last_heartbeat: std::time::Instant::now(),
match_index: 0,
next_index: 0,
role: Role::Follower, //FIXME it wont be always follower, we need to check the leader or candidates
Expand Down Expand Up @@ -545,10 +567,10 @@ impl Consensus {
match consensus.append_block_to_peer(&mut peer, block).await {
Ok(_) => {
block_queue.remove(0); // Remove the successfully sent block from the queue
tracing::info!("Successfully appended block to peer: {:?}", peer.client);
tracing::info!("successfully appended block to peer: {:?}", peer.client);
}
Err(e) => {
tracing::warn!("Failed to append block to peer {:?}: {:?}", peer.client, e);
tracing::warn!("failed to append block to peer {:?}: {:?}", peer.client, e);
sleep(RETRY_DELAY).await;
}
}
Expand Down Expand Up @@ -577,7 +599,7 @@ impl Consensus {
#[cfg(feature = "metrics")]
metrics::inc_append_entries(start.elapsed());

tracing::info!(last_heartbeat = ?peer.last_heartbeat, match_index = peer.match_index, next_index = peer.next_index, role = ?peer.role, term = peer.term, "current follower state"); //TODO also move this to metrics
tracing::info!(match_index = peer.match_index, next_index = peer.next_index, role = ?peer.role, term = peer.term, "current follower state"); //TODO also move this to metrics

match StatusCode::try_from(response.status) {
Ok(StatusCode::AppendSuccess) => Ok(()),
Expand Down

0 comments on commit 9dc3359

Please sign in to comment.