Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delayed RPC Send Using Tokens #5923

Open
wants to merge 66 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
0154359
trickle responses
ackintosh Jun 7, 2024
d5fe64e
pruning
ackintosh Jun 9, 2024
aab59f5
cargo fmt
ackintosh Jun 10, 2024
e00e679
Test that the receiver delays the responses
ackintosh Jun 10, 2024
670ec96
Add doc comments
ackintosh Jun 11, 2024
c0ae632
Fix typo
ackintosh Jun 13, 2024
7e0c630
Add inbound request size limiter
ackintosh Jun 14, 2024
6322210
Merge branch 'refs/heads/unstable' into delayed-rpc-response
ackintosh Jun 20, 2024
3947bf6
Fix compile error
ackintosh Jun 20, 2024
933dc00
Add doc comment and rename
ackintosh Jun 20, 2024
b62537f
Extract a function that calculates tau and t from the quota
ackintosh Jun 20, 2024
86cf8fb
unwrap
ackintosh Jun 22, 2024
8fd37c5
Remove unused limiter
ackintosh Jun 22, 2024
6c1015e
Restrict more than two requests from running simultaneously on the sa…
ackintosh Jun 26, 2024
817ce97
Rename from self_limiter to outbound_request_limiter
ackintosh Jun 29, 2024
7e42568
Fix clippy errors
ackintosh Jun 29, 2024
94c2493
Merge branch 'refs/heads/unstable' into delayed-rpc-response
ackintosh Jul 1, 2024
9ad4eb7
Fix import
ackintosh Jul 1, 2024
de9d943
Fix clippy errors
ackintosh Jul 6, 2024
7adb142
Merge branch 'refs/heads/unstable' into delayed-rpc-response
ackintosh Jul 7, 2024
627fd33
Merge branch 'refs/heads/unstable' into delayed-rpc-response
ackintosh Jul 11, 2024
b55ffca
Update request_id with AppRequestId
ackintosh Jul 11, 2024
73e9879
Merge branch 'unstable' into delayed-rpc-response
ackintosh Jul 13, 2024
cdef58d
Update beacon_node/lighthouse_network/src/rpc/active_requests_limiter.rs
ackintosh Jul 23, 2024
3190d9a
Update beacon_node/lighthouse_network/src/rpc/mod.rs
ackintosh Jul 23, 2024
19fe6b0
Merge branch 'unstable' into delayed-rpc-response
ackintosh Jul 25, 2024
5a9237f
Remove the RequestSizeLimiter and check if the count of requested blo…
ackintosh Jul 29, 2024
4609624
Revert extracting `tau_and_t()` because no longer need to do that
ackintosh Jul 29, 2024
a325438
Remove Instant from the requests field
ackintosh Sep 9, 2024
3b6edab
Remove unused field
ackintosh Sep 9, 2024
2ab853c
Merge branch 'unstable' into delayed-rpc-response
ackintosh Sep 9, 2024
2621ce8
Add DataColumnsBy***
ackintosh Sep 9, 2024
51247e3
Merge branch 'unstable' into delayed-rpc-response
ackintosh Sep 26, 2024
5ed47b7
Fix the mistakes made during the merge
ackintosh Sep 27, 2024
cbfb2ea
cargo fmt
ackintosh Sep 27, 2024
9f6177d
Update beacon_node/lighthouse_network/src/rpc/active_requests_limiter.rs
ackintosh Sep 27, 2024
9008d3e
Merge branch 'unstable' into delayed-rpc-response
ackintosh Oct 1, 2024
bd9f13c
merge unstable
ackintosh Oct 1, 2024
5dbac58
Move the response limiter logic from handler to behaviour
ackintosh Oct 5, 2024
ae67804
Remove Mutex from response_limiter
ackintosh Oct 5, 2024
4852b20
Fix clippy error
ackintosh Oct 7, 2024
156565c
Add the request back to active requests if the response is not a stre…
ackintosh Oct 7, 2024
0e1e58b
Add ResponseLimiter to make RPC cleaner
ackintosh Oct 10, 2024
cb87af0
Remove pending responses on disconnect
ackintosh Oct 11, 2024
023c542
Add ConnectionId to Request
ackintosh Oct 18, 2024
14ffeec
Add a comment
ackintosh Oct 20, 2024
5c9e063
Return early if the request is too large
ackintosh Oct 20, 2024
3c058b3
Remove ActiveRequestsLimiter
ackintosh Oct 21, 2024
a9a675a
Merge branch 'unstable' into delayed-rpc-response
ackintosh Oct 22, 2024
5d70573
Merge branch 'unstable' into delayed-rpc-response
ackintosh Oct 23, 2024
4e872a0
merge unstable
ackintosh Oct 23, 2024
450326c
Tweak for readability
ackintosh Oct 29, 2024
14fb84c
Merge branch 'unstable' into delayed-rpc-response
ackintosh Nov 19, 2024
636224c
Limit concurrent requests on self-limiter
ackintosh Nov 23, 2024
f6fd85b
Make the self-limiter mandatory, and make the rate-limiter optional w…
ackintosh Nov 23, 2024
95f8378
Inform the limiter that a response has been received
ackintosh Nov 25, 2024
9d2b263
Remove active requests belonging to the peer that disconnected
ackintosh Nov 26, 2024
2d7a679
Fix unused variable error
ackintosh Nov 27, 2024
b73a336
Fix clippy errors
ackintosh Nov 29, 2024
dfd092d
Merge branch 'unstable' into delayed-rpc-response
ackintosh Dec 1, 2024
810c5de
Fix clippy errors
ackintosh Dec 1, 2024
d46cbe8
Update test
ackintosh Dec 1, 2024
540436c
Adding a slight margin to the elapsed time check to account for poten…
ackintosh Dec 1, 2024
3d39f2c
Merge branch 'unstable' into delayed-rpc-response
ackintosh Dec 4, 2024
60c9900
Remove an active request when it ends with an error
ackintosh Dec 8, 2024
eec6b4a
Merge branch 'unstable' into delayed-rpc-response
ackintosh Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/active_requests_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use crate::rpc::{Protocol, SubstreamId};
use libp2p::swarm::ConnectionId;
use libp2p::PeerId;
use std::collections::hash_map::Entry;
use std::collections::HashMap;

