From b4177a9fe173ae592ccf581d290e869aff1cafc4 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 30 Dec 2024 16:28:15 +0200 Subject: [PATCH] sync: Send already connected peers to new subscribers (#7011) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce `SyncEvent::InitialPeers` message sent to new subscribers to allow them correctly tracking sync peers. This resolves a race condition described in https://github.com/paritytech/polkadot-sdk/issues/6573#issuecomment-2563091343. Fixes https://github.com/paritytech/polkadot-sdk/issues/6573. --------- Co-authored-by: command-bot <> Co-authored-by: Bastian Köcher --- prdoc/pr_7011.prdoc | 16 +++++++++++++ substrate/client/network-gossip/src/bridge.rs | 10 ++++---- substrate/client/network-gossip/src/lib.rs | 13 +++++++---- substrate/client/network/statement/src/lib.rs | 23 ++++++++++++++----- substrate/client/network/sync/src/engine.rs | 6 ++++- substrate/client/network/sync/src/types.rs | 4 ++++ .../client/network/transactions/src/lib.rs | 23 ++++++++++++++----- 7 files changed, 73 insertions(+), 22 deletions(-) create mode 100644 prdoc/pr_7011.prdoc diff --git a/prdoc/pr_7011.prdoc b/prdoc/pr_7011.prdoc new file mode 100644 index 000000000000..55fe0c73ca09 --- /dev/null +++ b/prdoc/pr_7011.prdoc @@ -0,0 +1,16 @@ +title: 'sync: Send already connected peers to new subscribers' +doc: +- audience: Node Dev + description: |- + Introduce `SyncEvent::InitialPeers` message sent to new subscribers to allow them correctly tracking sync peers. This resolves a race condition described in https://github.com/paritytech/polkadot-sdk/issues/6573#issuecomment-2563091343. + + Fixes https://github.com/paritytech/polkadot-sdk/issues/6573. +crates: +- name: sc-network-gossip + bump: major +- name: sc-network-statement + bump: patch +- name: sc-network-sync + bump: major +- name: sc-network-transactions + bump: patch diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index 2daf1e49ee4b..bff258a9a011 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -254,10 +254,12 @@ impl Future for GossipEngine { match sync_event_stream { Poll::Ready(Some(event)) => match event { - SyncEvent::PeerConnected(remote) => - this.network.add_set_reserved(remote, this.protocol.clone()), - SyncEvent::PeerDisconnected(remote) => - this.network.remove_set_reserved(remote, this.protocol.clone()), + SyncEvent::InitialPeers(peer_ids) => + this.network.add_set_reserved(peer_ids, this.protocol.clone()), + SyncEvent::PeerConnected(peer_id) => + this.network.add_set_reserved(vec![peer_id], this.protocol.clone()), + SyncEvent::PeerDisconnected(peer_id) => + this.network.remove_set_reserved(peer_id, this.protocol.clone()), }, // The sync event stream closed. Do the same for [`GossipValidator`]. Poll::Ready(None) => { diff --git a/substrate/client/network-gossip/src/lib.rs b/substrate/client/network-gossip/src/lib.rs index 20d9922200c2..2ec573bf9e3e 100644 --- a/substrate/client/network-gossip/src/lib.rs +++ b/substrate/client/network-gossip/src/lib.rs @@ -82,15 +82,18 @@ mod validator; /// Abstraction over a network. pub trait Network: NetworkPeers + NetworkEventStream { - fn add_set_reserved(&self, who: PeerId, protocol: ProtocolName) { - let addr = Multiaddr::empty().with(Protocol::P2p(*who.as_ref())); - let result = self.add_peers_to_reserved_set(protocol, iter::once(addr).collect()); + fn add_set_reserved(&self, peer_ids: Vec, protocol: ProtocolName) { + let addrs = peer_ids + .into_iter() + .map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into()))) + .collect(); + let result = self.add_peers_to_reserved_set(protocol, addrs); if let Err(err) = result { log::error!(target: "gossip", "add_set_reserved failed: {}", err); } } - fn remove_set_reserved(&self, who: PeerId, protocol: ProtocolName) { - let result = self.remove_peers_from_reserved_set(protocol, iter::once(who).collect()); + fn remove_set_reserved(&self, peer_id: PeerId, protocol: ProtocolName) { + let result = self.remove_peers_from_reserved_set(protocol, iter::once(peer_id).collect()); if let Err(err) = result { log::error!(target: "gossip", "remove_set_reserved failed: {}", err); } diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index df93788696e3..586a15cadd68 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -33,7 +33,8 @@ use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt} use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sc_network::{ config::{NonReservedPeerMode, SetConfig}, - error, multiaddr, + error, + multiaddr::{Multiaddr, Protocol}, peer_store::PeerStoreProvider, service::{ traits::{NotificationEvent, NotificationService, ValidationResult}, @@ -296,9 +297,19 @@ where fn handle_sync_event(&mut self, event: SyncEvent) { match event { - SyncEvent::PeerConnected(remote) => { - let addr = iter::once(multiaddr::Protocol::P2p(remote.into())) - .collect::(); + SyncEvent::InitialPeers(peer_ids) => { + let addrs = peer_ids + .into_iter() + .map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into()))) + .collect(); + let result = + self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs); + if let Err(err) = result { + log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err); + } + }, + SyncEvent::PeerConnected(peer_id) => { + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); let result = self.network.add_peers_to_reserved_set( self.protocol_name.clone(), iter::once(addr).collect(), @@ -307,10 +318,10 @@ where log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err); } }, - SyncEvent::PeerDisconnected(remote) => { + SyncEvent::PeerDisconnected(peer_id) => { let result = self.network.remove_peers_from_reserved_set( self.protocol_name.clone(), - iter::once(remote).collect(), + iter::once(peer_id).collect(), ); if let Err(err) = result { log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}"); diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 0c39ea0b93c0..4003361525e1 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -656,7 +656,11 @@ where ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { self.strategy.set_sync_fork_request(peers, &hash, number); }, - ToServiceCommand::EventStream(tx) => self.event_streams.push(tx), + ToServiceCommand::EventStream(tx) => { + let _ = tx + .unbounded_send(SyncEvent::InitialPeers(self.peers.keys().cloned().collect())); + self.event_streams.push(tx); + }, ToServiceCommand::RequestJustification(hash, number) => self.strategy.request_justification(&hash, number), ToServiceCommand::ClearJustificationRequests => diff --git a/substrate/client/network/sync/src/types.rs b/substrate/client/network/sync/src/types.rs index 5745a34378df..a72a2f7c1ffe 100644 --- a/substrate/client/network/sync/src/types.rs +++ b/substrate/client/network/sync/src/types.rs @@ -127,6 +127,10 @@ where /// Syncing-related events that other protocols can subscribe to. pub enum SyncEvent { + /// All connected peers that the syncing implementation is tracking. + /// Always sent as the first message to the stream. + InitialPeers(Vec), + /// Peer that the syncing implementation is tracking connected. PeerConnected(PeerId), diff --git a/substrate/client/network/transactions/src/lib.rs b/substrate/client/network/transactions/src/lib.rs index 44fa702ef6d4..49f429a04ee2 100644 --- a/substrate/client/network/transactions/src/lib.rs +++ b/substrate/client/network/transactions/src/lib.rs @@ -35,7 +35,8 @@ use log::{debug, trace, warn}; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sc_network::{ config::{NonReservedPeerMode, ProtocolId, SetConfig}, - error, multiaddr, + error, + multiaddr::{Multiaddr, Protocol}, peer_store::PeerStoreProvider, service::{ traits::{NotificationEvent, NotificationService, ValidationResult}, @@ -377,9 +378,19 @@ where fn handle_sync_event(&mut self, event: SyncEvent) { match event { - SyncEvent::PeerConnected(remote) => { - let addr = iter::once(multiaddr::Protocol::P2p(remote.into())) - .collect::(); + SyncEvent::InitialPeers(peer_ids) => { + let addrs = peer_ids + .into_iter() + .map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into()))) + .collect(); + let result = + self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs); + if let Err(err) = result { + log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err); + } + }, + SyncEvent::PeerConnected(peer_id) => { + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); let result = self.network.add_peers_to_reserved_set( self.protocol_name.clone(), iter::once(addr).collect(), @@ -388,10 +399,10 @@ where log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err); } }, - SyncEvent::PeerDisconnected(remote) => { + SyncEvent::PeerDisconnected(peer_id) => { let result = self.network.remove_peers_from_reserved_set( self.protocol_name.clone(), - iter::once(remote).collect(), + iter::once(peer_id).collect(), ); if let Err(err) = result { log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);