Skip to content

Commit

Permalink
Regularly rebroadcast consensus messages to ensure presence even if d…
Browse files Browse the repository at this point in the history
…ropped from the P2P layer

Attempts to fix #342, #382.
  • Loading branch information
kayabaNerve committed Oct 13, 2023
1 parent 15edea1 commit b0fcdd3
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
25 changes: 24 additions & 1 deletion coordinator/tributary/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ pub trait ReadWrite: Sized {

#[async_trait]
pub trait P2p: 'static + Send + Sync + Clone + Debug {
/// Broadcast a message to all other members of the Tributary with the specified genesis.
///
/// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't
/// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based
/// deduplication to ensure a sane amount of load.
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>);
}

Expand Down Expand Up @@ -178,7 +183,25 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
);
let blockchain = Arc::new(RwLock::new(blockchain));

let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };
let to_rebroadcast = Arc::new(RwLock::new(vec![]));
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
// P2P layer
tokio::spawn({
let to_rebroadcast = to_rebroadcast.clone();
let p2p = p2p.clone();
async move {
loop {
let to_rebroadcast = to_rebroadcast.read().await.clone();
for msg in to_rebroadcast {
p2p.broadcast(genesis, msg).await;
}
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
}
}
});

let network =
TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p };

let TendermintHandle { synced_block, synced_block_result, messages, machine } =
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
Expand Down
16 changes: 16 additions & 0 deletions coordinator/tributary/src/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ pub struct TendermintNetwork<D: Db, T: TransactionTrait, P: P2p> {
pub(crate) validators: Arc<Validators>,
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,

pub(crate) to_rebroadcast: Arc<RwLock<Vec<Vec<u8>>>>,

pub(crate) p2p: P,
}

Expand Down Expand Up @@ -304,8 +306,19 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
}

async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
// until the block it's trying to build is complete
// If the P2P layer drops a message before all nodes obtained access, or a node had an
// intermittent failure, this will ensure reconcilliation
// Resolves halts caused by timing discrepancies, which technically are violations of
// Tendermint as a BFT protocol, and shouldn't occur yet have in low-powered testing
// environments
// This is atrocious if there's no content-based deduplication protocol for messages actively
// being gossiped
// LibP2p, as used by Serai, is configured to content-based deduplicate
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
to_broadcast.extend(msg.encode());
self.to_rebroadcast.write().await.push(to_broadcast.clone());
self.p2p.broadcast(self.genesis, to_broadcast).await
}

Expand Down Expand Up @@ -407,6 +420,9 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
}
}

// Since we've added a valid block, clear to_rebroadcast
*self.to_rebroadcast.write().await = vec![];

Some(TendermintBlock(
self.blockchain.write().await.build_block::<Self>(self.signature_scheme()).serialize(),
))
Expand Down

0 comments on commit b0fcdd3

Please sign in to comment.