Skip to content

Commit

Permalink
Add additional metrics for idontwant (#6578)
Browse files Browse the repository at this point in the history
* Add additional metrics for idontwant

* Resolve issues from review

* Fix tests

* Don't exceed capacity

* Apply suggestions from code review

Co-authored-by: João Oliveira <[email protected]>

* Return early on failure

* Add comment
  • Loading branch information
pawanjay176 authored Nov 20, 2024
1 parent b1e9f69 commit 94311c6
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 18 deletions.
47 changes: 38 additions & 9 deletions beacon_node/lighthouse_network/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,7 @@ where
"IWANT: Peer has asked for message too many times; ignoring request"
);
} else if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) {
if peer.dont_send.get(&id).is_some() {
if peer.dont_send_received.get(&id).is_some() {
tracing::debug!(%peer_id, message=%id, "Peer already sent IDONTWANT for this message");
continue;
}
Expand Down Expand Up @@ -1817,6 +1817,15 @@ where
// Calculate the message id on the transformed data.
let msg_id = self.config.message_id(&message);

if let Some(metrics) = self.metrics.as_mut() {
if let Some(peer) = self.connected_peers.get_mut(propagation_source) {
// Record if we received a message that we already sent a IDONTWANT for to the peer
if peer.dont_send_sent.contains_key(&msg_id) {
metrics.register_idontwant_messages_ignored_per_topic(&raw_message.topic);
}
}
}

// Check the validity of the message
// Peers get penalized if this message is invalid. We don't add it to the duplicate cache
// and instead continually penalize peers that repeatedly send this message.
Expand Down Expand Up @@ -2512,11 +2521,19 @@ where

// Flush stale IDONTWANTs.
for peer in self.connected_peers.values_mut() {
while let Some((_front, instant)) = peer.dont_send.front() {
while let Some((_front, instant)) = peer.dont_send_received.front() {
if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
break;
} else {
peer.dont_send_received.pop_front();
}
}
// If metrics are not enabled, this queue would be empty.
while let Some((_front, instant)) = peer.dont_send_sent.front() {
if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
break;
} else {
peer.dont_send.pop_front();
peer.dont_send_sent.pop_front();
}
}
}
Expand Down Expand Up @@ -2751,6 +2768,16 @@ where
.entry(*peer_id)
.or_default()
.non_priority += 1;
return;
}
// IDONTWANT sent successfully.
if let Some(metrics) = self.metrics.as_mut() {
peer.dont_send_sent.insert(msg_id.clone(), Instant::now());
// Don't exceed capacity.
if peer.dont_send_sent.len() > IDONTWANT_CAP {
peer.dont_send_sent.pop_front();
}
metrics.register_idontwant_messages_sent_per_topic(&message.topic);
}
}
}
Expand Down Expand Up @@ -2808,7 +2835,7 @@ where
if !recipient_peers.is_empty() {
for peer_id in recipient_peers.iter() {
if let Some(peer) = self.connected_peers.get_mut(peer_id) {
if peer.dont_send.get(msg_id).is_some() {
if peer.dont_send_received.get(msg_id).is_some() {
tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message");
continue;
}
Expand Down Expand Up @@ -3162,7 +3189,8 @@ where
connections: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
dont_send_received: LinkedHashMap::new(),
dont_send_sent: LinkedHashMap::new(),
});
// Add the new connection
connected_peer.connections.push(connection_id);
Expand Down Expand Up @@ -3194,7 +3222,8 @@ where
connections: vec![],
sender: RpcSender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
dont_send_received: LinkedHashMap::new(),
dont_send_sent: LinkedHashMap::new(),
});
// Add the new connection
connected_peer.connections.push(connection_id);
Expand Down Expand Up @@ -3366,10 +3395,10 @@ where
metrics.register_idontwant_bytes(idontwant_size);
}
for message_id in message_ids {
peer.dont_send.insert(message_id, Instant::now());
peer.dont_send_received.insert(message_id, Instant::now());
// Don't exceed capacity.
if peer.dont_send.len() > IDONTWANT_CAP {
peer.dont_send.pop_front();
if peer.dont_send_received.len() > IDONTWANT_CAP {
peer.dont_send_received.pop_front();
}
}
}
Expand Down
17 changes: 10 additions & 7 deletions beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ where
kind: kind.clone().unwrap_or(PeerKind::Floodsub),
connections: vec![connection_id],
topics: Default::default(),
dont_send: LinkedHashMap::new(),
dont_send_received: LinkedHashMap::new(),
dont_send_sent: LinkedHashMap::new(),
sender,
},
);
Expand Down Expand Up @@ -626,7 +627,8 @@ fn test_join() {
kind: PeerKind::Floodsub,
connections: vec![connection_id],
topics: Default::default(),
dont_send: LinkedHashMap::new(),
dont_send_received: LinkedHashMap::new(),
dont_send_sent: LinkedHashMap::new(),
sender,
},
);
Expand Down Expand Up @@ -1023,7 +1025,8 @@ fn test_get_random_peers() {
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
sender: RpcSender::new(gs.config.connection_handler_queue_len()),
dont_send: LinkedHashMap::new(),
dont_send_sent: LinkedHashMap::new(),
dont_send_received: LinkedHashMap::new(),
},
);
}
Expand Down Expand Up @@ -5408,7 +5411,7 @@ fn doesnt_forward_idontwant() {
.unwrap();
let message_id = gs.config.message_id(&message);
let peer = gs.connected_peers.get_mut(&peers[2]).unwrap();
peer.dont_send.insert(message_id, Instant::now());
peer.dont_send_received.insert(message_id, Instant::now());

gs.handle_received_message(raw_message.clone(), &local_id);
assert_eq!(
Expand Down Expand Up @@ -5457,7 +5460,7 @@ fn parses_idontwant() {
},
);
let peer = gs.connected_peers.get_mut(&peers[1]).unwrap();
assert!(peer.dont_send.get(&message_id).is_some());
assert!(peer.dont_send_received.get(&message_id).is_some());
}

