From 8c5113709d045d3fc33fd278e41d45ae428f24f5 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Sun, 27 Oct 2024 16:41:11 +0800 Subject: [PATCH] Fix peer view inconsistency in network components (#68) * Fix peer set inconsistency in different components * Refactor `restart_sync()` * Inline `note_stalled_peer()` * Restart sync on removing current syncing peer in chain sync * Fix overflow * Rename remove_peer() to disconnect() * Rename to SyncPeers * Rename SetIdle * Nits * Nit --- .../subcoin-network/src/block_downloader.rs | 2 +- .../src/block_downloader/blocks_first.rs | 10 +- .../src/block_downloader/headers_first.rs | 8 +- crates/subcoin-network/src/peer_manager.rs | 46 +++- crates/subcoin-network/src/sync.rs | 224 ++++++++++-------- crates/subcoin-network/src/worker.rs | 30 ++- crates/subcoin-rpc/src/network.rs | 21 +- 7 files changed, 190 insertions(+), 151 deletions(-) diff --git a/crates/subcoin-network/src/block_downloader.rs b/crates/subcoin-network/src/block_downloader.rs index be19b56c..4087075d 100644 --- a/crates/subcoin-network/src/block_downloader.rs +++ b/crates/subcoin-network/src/block_downloader.rs @@ -184,7 +184,7 @@ impl BlockDownloadManager { _ => 512, }; - let queued_blocks = self.best_queued_number - best_number; + let queued_blocks = self.best_queued_number.saturating_sub(best_number); if queued_blocks > max_queued_blocks { self.queue_status = ImportQueueStatus::Overloaded; diff --git a/crates/subcoin-network/src/block_downloader/blocks_first.rs b/crates/subcoin-network/src/block_downloader/blocks_first.rs index 6e5d9dde..c3da986e 100644 --- a/crates/subcoin-network/src/block_downloader/blocks_first.rs +++ b/crates/subcoin-network/src/block_downloader/blocks_first.rs @@ -101,11 +101,7 @@ where } pub(crate) fn replaceable_sync_peer(&self) -> Option { - if self.downloaded_blocks_count > 0 { - None - } else { - Some(self.peer_id) - } + (self.downloaded_blocks_count == 0).then_some(self.peer_id) } pub(crate) fn replace_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) { @@ -144,7 +140,7 @@ where if best_number == self.target_block_number { self.state = State::Completed; - return SyncAction::SwitchToIdle; + return SyncAction::SetIdle; } if self.download_manager.is_stalled(self.peer_id) { @@ -299,7 +295,7 @@ where "Received block #{block_number},{block_hash} higher than the target block" ); self.state = State::Completed; - SyncAction::SwitchToIdle + SyncAction::SetIdle } else { self.state = State::Disconnecting; SyncAction::Disconnect( diff --git a/crates/subcoin-network/src/block_downloader/headers_first.rs b/crates/subcoin-network/src/block_downloader/headers_first.rs index 0bf287d3..034c2db4 100644 --- a/crates/subcoin-network/src/block_downloader/headers_first.rs +++ b/crates/subcoin-network/src/block_downloader/headers_first.rs @@ -128,8 +128,6 @@ pub struct HeadersFirstDownloader { downloaded_headers: DownloadedHeaders, downloaded_blocks_count: usize, last_locator_start: u32, - // TODO: Now it's solely used for the purpose of displaying the sync state. - // refactor it later. target_block_number: u32, _phantom: PhantomData, } @@ -180,11 +178,7 @@ where } pub(crate) fn replaceable_sync_peer(&self) -> Option { - if self.downloaded_blocks_count > 0 { - None - } else { - Some(self.peer_id) - } + (self.downloaded_blocks_count == 0).then_some(self.peer_id) } pub(crate) fn replace_sync_peer(&mut self, peer_id: PeerId, target_block_number: u32) { diff --git a/crates/subcoin-network/src/peer_manager.rs b/crates/subcoin-network/src/peer_manager.rs index c9f272ab..4e81ffe1 100644 --- a/crates/subcoin-network/src/peer_manager.rs +++ b/crates/subcoin-network/src/peer_manager.rs @@ -315,7 +315,7 @@ where } } - pub(crate) fn on_tick(&mut self) -> Option { + pub(crate) fn on_tick(&mut self) -> (Vec, Option) { let mut timeout_peers = vec![]; let mut should_ping_peers = vec![]; @@ -334,10 +334,6 @@ where self.send_pings(should_ping_peers); } - for peer_id in timeout_peers { - self.disconnect(peer_id, Error::PingTimeout); - } - let outbound_peers_count = self .connected_peers .values() @@ -360,6 +356,13 @@ where .set(self.address_book.available_addresses_count() as u64); } + let maybe_slow_peer = self.manage_outbound_connections(outbound_peers_count); + + (timeout_peers, maybe_slow_peer) + } + + /// Manages outbound connections by initiating new connections or evicting slow peers. + fn manage_outbound_connections(&mut self, outbound_peers_count: usize) -> Option { if outbound_peers_count < self.max_outbound_peers { if let Some(addr) = self.address_book.pop() { if !self.connections.contains_key(&addr) { @@ -442,7 +445,19 @@ where } } - /// Disconnect from a peer with given reason, do nothing if the peer is persistent. + /// Disconnects from a specified peer, unless it is designated as persistent, with a given reason. + /// + /// # Important Notes + /// + /// - **Syncing Components:** This function, as well as [`Self::evict`], should not be invoked + /// directly within the peer manager module without triggering a notification. For example, + /// chain sync might depend on receiving a disconnect notification to correctly update their + /// internal state, which helps maintain a consistent peer set between the peer manager and + /// other modules. + /// + /// - **Potential for Inconsistent State:** Bypassing notifications may lead to inconsistency + /// between the peer manager and modules that rely on peer status, resulting in unexpected + /// issues in the peer set or other connected components. pub(crate) fn disconnect(&mut self, peer_id: PeerId, reason: Error) { if self.config.persistent.contains(&peer_id) { return; @@ -465,6 +480,20 @@ where self.connected_peers.remove(&peer_id); } + /// Evicts a peer, disconnecting it with a specified reason and updating the eviction timestamp. + /// + /// This function internally calls [`Self::disconnect`] to carry out the disconnection + /// process and subsequently records the current time as the `last_eviction` timestamp. + /// + /// # Important Note + /// + /// Just like with `disconnect`, any call to `evict` should be accompanied by necessary + /// notifications to avoid state inconsistencies. + pub(crate) fn evict(&mut self, peer_id: PeerId, reason: Error) { + self.disconnect(peer_id, reason); + self.last_eviction = Instant::now(); + } + /// Sets the prefer addrv2 flag for a peer. pub(crate) fn set_want_addrv2(&mut self, peer_id: PeerId) { self.connected_peers.entry(peer_id).and_modify(|info| { @@ -479,11 +508,6 @@ where }); } - pub(crate) fn evict(&mut self, peer_id: PeerId, reason: Error) { - self.disconnect(peer_id, reason); - self.last_eviction = Instant::now(); - } - /// Checks if a peer is connected. pub(crate) fn is_connected(&self, peer_id: PeerId) -> bool { self.connected_peers.contains_key(&peer_id) diff --git a/crates/subcoin-network/src/sync.rs b/crates/subcoin-network/src/sync.rs index 3e044665..79f1163f 100644 --- a/crates/subcoin-network/src/sync.rs +++ b/crates/subcoin-network/src/sync.rs @@ -105,20 +105,26 @@ pub(crate) enum SyncRequest { pub(crate) enum SyncAction { /// Fetch headers, blocks and data. Request(SyncRequest), - /// Headers-First sync completed, use the Blocks-First sync - /// to download the recent blocks. + /// Transitions to a Blocks-First sync after Headers-First sync + /// compltes, to fetch the most recent blocks. SwitchToBlocksFirstSync, /// Disconnect from the peer for the given reason. Disconnect(PeerId, Error), - /// Make this peer as deprioritized and restart the current syncing - /// process using other sync candidates if there are any. + /// Deprioritize the specified peer, restarting the current sync + /// with other candidates if available. RestartSyncWithStalledPeer(PeerId), - /// Blocks-First sync finished, switch syncing state to idle. - SwitchToIdle, + /// Blocks-First sync finished and sets the syncing state to idle. + SetIdle, /// No action needed. None, } +#[derive(Debug)] +pub(crate) enum RestartReason { + Stalled, + Disconnected, +} + // This enum encapsulates the various strategies and states a node // might be in during the sync process. enum Syncing { @@ -229,80 +235,112 @@ where .collect() } - /// Attempts to restart the sync due to the stalled peer. - /// - /// Returns `true` if the sync is restarted with a new peer. - pub(super) fn restart_sync(&mut self, stalled_peer: PeerId) -> bool { - let our_best = self.client.best_number(); + /// Removes the given peer from peers of chain sync. + pub(super) fn disconnect(&mut self, peer_id: PeerId) { + if let Some(removed_peer) = self.peers.remove(&peer_id) { + // We currently support only one syncing peer, this logic needs to be + // refactored once multiple syncing peers are supported. + if matches!(removed_peer.state, PeerSyncState::DownloadingNew { .. }) { + self.restart_sync(removed_peer.peer_id, RestartReason::Disconnected); + } + } + } - // First, try to find the best available peer for syncing. - let new_available_peer = self - .peers - .values_mut() + /// Attempt to find the best available peer, falling back to a random choice if needed + fn select_next_peer_for_sync( + &mut self, + our_best: u32, + excluded_peer: PeerId, + ) -> Option { + self.peers + .values() .filter(|peer| { - peer.peer_id != stalled_peer + peer.peer_id != excluded_peer && peer.best_number > our_best && peer.state.is_available() }) - .max_by_key(|peer| peer.best_number); - - let new_peer = match new_available_peer { - Some(peer) => peer, - None => { + .max_by_key(|peer| peer.best_number) + .map(|peer| peer.peer_id) + .or_else(|| { let sync_candidates = self .peers - .values_mut() - .filter(|peer| peer.peer_id != stalled_peer && peer.best_number > our_best) + .values() + .filter(|peer| peer.peer_id != excluded_peer && peer.best_number > our_best) .collect::>(); - if sync_candidates.is_empty() { - if let Some(median_seen_block) = self.median_seen() { - let best_seen_block = self.peers.values().map(|p| p.best_number).max(); - - if median_seen_block <= our_best { - // We are synced to the median block seen by our peers, but this may - // not be the network's tip. - // - // Transition to idle unless more blocks are announced. - tracing::debug!( - best_seen_block, - median_seen_block, - our_best, - "Synced to the majority of peers, no new blocks to sync" - ); - self.syncing = Syncing::Idle; - return false; - } - } + // Pick a random peer, even if it's marked as deprioritized. + self.rng.choice(sync_candidates).map(|peer| peer.peer_id) + }) + } - // No new sync candidate, keep it as is. - // TODO: handle this properly. - tracing::debug!(?stalled_peer, "⚠️ Sync stalled, but no new sync candidates"); + /// Attempts to restart the sync based on the reason provided. + /// + /// Returns `true` if the sync is restarted with a new peer. + pub(super) fn restart_sync(&mut self, prior_peer_id: PeerId, reason: RestartReason) { + let our_best = self.client.best_number(); - return false; + let Some(new_peer_id) = self.select_next_peer_for_sync(our_best, prior_peer_id) else { + if let Some(median_seen_block) = self.median_seen() { + if median_seen_block <= our_best { + let best_seen_block = self.peers.values().map(|p| p.best_number).max(); + + // We are synced to the median block seen by our peers, but this may + // not be the network's tip. + // + // Transition to idle unless more blocks are announced. + tracing::debug!( + best_seen_block, + median_seen_block, + our_best, + "Synced to the majority of peers, switching to Idle" + ); + self.update_syncing_state(Syncing::Idle); + return; } - - // Pick a random peer, even if it's marked as deprioritized. - self.rng - .choice(sync_candidates) - .expect("Sync candidates must be non-empty as checked; qed") } + + // No new sync candidate, keep it as is. + // TODO: handle this properly. + tracing::debug!( + ?prior_peer_id, + "⚠️ Attempting to restart sync, but no new sync candidate available" + ); + + return; }; - tracing::debug!(?stalled_peer, ?new_peer, "🔄 Sync stalled, restarting"); + { + let Some(new_peer) = self.peers.get_mut(&new_peer_id) else { + tracing::error!("Corrupted state, next peer {new_peer_id} missing from peer list"); + return; + }; - new_peer.state = PeerSyncState::DownloadingNew { start: our_best }; + tracing::debug!(?reason, ?prior_peer_id, ?new_peer, "🔄 Sync restarted"); + new_peer.state = PeerSyncState::DownloadingNew { start: our_best }; - match &mut self.syncing { - Syncing::BlocksFirst(downloader) => { - downloader.restart(new_peer.peer_id, new_peer.best_number); - true + match &mut self.syncing { + Syncing::BlocksFirst(downloader) => { + downloader.restart(new_peer.peer_id, new_peer.best_number); + } + Syncing::HeadersFirst(downloader) => { + downloader.restart(new_peer.peer_id, new_peer.best_number); + } + Syncing::Idle => {} } - Syncing::HeadersFirst(downloader) => { - downloader.restart(new_peer.peer_id, new_peer.best_number); - true + } + + match reason { + RestartReason::Stalled => { + self.peers.entry(prior_peer_id).and_modify(|p| { + let current_stalled_count = p.state.stalled_count(); + p.state = PeerSyncState::Deprioritized { + stalled_count: current_stalled_count + 1, + }; + }); + } + RestartReason::Disconnected => { + // Nothing to be done, peer is already removed from the peer list. } - Syncing::Idle => false, } } @@ -323,20 +361,6 @@ where } } - pub(super) fn note_peer_stalled(&mut self, stalled_peer: PeerId) { - self.peers.entry(stalled_peer).and_modify(|p| { - let current_stalled_count = p.state.stalled_count(); - p.state = PeerSyncState::Deprioritized { - stalled_count: current_stalled_count + 1, - }; - }); - } - - pub(super) fn remove_peer(&mut self, peer_id: PeerId) { - // TODO: handle the situation that the peer is being involved in the downloader. - self.peers.remove(&peer_id); - } - pub(super) fn update_peer_latency(&mut self, peer_id: PeerId, avg_latency: Latency) { self.peers.entry(peer_id).and_modify(|peer| { peer.latency = avg_latency; @@ -346,9 +370,9 @@ where pub(super) fn update_sync_peer_on_lower_latency(&mut self) { let maybe_sync_peer_id = match &self.syncing { + Syncing::Idle => return, Syncing::BlocksFirst(downloader) => downloader.replaceable_sync_peer(), Syncing::HeadersFirst(downloader) => downloader.replaceable_sync_peer(), - Syncing::Idle => return, }; let Some(current_sync_peer_id) = maybe_sync_peer_id else { @@ -414,6 +438,9 @@ where self.peers.entry(current_sync_peer_id).and_modify(|peer| { peer.state = PeerSyncState::Available; }); + self.peers.entry(peer_id).and_modify(|peer| { + peer.state = PeerSyncState::DownloadingNew { start: our_best }; + }); } } } @@ -550,13 +577,12 @@ where } fn update_syncing_state(&mut self, new: Syncing) { - let is_major_syncing = new.is_major_syncing(); self.syncing = new; self.is_major_syncing - .store(is_major_syncing, Ordering::Relaxed); + .store(self.syncing.is_major_syncing(), Ordering::Relaxed); } - pub(super) fn switch_to_idle(&mut self) { + pub(super) fn set_idle(&mut self) { tracing::debug!( best_number = self.client.best_number(), "Blocks-First sync completed, switching to Syncing::Idle" @@ -605,7 +631,7 @@ where if inventories.len() == 1 { if let Inventory::Block(block_hash) = inventories[0] { if !self.inflight_announced_blocks.contains(&block_hash) { - // A new block maybe broadcasted via `inv` message. + // A new block is broadcasted via `inv` message. tracing::trace!( "Requesting a new block {block_hash} announced from {from:?}" ); @@ -620,22 +646,6 @@ where } } - pub(super) fn on_block(&mut self, block: BitcoinBlock, from: PeerId) -> SyncAction { - if self.inflight_announced_blocks.remove(&block.block_hash()) { - self.import_queue.import_blocks(ImportBlocks { - origin: BlockOrigin::NetworkBroadcast, - blocks: vec![block], - }); - return SyncAction::None; - } - - match &mut self.syncing { - Syncing::Idle => SyncAction::None, - Syncing::BlocksFirst(downloader) => downloader.on_block(block, from), - Syncing::HeadersFirst(downloader) => downloader.on_block(block, from), - } - } - fn announced_blocks_request( &mut self, block_hashes: impl IntoIterator, @@ -655,6 +665,22 @@ where SyncAction::Request(data_request) } + pub(super) fn on_block(&mut self, block: BitcoinBlock, from: PeerId) -> SyncAction { + match &mut self.syncing { + Syncing::Idle => { + if self.inflight_announced_blocks.remove(&block.block_hash()) { + self.import_queue.import_blocks(ImportBlocks { + origin: BlockOrigin::NetworkBroadcast, + blocks: vec![block], + }); + } + SyncAction::None + } + Syncing::BlocksFirst(downloader) => downloader.on_block(block, from), + Syncing::HeadersFirst(downloader) => downloader.on_block(block, from), + } + } + pub(super) fn on_headers(&mut self, headers: Vec, from: PeerId) -> SyncAction { match &mut self.syncing { Syncing::HeadersFirst(downloader) => downloader.on_headers(headers, from), @@ -680,10 +706,8 @@ where } } - return self.announced_blocks_request( - headers.into_iter().map(|header| header.block_hash()), - from, - ); + let new_blocks = headers.into_iter().map(|header| header.block_hash()); + return self.announced_blocks_request(new_blocks, from); } tracing::debug!( diff --git a/crates/subcoin-network/src/worker.rs b/crates/subcoin-network/src/worker.rs index 20c43d08..723f3e23 100644 --- a/crates/subcoin-network/src/worker.rs +++ b/crates/subcoin-network/src/worker.rs @@ -5,7 +5,7 @@ use crate::network::{ }; use crate::peer_manager::{Config, NewPeer, PeerManager, SlowPeer, PEER_LATENCY_THRESHOLD}; use crate::peer_store::PeerStore; -use crate::sync::{ChainSync, LocatorRequest, SyncAction, SyncRequest}; +use crate::sync::{ChainSync, LocatorRequest, RestartReason, SyncAction, SyncRequest}; use crate::transaction_manager::TransactionManager; use crate::{Bandwidth, Error, PeerId, SyncStrategy}; use bitcoin::p2p::message::{NetworkMessage, MAX_INV_SIZE}; @@ -182,7 +182,7 @@ where } Event::Disconnect { peer_addr, reason } => { self.peer_manager.disconnect(peer_addr, reason); - self.chain_sync.remove_peer(peer_addr); + self.chain_sync.disconnect(peer_addr); self.peer_store.remove_peer(peer_addr); } Event::PeerMessage { @@ -217,13 +217,18 @@ where for peer in self.chain_sync.unreliable_peers() { self.peer_manager.disconnect(peer, Error::UnreliablePeer); - self.chain_sync.remove_peer(peer); + self.chain_sync.disconnect(peer); self.peer_store.remove_peer(peer); } - if let Some(SlowPeer { peer_id, latency }) = self.peer_manager.on_tick() { + let (timeout_peers, maybe_slow_peer) = self.peer_manager.on_tick(); + timeout_peers.into_iter().for_each(|peer_id| { + self.peer_manager.disconnect(peer_id, Error::PingTimeout); + self.chain_sync.disconnect(peer_id); + }); + if let Some(SlowPeer { peer_id, latency }) = maybe_slow_peer { self.peer_manager.evict(peer_id, Error::SlowPeer(latency)); - self.chain_sync.remove_peer(peer_id); + self.chain_sync.disconnect(peer_id); } let connected_peers = self.peer_manager.connected_peers(); @@ -347,7 +352,7 @@ where if avg_latency > PEER_LATENCY_THRESHOLD { self.peer_manager .disconnect(from, Error::PingLatencyTooHigh(avg_latency)); - self.chain_sync.remove_peer(from); + self.chain_sync.disconnect(from); self.peer_store.remove_peer(from); } else { if self.chain_sync.peers.contains_key(&from) { @@ -367,7 +372,7 @@ where } Err(err) => { self.peer_manager.disconnect(from, err); - self.chain_sync.remove_peer(from); + self.chain_sync.disconnect(from); self.peer_store.remove_peer(from); } } @@ -450,17 +455,16 @@ where self.send_get_blocks_request(request); } } - SyncAction::SwitchToIdle => { - self.chain_sync.switch_to_idle(); + SyncAction::SetIdle => { + self.chain_sync.set_idle(); } SyncAction::RestartSyncWithStalledPeer(stalled_peer_id) => { - if self.chain_sync.restart_sync(stalled_peer_id) { - self.chain_sync.note_peer_stalled(stalled_peer_id); - } + self.chain_sync + .restart_sync(stalled_peer_id, RestartReason::Stalled); } SyncAction::Disconnect(peer_id, reason) => { self.peer_manager.disconnect(peer_id, reason); - self.chain_sync.remove_peer(peer_id); + self.chain_sync.disconnect(peer_id); } SyncAction::None => {} } diff --git a/crates/subcoin-rpc/src/network.rs b/crates/subcoin-rpc/src/network.rs index 3f80a63b..a16d2f26 100644 --- a/crates/subcoin-rpc/src/network.rs +++ b/crates/subcoin-rpc/src/network.rs @@ -17,11 +17,12 @@ pub enum SyncState { DownloadingNew, } +/// Overview of peers in chain sync. #[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct NetworkPeers { +pub struct SyncPeers { /// A map containing the count of peers in each sync state (e.g., syncing, idle). - sync_state_counts: BTreeMap, + peer_counts: BTreeMap, /// The highest block height known across all peers in the network. best_known_block: Option, /// Detailed synchronization information for each peer. @@ -31,8 +32,8 @@ pub struct NetworkPeers { #[rpc(client, server)] pub trait NetworkApi { /// Get the sync peers. - #[method(name = "network_peers")] - async fn network_peers(&self) -> Result; + #[method(name = "network_syncPeers")] + async fn network_sync_peers(&self) -> Result; /// Get overall network status. #[method(name = "network_status")] @@ -68,7 +69,7 @@ where Block: BlockT + 'static, Client: HeaderBackend + BlockBackend + AuxStore + 'static, { - async fn network_peers(&self) -> Result { + async fn network_sync_peers(&self) -> Result { let mut sync_peers = self.network_handle.sync_peers().await; let mut available = 0; @@ -91,17 +92,13 @@ where sync_peers.sort_by_key(|x| x.latency); - Ok(NetworkPeers { - sync_state_counts: BTreeMap::from([ + Ok(SyncPeers { + peer_counts: BTreeMap::from([ (SyncState::Available, available), (SyncState::Deprioritized, deprioritized), (SyncState::DownloadingNew, downloading_new), ]), - best_known_block: if best_known_block > 0 { - Some(best_known_block) - } else { - None - }, + best_known_block: (best_known_block > 0).then_some(best_known_block), peer_sync_details: sync_peers, }) }