diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 65e6ba47f..0a75c16d4 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -237,6 +237,10 @@ pub(crate) async fn scan_tributaries< let reader = tributary.reader(); let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); loop { + // Obtain the next block notification now to prevent obtaining it immediately after + // the next block occurs + let next_block_notification = tributary.next_block_notification().await; + tributary::scanner::handle_new_blocks::<_, _, _, _, _, _, P>( &mut tributary_db, &key, @@ -276,10 +280,10 @@ pub(crate) async fn scan_tributaries< ) .await; - // Sleep for half the block time - // TODO: Define a notification system for when a new block occurs - sleep(Duration::from_secs((Tributary::::block_time() / 2).into())) - .await; + next_block_notification + .await + .map_err(|_| "") + .expect("tributary dropped its notifications?"); } } }); @@ -361,96 +365,100 @@ pub async fn handle_p2p( tokio::spawn({ let p2p = p2p.clone(); async move { - let mut msg: Message

= recv.recv().await.unwrap(); - match msg.kind { - P2pMessageKind::KeepAlive => {} - - P2pMessageKind::Tributary(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - log::trace!("handling message for tributary {:?}", tributary.spec.set()); - if tributary.tributary.handle_message(&msg.msg).await { - P2p::broadcast(&p2p, msg.kind, msg.msg).await; + loop { + let mut msg: Message

= recv.recv().await.unwrap(); + match msg.kind { + P2pMessageKind::KeepAlive => {} + + P2pMessageKind::Tributary(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + log::trace!("handling message for tributary {:?}", tributary.spec.set()); + if tributary.tributary.handle_message(&msg.msg).await { + P2p::broadcast(&p2p, msg.kind, msg.msg).await; + } } - } - // TODO2: Rate limit this per timestamp - // And/or slash on Heartbeat which justifies a response, since the node obviously was - // offline and we must now use our bandwidth to compensate for them? - // TODO: Dedicated task for heartbeats - P2pMessageKind::Heartbeat(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - if msg.msg.len() != 40 { - log::error!("validator sent invalid heartbeat"); - return; - } + // TODO2: Rate limit this per timestamp + // And/or slash on Heartbeat which justifies a response, since the node obviously + // was offline and we must now use our bandwidth to compensate for them? + // TODO: Dedicated task for heartbeats + P2pMessageKind::Heartbeat(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + if msg.msg.len() != 40 { + log::error!("validator sent invalid heartbeat"); + continue; + } - let tributary_read = &tributary.tributary; + let tributary_read = &tributary.tributary; - /* - // Have sqrt(n) nodes reply with the blocks - let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; - // Try to have at least 3 responders - if responders < 3 { - responders = tributary.spec.n().min(3).into(); - } - */ - - // Have up to three nodes respond - let responders = u64::from(tributary.spec.n().min(3)); - - // Decide which nodes will respond by using the latest block's hash as a mutually - // agreed upon entropy source - // This isn't a secure source of entropy, yet it's fine for this - let entropy = - u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); - // If n = 10, responders = 3, we want `start` to be 0 ..= 7 - // (so the highest is 7, 8, 9) - // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 - let start = - usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)) - .unwrap(); - let mut selected = false; - for validator in &tributary.spec.validators() - [start .. (start + usize::try_from(responders).unwrap())] - { - if our_key == validator.0 { - selected = true; - break; + /* + // Have sqrt(n) nodes reply with the blocks + let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; + // Try to have at least 3 responders + if responders < 3 { + responders = tributary.spec.n().min(3).into(); + } + */ + + // Have up to three nodes respond + let responders = u64::from(tributary.spec.n().min(3)); + + // Decide which nodes will respond by using the latest block's hash as a mutually + // agreed upon entropy source + // This isn't a secure source of entropy, yet it's fine for this + let entropy = + u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); + // If n = 10, responders = 3, we want `start` to be 0 ..= 7 + // (so the highest is 7, 8, 9) + // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 + let start = + usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)) + .unwrap(); + let mut selected = false; + for validator in &tributary.spec.validators() + [start .. (start + usize::try_from(responders).unwrap())] + { + if our_key == validator.0 { + selected = true; + break; + } + } + if !selected { + log::debug!("received heartbeat and not selected to respond"); + continue; } - } - if !selected { - log::debug!("received heartbeat and not selected to respond"); - return; - } - log::debug!("received heartbeat and selected to respond"); + log::debug!("received heartbeat and selected to respond"); - let reader = tributary_read.reader(); + let reader = tributary_read.reader(); - let mut latest = msg.msg[.. 32].try_into().unwrap(); - while let Some(next) = reader.block_after(&latest) { - let mut res = reader.block(&next).unwrap().serialize(); - res.extend(reader.commit(&next).unwrap()); - // Also include the timestamp used within the Heartbeat - res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await; - latest = next; + let mut latest = msg.msg[.. 32].try_into().unwrap(); + while let Some(next) = reader.block_after(&latest) { + let mut res = reader.block(&next).unwrap().serialize(); + res.extend(reader.commit(&next).unwrap()); + // Also include the timestamp used within the Heartbeat + res.extend(&msg.msg[32 .. 40]); + p2p + .send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res) + .await; + latest = next; + } } - } - P2pMessageKind::Block(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - let mut msg_ref: &[u8] = msg.msg.as_ref(); - let Ok(block) = Block::::read(&mut msg_ref) else { - log::error!("received block message with an invalidly serialized block"); - return; - }; - // Get just the commit - msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); - msg.msg.drain((msg.msg.len() - 8) ..); - - let res = tributary.tributary.sync_block(block, msg.msg).await; - log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); + P2pMessageKind::Block(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + let mut msg_ref: &[u8] = msg.msg.as_ref(); + let Ok(block) = Block::::read(&mut msg_ref) else { + log::error!("received block message with an invalidly serialized block"); + continue; + }; + // Get just the commit + msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); + msg.msg.drain((msg.msg.len() - 8) ..); + + let res = tributary.tributary.sync_block(block, msg.msg).await; + log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); + } } } } diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index 9febf4d0d..d21928ec0 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{VecDeque, HashMap}; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; @@ -13,7 +13,7 @@ use crate::{ transaction::{Signed, TransactionKind, Transaction as TransactionTrait}, }; -#[derive(Clone, PartialEq, Eq, Debug)] +#[derive(Debug)] pub(crate) struct Blockchain { db: Option, genesis: [u8; 32], @@ -24,6 +24,8 @@ pub(crate) struct Blockchain { provided: ProvidedTransactions, mempool: Mempool, + + pub(crate) next_block_notifications: VecDeque>, } impl Blockchain { @@ -76,6 +78,8 @@ impl Blockchain { provided: ProvidedTransactions::new(db.clone(), genesis), mempool: Mempool::new(db, genesis), + + next_block_notifications: VecDeque::new(), }; if let Some((block_number, tip)) = { @@ -274,6 +278,10 @@ impl Blockchain { txn.commit(); self.db = Some(db); + for tx in self.next_block_notifications.drain(..) { + let _ = tx.send(()); + } + Ok(()) } } diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 64383a5a9..3c5227b0c 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -336,6 +336,15 @@ impl Tributary { _ => false, } } + + /// Get a Future which will resolve once the next block has been added. + pub async fn next_block_notification( + &self, + ) -> impl Send + Sync + core::future::Future> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.network.blockchain.write().await.next_block_notifications.push_back(tx); + rx + } } #[derive(Clone)] diff --git a/coordinator/tributary/src/tests/blockchain.rs b/coordinator/tributary/src/tests/blockchain.rs index 0f58b5995..21051095c 100644 --- a/coordinator/tributary/src/tests/blockchain.rs +++ b/coordinator/tributary/src/tests/blockchain.rs @@ -104,7 +104,7 @@ fn invalid_block() { { // Add a valid transaction - let mut blockchain = blockchain.clone(); + let (_, mut blockchain) = new_blockchain(genesis, &[tx.1.signer]); assert!(blockchain.add_transaction::( true, Transaction::Application(tx.clone()), @@ -129,7 +129,7 @@ fn invalid_block() { { // Invalid signature - let mut blockchain = blockchain.clone(); + let (_, mut blockchain) = new_blockchain(genesis, &[tx.1.signer]); assert!(blockchain.add_transaction::( true, Transaction::Application(tx),