/// Test that a node clears stale IDONTWANT messages.
Expand All @@ -5473,10 +5476,10 @@ fn clear_stale_idontwant() {
.create_network();

let peer = gs.connected_peers.get_mut(&peers[2]).unwrap();
peer.dont_send
peer.dont_send_received
.insert(MessageId::new(&[1, 2, 3, 4]), Instant::now());
std::thread::sleep(Duration::from_secs(3));
gs.heartbeat();
let peer = gs.connected_peers.get_mut(&peers[2]).unwrap();
assert!(peer.dont_send.is_empty());
assert!(peer.dont_send_received.is_empty());
}
34 changes: 34 additions & 0 deletions beacon_node/lighthouse_network/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ pub(crate) struct Metrics {
/// The number of bytes we have received in every IDONTWANT control message.
idontwant_bytes: Counter,

/// Number of IDONTWANT messages sent per topic.
idontwant_messages_sent_per_topic: Family<TopicHash, Counter>,

/// Number of full messages we received that we previously sent a IDONTWANT for.
idontwant_messages_ignored_per_topic: Family<TopicHash, Counter>,

/// The size of the priority queue.
priority_queue_size: Histogram,
/// The size of the non-priority queue.
Expand Down Expand Up @@ -341,6 +347,18 @@ impl Metrics {
metric
};

// IDONTWANT messages sent per topic
let idontwant_messages_sent_per_topic = register_family!(
"idonttwant_messages_sent_per_topic",
"Number of IDONTWANT messages sent per topic"
);

// IDONTWANTs which were ignored, and we still received the message per topic
let idontwant_messages_ignored_per_topic = register_family!(
"idontwant_messages_ignored_per_topic",
"IDONTWANT messages that were sent but we received the full message regardless"
);

let idontwant_bytes = {
let metric = Counter::default();
registry.register(
Expand Down Expand Up @@ -405,6 +423,8 @@ impl Metrics {
idontwant_msgs,
idontwant_bytes,
idontwant_msgs_ids,
idontwant_messages_sent_per_topic,
idontwant_messages_ignored_per_topic,
priority_queue_size,
non_priority_queue_size,
}
Expand Down Expand Up @@ -608,6 +628,20 @@ impl Metrics {
self.idontwant_bytes.inc_by(bytes as u64);
}

/// Register receiving an IDONTWANT control message for a given topic.
pub(crate) fn register_idontwant_messages_sent_per_topic(&mut self, topic: &TopicHash) {
self.idontwant_messages_sent_per_topic
.get_or_create(topic)
.inc();
}

/// Register receiving a message for an already sent IDONTWANT.
pub(crate) fn register_idontwant_messages_ignored_per_topic(&mut self, topic: &TopicHash) {
self.idontwant_messages_ignored_per_topic
.get_or_create(topic)
.inc();
}

/// Register receiving an IDONTWANT msg for this topic.
pub(crate) fn register_idontwant(&mut self, msgs: usize) {
self.idontwant_msgs.inc();
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/lighthouse_network/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ pub(crate) struct PeerConnections {
pub(crate) sender: RpcSender,
/// Subscribed topics.
pub(crate) topics: BTreeSet<TopicHash>,
/// Don't send messages.
pub(crate) dont_send: LinkedHashMap<MessageId, Instant>,
/// IDONTWANT messages received from the peer.
pub(crate) dont_send_received: LinkedHashMap<MessageId, Instant>,
/// IDONTWANT messages we sent to the peer.
pub(crate) dont_send_sent: LinkedHashMap<MessageId, Instant>,
}

/// Describes the types of peers that can exist in the gossipsub context.
Expand Down

0 comments on commit 94311c6

Please sign in to comment.