Skip to content

Commit

Permalink
Merge branch 'main' into improve_relayer_logging_and_tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Jun 10, 2024
2 parents 5b4f3bb + a12e271 commit 2c7d606
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub struct Consensus {
direct_peers: Vec<String>,
voted_for: Mutex<Option<PeerAddress>>, //essential to ensure that a server only votes once per term
current_term: AtomicU64,
last_arrived_block_number: AtomicU64, //TODO use a true index for both executions and blocks, currently we use something like Bully algorithm so block number is fine
last_arrived_block_number: AtomicU64,
role: RwLock<Role>,
heartbeat_timeout: Duration,
my_address: PeerAddress,
Expand All @@ -174,7 +174,7 @@ impl Consensus {
let (sender, receiver) = mpsc::channel::<Block>(32);
let receiver = Arc::new(Mutex::new(receiver));
let (broadcast_sender, _) = broadcast::channel(32);
let last_arrived_block_number = AtomicU64::new(storage.read_mined_block_number().await.unwrap_or(BlockNumber::from(0)).into());
let last_arrived_block_number = AtomicU64::new(std::u64::MAX); //we use the max value to ensure that only after receiving the first appendEntry we can start the consensus
let peers = Arc::new(RwLock::new(HashMap::new()));
let my_address = Self::discover_my_address(jsonrpc_address.port(), grpc_address.port());

Expand Down Expand Up @@ -324,6 +324,8 @@ impl Consensus {
async fn become_leader(&self) {
*self.role.write().await = Role::Leader;

self.last_arrived_block_number.store(std::u64::MAX, Ordering::SeqCst); //as leader, we don't have a last block number

//TODO XXX // Initialize leader-specific tasks such as sending appendEntries
//TODO XXX self.send_append_entries().await;
}
Expand Down Expand Up @@ -448,7 +450,17 @@ impl Consensus {

//TODO for now the block number is the index, but it should be a separate index wiht the execution AND the block
pub async fn should_serve(&self) -> bool {
if self.is_leader().await {
return true;
}

let last_arrived_block_number = self.last_arrived_block_number.load(Ordering::SeqCst);

if last_arrived_block_number == std::u64::MAX {
tracing::warn!("no appendEntry has been received yet");
return false;
}

let storage_block_number: u64 = self.storage.read_mined_block_number().await.unwrap_or(BlockNumber::from(0)).into();

tracing::info!(
Expand All @@ -457,10 +469,6 @@ impl Consensus {
storage_block_number
);

if self.peers.read().await.len() == 0 {
return self.is_leader().await;
}

(last_arrived_block_number - 2) <= storage_block_number
}

Expand Down Expand Up @@ -811,7 +819,7 @@ impl AppendEntryService for AppendEntryServiceImpl {
);
return Ok(Response::new(RequestVoteResponse {
term: current_term,
vote_granted: false, //XXX check how we are dealing with vote_granted false
vote_granted: false,
}));
}

Expand All @@ -822,7 +830,6 @@ impl AppendEntryService for AppendEntryServiceImpl {
}

let mut voted_for = consensus.voted_for.lock().await;
//XXX for some reason candidate_id is going wrong
let candidate_address = PeerAddress::from_string(request.candidate_id.clone()).unwrap(); //XXX FIXME replace with rpc error
if voted_for.is_none() {
*voted_for = Some(candidate_address.clone());
Expand Down

0 comments on commit 2c7d606

Please sign in to comment.