From 9dc3359b3607f0f4acddee964c4cc88a6ffebccb Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Wed, 5 Jun 2024 21:43:27 -0300 Subject: [PATCH] Add more traces to election (#1008) * chore: add more traces to election * lint --- src/eth/consensus/mod.rs | 70 ++++++++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 14050ca54..197e8d297 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -112,10 +112,17 @@ impl PeerAddress { } } +use std::fmt; + +impl fmt::Display for PeerAddress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}:{};{}", self.address, self.jsonrpc_port, self.grpc_port) + } +} + #[derive(Clone)] struct Peer { client: AppendEntryServiceClient, - last_heartbeat: std::time::Instant, //TODO implement metrics for this match_index: u64, next_index: u64, role: Role, @@ -137,7 +144,6 @@ pub struct Consensus { last_arrived_block_number: AtomicU64, //TODO use a true index for both executions and blocks, currently we use something like Bully algorithm so block number is fine role: RwLock, heartbeat_timeout: Duration, - election_timeout: Duration, my_address: PeerAddress, } @@ -162,8 +168,7 @@ impl Consensus { importer_config, role: RwLock::new(Role::Follower), heartbeat_timeout: Duration::from_millis(rand::thread_rng().gen_range(1500..1700)), // Adjust as needed - election_timeout: Duration::from_millis(rand::thread_rng().gen_range(1700..1900)), // Adjust as needed - my_address, + my_address: my_address.clone(), }; let consensus = Arc::new(consensus); @@ -172,6 +177,7 @@ impl Consensus { Self::initialize_server(Arc::clone(&consensus)); Self::initialize_heartbeat_timer(Arc::clone(&consensus)); + tracing::info!(my_address = %my_address, "consensus module initialized"); consensus } @@ -183,6 +189,17 @@ impl Consensus { PeerAddress::new(format!("http://{}", my_ip), 3000, 3777) //FIXME TODO pick ports from config } + /// Initializes the heartbeat timer for the consensus module. + /// + /// The heartbeat timer is responsible for periodically checking if the node should start a new election. + /// This is essential in a distributed system to ensure that nodes remain in sync and a leader is always present. + /// + /// # Details + /// + /// - **Heartbeat Timeout (`heartbeat_timeout`)**: This is the duration after which the node checks for a leader's heartbeat. + /// If a leader's heartbeat is not received within this duration, the node considers that the leader may be down and an election may need to be started. + /// - **Election Timeout (`election_timeout`)**: This is the duration within which the node expects to either complete the election process or restart it. + /// If an election is not completed within this timeframe, it indicates potential issues, and the process may be retried. fn initialize_heartbeat_timer(consensus: Arc) { named_spawn("consensus::heartbeat_timer", async move { loop { @@ -192,13 +209,14 @@ impl Consensus { { let peers = consensus.peers.read().await; for (_, (peer, _)) in peers.iter() { - if peer.last_heartbeat.elapsed() < consensus.election_timeout { + if peer.role == Role::Leader { leader_found = true; break; } } } if !leader_found { + tracing::info!("leader not found, starting election"); consensus.start_election().await; } } @@ -206,6 +224,18 @@ impl Consensus { }); } + /// Starts the election process for the consensus module. + /// + /// This method is called when a node suspects that there is no active leader in the cluster. + /// The node increments its term and votes for itself, then sends RequestVote RPCs to all other nodes in the cluster. + /// If the node receives a majority of votes, it becomes the leader. Otherwise, it remains a follower and waits for the next election timeout. + /// + /// # Details + /// + /// - The method first increments the current term and votes for itself. + /// - It then sends out `RequestVote` RPCs to all known peers. + /// - If a majority of the peers grant their votes, the node transitions to the leader role. + /// - If not, it remains a follower and waits for the next election cycle. async fn start_election(&self) { let term = self.current_term.fetch_add(1, Ordering::SeqCst) + 1; self.current_term.store(term, Ordering::SeqCst); @@ -425,18 +455,12 @@ impl Consensus { let mut peers_lock = consensus.peers.write().await; - for (address, new_peer) in new_peers { + for (address, peer) in new_peers { if peers_lock.contains_key(&address) { - tracing::info!("Peer {} already exists, skipping initialization", address.address); + tracing::info!("consensus module peer {} already exists, skipping initialization", address.address); continue; } - //XXX why? - let peer = Peer { - receiver: Arc::new(Mutex::new(consensus.broadcast_sender.subscribe())), - ..new_peer - }; - let consensus_clone = Arc::clone(&consensus); let peer_clone = peer.clone(); @@ -444,13 +468,13 @@ impl Consensus { Self::handle_peer_block_propagation(peer_clone, consensus_clone).await; }); - tracing::info!("Adding new peer: {}", address.address); + tracing::info!("consensus module adding new peer: {}", address.address); peers_lock.insert(address, (peer, handle)); } tracing::info!( - peers = ?peers_lock.keys().collect::>(), - "Discovered peers", + peers = peers_lock.keys().map(|p| p.to_string()).collect::>().join(", "), + "consensus module discovered peers", ); } @@ -466,18 +490,17 @@ impl Consensus { Ok(client) => { let peer = Peer { client, - last_heartbeat: std::time::Instant::now(), match_index: 0, next_index: 0, role: Role::Follower, // FIXME it won't be always follower, we need to check the leader or candidates term: 0, // Replace with actual term receiver: Arc::new(Mutex::new(consensus.broadcast_sender.subscribe())), }; - peers.push((peer_address, peer)); - tracing::info!("Peer {} is available", address); + peers.push((peer_address.clone(), peer)); + tracing::info!(peer = peer_address.to_string(), "peer is available"); } Err(_) => { - tracing::warn!("Peer {} is not available", address); + tracing::warn!(peer = peer_address.to_string(), "peer is not available"); } } } @@ -512,7 +535,6 @@ impl Consensus { let peer = Peer { client, - last_heartbeat: std::time::Instant::now(), match_index: 0, next_index: 0, role: Role::Follower, //FIXME it wont be always follower, we need to check the leader or candidates @@ -545,10 +567,10 @@ impl Consensus { match consensus.append_block_to_peer(&mut peer, block).await { Ok(_) => { block_queue.remove(0); // Remove the successfully sent block from the queue - tracing::info!("Successfully appended block to peer: {:?}", peer.client); + tracing::info!("successfully appended block to peer: {:?}", peer.client); } Err(e) => { - tracing::warn!("Failed to append block to peer {:?}: {:?}", peer.client, e); + tracing::warn!("failed to append block to peer {:?}: {:?}", peer.client, e); sleep(RETRY_DELAY).await; } } @@ -577,7 +599,7 @@ impl Consensus { #[cfg(feature = "metrics")] metrics::inc_append_entries(start.elapsed()); - tracing::info!(last_heartbeat = ?peer.last_heartbeat, match_index = peer.match_index, next_index = peer.next_index, role = ?peer.role, term = peer.term, "current follower state"); //TODO also move this to metrics + tracing::info!(match_index = peer.match_index, next_index = peer.next_index, role = ?peer.role, term = peer.term, "current follower state"); //TODO also move this to metrics match StatusCode::try_from(response.status) { Ok(StatusCode::AppendSuccess) => Ok(()),