diff --git a/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs b/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs index 88fe48c4414..5ead0c06a0a 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs @@ -1385,7 +1385,7 @@ where "IWANT: Peer has asked for message too many times; ignoring request" ); } else if let Some(peer) = &mut self.connected_peers.get_mut(peer_id) { - if peer.dont_send.get(&id).is_some() { + if peer.dont_send_received.get(&id).is_some() { tracing::debug!(%peer_id, message=%id, "Peer already sent IDONTWANT for this message"); continue; } @@ -1817,6 +1817,15 @@ where // Calculate the message id on the transformed data. let msg_id = self.config.message_id(&message); + if let Some(metrics) = self.metrics.as_mut() { + if let Some(peer) = self.connected_peers.get_mut(propagation_source) { + // Record if we received a message that we already sent a IDONTWANT for to the peer + if peer.dont_send_sent.contains_key(&msg_id) { + metrics.register_idontwant_messages_ignored_per_topic(&raw_message.topic); + } + } + } + // Check the validity of the message // Peers get penalized if this message is invalid. We don't add it to the duplicate cache // and instead continually penalize peers that repeatedly send this message. @@ -2512,11 +2521,19 @@ where // Flush stale IDONTWANTs. for peer in self.connected_peers.values_mut() { - while let Some((_front, instant)) = peer.dont_send.front() { + while let Some((_front, instant)) = peer.dont_send_received.front() { + if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() { + break; + } else { + peer.dont_send_received.pop_front(); + } + } + // If metrics are not enabled, this queue would be empty. + while let Some((_front, instant)) = peer.dont_send_sent.front() { if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() { break; } else { - peer.dont_send.pop_front(); + peer.dont_send_sent.pop_front(); } } } @@ -2751,6 +2768,16 @@ where .entry(*peer_id) .or_default() .non_priority += 1; + return; + } + // IDONTWANT sent successfully. + if let Some(metrics) = self.metrics.as_mut() { + peer.dont_send_sent.insert(msg_id.clone(), Instant::now()); + // Don't exceed capacity. + if peer.dont_send_sent.len() > IDONTWANT_CAP { + peer.dont_send_sent.pop_front(); + } + metrics.register_idontwant_messages_sent_per_topic(&message.topic); } } } @@ -2808,7 +2835,7 @@ where if !recipient_peers.is_empty() { for peer_id in recipient_peers.iter() { if let Some(peer) = self.connected_peers.get_mut(peer_id) { - if peer.dont_send.get(msg_id).is_some() { + if peer.dont_send_received.get(msg_id).is_some() { tracing::debug!(%peer_id, message=%msg_id, "Peer doesn't want message"); continue; } @@ -3162,7 +3189,8 @@ where connections: vec![], sender: RpcSender::new(self.config.connection_handler_queue_len()), topics: Default::default(), - dont_send: LinkedHashMap::new(), + dont_send_received: LinkedHashMap::new(), + dont_send_sent: LinkedHashMap::new(), }); // Add the new connection connected_peer.connections.push(connection_id); @@ -3194,7 +3222,8 @@ where connections: vec![], sender: RpcSender::new(self.config.connection_handler_queue_len()), topics: Default::default(), - dont_send: LinkedHashMap::new(), + dont_send_received: LinkedHashMap::new(), + dont_send_sent: LinkedHashMap::new(), }); // Add the new connection connected_peer.connections.push(connection_id); @@ -3366,10 +3395,10 @@ where metrics.register_idontwant_bytes(idontwant_size); } for message_id in message_ids { - peer.dont_send.insert(message_id, Instant::now()); + peer.dont_send_received.insert(message_id, Instant::now()); // Don't exceed capacity. - if peer.dont_send.len() > IDONTWANT_CAP { - peer.dont_send.pop_front(); + if peer.dont_send_received.len() > IDONTWANT_CAP { + peer.dont_send_received.pop_front(); } } } diff --git a/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs b/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs index 62f026b568a..713fe1f2668 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs @@ -238,7 +238,8 @@ where kind: kind.clone().unwrap_or(PeerKind::Floodsub), connections: vec![connection_id], topics: Default::default(), - dont_send: LinkedHashMap::new(), + dont_send_received: LinkedHashMap::new(), + dont_send_sent: LinkedHashMap::new(), sender, }, ); @@ -626,7 +627,8 @@ fn test_join() { kind: PeerKind::Floodsub, connections: vec![connection_id], topics: Default::default(), - dont_send: LinkedHashMap::new(), + dont_send_received: LinkedHashMap::new(), + dont_send_sent: LinkedHashMap::new(), sender, }, ); @@ -1023,7 +1025,8 @@ fn test_get_random_peers() { connections: vec![ConnectionId::new_unchecked(0)], topics: topics.clone(), sender: RpcSender::new(gs.config.connection_handler_queue_len()), - dont_send: LinkedHashMap::new(), + dont_send_sent: LinkedHashMap::new(), + dont_send_received: LinkedHashMap::new(), }, ); } @@ -5408,7 +5411,7 @@ fn doesnt_forward_idontwant() { .unwrap(); let message_id = gs.config.message_id(&message); let peer = gs.connected_peers.get_mut(&peers[2]).unwrap(); - peer.dont_send.insert(message_id, Instant::now()); + peer.dont_send_received.insert(message_id, Instant::now()); gs.handle_received_message(raw_message.clone(), &local_id); assert_eq!( @@ -5457,7 +5460,7 @@ fn parses_idontwant() { }, ); let peer = gs.connected_peers.get_mut(&peers[1]).unwrap(); - assert!(peer.dont_send.get(&message_id).is_some()); + assert!(peer.dont_send_received.get(&message_id).is_some()); } /// Test that a node clears stale IDONTWANT messages. @@ -5473,10 +5476,10 @@ fn clear_stale_idontwant() { .create_network(); let peer = gs.connected_peers.get_mut(&peers[2]).unwrap(); - peer.dont_send + peer.dont_send_received .insert(MessageId::new(&[1, 2, 3, 4]), Instant::now()); std::thread::sleep(Duration::from_secs(3)); gs.heartbeat(); let peer = gs.connected_peers.get_mut(&peers[2]).unwrap(); - assert!(peer.dont_send.is_empty()); + assert!(peer.dont_send_received.is_empty()); } diff --git a/beacon_node/lighthouse_network/gossipsub/src/metrics.rs b/beacon_node/lighthouse_network/gossipsub/src/metrics.rs index a4ac389a748..d3ca6c299e5 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/metrics.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/metrics.rs @@ -188,6 +188,12 @@ pub(crate) struct Metrics { /// The number of bytes we have received in every IDONTWANT control message. idontwant_bytes: Counter, + /// Number of IDONTWANT messages sent per topic. + idontwant_messages_sent_per_topic: Family, + + /// Number of full messages we received that we previously sent a IDONTWANT for. + idontwant_messages_ignored_per_topic: Family, + /// The size of the priority queue. priority_queue_size: Histogram, /// The size of the non-priority queue. @@ -341,6 +347,18 @@ impl Metrics { metric }; + // IDONTWANT messages sent per topic + let idontwant_messages_sent_per_topic = register_family!( + "idonttwant_messages_sent_per_topic", + "Number of IDONTWANT messages sent per topic" + ); + + // IDONTWANTs which were ignored, and we still received the message per topic + let idontwant_messages_ignored_per_topic = register_family!( + "idontwant_messages_ignored_per_topic", + "IDONTWANT messages that were sent but we received the full message regardless" + ); + let idontwant_bytes = { let metric = Counter::default(); registry.register( @@ -405,6 +423,8 @@ impl Metrics { idontwant_msgs, idontwant_bytes, idontwant_msgs_ids, + idontwant_messages_sent_per_topic, + idontwant_messages_ignored_per_topic, priority_queue_size, non_priority_queue_size, } @@ -608,6 +628,20 @@ impl Metrics { self.idontwant_bytes.inc_by(bytes as u64); } + /// Register receiving an IDONTWANT control message for a given topic. + pub(crate) fn register_idontwant_messages_sent_per_topic(&mut self, topic: &TopicHash) { + self.idontwant_messages_sent_per_topic + .get_or_create(topic) + .inc(); + } + + /// Register receiving a message for an already sent IDONTWANT. + pub(crate) fn register_idontwant_messages_ignored_per_topic(&mut self, topic: &TopicHash) { + self.idontwant_messages_ignored_per_topic + .get_or_create(topic) + .inc(); + } + /// Register receiving an IDONTWANT msg for this topic. pub(crate) fn register_idontwant(&mut self, msgs: usize) { self.idontwant_msgs.inc(); diff --git a/beacon_node/lighthouse_network/gossipsub/src/types.rs b/beacon_node/lighthouse_network/gossipsub/src/types.rs index d14a9293749..f5dac380e32 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/types.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/types.rs @@ -123,8 +123,10 @@ pub(crate) struct PeerConnections { pub(crate) sender: RpcSender, /// Subscribed topics. pub(crate) topics: BTreeSet, - /// Don't send messages. - pub(crate) dont_send: LinkedHashMap, + /// IDONTWANT messages received from the peer. + pub(crate) dont_send_received: LinkedHashMap, + /// IDONTWANT messages we sent to the peer. + pub(crate) dont_send_sent: LinkedHashMap, } /// Describes the types of peers that can exist in the gossipsub context.