Skip to content

Commit

Permalink
[cherry-pick][1.16][consensus] fix edge case of block retrieval (#13903)
Browse files Browse the repository at this point in the history
* [consensus] fix edge case of block retrieval

* [consensus] unit test for block retrieval timeout

---------

Co-authored-by: Zekun Li <[email protected]>
  • Loading branch information
ibalajiarun and Zekun Li authored Jul 9, 2024
1 parent a9b8526 commit 85ff36f
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 12 deletions.
21 changes: 10 additions & 11 deletions consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
pipeline::execution_client::TExecutionClient,
};
use anyhow::{bail, Context};
use anyhow::{anyhow, bail, Context};
use aptos_consensus_types::{
block::Block,
block_retrieval::{
Expand All @@ -47,7 +47,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use futures_channel::oneshot;
use rand::{prelude::*, Rng};
use std::{clone::Clone, cmp::min, sync::Arc, time::Duration};
use tokio::time;
use tokio::{time, time::timeout};

#[derive(Debug, PartialEq, Eq)]
/// Whether we need to do block retrieval if we want to insert a Quorum Cert.
Expand Down Expand Up @@ -568,15 +568,14 @@ impl BlockRetriever {
let author = self.network.author();
futures.push(
async move {
let response = rx
.await
.map(|block| {
BlockRetrievalResponse::new(
BlockRetrievalStatus::SucceededWithTarget,
vec![block],
)
})
.map_err(|_| anyhow::anyhow!("self retrieval failed"));
let response = match timeout(rpc_timeout, rx).await {
Ok(Ok(block)) => Ok(BlockRetrievalResponse::new(
BlockRetrievalStatus::SucceededWithTarget,
vec![block],
)),
Ok(Err(_)) => Err(anyhow!("self retrieval cancelled")),
Err(_) => Err(anyhow!("self retrieval timeout")),
};
(author, response)
}
.boxed(),
Expand Down
37 changes: 36 additions & 1 deletion consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use aptos_network::{
PeerManagerRequestSender,
},
protocols::{
network::{NewNetworkEvents, SerializedRequest},
network::{NewNetworkEvents, RpcError, SerializedRequest},
rpc::InboundRpcRequest,
wire::handshake::v1::ProtocolIdSet,
},
Expand Down Expand Up @@ -75,6 +75,8 @@ pub struct NetworkPlayground {
outbound_msgs_tx: mpsc::Sender<(TwinId, PeerManagerRequest)>,
/// NetworkPlayground reads all nodes' outbound messages through this queue.
outbound_msgs_rx: mpsc::Receiver<(TwinId, PeerManagerRequest)>,
/// Allow test code to timeout RPC messages between peers.
timeout_config: Arc<RwLock<TimeoutConfig>>,
/// Allow test code to drop direct-send messages between peers.
drop_config: Arc<RwLock<DropConfig>>,
/// Allow test code to drop direct-send messages between peers per round.
Expand All @@ -96,6 +98,7 @@ impl NetworkPlayground {
node_consensus_txs: Arc::new(Mutex::new(HashMap::new())),
outbound_msgs_tx,
outbound_msgs_rx,
timeout_config: Arc::new(RwLock::new(TimeoutConfig::default())),
drop_config: Arc::new(RwLock::new(DropConfig::default())),
drop_config_round: DropConfigRound::default(),
executor,
Expand All @@ -122,6 +125,7 @@ impl NetworkPlayground {
/// Rpc messages are immediately sent to the destination for handling, so
/// they don't block.
async fn start_node_outbound_handler(
timeout_config: Arc<RwLock<TimeoutConfig>>,
drop_config: Arc<RwLock<DropConfig>>,
src_twin_id: TwinId,
mut network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
Expand Down Expand Up @@ -160,6 +164,14 @@ impl NetworkPlayground {
None => continue, // drop rpc
};

if timeout_config
.read()
.is_message_timedout(&src_twin_id, dst_twin_id)
{
outbound_req.res_tx.send(Err(RpcError::TimedOut)).unwrap();
continue;
}

let node_consensus_tx =
node_consensus_txs.lock().get(dst_twin_id).unwrap().clone();

Expand Down Expand Up @@ -195,10 +207,12 @@ impl NetworkPlayground {
) {
self.node_consensus_txs.lock().insert(twin_id, consensus_tx);
self.drop_config.write().add_node(twin_id);
self.timeout_config.write().add_node(twin_id);

self.extend_author_to_twin_ids(twin_id.author, twin_id);

let fut1 = NetworkPlayground::start_node_outbound_handler(
Arc::clone(&self.timeout_config),
Arc::clone(&self.drop_config),
twin_id,
network_reqs_rx,
Expand Down Expand Up @@ -374,6 +388,10 @@ impl NetworkPlayground {
ret
}

pub fn timeout_config(&self) -> Arc<RwLock<TimeoutConfig>> {
self.timeout_config.clone()
}

pub async fn start(mut self) {
// Take the next queued message
while let Some((src_twin_id, net_req)) = self.outbound_msgs_rx.next().await {
Expand Down Expand Up @@ -453,6 +471,23 @@ impl DropConfig {
}
}

#[derive(Default)]
pub(crate) struct TimeoutConfig(HashMap<TwinId, HashSet<TwinId>>);

impl TimeoutConfig {
pub fn is_message_timedout(&self, src: &TwinId, dst: &TwinId) -> bool {
self.0.get(src).map_or(false, |set| set.contains(dst))
}

pub fn timeout_message_for(&mut self, src: &TwinId, dst: &TwinId) -> bool {
self.0.entry(*src).or_default().insert(*dst)
}

fn add_node(&mut self, src: TwinId) {
self.0.insert(src, HashSet::new());
}
}

/// Table of per round message dropping rules
#[derive(Default)]
struct DropConfigRound(HashMap<u64, DropConfig>);
Expand Down
68 changes: 68 additions & 0 deletions consensus/src/round_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,74 @@ fn block_retrieval_test() {
});
}

#[test]
fn block_retrieval_timeout_test() {
let runtime = consensus_runtime();
let mut playground = NetworkPlayground::new(runtime.handle().clone());
let mut nodes = NodeSetup::create_nodes(
&mut playground,
runtime.handle().clone(),
4,
Some(vec![0, 1]),
None,
None,
None,
None,
);
let timeout_config = playground.timeout_config();
runtime.spawn(playground.start());

for i in 0..4 {
info!("processing {}", i);
process_and_vote_on_proposal(
&runtime,
&mut nodes,
i as usize % 2,
&[3],
true,
None,
true,
i + 1,
i.saturating_sub(1),
0,
);
}

timed_block_on(&runtime, async {
let mut behind_node = nodes.pop().unwrap();

for node in nodes.iter() {
timeout_config.write().timeout_message_for(
&TwinId {
id: behind_node.id,
author: behind_node.signer.author(),
},
&TwinId {
id: node.id,
author: node.signer.author(),
},
);
}

// Drain the queue on other nodes
for node in nodes.iter_mut() {
let _ = node.next_proposal().await;
}

info!(
"Processing proposals for behind node {}",
behind_node.identity_desc()
);

let proposal_msg = behind_node.next_proposal().await;
behind_node
.round_manager
.process_proposal_msg(proposal_msg)
.await
.unwrap_err();
});
}

#[ignore] // TODO: turn this test back on once the flakes have resolved.
#[test]
pub fn forking_retrieval_test() {
Expand Down

0 comments on commit 85ff36f

Please sign in to comment.