Skip to content

Commit

Permalink
Remove active requests belonging to the peer that disconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
ackintosh committed Nov 26, 2024
1 parent 95f8378 commit 9d2b263
Showing 1 changed file with 57 additions and 2 deletions.
59 changes: 57 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
/// Informs the limiter that a peer has disconnected. This removes any pending requests and
/// returns their IDs.
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
// TODO: remove the peer from active requests.
self.active_requests.remove(&peer_id);

// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
// should never really be large. So we iterate for simplicity
Expand Down Expand Up @@ -348,7 +348,7 @@ mod tests {
}
}

/// Test that `next_peer_request_ready` correctly maintains the queue.
/// Test that `next_peer_request_ready` correctly maintains the queue when using the self-limiter without rate limiting.
#[tokio::test]
async fn test_next_peer_request_ready_concurrent_requests() {
let log = logging::test_logger();
Expand Down Expand Up @@ -423,4 +423,59 @@ mod tests {
));
}
}

#[tokio::test]
async fn test_peer_disconnected() {
let log = logging::test_logger();
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
SelfRateLimiter::new(None, log).unwrap();
let peer1 = PeerId::random();
let peer2 = PeerId::random();

for peer in [peer1, peer2] {
for i in 1..=5u32 {
let result = limiter.allows(
peer,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id: i,
})),
RequestType::Ping(Ping { data: i as u64 }),
);
}
}

assert!(limiter.active_requests.get(&peer1).is_some());
assert!(limiter
.delayed_requests
.get(&(peer1, Protocol::Ping))
.is_some());
assert!(limiter.active_requests.get(&peer2).is_some());
assert!(limiter
.delayed_requests
.get(&(peer2, Protocol::Ping))
.is_some());

let mut failed_requests = limiter.peer_disconnected(peer1);
for i in 3..=5u32 {
let (request_id, _) = failed_requests.remove(0);
assert!(matches!(
request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id
})) if id == i
));
}

// Check that peer1’s active and delayed requests have been removed.
assert!(limiter.active_requests.get(&peer1).is_none());
assert!(limiter
.delayed_requests
.get(&(peer1, Protocol::Ping))
.is_none());
assert!(limiter.active_requests.get(&peer2).is_some());
assert!(limiter
.delayed_requests
.get(&(peer2, Protocol::Ping))
.is_some());
}
}

0 comments on commit 9d2b263

Please sign in to comment.