/// Restricts more than two inbound requests from running simultaneously on the same protocol per peer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be restricting more than one inbound request, not two.

I'm not completely sure about the intuition for allowing 2 concurrent streams, allowing a single stream per protocol makes more sense to me, I have asked in the spec PR ethereum/consensus-specs#3767 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub(super) struct ActiveRequestsLimiter {
requests: HashMap<PeerId, Vec<(Protocol, ConnectionId, SubstreamId)>>,
}

impl ActiveRequestsLimiter {
pub(super) fn new() -> Self {
Self {
requests: HashMap::new(),
}
}

/// Allows if there is not a request on the same protocol.
pub(super) fn allows(
&mut self,
peer_id: PeerId,
protocol: Protocol,
connection_id: &ConnectionId,
substream_id: &SubstreamId,
) -> bool {
match self.requests.entry(peer_id) {
Entry::Occupied(mut entry) => {
for (p, _cid, _sid) in entry.get_mut().iter_mut() {
// Check if there is a request on the same protocol.
if p == &protocol {
return false;
}
}

// Request on the same protocol was not found.
entry
.get_mut()
.push((protocol, *connection_id, *substream_id));
true
}
Entry::Vacant(entry) => {
// No active requests for the peer.
entry.insert(vec![(protocol, *connection_id, *substream_id)]);
true
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this function is now just this, wdyt of having this function at the Behaviour level, removing ActiveRequestsLimiteR and therefore eliminating the need for a duplicated HashMap with requests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. 💡 Thanks!

Copy link
Member Author

@ackintosh ackintosh Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed ActiveRequestsLimiter in 3c058b3


/// Removes the request with the given SubstreamId.
ackintosh marked this conversation as resolved.
Show resolved Hide resolved
pub(super) fn remove_request(
&mut self,
peer_id: PeerId,
connection_id: &ConnectionId,
substream_id: &SubstreamId,
) {
if let Some(requests) = self.requests.get_mut(&peer_id) {
requests.retain(|(_protocol, cid, sid)| cid != connection_id && sid != substream_id);
}
}

/// Removes the requests with the given PeerId.
pub(super) fn remove_peer(&mut self, peer_id: &PeerId) {
self.requests.remove(peer_id);
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_limiter() {
let mut limiter = ActiveRequestsLimiter::new();
let peer_id = PeerId::random();
let connection_id = ConnectionId::new_unchecked(1);
let substream_id = SubstreamId::new(1);

assert!(limiter.allows(peer_id, Protocol::Status, &connection_id, &substream_id));
// Not allowed since a request for the same protocol is in progress.
assert!(!limiter.allows(peer_id, Protocol::Status, &connection_id, &substream_id));
// Allowed since there is no BlocksByRange request in the active requests.
assert!(limiter.allows(
peer_id,
Protocol::BlocksByRange,
&connection_id,
&SubstreamId::new(2)
));
// Allowed since there is no request from the peer in the active requests.
assert!(limiter.allows(
PeerId::random(),
Protocol::Status,
&connection_id,
&substream_id
));

// Remove the Status request.
limiter.remove_request(peer_id, &connection_id, &substream_id);
assert!(limiter.allows(
peer_id,
Protocol::Status,
&connection_id,
&SubstreamId::new(3)
));

// Remove the peer.
limiter.remove_peer(&peer_id);
assert!(limiter.allows(
peer_id,
Protocol::Status,
&connection_id,
&SubstreamId::new(4)
));
assert!(limiter.allows(
peer_id,
Protocol::BlocksByRange,
&connection_id,
&SubstreamId::new(5)
));
}
}
172 changes: 167 additions & 5 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProt
use super::{RPCReceived, RPCSend, ReqId};
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
use crate::rpc::protocol::InboundFramed;
use crate::rpc::rate_limiter::{RPCRateLimiter, RateLimitedErr};
use fnv::FnvHashMap;
use futures::prelude::*;
use futures::SinkExt;
Expand All @@ -15,6 +16,8 @@ use libp2p::swarm::handler::{
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p::swarm::Stream;
use libp2p::PeerId;
use parking_lot::Mutex;
use slog::{crit, debug, trace};
use smallvec::SmallVec;
use std::{
Expand Down Expand Up @@ -137,8 +140,18 @@ where
/// Logger for handling RPC streams
log: slog::Logger,

/// Timeout that will me used for inbound and outbound responses.
/// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,

/// Rate limiter for our responses and the PeerId that this handler interacts with.
/// The PeerId is necessary since the rate limiter manages rate limiting per peer.
response_limiter: Option<(PeerId, Arc<Mutex<RPCRateLimiter>>)>,

/// Responses queued for sending. These responses are stored when the response limiter rejects them.
delayed_responses: FnvHashMap<Protocol, VecDeque<QueuedResponse<E>>>,

/// The delay required to allow for sending a response per protocol.
next_response: DelayQueue<Protocol>,
}

enum HandlerState {
Expand Down Expand Up @@ -213,6 +226,12 @@ pub enum OutboundSubstreamState<E: EthSpec> {
Poisoned,
}

struct QueuedResponse<E: EthSpec> {
response: RPCCodedResponse<E>,
protocol: Protocol,
inbound_id: SubstreamId,
}

impl<Id, E> RPCHandler<Id, E>
where
E: EthSpec,
Expand All @@ -222,6 +241,7 @@ where
fork_context: Arc<ForkContext>,
log: &slog::Logger,
resp_timeout: Duration,
response_limiter: Option<(PeerId, Arc<Mutex<RPCRateLimiter>>)>,
) -> Self {
RPCHandler {
listen_protocol,
Expand All @@ -241,6 +261,9 @@ where
waker: None,
log: log.clone(),
resp_timeout,
response_limiter,
delayed_responses: FnvHashMap::default(),
next_response: DelayQueue::default(),
}
}

Expand Down Expand Up @@ -288,6 +311,36 @@ where
}
}

/// Checks if the response limiter allows the response. If the response should be delayed, the
/// duration to wait is returned.
fn try_response_limiter(
limiter: &mut Arc<Mutex<RPCRateLimiter>>,
peer_id: &PeerId,
protocol: Protocol,
response: RPCCodedResponse<E>,
log: &slog::Logger,
) -> Result<(), Duration> {
match limiter.lock().allows(peer_id, &(response, protocol)) {
Ok(()) => Ok(()),
Err(e) => match e {
RateLimitedErr::TooLarge => {
// This should never happen with default parameters. Let's just send the response.
// Log a crit since this is a config issue.
crit!(
log,
"Response rate limiting error for a batch that will never fit. Sending response anyway. Check configuration parameters.";
"protocol" => %protocol
);
Ok(())
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(log, "Response rate limiting"; "protocol" => %protocol, "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id);
Err(wait_time)
}
},
}
}

/// Sends a response to a peer's request.
// NOTE: If the substream has closed due to inactivity, or the substream is in the
// wrong state a response will fail silently.
Expand All @@ -301,21 +354,84 @@ where
}
return;
};

if let Some((peer_id, limiter)) = self.response_limiter.as_mut() {
// First check that there are not already other responses waiting to be sent.
if let Some(queued_responses) = self.delayed_responses.get_mut(&inbound_info.protocol) {
queued_responses.push_back(QueuedResponse {
response,
protocol: inbound_info.protocol,
inbound_id,
});
return;
}

match Self::try_response_limiter(
limiter,
peer_id,
inbound_info.protocol,
response.clone(),
&self.log,
) {
Ok(()) => {
Self::send_response_inner(
inbound_id,
inbound_info,
response,
&mut self.events_out,
&self.state,
&self.log,
);
}
Err(wait_time) => {
self.next_response.insert(inbound_info.protocol, wait_time);
self.delayed_responses
.entry(inbound_info.protocol)
.or_default()
.push_back(QueuedResponse {
response,
protocol: inbound_info.protocol,
inbound_id,
});
}
}
} else {
Self::send_response_inner(
inbound_id,
inbound_info,
response,
&mut self.events_out,
&self.state,
&self.log,
);
}
}

/// Sends a response to a peer's request.
fn send_response_inner(
inbound_id: SubstreamId,
inbound_info: &mut InboundInfo<E>,
response: RPCCodedResponse<E>,
events_out: &mut SmallVec<[HandlerEvent<Id, E>; 4]>,
handler_state: &HandlerState,
log: &slog::Logger,
) {
// If the response we are sending is an error, report back for handling
if let RPCCodedResponse::Error(ref code, ref reason) = response {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
error: RPCError::ErrorResponse(*code, reason.to_string()),
proto: inbound_info.protocol,
id: inbound_id,
}));
}

