diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 3c5227b0c..9d8094f2e 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -127,6 +127,11 @@ pub trait ReadWrite: Sized { #[async_trait] pub trait P2p: 'static + Send + Sync + Clone + Debug { + /// Broadcast a message to all other members of the Tributary with the specified genesis. + /// + /// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't + /// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based + /// deduplication to ensure a sane amount of load. async fn broadcast(&self, genesis: [u8; 32], msg: Vec); } @@ -178,7 +183,25 @@ impl Tributary { ); let blockchain = Arc::new(RwLock::new(blockchain)); - let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; + let to_rebroadcast = Arc::new(RwLock::new(vec![])); + // Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the + // P2P layer + tokio::spawn({ + let to_rebroadcast = to_rebroadcast.clone(); + let p2p = p2p.clone(); + async move { + loop { + let to_rebroadcast = to_rebroadcast.read().await.clone(); + for msg in to_rebroadcast { + p2p.broadcast(genesis, msg).await; + } + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + } + } + }); + + let network = + TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p }; let TendermintHandle { synced_block, synced_block_result, messages, machine } = TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary/src/tendermint/mod.rs index c078826da..fad9b3e83 100644 --- a/coordinator/tributary/src/tendermint/mod.rs +++ b/coordinator/tributary/src/tendermint/mod.rs @@ -271,6 +271,8 @@ pub struct TendermintNetwork { pub(crate) validators: Arc, pub(crate) blockchain: Arc>>, + pub(crate) to_rebroadcast: Arc>>>, + pub(crate) p2p: P, } @@ -304,8 +306,19 @@ impl Network for TendermintNetwork } async fn broadcast(&mut self, msg: SignedMessageFor) { + // Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second + // until the block it's trying to build is complete + // If the P2P layer drops a message before all nodes obtained access, or a node had an + // intermittent failure, this will ensure reconcilliation + // Resolves halts caused by timing discrepancies, which technically are violations of + // Tendermint as a BFT protocol, and shouldn't occur yet have in low-powered testing + // environments + // This is atrocious if there's no content-based deduplication protocol for messages actively + // being gossiped + // LibP2p, as used by Serai, is configured to content-based deduplicate let mut to_broadcast = vec![TENDERMINT_MESSAGE]; to_broadcast.extend(msg.encode()); + self.to_rebroadcast.write().await.push(to_broadcast.clone()); self.p2p.broadcast(self.genesis, to_broadcast).await } @@ -407,6 +420,9 @@ impl Network for TendermintNetwork } } + // Since we've added a valid block, clear to_rebroadcast + *self.to_rebroadcast.write().await = vec![]; + Some(TendermintBlock( self.blockchain.write().await.build_block::(self.signature_scheme()).serialize(), ))