Skip to content

Commit

Permalink
Fix peer view inconsistency in network components (#68)
Browse files Browse the repository at this point in the history
* Fix peer set inconsistency in different components

* Refactor `restart_sync()`

* Inline `note_stalled_peer()`

* Restart sync on removing current syncing peer in chain sync

* Fix overflow

* Rename remove_peer() to disconnect()

* Rename to SyncPeers

* Rename SetIdle

* Nits

* Nit
  • Loading branch information
liuchengxu authored Oct 27, 2024
1 parent aa2dfc4 commit 8c51137
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 151 deletions.
2 changes: 1 addition & 1 deletion crates/subcoin-network/src/block_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl BlockDownloadManager {
_ => 512,
};

let queued_blocks = self.best_queued_number - best_number;
let queued_blocks = self.best_queued_number.saturating_sub(best_number);

if queued_blocks > max_queued_blocks {
self.queue_status = ImportQueueStatus::Overloaded;
Expand Down
10 changes: 3 additions & 7 deletions crates/subcoin-network/src/block_downloader/blocks_first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,7 @@ where
}

pub(crate) fn replaceable_sync_peer(&self) -> Option<PeerId> {
if self.downloaded_blocks_count > 0 {
None
} else {
Some(self.peer_id)
}
(self.downloaded_blocks_count == 0).then_some(self.peer_id)
}

pub(crate) fn replace_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) {
Expand Down Expand Up @@ -144,7 +140,7 @@ where

if best_number == self.target_block_number {
self.state = State::Completed;
return SyncAction::SwitchToIdle;
return SyncAction::SetIdle;
}

if self.download_manager.is_stalled(self.peer_id) {
Expand Down Expand Up @@ -299,7 +295,7 @@ where
"Received block #{block_number},{block_hash} higher than the target block"
);
self.state = State::Completed;
SyncAction::SwitchToIdle
SyncAction::SetIdle
} else {
self.state = State::Disconnecting;
SyncAction::Disconnect(
Expand Down
8 changes: 1 addition & 7 deletions crates/subcoin-network/src/block_downloader/headers_first.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ pub struct HeadersFirstDownloader<Block, Client> {
downloaded_headers: DownloadedHeaders,
downloaded_blocks_count: usize,
last_locator_start: u32,
// TODO: Now it's solely used for the purpose of displaying the sync state.
// refactor it later.
target_block_number: u32,
_phantom: PhantomData<Block>,
}
Expand Down Expand Up @@ -180,11 +178,7 @@ where
}

pub(crate) fn replaceable_sync_peer(&self) -> Option<PeerId> {
if self.downloaded_blocks_count > 0 {
None
} else {
Some(self.peer_id)
}
(self.downloaded_blocks_count == 0).then_some(self.peer_id)
}

pub(crate) fn replace_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) {
Expand Down
46 changes: 35 additions & 11 deletions crates/subcoin-network/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ where
}
}

pub(crate) fn on_tick(&mut self) -> Option<SlowPeer> {
pub(crate) fn on_tick(&mut self) -> (Vec<PeerId>, Option<SlowPeer>) {
let mut timeout_peers = vec![];
let mut should_ping_peers = vec![];

Expand All @@ -334,10 +334,6 @@ where
self.send_pings(should_ping_peers);
}

for peer_id in timeout_peers {
self.disconnect(peer_id, Error::PingTimeout);
}

let outbound_peers_count = self
.connected_peers
.values()
Expand All @@ -360,6 +356,13 @@ where
.set(self.address_book.available_addresses_count() as u64);
}

let maybe_slow_peer = self.manage_outbound_connections(outbound_peers_count);

(timeout_peers, maybe_slow_peer)
}

/// Manages outbound connections by initiating new connections or evicting slow peers.
fn manage_outbound_connections(&mut self, outbound_peers_count: usize) -> Option<SlowPeer> {
if outbound_peers_count < self.max_outbound_peers {
if let Some(addr) = self.address_book.pop() {
if !self.connections.contains_key(&addr) {
Expand Down Expand Up @@ -442,7 +445,19 @@ where
}
}

/// Disconnect from a peer with given reason, do nothing if the peer is persistent.
/// Disconnects from a specified peer, unless it is designated as persistent, with a given reason.
///
/// # Important Notes
///
/// - **Syncing Components:** This function, as well as [`Self::evict`], should not be invoked
/// directly within the peer manager module without triggering a notification. For example,
/// chain sync might depend on receiving a disconnect notification to correctly update their
/// internal state, which helps maintain a consistent peer set between the peer manager and
/// other modules.
///
/// - **Potential for Inconsistent State:** Bypassing notifications may lead to inconsistency
/// between the peer manager and modules that rely on peer status, resulting in unexpected
/// issues in the peer set or other connected components.
pub(crate) fn disconnect(&mut self, peer_id: PeerId, reason: Error) {
if self.config.persistent.contains(&peer_id) {
return;
Expand All @@ -465,6 +480,20 @@ where
self.connected_peers.remove(&peer_id);
}

/// Evicts a peer, disconnecting it with a specified reason and updating the eviction timestamp.
///
/// This function internally calls [`Self::disconnect`] to carry out the disconnection
/// process and subsequently records the current time as the `last_eviction` timestamp.
///
/// # Important Note
///
/// Just like with `disconnect`, any call to `evict` should be accompanied by necessary
/// notifications to avoid state inconsistencies.
pub(crate) fn evict(&mut self, peer_id: PeerId, reason: Error) {
self.disconnect(peer_id, reason);
self.last_eviction = Instant::now();
}

/// Sets the prefer addrv2 flag for a peer.
pub(crate) fn set_want_addrv2(&mut self, peer_id: PeerId) {
self.connected_peers.entry(peer_id).and_modify(|info| {
Expand All @@ -479,11 +508,6 @@ where
});
}

pub(crate) fn evict(&mut self, peer_id: PeerId, reason: Error) {
self.disconnect(peer_id, reason);
self.last_eviction = Instant::now();
}

/// Checks if a peer is connected.
pub(crate) fn is_connected(&self, peer_id: PeerId) -> bool {
self.connected_peers.contains_key(&peer_id)
Expand Down
Loading

0 comments on commit 8c51137

Please sign in to comment.