if matches!(self.state, HandlerState::Deactivated) {
if matches!(handler_state, HandlerState::Deactivated) {
// we no longer send responses after the handler is deactivated
debug!(self.log, "Response not sent. Deactivated handler";
"response" => %response, "id" => inbound_id);
debug!(log, "Response not sent. Deactivated handler";
"response" => %response, "id" => inbound_id);
return;
}

inbound_info.pending_items.push_back(response);
}
}
Expand Down Expand Up @@ -388,6 +504,52 @@ where
};
}

if let Some((peer_id, limiter)) = self.response_limiter.as_mut() {
// Process delayed responses that are ready to be sent.
if let Poll::Ready(Some(expired)) = self.next_response.poll_expired(cx) {
let protocol = expired.into_inner();
if let Entry::Occupied(mut entry) = self.delayed_responses.entry(protocol) {
let queued_responses = entry.get_mut();
while let Some(res) = queued_responses.pop_front() {
let Some(inbound_info) = self.inbound_substreams.get_mut(&res.inbound_id)
else {
debug!(self.log, "The inbound stream has expired. The delayed response was not sent."; "protocol" => %protocol, "peer_id" => %peer_id, "inbound_id" => res.inbound_id);
continue;
};
match Self::try_response_limiter(
limiter,
peer_id,
res.protocol,
res.response.clone(),
&self.log,
) {
Ok(()) => {
debug!(self.log, "The waiting time for response rate-limiting has over. Sending the response."; "protocol" => %protocol, "peer_id" => %peer_id, "inbound_id" => res.inbound_id);
Self::send_response_inner(
res.inbound_id,
inbound_info,
res.response,
&mut self.events_out,
&self.state,
&self.log,
);
}
Err(wait_time) => {
self.next_response.insert(protocol, wait_time);
queued_responses.push_front(res);
// If one fails just wait for the next window that allows sending responses.
break;
}
}
}

if queued_responses.is_empty() {
entry.remove();
}
}
}
}

// purge expired inbound substreams and send an error

while let Poll::Ready(Some(inbound_id)) = self.inbound_substreams_delay.poll_expired(cx) {
Expand Down
Loading
Loading