diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index cce96d868..8fdb2e81d 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -644,6 +644,7 @@ pub async fn handle_p2p_task( tokio::spawn({ let p2p = p2p.clone(); async move { + let mut last_replied_to_heartbeat = 0; loop { let Some(mut msg) = recv.recv().await else { // Channel closure happens when the tributary retires @@ -666,6 +667,12 @@ pub async fn handle_p2p_task( // them? P2pMessageKind::Heartbeat(msg_genesis) => { assert_eq!(msg_genesis, genesis); + + let current_time_unit = heartbeat_time_unit::(); + if current_time_unit.saturating_sub(last_replied_to_heartbeat) < 10 { + continue; + } + if msg.msg.len() != 40 { log::error!("validator sent invalid heartbeat"); continue; @@ -674,50 +681,54 @@ pub async fn handle_p2p_task( let msg_time_unit = u64::from_le_bytes(msg.msg[32 .. 40].try_into().expect( "length-checked heartbeat message didn't have 8 bytes for the u64", )); - if heartbeat_time_unit::().saturating_sub(msg_time_unit) > 1 { + if current_time_unit.saturating_sub(msg_time_unit) > 1 { continue; } - let p2p = p2p.clone(); - let spec = tributary.spec.clone(); + // This is the network's last replied to, not ours specifically + last_replied_to_heartbeat = current_time_unit; + let reader = tributary.tributary.reader(); - // Spawn a dedicated task as this may require loading large amounts of data - // from disk and take a notable amount of time - tokio::spawn(async move { - // Have sqrt(n) nodes reply with the blocks - #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] - let mut responders = f32::from(spec.n(&[])).sqrt().floor() as u64; - // Try to have at least 3 responders - if responders < 3 { - responders = spec.n(&[]).min(3).into(); - } - // Decide which nodes will respond by using the latest block's hash as a - // mutually agreed upon entropy source - // This isn't a secure source of entropy, yet it's fine for this - let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap()); - // If n = 10, responders = 3, we want `start` to be 0 ..= 7 - // (so the highest is 7, 8, 9) - // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 - let start = - usize::try_from(entropy % (u64::from(spec.n(&[]) + 1) - responders)) - .unwrap(); - let mut selected = false; - for validator in &spec.validators() - [start .. (start + usize::try_from(responders).unwrap())] - { - if our_key == validator.0 { - selected = true; - break; - } - } - if !selected { - log::debug!("received heartbeat and not selected to respond"); - return; + // Have sqrt(n) nodes reply with the blocks + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let mut responders = f32::from(tributary.spec.n(&[])).sqrt().floor() as u64; + // Try to have at least 3 responders + if responders < 3 { + responders = tributary.spec.n(&[]).min(3).into(); + } + + // Decide which nodes will respond by using the latest block's hash as a + // mutually agreed upon entropy source + // This isn't a secure source of entropy, yet it's fine for this + let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap()); + // If n = 10, responders = 3, we want `start` to be 0 ..= 7 + // (so the highest is 7, 8, 9) + // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 + let start = usize::try_from( + entropy % (u64::from(tributary.spec.n(&[]) + 1) - responders), + ) + .unwrap(); + let mut selected = false; + for validator in &tributary.spec.validators() + [start .. (start + usize::try_from(responders).unwrap())] + { + if our_key == validator.0 { + selected = true; + break; } + } + if !selected { + log::debug!("received heartbeat and not selected to respond"); + return; + } - log::debug!("received heartbeat and selected to respond"); + log::debug!("received heartbeat and selected to respond"); + let p2p = p2p.clone(); + // Spawn a dedicated task as this may require loading large amounts of data + // from disk and take a notable amount of time + tokio::spawn(async move { // Have the selected nodes respond // TODO: Spawn a dedicated topic for this heartbeat response? let mut latest = msg.msg[.. 32].try_into().unwrap(); @@ -732,7 +743,7 @@ pub async fn handle_p2p_task( res.extend(reader.commit(&next).unwrap()); // Also include the timestamp used within the Heartbeat res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await; + p2p.send(msg.sender, P2pMessageKind::Block(genesis), res).await; } } });