Skip to content

Commit

Permalink
sync: Send already connected peers to new subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin committed Dec 27, 2024
1 parent b7afe48 commit 3cc7942
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 22 deletions.
10 changes: 6 additions & 4 deletions substrate/client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,12 @@ impl<B: BlockT> Future for GossipEngine<B> {

match sync_event_stream {
Poll::Ready(Some(event)) => match event {
SyncEvent::PeerConnected(remote) =>
this.network.add_set_reserved(remote, this.protocol.clone()),
SyncEvent::PeerDisconnected(remote) =>
this.network.remove_set_reserved(remote, this.protocol.clone()),
SyncEvent::InitialPeers(peer_ids) =>
this.network.add_set_reserved(peer_ids, this.protocol.clone()),
SyncEvent::PeerConnected(peer_id) =>
this.network.add_set_reserved(vec![peer_id], this.protocol.clone()),
SyncEvent::PeerDisconnected(peer_id) =>
this.network.remove_set_reserved(peer_id, this.protocol.clone()),
},
// The sync event stream closed. Do the same for [`GossipValidator`].
Poll::Ready(None) => {
Expand Down
13 changes: 8 additions & 5 deletions substrate/client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,18 @@ mod validator;

/// Abstraction over a network.
pub trait Network<B: BlockT>: NetworkPeers + NetworkEventStream {
fn add_set_reserved(&self, who: PeerId, protocol: ProtocolName) {
let addr = Multiaddr::empty().with(Protocol::P2p(*who.as_ref()));
let result = self.add_peers_to_reserved_set(protocol, iter::once(addr).collect());
fn add_set_reserved(&self, peer_ids: Vec<PeerId>, protocol: ProtocolName) {
let addrs = peer_ids
.into_iter()
.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
.collect();
let result = self.add_peers_to_reserved_set(protocol, addrs);
if let Err(err) = result {
log::error!(target: "gossip", "add_set_reserved failed: {}", err);
}
}
fn remove_set_reserved(&self, who: PeerId, protocol: ProtocolName) {
let result = self.remove_peers_from_reserved_set(protocol, iter::once(who).collect());
fn remove_set_reserved(&self, peer_id: PeerId, protocol: ProtocolName) {
let result = self.remove_peers_from_reserved_set(protocol, iter::once(peer_id).collect());
if let Err(err) = result {
log::error!(target: "gossip", "remove_set_reserved failed: {}", err);
}
Expand Down
23 changes: 17 additions & 6 deletions substrate/client/network/statement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt}
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::{
config::{NonReservedPeerMode, SetConfig},
error, multiaddr,
error,
multiaddr::{Multiaddr, Protocol},
peer_store::PeerStoreProvider,
service::{
traits::{NotificationEvent, NotificationService, ValidationResult},
Expand Down Expand Up @@ -296,9 +297,19 @@ where

fn handle_sync_event(&mut self, event: SyncEvent) {
match event {
SyncEvent::PeerConnected(remote) => {
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
.collect::<multiaddr::Multiaddr>();
SyncEvent::InitialPeers(peer_ids) => {
let addrs = peer_ids
.into_iter()
.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
.collect();
let result =
self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err);
}
},
SyncEvent::PeerConnected(peer_id) => {
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
let result = self.network.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
Expand All @@ -307,10 +318,10 @@ where
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
}
},
SyncEvent::PeerDisconnected(remote) => {
SyncEvent::PeerDisconnected(peer_id) => {
let result = self.network.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(remote).collect(),
iter::once(peer_id).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
Expand Down
6 changes: 5 additions & 1 deletion substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,11 @@ where
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
self.strategy.set_sync_fork_request(peers, &hash, number);
},
ToServiceCommand::EventStream(tx) => self.event_streams.push(tx),
ToServiceCommand::EventStream(tx) => {
let _ = tx
.unbounded_send(SyncEvent::InitialPeers(self.peers.keys().cloned().collect()));
self.event_streams.push(tx);
},
ToServiceCommand::RequestJustification(hash, number) =>
self.strategy.request_justification(&hash, number),
ToServiceCommand::ClearJustificationRequests =>
Expand Down
4 changes: 4 additions & 0 deletions substrate/client/network/sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ where

/// Syncing-related events that other protocols can subscribe to.
pub enum SyncEvent {
/// All connected peers that the syncing implementation is tracking.
/// Always sent as the first message to the stream.
InitialPeers(Vec<PeerId>),

/// Peer that the syncing implementation is tracking connected.
PeerConnected(PeerId),

Expand Down
22 changes: 16 additions & 6 deletions substrate/client/network/transactions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use log::{debug, trace, warn};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::{
config::{NonReservedPeerMode, ProtocolId, SetConfig},
error, multiaddr,
error, multiaddr::{Multiaddr, Protocol},
peer_store::PeerStoreProvider,
service::{
traits::{NotificationEvent, NotificationService, ValidationResult},
Expand Down Expand Up @@ -377,9 +377,19 @@ where

fn handle_sync_event(&mut self, event: SyncEvent) {
match event {
SyncEvent::PeerConnected(remote) => {
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
.collect::<multiaddr::Multiaddr>();
SyncEvent::InitialPeers(peer_ids) => {
let addrs = peer_ids
.into_iter()
.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
.collect();
let result =
self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err);
}
},
SyncEvent::PeerConnected(peer_id) => {
let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
let result = self.network.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
Expand All @@ -388,10 +398,10 @@ where
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
}
},
SyncEvent::PeerDisconnected(remote) => {
SyncEvent::PeerDisconnected(peer_id) => {
let result = self.network.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(remote).collect(),
iter::once(peer_id).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);
Expand Down

0 comments on commit 3cc7942

Please sign in to comment.