From a8b065bfa4cf075644c37d69947d74e7f1f2d770 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 10:42:18 +1000 Subject: [PATCH 01/26] Refactor return type of poll_discover() --- zebra-network/src/peer_set/set.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 2fa546c9883..4ef47f83d91 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -509,17 +509,25 @@ where self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr) } - /// Checks for newly inserted or removed services. + /// Processes the entire list of newly inserted or removed services. /// /// Puts inserted services in the unready list. /// Drops removed services, after cancelling any pending requests. - fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { - use futures::ready; + fn poll_discover(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> { loop { - match ready!(Pin::new(&mut self.discover).poll_discover(cx)) + // If the changes are finished, return. + let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else { + // We've finished processing the entire list. + return Ok(()); + }; + + // If the change channel has a permanent error, return that error. + let change = discovered .ok_or("discovery stream closed")? - .map_err(Into::into)? - { + .map_err(Into::into)?; + + // Process each change. + match change { Change::Remove(key) => { trace!(?key, "got Change::Remove from Discover"); self.remove(&key); @@ -998,7 +1006,7 @@ where self.poll_background_errors(cx)?; // Update peer statuses - let _ = self.poll_discover(cx)?; + self.poll_discover(cx)?; self.disconnect_from_outdated_peers(); self.inventory_registry.poll_inventory(cx)?; self.poll_unready(cx); From 728efadc918ed99ea198151858f011693e077d42 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 12:14:43 +1000 Subject: [PATCH 02/26] Simplify poll_ready() by removing preselected peers --- zebra-network/src/peer/error.rs | 5 + zebra-network/src/peer_set/set.rs | 163 +++++++++++------------------- 2 files changed, 64 insertions(+), 104 deletions(-) diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 6263fb56119..c40c34b1d1d 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -82,6 +82,10 @@ pub enum PeerError { #[error("Internal services over capacity")] Overloaded, + /// There are no ready remote peers. + #[error("No ready peers available")] + NoReadyPeers, + /// This peer request's caused an internal service timeout, so the connection was dropped /// to shed load or prevent attacks. #[error("Internal services timed out")] @@ -147,6 +151,7 @@ impl PeerError { PeerError::Serialization(inner) => format!("Serialization({inner})").into(), PeerError::DuplicateHandshake => "DuplicateHandshake".into(), PeerError::Overloaded => "Overloaded".into(), + PeerError::NoReadyPeers => "NoReadyPeers".into(), PeerError::InboundTimeout => "InboundTimeout".into(), PeerError::ServiceShutdown => "ServiceShutdown".into(), PeerError::NotFoundResponse(_) => "NotFoundResponse".into(), diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 4ef47f83d91..124951f1ebe 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -191,17 +191,6 @@ where // Request Routing // - /// A preselected ready service. - /// - /// # Correctness - /// - /// If this is `Some(addr)`, `addr` must be a key for a peer in `ready_services`. - /// If that peer is removed from `ready_services`, we must set the preselected peer to `None`. - /// - /// This is handled by [`PeerSet::take_ready_service`] and - /// [`PeerSet::disconnect_from_outdated_peers`]. - preselected_p2c_peer: Option, - /// Stores gossiped inventory hashes from connected peers. /// /// Used to route inventory requests to peers that are likely to have it. @@ -310,7 +299,6 @@ where // Ready peers ready_services: HashMap::new(), // Request Routing - preselected_p2c_peer: None, inventory_registry: InventoryRegistry::new(inv_stream), // Busy peers @@ -417,7 +405,6 @@ where /// - channels by closing the channel fn shut_down_tasks_and_channels(&mut self) { // Drop services and cancel their background tasks. - self.preselected_p2c_peer = None; self.ready_services = HashMap::new(); for (_peer_key, handle) in self.cancel_handles.drain() { @@ -565,27 +552,15 @@ where /// Checks if the minimum peer version has changed, and disconnects from outdated peers. fn disconnect_from_outdated_peers(&mut self) { if let Some(minimum_version) = self.minimum_peer_version.changed() { - self.ready_services.retain(|address, peer| { - if peer.remote_version() >= minimum_version { - true - } else { - if self.preselected_p2c_peer == Some(*address) { - self.preselected_p2c_peer = None; - } - - false - } - }); + // It is ok to drop ready services, they don't need anything cancelled. + self.ready_services + .retain(|_address, peer| peer.remote_version() >= minimum_version); } } - /// Takes a ready service by key, invalidating `preselected_p2c_peer` if needed. + /// Takes a ready service by key. fn take_ready_service(&mut self, key: &D::Key) -> Option { if let Some(svc) = self.ready_services.remove(key) { - if Some(*key) == self.preselected_p2c_peer { - self.preselected_p2c_peer = None; - } - assert!( !self.cancel_handles.contains_key(key), "cancel handles are only used for unready service work" @@ -639,7 +614,7 @@ where } /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service. - fn preselect_p2c_peer(&self) -> Option { + fn select_ready_p2c_peer(&self) -> Option { self.select_p2c_peer_from_list(&self.ready_services.keys().copied().collect()) } @@ -655,8 +630,7 @@ where .expect("just checked there is one service"), ), len => { - // If there are only 2 peers, randomise their order. - // Otherwise, choose 2 random peers in a random order. + // Choose 2 random peers, then return the least loaded of those 2 peers. let (a, b) = { let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2); let a = idxs.index(0); @@ -716,19 +690,32 @@ where /// Routes a request using P2C load-balancing. fn route_p2c(&mut self, req: Request) -> >::Future { - let preselected_key = self - .preselected_p2c_peer - .expect("ready peer service must have a preselected peer"); + if let Some(p2c_key) = self.select_ready_p2c_peer() { + tracing::trace!(?p2c_key, "routing based on p2c"); - tracing::trace!(?preselected_key, "routing based on p2c"); + let mut svc = self + .take_ready_service(&p2c_key) + .expect("selected peer must be ready"); - let mut svc = self - .take_ready_service(&preselected_key) - .expect("ready peer set must have preselected a ready peer"); + let fut = svc.call(req); + self.push_unready(p2c_key, svc); - let fut = svc.call(req); - self.push_unready(preselected_key, svc); - fut.map_err(Into::into).boxed() + return fut.map_err(Into::into).boxed(); + } + + async move { + // Let other tasks run, so a retry request might get different ready peers. + tokio::task::yield_now().await; + + // # Security + // + // Avoid routing requests to peers that are missing inventory. + // If we kept trying doomed requests, peers that are missing our requested inventory + // could take up a large amount of our bandwidth and retry limits. + Err(SharedPeerError::from(PeerError::NoReadyPeers)) + } + .map_err(Into::into) + .boxed() } /// Tries to route a request to a ready peer that advertised that inventory, @@ -1003,78 +990,46 @@ where Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Update service and peer statuses. self.poll_background_errors(cx)?; - - // Update peer statuses self.poll_discover(cx)?; - self.disconnect_from_outdated_peers(); self.inventory_registry.poll_inventory(cx)?; self.poll_unready(cx); + // Cleanup and metrics + self.disconnect_from_outdated_peers(); self.log_peer_set_size(); self.update_metrics(); - loop { - // Re-check that the pre-selected service is ready, in case - // something has happened since (e.g., it failed, peer closed - // connection, ...) - if let Some(key) = self.preselected_p2c_peer { - trace!(preselected_key = ?key); - let mut service = self - .take_ready_service(&key) - .expect("preselected peer must be in the ready list"); - match service.poll_ready(cx) { - Poll::Ready(Ok(())) => { - trace!("preselected service is still ready, keeping it selected"); - self.preselected_p2c_peer = Some(key); - self.ready_services.insert(key, service); - return Poll::Ready(Ok(())); - } - Poll::Pending => { - trace!("preselected service is no longer ready, moving to unready list"); - self.push_unready(key, service); - } - Poll::Ready(Err(error)) => { - trace!(%error, "preselected service failed, dropping it"); - std::mem::drop(service); - } - } - } - - trace!("preselected service was not ready, preselecting another ready service"); - self.preselected_p2c_peer = self.preselect_p2c_peer(); - self.update_metrics(); + if self.ready_services.is_empty() { + // # Correctness + // + // If the channel is full, drop the demand signal rather than waiting. + // If we waited here, the crawler could deadlock sending a request to + // fetch more peers, because it also empties the channel. + trace!("no ready services, sending demand signal"); + let _ = self.demand_signal.try_send(MorePeers); - if self.preselected_p2c_peer.is_none() { - // CORRECTNESS - // - // If the channel is full, drop the demand signal rather than waiting. - // If we waited here, the crawler could deadlock sending a request to - // fetch more peers, because it also empties the channel. - trace!("no ready services, sending demand signal"); - let _ = self.demand_signal.try_send(MorePeers); + // # Correctness + // + // The current task must be scheduled for wakeup every time we + // return `Poll::Pending`. + // + // As long as there are unready or new peers, this task will run, + // because: + // - `poll_discover` schedules this task for wakeup when new + // peers arrive. + // - if there are unready peers, `poll_unready` schedules this + // task for wakeup when peer services become ready. + // + // To avoid peers blocking on a full background error channel: + // - if no background tasks have exited since the last poll, + // `poll_background_errors` schedules this task for wakeup when + // the next task exits. - // CORRECTNESS - // - // The current task must be scheduled for wakeup every time we - // return `Poll::Pending`. - // - // As long as there are unready or new peers, this task will run, - // because: - // - `poll_discover` schedules this task for wakeup when new - // peers arrive. - // - if there are unready peers, `poll_unready` schedules this - // task for wakeup when peer services become ready. - // - if the preselected peer is not ready, `service.poll_ready` - // schedules this task for wakeup when that service becomes - // ready. - // - // To avoid peers blocking on a full background error channel: - // - if no background tasks have exited since the last poll, - // `poll_background_errors` schedules this task for wakeup when - // the next task exits. - return Poll::Pending; - } + Poll::Pending + } else { + Poll::Ready(Ok(())) } } From dbb4e1c021f15f4b258f737f0da71193dd836d97 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 12:20:00 +1000 Subject: [PATCH 03/26] Fix peer set readiness check --- zebra-network/src/peer_set/set/tests/vectors.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/zebra-network/src/peer_set/set/tests/vectors.rs b/zebra-network/src/peer_set/set/tests/vectors.rs index 8290469997c..d63ba719f3f 100644 --- a/zebra-network/src/peer_set/set/tests/vectors.rs +++ b/zebra-network/src/peer_set/set/tests/vectors.rs @@ -1,8 +1,7 @@ //! Fixed test vectors for the peer set. -use std::{cmp::max, iter, time::Duration}; +use std::{cmp::max, iter}; -use tokio::time::timeout; use tower::{Service, ServiceExt}; use zebra_chain::{ @@ -172,7 +171,12 @@ fn peer_set_ready_multiple_connections() { // Peer set hangs when no more connections are present let peer_ready = peer_set.ready(); - assert!(timeout(Duration::from_secs(10), peer_ready).await.is_err()); + peer_ready + .await + .expect("peer set is always ready until peers are cleared"); + + // TODO: re-enable this check when waiting is fixed? + //assert!(timeout(Duration::from_secs(10), peer_ready).await.is_err()); }); } From a65b831c8f3ea52765833a5d36eaf0fad1d9db59 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 12:44:21 +1000 Subject: [PATCH 04/26] Pass task context correctly to background tasks --- zebra-network/src/peer_set/set.rs | 76 +++++++++++++++---------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 124951f1ebe..f7c50bae4c2 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -110,11 +110,12 @@ use futures::{ future::{FutureExt, TryFutureExt}, prelude::*, stream::FuturesUnordered, + task::noop_waker, }; use itertools::Itertools; use num_integer::div_ceil; use tokio::{ - sync::{broadcast, oneshot::error::TryRecvError, watch}, + sync::{broadcast, watch}, task::JoinHandle, }; use tower::{ @@ -254,7 +255,11 @@ where C: ChainTip, { fn drop(&mut self) { - self.shut_down_tasks_and_channels() + // We don't want to wake the current task, we just want to drop everything we can. + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + self.shut_down_tasks_and_channels(&mut cx); } } @@ -324,15 +329,14 @@ where /// Check background task handles to make sure they're still running. /// /// If any background task exits, shuts down all other background tasks, - /// and returns an error. - fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> { - if let Some(result) = self.receive_tasks_if_needed() { - return result; - } + /// and returns an error. Otherwise, returns `Poll::Pending`, and registers a wakeup for + /// receiving the background tasks, or the background tasks exiting. + fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll> { + futures::ready!(self.receive_tasks_if_needed(cx))?; match Pin::new(&mut self.guards).poll_next(cx) { // All background tasks are still running. - Poll::Pending => Ok(()), + Poll::Pending => Poll::Pending, Poll::Ready(Some(res)) => { info!( @@ -340,35 +344,36 @@ where "a peer set background task exited, shutting down other peer set tasks" ); - self.shut_down_tasks_and_channels(); + self.shut_down_tasks_and_channels(cx); // Flatten the join result and inner result, // then turn Ok() task exits into errors. - res.map_err(Into::into) - // TODO: replace with Result::flatten when it stabilises (#70142) - .and_then(convert::identity) - .and(Err("a peer set background task exited".into())) + Poll::Ready( + res.map_err(Into::into) + // TODO: replace with Result::flatten when it stabilises (#70142) + .and_then(convert::identity) + .and(Err("a peer set background task exited".into())), + ) } Poll::Ready(None) => { - self.shut_down_tasks_and_channels(); - Err("all peer set background tasks have exited".into()) + self.shut_down_tasks_and_channels(cx); + Poll::Ready(Err("all peer set background tasks have exited".into())) } } } - /// Receive background tasks, if they've been sent on the channel, - /// but not consumed yet. + /// Receive background tasks, if they've been sent on the channel, but not consumed yet. /// - /// Returns a result representing the current task state, - /// or `None` if the background tasks should be polled to check their state. - fn receive_tasks_if_needed(&mut self) -> Option> { + /// Returns a result representing the current task state, or `Poll::Pending` if the background + /// tasks should be polled again to check their state. + fn receive_tasks_if_needed(&mut self, cx: &mut Context<'_>) -> Poll> { if self.guards.is_empty() { - match self.handle_rx.try_recv() { - // The tasks haven't been sent yet. - Err(TryRecvError::Empty) => Some(Ok(())), + // Return Pending if the tasks have not been sent yet. + let handles = futures::ready!(Pin::new(&mut self.handle_rx).poll(cx)); - // The tasks have been sent, but not consumed. + match handles { + // The tasks have been sent, but not consumed yet. Ok(handles) => { // Currently, the peer set treats an empty background task set as an error. // @@ -381,21 +386,16 @@ where self.guards.extend(handles); - None + Poll::Ready(Ok(())) } - // The tasks have been sent and consumed, but then they exited. - // - // Correctness: the peer set must receive at least one task. - // - // TODO: refactor `handle_rx` and `guards` into an enum - // for the background task state: Waiting/Running/Shutdown. - Err(TryRecvError::Closed) => { - Some(Err("all peer set background tasks have exited".into())) - } + // The sender was dropped without sending the tasks. + Err(_) => Poll::Ready(Err( + "sender did not send peer background tasks before it was dropped".into(), + )), } } else { - None + Poll::Ready(Ok(())) } } @@ -403,7 +403,7 @@ where /// - services by dropping the service lists /// - background tasks via their join handles or cancel handles /// - channels by closing the channel - fn shut_down_tasks_and_channels(&mut self) { + fn shut_down_tasks_and_channels(&mut self, cx: &mut Context<'_>) { // Drop services and cancel their background tasks. self.ready_services = HashMap::new(); @@ -416,9 +416,9 @@ where // so we don't add more peers to a shut down peer set. self.demand_signal.close_channel(); - // Shut down background tasks. + // Shut down background tasks, ignoring pending polls. self.handle_rx.close(); - self.receive_tasks_if_needed(); + let _ = self.receive_tasks_if_needed(cx); for guard in self.guards.iter() { guard.abort(); } From 8d22afd1bfa7e0ef7dac4883c191c5baa1fdbf62 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 12:48:46 +1000 Subject: [PATCH 05/26] Make poll_discover() return Pending --- zebra-network/src/peer_set/set.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index f7c50bae4c2..d46de3421f5 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -500,12 +500,15 @@ where /// /// Puts inserted services in the unready list. /// Drops removed services, after cancelling any pending requests. - fn poll_discover(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> { + /// + /// If the peer connector channel is closed, returns an error. + /// Otherwise, returns `Poll::Pending`, and registers a wakeup for new peers. + fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { loop { // If the changes are finished, return. let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else { - // We've finished processing the entire list. - return Ok(()); + // We've finished processing the entire list, but there could be new peers later. + return Poll::Pending; }; // If the change channel has a permanent error, return that error. From c96cd5b20d4645baf7af2592e7f34218d239f7e0 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 12:57:11 +1000 Subject: [PATCH 06/26] Make poll_inventory() return Pending --- zebra-network/src/peer_set/inventory_registry.rs | 16 ++++++++++------ .../src/peer_set/inventory_registry/update.rs | 7 ++++--- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index b3de4ccee7e..9cbd4466c1d 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -283,11 +283,15 @@ impl InventoryRegistry { /// Drive periodic inventory tasks /// - /// # Details + /// Rotates the inventory HashMaps on every timer tick. + /// Drains the inv_stream channel and registers all advertised inventory. /// - /// - rotates HashMaps based on interval events - /// - drains the inv_stream channel and registers all advertised inventory - pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> { + /// Returns an error if the inventory channel is closed. Otherwise returns `Poll::Pending`, and + /// registers a wakeup the next time there is new inventory. + /// + /// TODO: also register a wakeup for the next timer tick. + /// (Currently the timer never wakes the task.) + pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Poll> { // # Correctness // // Registers the current task for wakeup when the timer next becomes ready. @@ -333,11 +337,11 @@ impl InventoryRegistry { } // This indicates all senders, including the one in the handshaker, // have been dropped, which really is a permanent failure. - None => return Err(broadcast::error::RecvError::Closed.into()), + None => return Poll::Ready(Err(broadcast::error::RecvError::Closed.into())), } } - Ok(()) + Poll::Pending } /// Record the given inventory `change` for the peer `addr`. diff --git a/zebra-network/src/peer_set/inventory_registry/update.rs b/zebra-network/src/peer_set/inventory_registry/update.rs index 9ebedc55a4b..411d45fb182 100644 --- a/zebra-network/src/peer_set/inventory_registry/update.rs +++ b/zebra-network/src/peer_set/inventory_registry/update.rs @@ -28,9 +28,10 @@ impl<'a> Update<'a> { impl Future for Update<'_> { type Output = Result<(), BoxError>; + /// A future that returns when the next inventory update has been completed. + /// + /// See [`InventoryRegistry::poll_inventory()`] for details. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // TODO: should the future wait until new changes arrive? - // or for the rotation timer? - Poll::Ready(self.registry.poll_inventory(cx)) + self.registry.poll_inventory(cx) } } From 2eafe954d34a9305e65ae236c7902be0750bdef4 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 13:03:35 +1000 Subject: [PATCH 07/26] Make poll_unready() return Poll::Pending --- zebra-network/src/peer_set/set.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index d46de3421f5..3009a5f08cb 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -428,11 +428,19 @@ where /// /// Move newly ready services to the ready list if they are for peers with supported protocol /// versions, otherwise they are dropped. Also drop failed services. - fn poll_unready(&mut self, cx: &mut Context<'_>) { + /// + /// Never returns an error. If there are no unready tasks, returns `Ok(())` + /// Otherwise, returns `Poll::Pending`, and registers a wakeup for the next task that becomes + /// ready. + fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match Pin::new(&mut self.unready_services).poll_next(cx) { - // No unready service changes, or empty unready services - Poll::Pending | Poll::Ready(None) => return, + // Finished unready service changes, but there are still some unready services. + Poll::Pending => return Poll::Pending, + + // There are no unready services, so the task won't be woken by this waker until + // some are added by code woken by other wakers. + Poll::Ready(None) => return Poll::Ready(Ok(())), // Unready -> Ready Poll::Ready(Some(Ok((key, svc)))) => { From 248dce01d45f0a545efd028a857cb792c9cc66dd Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 13:16:52 +1000 Subject: [PATCH 08/26] Simplify with futures::ready!() and ? --- .../src/peer_set/inventory_registry.rs | 15 +++--- zebra-network/src/peer_set/set.rs | 50 ++++++++----------- 2 files changed, 30 insertions(+), 35 deletions(-) diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index 9cbd4466c1d..2878168f8ce 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -324,10 +324,12 @@ impl InventoryRegistry { // rather than propagating it through the peer set's Service::poll_ready // implementation, where reporting a failure means reporting a permanent // failure of the peer set. - while let Poll::Ready(channel_result) = self.inv_stream.next().poll_unpin(cx) { + + // Returns Pending if all messages are processed, but the channel could get more. + while let Some(channel_result) = futures::ready!(self.inv_stream.next().poll_unpin(cx)) { match channel_result { - Some(Ok(change)) => self.register(change), - Some(Err(BroadcastStreamRecvError::Lagged(count))) => { + Ok(change) => self.register(change), + Err(BroadcastStreamRecvError::Lagged(count)) => { metrics::counter!("pool.inventory.dropped", 1); metrics::counter!("pool.inventory.dropped.messages", count); @@ -335,13 +337,12 @@ impl InventoryRegistry { // or poll the registry or peer set in a separate task. info!(count, "dropped lagged inventory advertisements"); } - // This indicates all senders, including the one in the handshaker, - // have been dropped, which really is a permanent failure. - None => return Poll::Ready(Err(broadcast::error::RecvError::Closed.into())), } } - Poll::Pending + // If the channel is empty and returns None, all senders, including the one in the + // handshaker, have been dropped, which really is a permanent failure. + Poll::Ready(Err(broadcast::error::RecvError::Closed.into())) } /// Record the given inventory `change` for the peer `addr`. diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 3009a5f08cb..52e59842580 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -334,11 +334,9 @@ where fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll> { futures::ready!(self.receive_tasks_if_needed(cx))?; - match Pin::new(&mut self.guards).poll_next(cx) { - // All background tasks are still running. - Poll::Pending => Poll::Pending, - - Poll::Ready(Some(res)) => { + // Return Pending if all background tasks are still running. + match futures::ready!(Pin::new(&mut self.guards).poll_next(cx)) { + Some(res) => { info!( background_tasks = %self.guards.len(), "a peer set background task exited, shutting down other peer set tasks" @@ -346,17 +344,16 @@ where self.shut_down_tasks_and_channels(cx); - // Flatten the join result and inner result, - // then turn Ok() task exits into errors. - Poll::Ready( - res.map_err(Into::into) - // TODO: replace with Result::flatten when it stabilises (#70142) - .and_then(convert::identity) - .and(Err("a peer set background task exited".into())), - ) + // Flatten the join result and inner result, and return any errors. + res.map_err(Into::into) + // TODO: replace with Result::flatten when it stabilises (#70142) + .and_then(convert::identity)?; + + // Turn Ok() task exits into errors. + Poll::Ready(Err("a peer set background task exited".into())) } - Poll::Ready(None) => { + None => { self.shut_down_tasks_and_channels(cx); Poll::Ready(Err("all peer set background tasks have exited".into())) } @@ -434,16 +431,15 @@ where /// ready. fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { - match Pin::new(&mut self.unready_services).poll_next(cx) { - // Finished unready service changes, but there are still some unready services. - Poll::Pending => return Poll::Pending, - + // Return Pending if we've finished processing the unready service changes, + // but there are still some unready services. + match futures::ready!(Pin::new(&mut self.unready_services).poll_next(cx)) { // There are no unready services, so the task won't be woken by this waker until // some are added by code woken by other wakers. - Poll::Ready(None) => return Poll::Ready(Ok(())), + None => return Poll::Ready(Ok(())), // Unready -> Ready - Poll::Ready(Some(Ok((key, svc)))) => { + Some(Ok((key, svc))) => { trace!(?key, "service became ready"); let cancel = self.cancel_handles.remove(&key); assert!(cancel.is_some(), "missing cancel handle"); @@ -454,7 +450,7 @@ where } // Unready -> Canceled - Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => { + Some(Err((key, UnreadyError::Canceled))) => { // A service be canceled because we've connected to the same service twice. // In that case, there is a cancel handle for the peer address, // but it belongs to the service for the newer connection. @@ -464,7 +460,7 @@ where "service was canceled, dropping service" ); } - Poll::Ready(Some(Err((key, UnreadyError::CancelHandleDropped(_))))) => { + Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => { // Similarly, services with dropped cancel handes can have duplicates. trace!( ?key, @@ -474,7 +470,7 @@ where } // Unready -> Errored - Poll::Ready(Some(Err((key, UnreadyError::Inner(error))))) => { + Some(Err((key, UnreadyError::Inner(error)))) => { debug!(%error, "service failed while unready, dropping service"); let cancel = self.cancel_handles.remove(&key); @@ -513,11 +509,9 @@ where /// Otherwise, returns `Poll::Pending`, and registers a wakeup for new peers. fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { loop { - // If the changes are finished, return. - let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else { - // We've finished processing the entire list, but there could be new peers later. - return Poll::Pending; - }; + // Return Pending if we've finished processing the entire list, + // but there could be new peers later. + let discovered = futures::ready!(Pin::new(&mut self.discover).poll_discover(cx)); // If the change channel has a permanent error, return that error. let change = discovered From 533044b87ad6008649f82cce5141ed2148cdea1f Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 14:45:37 +1000 Subject: [PATCH 09/26] When there are no peers, wake on newly ready peers, or new peers, in that order --- zebra-network/src/peer_set/set.rs | 64 +++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 52e59842580..a7d68eab9b9 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -97,7 +97,6 @@ use std::{ collections::{HashMap, HashSet}, convert, fmt::Debug, - future::Future, marker::PhantomData, net::IpAddr, pin::Pin, @@ -107,7 +106,7 @@ use std::{ use futures::{ channel::{mpsc, oneshot}, - future::{FutureExt, TryFutureExt}, + future::{Future, FutureExt, TryFutureExt}, prelude::*, stream::FuturesUnordered, task::noop_waker, @@ -329,7 +328,7 @@ where /// Check background task handles to make sure they're still running. /// /// If any background task exits, shuts down all other background tasks, - /// and returns an error. Otherwise, returns `Poll::Pending`, and registers a wakeup for + /// and returns an error. Otherwise, returns `Pending`, and registers a wakeup for /// receiving the background tasks, or the background tasks exiting. fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll> { futures::ready!(self.receive_tasks_if_needed(cx))?; @@ -426,20 +425,23 @@ where /// Move newly ready services to the ready list if they are for peers with supported protocol /// versions, otherwise they are dropped. Also drop failed services. /// - /// Never returns an error. If there are no unready tasks, returns `Ok(())` + /// Never returns an error. If there are no unready peers, returns `Ok` immediately. /// Otherwise, returns `Poll::Pending`, and registers a wakeup for the next task that becomes /// ready. fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.unready_services.is_empty() { + // Don't replace the previous waker, because we have no peers to wake the task. + return Poll::Ready(Ok(())); + } + loop { // Return Pending if we've finished processing the unready service changes, // but there are still some unready services. - match futures::ready!(Pin::new(&mut self.unready_services).poll_next(cx)) { - // There are no unready services, so the task won't be woken by this waker until - // some are added by code woken by other wakers. - None => return Poll::Ready(Ok(())), - + match futures::ready!(Pin::new(&mut self.unready_services).poll_next(cx)) + .expect("already checked for an empty list") + { // Unready -> Ready - Some(Ok((key, svc))) => { + Ok((key, svc)) => { trace!(?key, "service became ready"); let cancel = self.cancel_handles.remove(&key); assert!(cancel.is_some(), "missing cancel handle"); @@ -450,7 +452,7 @@ where } // Unready -> Canceled - Some(Err((key, UnreadyError::Canceled))) => { + Err((key, UnreadyError::Canceled)) => { // A service be canceled because we've connected to the same service twice. // In that case, there is a cancel handle for the peer address, // but it belongs to the service for the newer connection. @@ -460,7 +462,7 @@ where "service was canceled, dropping service" ); } - Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => { + Err((key, UnreadyError::CancelHandleDropped(_))) => { // Similarly, services with dropped cancel handes can have duplicates. trace!( ?key, @@ -470,7 +472,7 @@ where } // Unready -> Errored - Some(Err((key, UnreadyError::Inner(error)))) => { + Err((key, UnreadyError::Inner(error))) => { debug!(%error, "service failed while unready, dropping service"); let cancel = self.cancel_handles.remove(&key); @@ -996,10 +998,38 @@ where fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // Update service and peer statuses. - self.poll_background_errors(cx)?; - self.poll_discover(cx)?; - self.inventory_registry.poll_inventory(cx)?; - self.poll_unready(cx); + // + // # Correctness + // + // If we poll these futures in series, then only the last polled future will wake + // the task. This can cause hangs when there are no new peers, no newly ready peers, + // or no peers in the peer set at all. + // + // As a temporary fix, we try to pick the most likely waker, and skip wakers that will + // never wake. + + // We can tolerate not being woken for these tasks, because we only return `Pending` + // when we have no ready peers. Delays in background error checking are acceptable, + // and we can't do anything useful with inventory until we have peers. (The oldest + // inventory will get automatically dropped.) + let _ = self.poll_background_errors(cx)?; + let _ = self.inventory_registry.poll_inventory(cx)?; + + // We must be woken if there are new peers, but new peers can be infrequent if our + // connection slots are full, or we're connected to all available/useful peers. + // + // TODO: wake waiting tasks if there are new peers or unready peers become ready (#7858) + let _ = self.poll_discover(cx)?; + // This waker skips itself if it is empty and will never wake. + // Otherwise, each connected peer should become ready or timeout within a few minutes. + // + // TODO: drop peers that overload us with inbound messages and never become ready (#7822) + let _ = self.poll_unready(cx)?; + + // Correctness: the waker in the context is waiting on peers. If it is replaced later in + // the function, the network could hang. We drop the reference here so it can't be used. + #[allow(dropping_references)] + std::mem::drop(cx); // Cleanup and metrics self.disconnect_from_outdated_peers(); From 9c6c54cc37b6e1204a00a13693b8a822d70b310e Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 27 Oct 2023 15:16:38 +1000 Subject: [PATCH 10/26] Preserve the original waker when there are no unready peers --- zebra-network/src/peer_set/set.rs | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index a7d68eab9b9..42823b6d8a1 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -425,21 +425,23 @@ where /// Move newly ready services to the ready list if they are for peers with supported protocol /// versions, otherwise they are dropped. Also drop failed services. /// - /// Never returns an error. If there are no unready peers, returns `Ok` immediately. - /// Otherwise, returns `Poll::Pending`, and registers a wakeup for the next task that becomes - /// ready. + /// Never returns an error. If there are no unready peers, returns `Ok`, and preserves the + /// original waker. Otherwise, returns `Poll::Pending`, and registers a wakeup for the next + /// peer that becomes ready. fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.unready_services.is_empty() { // Don't replace the previous waker, because we have no peers to wake the task. return Poll::Ready(Ok(())); } - loop { - // Return Pending if we've finished processing the unready service changes, - // but there are still some unready services. - match futures::ready!(Pin::new(&mut self.unready_services).poll_next(cx)) - .expect("already checked for an empty list") - { + let original_waker = cx.waker(); + + // Return Pending if we've finished processing the unready service changes, + // but there are still some unready services. + while let Some(ready_peer) = + futures::ready!(Pin::new(&mut self.unready_services).poll_next(cx)) + { + match ready_peer { // Unready -> Ready Ok((key, svc)) => { trace!(?key, "service became ready"); @@ -480,6 +482,11 @@ where } } } + + // There are no unready peers, so the unready_peers waker will never wake pending tasks. + *cx = Context::from_waker(original_waker); + + Poll::Ready(Ok(())) } /// Returns the number of peer connections Zebra already has with @@ -1020,7 +1027,7 @@ where // // TODO: wake waiting tasks if there are new peers or unready peers become ready (#7858) let _ = self.poll_discover(cx)?; - // This waker skips itself if it is empty and will never wake. + // This method preserves the original waker if it has no unready peers and will never wake. // Otherwise, each connected peer should become ready or timeout within a few minutes. // // TODO: drop peers that overload us with inbound messages and never become ready (#7822) From 2ca7842a2083ff330c84ca3b0efe46d079edca2a Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 6 Nov 2023 09:45:36 +1000 Subject: [PATCH 11/26] Fix polling docs and remove unnecessary code --- .../src/peer_set/inventory_registry.rs | 8 +- zebra-network/src/peer_set/set.rs | 82 +++++++++---------- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index 2878168f8ce..f01a156389c 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -287,14 +287,14 @@ impl InventoryRegistry { /// Drains the inv_stream channel and registers all advertised inventory. /// /// Returns an error if the inventory channel is closed. Otherwise returns `Poll::Pending`, and - /// registers a wakeup the next time there is new inventory. - /// - /// TODO: also register a wakeup for the next timer tick. - /// (Currently the timer never wakes the task.) + /// registers a wakeup the next time there is new inventory, and the next time the inventory + /// should rotate. pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Poll> { // # Correctness // // Registers the current task for wakeup when the timer next becomes ready. + // (But doesn't return, because we also want to register the task for wakeup when more + // inventory arrives.) // // # Security // diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 42823b6d8a1..969197a8d59 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -254,7 +254,7 @@ where C: ChainTip, { fn drop(&mut self) { - // We don't want to wake the current task, we just want to drop everything we can. + // We don't have access to the current task (if any), so we just drop everything we can. let waker = noop_waker(); let mut cx = Context::from_waker(&waker); @@ -425,18 +425,22 @@ where /// Move newly ready services to the ready list if they are for peers with supported protocol /// versions, otherwise they are dropped. Also drop failed services. /// - /// Never returns an error. If there are no unready peers, returns `Ok`, and preserves the - /// original waker. Otherwise, returns `Poll::Pending`, and registers a wakeup for the next - /// peer that becomes ready. + /// Never returns an error. If there are unready peers, returns `Poll::Pending`, and registers + /// a wakeup when the next peer becomes ready. If there are no unready peers, returns `Ok`, and + /// doesn't register any wakeups. (Since wakeups come from peers, there needs to be at least one + /// peer to register a wakeup.) fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.unready_services.is_empty() { - // Don't replace the previous waker, because we have no peers to wake the task. - return Poll::Ready(Ok(())); - } - - let original_waker = cx.waker(); - - // Return Pending if we've finished processing the unready service changes, + // # Correctness + // + // `poll_next()` must always be called, because `self.unready_services` could have been + // empty before the call to `self.poll_ready()`. + // + // > When new futures are added, `poll_next` must be called in order to begin receiving + // > wake-ups for new futures. + // + // + // + // Returns Pending if we've finished processing the unready service changes, // but there are still some unready services. while let Some(ready_peer) = futures::ready!(Pin::new(&mut self.unready_services).poll_next(cx)) @@ -483,9 +487,10 @@ where } } - // There are no unready peers, so the unready_peers waker will never wake pending tasks. - *cx = Context::from_waker(original_waker); - + // Return Ok if we've finished processing the unready service changes, and there are no + // unready services left. This means the stream is terminated. But when we add more unready + // peers and call `poll_next()`, its termination status will be reset, and it will receive + // wakeups again. Poll::Ready(Ok(())) } @@ -1008,38 +1013,35 @@ where // // # Correctness // - // If we poll these futures in series, then only the last polled future will wake - // the task. This can cause hangs when there are no new peers, no newly ready peers, - // or no peers in the peer set at all. - // - // As a temporary fix, we try to pick the most likely waker, and skip wakers that will - // never wake. + // All of the futures that receive a context from this method can wake the peer set buffer + // task. If there are no ready peers, and no new peers, network requests will pause until: + // - an unready peer becomes ready, or + // - a new peer arrives. + + // Check for new peers, and register a task wake when the next new peers arrive. New peers + // can be infrequent if our connection slots are full, or we're connected to all + // available/useful peers. + let _ = self.poll_discover(cx)?; - // We can tolerate not being woken for these tasks, because we only return `Pending` - // when we have no ready peers. Delays in background error checking are acceptable, - // and we can't do anything useful with inventory until we have peers. (The oldest - // inventory will get automatically dropped.) + // These tasks don't provide new peers or newly ready peers. let _ = self.poll_background_errors(cx)?; let _ = self.inventory_registry.poll_inventory(cx)?; - // We must be woken if there are new peers, but new peers can be infrequent if our - // connection slots are full, or we're connected to all available/useful peers. + // Check for newly ready peers, including newly added peers (which are added as unready). + // So it needs to run after `poll_discover()`. // - // TODO: wake waiting tasks if there are new peers or unready peers become ready (#7858) - let _ = self.poll_discover(cx)?; - // This method preserves the original waker if it has no unready peers and will never wake. - // Otherwise, each connected peer should become ready or timeout within a few minutes. + // Each connected peer should become ready within a few minutes, or timeout, close the + // connection, and release its connection slot. // // TODO: drop peers that overload us with inbound messages and never become ready (#7822) let _ = self.poll_unready(cx)?; - // Correctness: the waker in the context is waiting on peers. If it is replaced later in - // the function, the network could hang. We drop the reference here so it can't be used. - #[allow(dropping_references)] - std::mem::drop(cx); + // Cleanup and metrics. - // Cleanup and metrics + // Only checks the versions of ready peers, so it needs to run after `poll_unready()`. self.disconnect_from_outdated_peers(); + + // These metrics should run last, to report the most up-to-date information. self.log_peer_set_size(); self.update_metrics(); @@ -1064,11 +1066,9 @@ where // - if there are unready peers, `poll_unready` schedules this // task for wakeup when peer services become ready. // - // To avoid peers blocking on a full background error channel: - // - if no background tasks have exited since the last poll, - // `poll_background_errors` schedules this task for wakeup when - // the next task exits. - + // To avoid peers blocking on a full peer status/error channel: + // - `poll_background_errors` schedules this task for wakeup when + // the peer status update task exits. Poll::Pending } else { Poll::Ready(Ok(())) From 8d5c8452f2f4363d72a086a1df92e89ca8adeda1 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 6 Nov 2023 09:49:34 +1000 Subject: [PATCH 12/26] Make sure we're ignoring Poll::Pending not Result --- zebra-network/src/peer_set/set.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 969197a8d59..4577e9f3575 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -1018,23 +1018,24 @@ where // - an unready peer becomes ready, or // - a new peer arrives. - // Check for new peers, and register a task wake when the next new peers arrive. New peers + // Check for new peers, and register a task wakeup when the next new peers arrive. New peers // can be infrequent if our connection slots are full, or we're connected to all // available/useful peers. - let _ = self.poll_discover(cx)?; + let _poll_pending: Poll<()> = self.poll_discover(cx)?; // These tasks don't provide new peers or newly ready peers. - let _ = self.poll_background_errors(cx)?; - let _ = self.inventory_registry.poll_inventory(cx)?; + let _poll_pending: Poll<()> = self.poll_background_errors(cx)?; + let _poll_pending: Poll<()> = self.inventory_registry.poll_inventory(cx)?; // Check for newly ready peers, including newly added peers (which are added as unready). - // So it needs to run after `poll_discover()`. + // So it needs to run after `poll_discover()`. Registers a wakeup if there are any unready + // peers. // // Each connected peer should become ready within a few minutes, or timeout, close the // connection, and release its connection slot. // // TODO: drop peers that overload us with inbound messages and never become ready (#7822) - let _ = self.poll_unready(cx)?; + let _poll_pending_or_ready: Poll<()> = self.poll_unready(cx)?; // Cleanup and metrics. From 67a2fc66506d8041c0fa4d5621df30392b15710e Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 6 Nov 2023 11:33:14 +1000 Subject: [PATCH 13/26] Make panic checking method names clearer --- zebra-chain/src/diagnostic/task.rs | 70 +++++++++++++++++-- zebra-chain/src/diagnostic/task/future.rs | 45 +++++++----- zebra-chain/src/diagnostic/task/thread.rs | 69 ++++++++++++++---- .../finalized_state/disk_format/upgrade.rs | 2 +- 4 files changed, 150 insertions(+), 36 deletions(-) diff --git a/zebra-chain/src/diagnostic/task.rs b/zebra-chain/src/diagnostic/task.rs index 2d43f695537..d76f7265b4d 100644 --- a/zebra-chain/src/diagnostic/task.rs +++ b/zebra-chain/src/diagnostic/task.rs @@ -9,7 +9,7 @@ pub mod future; pub mod thread; /// A trait that checks a task's return value for panics. -pub trait CheckForPanics { +pub trait CheckForPanics: Sized { /// The output type, after removing panics from `Self`. type Output; @@ -20,11 +20,34 @@ pub trait CheckForPanics { /// /// If `self` contains a panic payload or an unexpected termination. #[track_caller] - fn check_for_panics(self) -> Self::Output; + fn panic_if_task_has_finished(self) -> Self::Output { + self.check_for_panics_with(true) + } + + /// Check if `self` contains a panic payload, then panic. + /// Otherwise, return the non-panic part of `self`. + /// + /// # Panics + /// + /// If `self` contains a panic payload. + #[track_caller] + fn panic_if_task_has_panicked(self) -> Self::Output { + self.check_for_panics_with(true) + } + + /// Check if `self` contains a panic payload, then panic. Also panics if + /// `panic_on_unexpected_termination` is true, and `self` is an unexpected termination. + /// Otherwise, return the non-panic part of `self`. + /// + /// # Panics + /// + /// If `self` contains a panic payload, or if we're panicking on unexpected terminations. + #[track_caller] + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output; } /// A trait that waits for a task to finish, then handles panics and cancellations. -pub trait WaitForPanics { +pub trait WaitForPanics: Sized { /// The underlying task output, after removing panics and unwrapping termination results. type Output; @@ -43,5 +66,44 @@ pub trait WaitForPanics { /// /// If `self` contains an expected termination, and we're shutting down anyway. #[track_caller] - fn wait_for_panics(self) -> Self::Output; + fn wait_and_panic_on_unexpected_termination(self) -> Self::Output { + self.wait_for_panics_with(true) + } + + /// Waits for `self` to finish, then check if its output is: + /// - a panic payload: resume that panic, + /// - a task termination: hang waiting for shutdown. + /// + /// Otherwise, returns the task return value of `self`. + /// + /// # Panics + /// + /// If `self` contains a panic payload. + /// + /// # Hangs + /// + /// If `self` contains a task termination. + #[track_caller] + fn wait_for_panics(self) -> Self::Output { + self.wait_for_panics_with(false) + } + + /// Waits for `self` to finish, then check if its output is: + /// - a panic payload: resume that panic, + /// - an unexpected termination: + /// - if `panic_on_unexpected_termination` is true, panic with that error, + /// - otherwise, hang waiting for shutdown, + /// - an expected termination: hang waiting for shutdown. + /// + /// Otherwise, returns the task return value of `self`. + /// + /// # Panics + /// + /// If `self` contains a panic payload, or if we're panicking on unexpected terminations. + /// + /// # Hangs + /// + /// If `self` contains an expected or ignored termination, and we're shutting down anyway. + #[track_caller] + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output; } diff --git a/zebra-chain/src/diagnostic/task/future.rs b/zebra-chain/src/diagnostic/task/future.rs index 431b13ed94f..a119d7c6234 100644 --- a/zebra-chain/src/diagnostic/task/future.rs +++ b/zebra-chain/src/diagnostic/task/future.rs @@ -18,15 +18,18 @@ impl CheckForPanics for Result { type Output = Result; /// Returns the task result if the task finished normally. - /// Otherwise, resumes any panics, logs unexpected errors, and ignores any expected errors. + /// Otherwise, resumes any panics, and ignores any expected errors. + /// Handles unexpected errors based on `panic_on_unexpected_termination`. /// /// If the task finished normally, returns `Some(T)`. /// If the task was cancelled, returns `None`. #[track_caller] - fn check_for_panics(self) -> Self::Output { + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { match self { Ok(task_output) => Ok(task_output), - Err(join_error) => Err(join_error.check_for_panics()), + Err(join_error) => { + Err(join_error.check_for_panics_with(panic_on_unexpected_termination)) + } } } } @@ -38,22 +41,26 @@ impl CheckForPanics for JoinError { /// Resume any panics and panic on unexpected task cancellations. /// Always returns [`JoinError::Cancelled`](JoinError::is_cancelled). #[track_caller] - fn check_for_panics(self) -> Self::Output { + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { match self.try_into_panic() { Ok(panic_payload) => panic::resume_unwind(panic_payload), // We could ignore this error, but then we'd have to change the return type. - Err(task_cancelled) if is_shutting_down() => { - debug!( - ?task_cancelled, - "ignoring cancelled task because Zebra is shutting down" - ); + Err(task_cancelled) => { + if !panic_on_unexpected_termination { + debug!(?task_cancelled, "ignoring expected task termination"); - task_cancelled - } + task_cancelled + } else if is_shutting_down() { + debug!( + ?task_cancelled, + "ignoring task termination because Zebra is shutting down" + ); - Err(task_cancelled) => { - panic!("task cancelled during normal Zebra operation: {task_cancelled:?}"); + task_cancelled + } else { + panic!("task unexpectedly exited with: {:?}", task_cancelled) + } } } } @@ -67,7 +74,9 @@ where /// Returns a future which waits for `self` to finish, then checks if its output is: /// - a panic payload: resume that panic, - /// - an unexpected termination: panic with that error, + /// - an unexpected termination: + /// - if `panic_on_unexpected_termination` is true, panic with that error, + /// - otherwise, hang waiting for shutdown, /// - an expected termination: hang waiting for shutdown. /// /// Otherwise, returns the task return value of `self`. @@ -79,11 +88,15 @@ where /// # Hangs /// /// If `self` contains an expected termination, and we're shutting down anyway. + /// If we're ignoring terminations because `panic_on_unexpected_termination` is `false`. /// Futures hang by returning `Pending` and not setting a waker, so this uses minimal resources. #[track_caller] - fn wait_for_panics(self) -> Self::Output { + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { async move { - match self.await.check_for_panics() { + match self + .await + .check_for_panics_with(panic_on_unexpected_termination) + { Ok(task_output) => task_output, Err(_expected_cancel_error) => future::pending().await, } diff --git a/zebra-chain/src/diagnostic/task/thread.rs b/zebra-chain/src/diagnostic/task/thread.rs index 84df3fac4aa..67e8ad98e3f 100644 --- a/zebra-chain/src/diagnostic/task/thread.rs +++ b/zebra-chain/src/diagnostic/task/thread.rs @@ -8,19 +8,42 @@ use std::{ thread::{self, JoinHandle}, }; +use crate::shutdown::is_shutting_down; + use super::{CheckForPanics, WaitForPanics}; -impl CheckForPanics for thread::Result { +impl CheckForPanics for thread::Result +where + T: std::fmt::Debug, +{ type Output = T; /// Panics if the thread panicked. + /// If `panic_on_unexpected_termination` is true, and Zebra is not shutting down, also panics + /// if the thread exits. /// /// Threads can't be cancelled except by using a panic, so there are no thread errors here. + /// `panic_on_unexpected_termination` is #[track_caller] - fn check_for_panics(self) -> Self::Output { + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { match self { // The value returned by the thread when it finished. - Ok(thread_output) => thread_output, + Ok(thread_output) => { + if !panic_on_unexpected_termination { + debug!(?thread_output, "ignoring expected thread exit"); + + thread_output + } else if is_shutting_down() { + debug!( + ?thread_output, + "ignoring thread exit because Zebra is shutting down" + ); + + thread_output + } else { + panic!("thread unexpectedly exited with: {:?}", thread_output) + } + } // A thread error is always a panic. Err(panic_payload) => panic::resume_unwind(panic_payload), @@ -28,17 +51,24 @@ impl CheckForPanics for thread::Result { } } -impl WaitForPanics for JoinHandle { +impl WaitForPanics for JoinHandle +where + T: std::fmt::Debug, +{ type Output = T; /// Waits for the thread to finish, then panics if the thread panicked. #[track_caller] - fn wait_for_panics(self) -> Self::Output { - self.join().check_for_panics() + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { + self.join() + .check_for_panics_with(panic_on_unexpected_termination) } } -impl WaitForPanics for Arc> { +impl WaitForPanics for Arc> +where + T: std::fmt::Debug, +{ type Output = Option; /// If this is the final `Arc`, waits for the thread to finish, then panics if the thread @@ -46,7 +76,7 @@ impl WaitForPanics for Arc> { /// /// If this is not the final `Arc`, drops the handle and immediately returns `None`. #[track_caller] - fn wait_for_panics(self) -> Self::Output { + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { // If we are the last Arc with a reference to this handle, // we can wait for it and propagate any panics. // @@ -56,14 +86,17 @@ impl WaitForPanics for Arc> { // This is more readable as an expanded statement. #[allow(clippy::manual_map)] if let Some(handle) = Arc::into_inner(self) { - Some(handle.wait_for_panics()) + Some(handle.wait_for_panics_with(panic_on_unexpected_termination)) } else { None } } } -impl CheckForPanics for &mut Option>> { +impl CheckForPanics for &mut Option>> +where + T: std::fmt::Debug, +{ type Output = Option; /// If this is the final `Arc`, checks if the thread has finished, then panics if the thread @@ -71,14 +104,14 @@ impl CheckForPanics for &mut Option>> { /// /// If the thread has not finished, or this is not the final `Arc`, returns `None`. #[track_caller] - fn check_for_panics(self) -> Self::Output { + fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { let handle = self.take()?; if handle.is_finished() { // This is the same as calling `self.wait_for_panics()`, but we can't do that, // because we've taken `self`. #[allow(clippy::manual_map)] - return handle.wait_for_panics(); + return handle.wait_for_panics_with(panic_on_unexpected_termination); } *self = Some(handle); @@ -87,7 +120,10 @@ impl CheckForPanics for &mut Option>> { } } -impl WaitForPanics for &mut Option>> { +impl WaitForPanics for &mut Option>> +where + T: std::fmt::Debug, +{ type Output = Option; /// If this is the final `Arc`, waits for the thread to finish, then panics if the thread @@ -95,10 +131,13 @@ impl WaitForPanics for &mut Option>> { /// /// If this is not the final `Arc`, drops the handle and returns `None`. #[track_caller] - fn wait_for_panics(self) -> Self::Output { + fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output { // This is more readable as an expanded statement. #[allow(clippy::manual_map)] - if let Some(output) = self.take()?.wait_for_panics() { + if let Some(output) = self + .take()? + .wait_for_panics_with(panic_on_unexpected_termination) + { Some(output) } else { // Some other task has a reference, so we should give up ours to let them use it. diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs index 86a04553c50..d2981e62a53 100644 --- a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs @@ -883,7 +883,7 @@ impl DbFormatChangeThreadHandle { /// /// This method should be called regularly, so that panics are detected as soon as possible. pub fn check_for_panics(&mut self) { - self.update_task.check_for_panics(); + self.update_task.panic_if_task_has_panicked(); } /// Wait for the spawned thread to finish. If it exited with a panic, resume that panic. From 8181cd3a549c06057b3a8e6d39e75657394d7213 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 6 Nov 2023 11:37:30 +1000 Subject: [PATCH 14/26] Fix connection Client task wakeups and error handling --- zebra-network/src/peer/client.rs | 56 +++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 69940275414..03294936af1 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -18,6 +18,8 @@ use futures::{ use tokio::{sync::broadcast, task::JoinHandle}; use tower::Service; +use zebra_chain::diagnostic::task::CheckForPanics; + use crate::{ peer::{ error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError}, @@ -421,6 +423,13 @@ impl MissingInventoryCollector { impl Client { /// Check if this connection's heartbeat task has exited. + /// + /// Returns an error if the heartbeat task exited. Otherwise, schedules the client task for + /// wakeup when the heartbeat task finishes, or the channel closes, and returns `Pending`. + /// + /// # Panics + /// + /// If the heartbeat task panicked. #[allow(clippy::unwrap_in_result)] fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> { let is_canceled = self @@ -430,17 +439,17 @@ impl Client { .poll_canceled(cx) .is_ready(); - if is_canceled { - return self.set_task_exited_error( - "heartbeat", - PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), - ); - } - match self.heartbeat_task.poll_unpin(cx) { Poll::Pending => { - // Heartbeat task is still running. - Ok(()) + if is_canceled { + self.set_task_exited_error( + "heartbeat", + PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), + ) + } else { + // Heartbeat task is still running. + Ok(()) + } } Poll::Ready(Ok(Ok(_))) => { // Heartbeat task stopped unexpectedly, without panic or error. @@ -459,6 +468,9 @@ impl Client { ) } Poll::Ready(Err(error)) => { + // Heartbeat task stopped with panic. + let error = error.panic_if_task_has_panicked(); + // Heartbeat task was cancelled. if error.is_cancelled() { self.set_task_exited_error( @@ -466,11 +478,7 @@ impl Client { PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), ) } - // Heartbeat task stopped with panic. - else if error.is_panic() { - panic!("heartbeat task has panicked: {error}"); - } - // Heartbeat task stopped with error. + // Heartbeat task stopped with another kind of task error. else { self.set_task_exited_error( "heartbeat", @@ -493,8 +501,24 @@ impl Client { self.set_task_exited_error("connection", PeerError::ConnectionTaskExited) } Poll::Ready(Err(error)) => { - // Connection task stopped unexpectedly with a panic. - panic!("connection task has panicked: {error}"); + // Heartbeat task was cancelled. + if error.is_cancelled() { + self.set_task_exited_error( + "heartbeat", + PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), + ) + } + // Heartbeat task stopped with panic. + else if error.is_panic() { + panic!("heartbeat task has panicked: {error}"); + } + // Heartbeat task stopped with error. + else { + self.set_task_exited_error( + "heartbeat", + PeerError::HeartbeatTaskExited(error.to_string()), + ) + } } } } From 5fb5faa52e42d2b8e23237737eb79ca5e8b9eb0d Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 6 Nov 2023 11:41:41 +1000 Subject: [PATCH 15/26] Cleanup connection panic handling and add wakeup docs --- zebra-network/src/peer/client.rs | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 03294936af1..28801563d8c 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -468,7 +468,7 @@ impl Client { ) } Poll::Ready(Err(error)) => { - // Heartbeat task stopped with panic. + // Heartbeat task panicked. let error = error.panic_if_task_has_panicked(); // Heartbeat task was cancelled. @@ -489,7 +489,14 @@ impl Client { } } - /// Check if the connection's task has exited. + /// Check if the connection's request/response task has exited. + /// + /// Returns an error if the connection task exited. Otherwise, schedules the client task for + /// wakeup when the connection task finishes, and returns `Pending`. + /// + /// # Panics + /// + /// If the connection task panicked. fn check_connection(&mut self, context: &mut Context<'_>) -> Result<(), SharedPeerError> { match self.connection_task.poll_unpin(context) { Poll::Pending => { @@ -501,21 +508,20 @@ impl Client { self.set_task_exited_error("connection", PeerError::ConnectionTaskExited) } Poll::Ready(Err(error)) => { - // Heartbeat task was cancelled. + // Connection task panicked. + let error = error.panic_if_task_has_panicked(); + + // Connection task was cancelled. if error.is_cancelled() { self.set_task_exited_error( - "heartbeat", + "connection", PeerError::HeartbeatTaskExited("Task was cancelled".to_string()), ) } - // Heartbeat task stopped with panic. - else if error.is_panic() { - panic!("heartbeat task has panicked: {error}"); - } - // Heartbeat task stopped with error. + // Connection task stopped with another kind of task error. else { self.set_task_exited_error( - "heartbeat", + "connection", PeerError::HeartbeatTaskExited(error.to_string()), ) } @@ -546,6 +552,9 @@ impl Client { } /// Poll for space in the shared request sender channel. + /// + /// Returns an error if the sender channel is closed. If there is no space in the channel, + /// returns `Pending`, and schedules the task for wakeup when there is space available. fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll> { let server_result = ready!(self.server_tx.poll_ready(cx)); if server_result.is_err() { From 51890e9bab1789c58280b5849895058147daaa68 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 6 Nov 2023 12:09:48 +1000 Subject: [PATCH 16/26] Fix connection client task wakeups to prevent hangs --- zebra-network/src/peer/client.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 28801563d8c..6bb8f1a04ee 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -605,18 +605,27 @@ impl Service for Client { //`ready!` returns `Poll::Pending` when `server_tx` is unready, and // schedules this task for wakeup. - let mut result = self - .check_heartbeat(cx) - .and_then(|()| self.check_connection(cx)); + // Check all the tasks + let heartbeat_result = self.check_heartbeat(cx); + let connection_result = self.check_connection(cx); + let sender_result = self.poll_request(cx); - if result.is_ok() { - result = ready!(self.poll_request(cx)); - } + // Requests should only wait if there is no space in the channel. + let is_pending = sender_result.is_pending(); + + // Combine the results, include the result inside the poll. + let result = heartbeat_result.and(connection_result).and_then(|_| { + let _pending: Poll<()> = sender_result?; + Ok(()) + }); + // Shut down the client and connection if there is an error. if let Err(error) = result { self.shutdown(); Poll::Ready(Err(error)) + } else if is_pending { + Poll::Pending } else { Poll::Ready(Ok(())) } From ffe0eb6ae96668835440a87cd783b12454cb340e Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 6 Nov 2023 12:23:52 +1000 Subject: [PATCH 17/26] Simplify error and pending handling --- zebra-network/src/peer/client.rs | 77 +++++++++++++++++++------------- 1 file changed, 46 insertions(+), 31 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 6bb8f1a04ee..e5f8a95ac40 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -431,7 +431,7 @@ impl Client { /// /// If the heartbeat task panicked. #[allow(clippy::unwrap_in_result)] - fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> { + fn poll_heartbeat(&mut self, cx: &mut Context<'_>) -> Poll> { let is_canceled = self .shutdown_tx .as_mut() @@ -439,7 +439,7 @@ impl Client { .poll_canceled(cx) .is_ready(); - match self.heartbeat_task.poll_unpin(cx) { + let result = match self.heartbeat_task.poll_unpin(cx) { Poll::Pending => { if is_canceled { self.set_task_exited_error( @@ -448,7 +448,7 @@ impl Client { ) } else { // Heartbeat task is still running. - Ok(()) + return Poll::Pending; } } Poll::Ready(Ok(Ok(_))) => { @@ -486,7 +486,9 @@ impl Client { ) } } - } + }; + + Poll::Ready(result) } /// Check if the connection's request/response task has exited. @@ -497,17 +499,14 @@ impl Client { /// # Panics /// /// If the connection task panicked. - fn check_connection(&mut self, context: &mut Context<'_>) -> Result<(), SharedPeerError> { - match self.connection_task.poll_unpin(context) { - Poll::Pending => { - // Connection task is still running. - Ok(()) - } - Poll::Ready(Ok(())) => { + fn poll_connection(&mut self, context: &mut Context<'_>) -> Poll> { + // Return `Pending` if the connection task is still running. + let result = match ready!(self.connection_task.poll_unpin(context)) { + Ok(()) => { // Connection task stopped unexpectedly, without panicking. self.set_task_exited_error("connection", PeerError::ConnectionTaskExited) } - Poll::Ready(Err(error)) => { + Err(error) => { // Connection task panicked. let error = error.panic_if_task_has_panicked(); @@ -526,7 +525,9 @@ impl Client { ) } } - } + }; + + Poll::Ready(result) } /// Properly update the error slot after a background task has unexpectedly stopped. @@ -569,6 +570,33 @@ impl Client { } } + /// Poll for space in the shared request sender channel, and for errors in the connection tasks. + /// + /// Returns an error if the sender channel is closed, or the heartbeat or connection tasks have + /// terminated. If there is no space in the channel, returns `Pending`, and schedules the task + /// for wakeup when there is space available, or one of the tasks terminates. + fn poll_client(&mut self, cx: &mut Context<'_>) -> Poll> { + // # Correctness + // + // The current task must be scheduled for wakeup every time we return + // `Poll::Pending`. + // + // `poll_heartbeat()` and `poll_connection()` schedule the client task for wakeup + // if either task exits, or if the heartbeat task drops the cancel handle. + // + //`ready!` returns `Poll::Pending` when `server_tx` is unready, and + // schedules this task for wakeup. + // + // It's ok to exit early and skip wakeups when there is an error, because the connection + // and its tasks are shut down immediately on error. + + let _heartbeat_pending: Poll<()> = self.poll_heartbeat(cx)?; + let _connection_pending: Poll<()> = self.poll_connection(cx)?; + + // We're only pending if the sender channel is full. + self.poll_request(cx) + } + /// Shut down the resources held by the client half of this peer connection. /// /// Stops further requests to the remote peer, and stops the heartbeat task. @@ -599,33 +627,20 @@ impl Service for Client { // The current task must be scheduled for wakeup every time we return // `Poll::Pending`. // - // `check_heartbeat` and `check_connection` schedule the client task for wakeup - // if either task exits, or if the heartbeat task drops the cancel handle. + // `poll_client()` schedules the client task for wakeup if the sender channel has space, + // either connection task exits, or if the heartbeat task drops the cancel handle. + + // Check all the tasks and channels. // //`ready!` returns `Poll::Pending` when `server_tx` is unready, and // schedules this task for wakeup. - - // Check all the tasks - let heartbeat_result = self.check_heartbeat(cx); - let connection_result = self.check_connection(cx); - let sender_result = self.poll_request(cx); - - // Requests should only wait if there is no space in the channel. - let is_pending = sender_result.is_pending(); - - // Combine the results, include the result inside the poll. - let result = heartbeat_result.and(connection_result).and_then(|_| { - let _pending: Poll<()> = sender_result?; - Ok(()) - }); + let result = ready!(self.poll_client(cx)); // Shut down the client and connection if there is an error. if let Err(error) = result { self.shutdown(); Poll::Ready(Err(error)) - } else if is_pending { - Poll::Pending } else { Poll::Ready(Ok(())) } From a14fa12d88b31e8252f5f8d99176f147bde429dd Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Nov 2023 12:14:34 +1000 Subject: [PATCH 18/26] Clarify inventory set behaviour --- zebra-network/src/peer_set/inventory_registry.rs | 4 +++- zebra-network/src/peer_set/inventory_registry/update.rs | 9 ++++++--- zebra-network/src/peer_set/set.rs | 2 ++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index f01a156389c..cc689fd7f3e 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -281,7 +281,7 @@ impl InventoryRegistry { Update::new(self) } - /// Drive periodic inventory tasks + /// Drive periodic inventory tasks. /// /// Rotates the inventory HashMaps on every timer tick. /// Drains the inv_stream channel and registers all advertised inventory. @@ -330,6 +330,8 @@ impl InventoryRegistry { match channel_result { Ok(change) => self.register(change), Err(BroadcastStreamRecvError::Lagged(count)) => { + // This isn't a fatal inventory error, it's expected behaviour when Zebra is + // under load from peers. metrics::counter!("pool.inventory.dropped", 1); metrics::counter!("pool.inventory.dropped.messages", count); diff --git a/zebra-network/src/peer_set/inventory_registry/update.rs b/zebra-network/src/peer_set/inventory_registry/update.rs index 411d45fb182..5089d64d3bc 100644 --- a/zebra-network/src/peer_set/inventory_registry/update.rs +++ b/zebra-network/src/peer_set/inventory_registry/update.rs @@ -19,8 +19,11 @@ pub struct Update<'a> { impl Unpin for Update<'_> {} impl<'a> Update<'a> { - #[allow(dead_code)] - pub(super) fn new(registry: &'a mut InventoryRegistry) -> Self { + /// Returns a new future that returns when the next inventory update or rotation has been + /// completed by `registry`. + /// + /// See [`InventoryRegistry::poll_inventory()`] for details. + pub fn new(registry: &'a mut InventoryRegistry) -> Self { Self { registry } } } @@ -28,7 +31,7 @@ impl<'a> Update<'a> { impl Future for Update<'_> { type Output = Result<(), BoxError>; - /// A future that returns when the next inventory update has been completed. + /// A future that returns when the next inventory update or rotation has been completed. /// /// See [`InventoryRegistry::poll_inventory()`] for details. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 4577e9f3575..681d9207bce 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -327,6 +327,8 @@ where /// Check background task handles to make sure they're still running. /// + /// Never returns `Ok`. + /// /// If any background task exits, shuts down all other background tasks, /// and returns an error. Otherwise, returns `Pending`, and registers a wakeup for /// receiving the background tasks, or the background tasks exiting. From 6bdf6c14678eb408e4679e3958e33dbde2c8e0d8 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Nov 2023 12:38:09 +1000 Subject: [PATCH 19/26] Define peer set poll_* methods so they return Ok if they do something --- .../src/peer_set/inventory_registry.rs | 40 ++++++--- zebra-network/src/peer_set/set.rs | 81 +++++++++++++------ 2 files changed, 85 insertions(+), 36 deletions(-) diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index cc689fd7f3e..b43915822e9 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -275,7 +275,7 @@ impl InventoryRegistry { self.current.iter().chain(self.prev.iter()) } - /// Returns a future that polls once for new registry updates. + /// Returns a future that waits for new registry updates. #[allow(dead_code)] pub fn update(&mut self) -> Update { Update::new(self) @@ -286,10 +286,14 @@ impl InventoryRegistry { /// Rotates the inventory HashMaps on every timer tick. /// Drains the inv_stream channel and registers all advertised inventory. /// - /// Returns an error if the inventory channel is closed. Otherwise returns `Poll::Pending`, and - /// registers a wakeup the next time there is new inventory, and the next time the inventory - /// should rotate. + /// Returns an error if the inventory channel is closed. + /// + /// Otherwise, returns `Ok` if it performed at least one update or rotation, or `Poll::Pending` + /// if there was no inventory change. Always registers a wakeup for the next inventory update + /// or rotation, even when it returns `Ok`. pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Poll> { + let mut result = Poll::Pending; + // # Correctness // // Registers the current task for wakeup when the timer next becomes ready. @@ -306,6 +310,7 @@ impl InventoryRegistry { // two interval ticks are delayed. if Pin::new(&mut self.interval).poll_next(cx).is_ready() { self.rotate(); + result = Poll::Ready(Ok(())); } // This module uses a broadcast channel instead of an mpsc channel, even @@ -326,25 +331,36 @@ impl InventoryRegistry { // failure of the peer set. // Returns Pending if all messages are processed, but the channel could get more. - while let Some(channel_result) = futures::ready!(self.inv_stream.next().poll_unpin(cx)) { + loop { + let channel_result = self.inv_stream.next().poll_unpin(cx); + match channel_result { - Ok(change) => self.register(change), - Err(BroadcastStreamRecvError::Lagged(count)) => { + Poll::Ready(Some(Ok(change))) => { + self.register(change); + result = Poll::Ready(Ok(())); + } + Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(count)))) => { // This isn't a fatal inventory error, it's expected behaviour when Zebra is // under load from peers. metrics::counter!("pool.inventory.dropped", 1); metrics::counter!("pool.inventory.dropped.messages", count); - // If this message happens a lot, we should improve inventory registry performance, - // or poll the registry or peer set in a separate task. + // If this message happens a lot, we should improve inventory registry + // performance, or poll the registry or peer set in a separate task. info!(count, "dropped lagged inventory advertisements"); } + Poll::Ready(None) => { + // If the channel is empty and returns None, all senders, including the one in + // the handshaker, have been dropped, which really is a permanent failure. + result = Poll::Ready(Err(broadcast::error::RecvError::Closed.into())); + } + Poll::Pending => { + break; + } } } - // If the channel is empty and returns None, all senders, including the one in the - // handshaker, have been dropped, which really is a permanent failure. - Poll::Ready(Err(broadcast::error::RecvError::Closed.into())) + result } /// Record the given inventory `change` for the peer `addr`. diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 681d9207bce..6661978673f 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -427,11 +427,17 @@ where /// Move newly ready services to the ready list if they are for peers with supported protocol /// versions, otherwise they are dropped. Also drop failed services. /// - /// Never returns an error. If there are unready peers, returns `Poll::Pending`, and registers - /// a wakeup when the next peer becomes ready. If there are no unready peers, returns `Ok`, and - /// doesn't register any wakeups. (Since wakeups come from peers, there needs to be at least one - /// peer to register a wakeup.) - fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// Never returns an error. + /// + /// Returns `Ok(Some(())` if at least one peer became ready, `Poll::Pending` if there are + /// unready peers, but none became ready, and `Ok(None)` if the unready peers were empty. + /// + /// If there are any remaining unready peers, registers a wakeup for the next time one becomes + /// ready. If there are no unready peers, doesn't register any wakeups. (Since wakeups come + /// from peers, there needs to be at least one peer to register a wakeup.) + fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll, BoxError>> { + let mut result = Poll::Pending; + // # Correctness // // `poll_next()` must always be called, because `self.unready_services` could have been @@ -444,12 +450,29 @@ where // // Returns Pending if we've finished processing the unready service changes, // but there are still some unready services. - while let Some(ready_peer) = - futures::ready!(Pin::new(&mut self.unready_services).poll_next(cx)) - { + loop { + // No ready peers left, but there are some unready peers pending. + let Poll::Ready(ready_peer) = Pin::new(&mut self.unready_services).poll_next(cx) else { + break; + }; + match ready_peer { + // No unready peers in the list. + None => { + // If we've finished processing the unready service changes, and there are no + // unready services left, it doesn't make sense to return Pending, because + // their stream is terminated. But when we add more unready peers and call + // `poll_next()`, its termination status will be reset, and it will receive + // wakeups again. + if result.is_pending() { + result = Poll::Ready(Ok(None)); + } + + break; + } + // Unready -> Ready - Ok((key, svc)) => { + Some(Ok((key, svc))) => { trace!(?key, "service became ready"); let cancel = self.cancel_handles.remove(&key); assert!(cancel.is_some(), "missing cancel handle"); @@ -457,10 +480,13 @@ where if svc.remote_version() >= self.minimum_peer_version.current() { self.ready_services.insert(key, svc); } + + // Return Ok if at least one peer became ready. + result = Poll::Ready(Ok(Some(()))); } // Unready -> Canceled - Err((key, UnreadyError::Canceled)) => { + Some(Err((key, UnreadyError::Canceled))) => { // A service be canceled because we've connected to the same service twice. // In that case, there is a cancel handle for the peer address, // but it belongs to the service for the newer connection. @@ -470,7 +496,7 @@ where "service was canceled, dropping service" ); } - Err((key, UnreadyError::CancelHandleDropped(_))) => { + Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => { // Similarly, services with dropped cancel handes can have duplicates. trace!( ?key, @@ -480,7 +506,7 @@ where } // Unready -> Errored - Err((key, UnreadyError::Inner(error))) => { + Some(Err((key, UnreadyError::Inner(error)))) => { debug!(%error, "service failed while unready, dropping service"); let cancel = self.cancel_handles.remove(&key); @@ -489,11 +515,7 @@ where } } - // Return Ok if we've finished processing the unready service changes, and there are no - // unready services left. This means the stream is terminated. But when we add more unready - // peers and call `poll_next()`, its termination status will be reset, and it will receive - // wakeups again. - Poll::Ready(Ok(())) + result } /// Returns the number of peer connections Zebra already has with @@ -522,18 +544,27 @@ where /// Drops removed services, after cancelling any pending requests. /// /// If the peer connector channel is closed, returns an error. - /// Otherwise, returns `Poll::Pending`, and registers a wakeup for new peers. + /// + /// Otherwise, returns `Ok` if it discovered at least one peer, or `Poll::Pending` if it didn't + /// discover any peers. Always registers a wakeup for new peers, even when it returns `Ok`. fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll> { + // Return pending if there are no peers in the list. + let mut result = Poll::Pending; + loop { - // Return Pending if we've finished processing the entire list, - // but there could be new peers later. - let discovered = futures::ready!(Pin::new(&mut self.discover).poll_discover(cx)); + // If we've emptied the list, finish looping, otherwise process the new peer. + let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else { + break; + }; // If the change channel has a permanent error, return that error. let change = discovered .ok_or("discovery stream closed")? .map_err(Into::into)?; + // Otherwise we have successfully processed a peer. + result = Poll::Ready(Ok(())); + // Process each change. match change { Change::Remove(key) => { @@ -568,6 +599,8 @@ where } } } + + result } /// Checks if the minimum peer version has changed, and disconnects from outdated peers. @@ -1023,11 +1056,11 @@ where // Check for new peers, and register a task wakeup when the next new peers arrive. New peers // can be infrequent if our connection slots are full, or we're connected to all // available/useful peers. - let _poll_pending: Poll<()> = self.poll_discover(cx)?; + let _poll_pending_or_ready: Poll<()> = self.poll_discover(cx)?; // These tasks don't provide new peers or newly ready peers. let _poll_pending: Poll<()> = self.poll_background_errors(cx)?; - let _poll_pending: Poll<()> = self.inventory_registry.poll_inventory(cx)?; + let _poll_pending_or_ready: Poll<()> = self.inventory_registry.poll_inventory(cx)?; // Check for newly ready peers, including newly added peers (which are added as unready). // So it needs to run after `poll_discover()`. Registers a wakeup if there are any unready @@ -1037,7 +1070,7 @@ where // connection, and release its connection slot. // // TODO: drop peers that overload us with inbound messages and never become ready (#7822) - let _poll_pending_or_ready: Poll<()> = self.poll_unready(cx)?; + let _poll_pending_or_ready: Poll> = self.poll_unready(cx)?; // Cleanup and metrics. From 21b5c39ca21a98ca2ba7878d5b0bd0f5bd04fe83 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 7 Nov 2023 12:42:20 +1000 Subject: [PATCH 20/26] Clarify documentation Co-authored-by: Arya --- zebra-chain/src/diagnostic/task/thread.rs | 8 +++++--- zebra-network/src/peer/client.rs | 2 ++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/zebra-chain/src/diagnostic/task/thread.rs b/zebra-chain/src/diagnostic/task/thread.rs index 67e8ad98e3f..517aef01411 100644 --- a/zebra-chain/src/diagnostic/task/thread.rs +++ b/zebra-chain/src/diagnostic/task/thread.rs @@ -18,9 +18,11 @@ where { type Output = T; - /// Panics if the thread panicked. - /// If `panic_on_unexpected_termination` is true, and Zebra is not shutting down, also panics - /// if the thread exits. + /// # Panics + /// + /// - if the thread panicked. + /// - if the thread is cancelled, `panic_on_unexpected_termination` is true, and + /// Zebra is not shutting down. /// /// Threads can't be cancelled except by using a panic, so there are no thread errors here. /// `panic_on_unexpected_termination` is diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index e5f8a95ac40..8dd81ed3239 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -441,6 +441,8 @@ impl Client { let result = match self.heartbeat_task.poll_unpin(cx) { Poll::Pending => { + // The heartbeat task returns `Pending` while it continues to run. + // But if it has dropped its receiver, it is shutting down, and we should also shut down. if is_canceled { self.set_task_exited_error( "heartbeat", From a7a4a375b650864793fb2bc7f7c78086e9fb3c31 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Nov 2023 12:46:31 +1000 Subject: [PATCH 21/26] Fix test that depended on preselected peers --- zebra-network/src/peer_set/set/tests/vectors.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/zebra-network/src/peer_set/set/tests/vectors.rs b/zebra-network/src/peer_set/set/tests/vectors.rs index d63ba719f3f..02ddc25d85e 100644 --- a/zebra-network/src/peer_set/set/tests/vectors.rs +++ b/zebra-network/src/peer_set/set/tests/vectors.rs @@ -1,7 +1,8 @@ //! Fixed test vectors for the peer set. -use std::{cmp::max, iter}; +use std::{cmp::max, iter, time::Duration}; +use tokio::time::timeout; use tower::{Service, ServiceExt}; use zebra_chain::{ @@ -9,7 +10,6 @@ use zebra_chain::{ parameters::{Network, NetworkUpgrade}, }; -use super::{PeerSetBuilder, PeerVersions}; use crate::{ constants::DEFAULT_MAX_CONNS_PER_IP, peer::{ClientRequest, MinimumPeerVersion}, @@ -18,6 +18,8 @@ use crate::{ Request, SharedPeerError, }; +use super::{PeerSetBuilder, PeerVersions}; + #[test] fn peer_set_ready_single_connection() { // We are going to use just one peer version in this test @@ -171,12 +173,7 @@ fn peer_set_ready_multiple_connections() { // Peer set hangs when no more connections are present let peer_ready = peer_set.ready(); - peer_ready - .await - .expect("peer set is always ready until peers are cleared"); - - // TODO: re-enable this check when waiting is fixed? - //assert!(timeout(Duration::from_secs(10), peer_ready).await.is_err()); + assert!(timeout(Duration::from_secs(10), peer_ready).await.is_err()); }); } From 5db063520bff55ba7169f2a51eadcf939ebd178f Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Nov 2023 13:27:49 +1000 Subject: [PATCH 22/26] Check ready peers for errors before sending requests to them --- zebra-network/src/peer/load_tracked_client.rs | 1 + zebra-network/src/peer_set/set.rs | 96 +++++++++++++++---- 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/zebra-network/src/peer/load_tracked_client.rs b/zebra-network/src/peer/load_tracked_client.rs index 49d1430720a..6211e6cd52b 100644 --- a/zebra-network/src/peer/load_tracked_client.rs +++ b/zebra-network/src/peer/load_tracked_client.rs @@ -20,6 +20,7 @@ use crate::{ /// A client service wrapper that keeps track of its load. /// /// It also keeps track of the peer's reported protocol version. +#[derive(Debug)] pub struct LoadTrackedClient { /// A service representing a connected peer, wrapped in a load tracker. service: PeakEwma, diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 6661978673f..80e65ce5b84 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -474,12 +474,8 @@ where // Unready -> Ready Some(Ok((key, svc))) => { trace!(?key, "service became ready"); - let cancel = self.cancel_handles.remove(&key); - assert!(cancel.is_some(), "missing cancel handle"); - if svc.remote_version() >= self.minimum_peer_version.current() { - self.ready_services.insert(key, svc); - } + self.push_ready(key, svc); // Return Ok if at least one peer became ready. result = Poll::Ready(Ok(Some(()))); @@ -518,6 +514,55 @@ where result } + /// Checks previously ready peer services for errors. + /// + /// The only way these peer `Client`s can become unready is when we send them a request, + /// because the peer set has exclusive access to send requests to each peer. (If an inbound + /// request is in progress, it will be handled, then our request will be sent by the connection + /// task.) + /// + /// Returns `Poll::Ready` if there are some ready peers, and `Poll::Pending` if there are no + /// ready peers. Never registers any task wakeups. + /// + /// # Panics + /// + /// If any peers somehow became unready. This indicates a bug in the peer set, where requests + /// are sent to peers without putting them in `unready_peers`. + fn poll_ready_peer_errors(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let mut previous = HashMap::new(); + std::mem::swap(&mut previous, &mut self.ready_services); + + // TODO: consider only checking some peers each poll (for performance reasons), + // but make sure we eventually check all of them. + for (key, mut svc) in previous.drain() { + let Poll::Ready(peer_readiness) = Pin::new(&mut svc).poll_ready(cx) else { + unreachable!( + "unexpected unready peer: peers must be put into the unready_peers list \ + after sending them a request" + ); + }; + + match peer_readiness { + // Still ready, add it back to the list. + Ok(()) => self.push_ready(key, svc), + + // Ready -> Errored + Err(error) => { + debug!(%error, "service failed while ready, dropping service"); + + // Ready services can just be dropped, they don't need any cleanup. + std::mem::drop(svc); + } + } + } + + if self.ready_services.is_empty() { + Poll::Pending + } else { + Poll::Ready(()) + } + } + /// Returns the number of peer connections Zebra already has with /// the provided IP address /// @@ -642,6 +687,20 @@ where } } + /// Adds a ready service to the ready list if it's for a peer with a supported version. + /// + /// If the service is for a connection to an outdated peer, the service is dropped. + fn push_ready(&mut self, key: D::Key, svc: D::Service) { + let cancel = self.cancel_handles.remove(&key); + assert!(cancel.is_some(), "missing cancel handle"); + + if svc.remote_version() >= self.minimum_peer_version.current() { + self.ready_services.insert(key, svc); + } else { + std::mem::drop(svc); + } + } + /// Adds a busy service to the unready list if it's for a peer with a supported version, /// and adds a cancel handle for the service's current request. /// @@ -1081,30 +1140,31 @@ where self.log_peer_set_size(); self.update_metrics(); - if self.ready_services.is_empty() { + // Check for failures in ready peers, removing newly errored or disconnected peers. + // So it needs to run after `poll_unready()`. + let ready_peers: Poll<()> = self.poll_ready_peer_errors(cx); + + if ready_peers.is_pending() { // # Correctness // - // If the channel is full, drop the demand signal rather than waiting. - // If we waited here, the crawler could deadlock sending a request to - // fetch more peers, because it also empties the channel. + // If the channel is full, drop the demand signal rather than waiting. If we waited + // here, the crawler could deadlock sending a request to fetch more peers, because it + // also empties the channel. trace!("no ready services, sending demand signal"); let _ = self.demand_signal.try_send(MorePeers); // # Correctness // - // The current task must be scheduled for wakeup every time we - // return `Poll::Pending`. + // The current task must be scheduled for wakeup every time we return `Poll::Pending`. // - // As long as there are unready or new peers, this task will run, - // because: - // - `poll_discover` schedules this task for wakeup when new - // peers arrive. - // - if there are unready peers, `poll_unready` schedules this + // As long as there are unready or new peers, this task will run, because: + // - `poll_discover` schedules this task for wakeup when new peers arrive. + // - if there are unready peers, `poll_unready` or `poll_ready_peers` schedule this // task for wakeup when peer services become ready. // // To avoid peers blocking on a full peer status/error channel: - // - `poll_background_errors` schedules this task for wakeup when - // the peer status update task exits. + // - `poll_background_errors` schedules this task for wakeup when the peer status + // update task exits. Poll::Pending } else { Poll::Ready(Ok(())) From 8e20387f2425736eab62aa1ab8661641ff670e60 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Nov 2023 14:47:43 +1000 Subject: [PATCH 23/26] Fix a hanging test by not waiting for irrelevant actions --- .../peer_set/inventory_registry/tests/prop.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/zebra-network/src/peer_set/inventory_registry/tests/prop.rs b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs index 959d7b9182f..6bed4fbd729 100644 --- a/zebra-network/src/peer_set/inventory_registry/tests/prop.rs +++ b/zebra-network/src/peer_set/inventory_registry/tests/prop.rs @@ -1,7 +1,11 @@ //! Randomised property tests for the inventory registry. -use std::collections::HashSet; +use std::{ + collections::HashSet, + task::{Context, Poll}, +}; +use futures::task::noop_waker; use proptest::prelude::*; use crate::{ @@ -81,10 +85,12 @@ async fn inv_registry_inbound_wrapper_with( forwarded_msg.expect("unexpected forwarded error result"), ); - inv_registry - .update() - .await - .expect("unexpected dropped registry sender channel"); + // We don't actually care if the registry takes any action here. + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + let _poll_pending_or_ok: Poll<()> = inv_registry + .poll_inventory(&mut cx) + .map(|result| result.expect("unexpected error polling inventory")); let test_peer = test_peer .get_transient_addr() From 598a9efabc2623f84dc7191914490f57db7e236c Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Nov 2023 15:12:00 +1000 Subject: [PATCH 24/26] Only remove cancel handles when they are required --- zebra-network/src/peer_set/set.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 80e65ce5b84..60536fd21d0 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -475,7 +475,7 @@ where Some(Ok((key, svc))) => { trace!(?key, "service became ready"); - self.push_ready(key, svc); + self.push_ready(true, key, svc); // Return Ok if at least one peer became ready. result = Poll::Ready(Ok(Some(()))); @@ -544,7 +544,7 @@ where match peer_readiness { // Still ready, add it back to the list. - Ok(()) => self.push_ready(key, svc), + Ok(()) => self.push_ready(false, key, svc), // Ready -> Errored Err(error) => { @@ -688,11 +688,16 @@ where } /// Adds a ready service to the ready list if it's for a peer with a supported version. + /// If `was_unready` is true, also removes the peer's cancel handle. /// /// If the service is for a connection to an outdated peer, the service is dropped. - fn push_ready(&mut self, key: D::Key, svc: D::Service) { + fn push_ready(&mut self, was_unready: bool, key: D::Key, svc: D::Service) { let cancel = self.cancel_handles.remove(&key); - assert!(cancel.is_some(), "missing cancel handle"); + assert_eq!( + cancel.is_some(), + was_unready, + "missing or unexpected cancel handle" + ); if svc.remote_version() >= self.minimum_peer_version.current() { self.ready_services.insert(key, svc); From 378ba7c6ce2f81e7d872ba56580bded7df17be3d Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 14 Nov 2023 15:42:04 +1000 Subject: [PATCH 25/26] fix incorrect panic on termination setting --- zebra-chain/src/diagnostic/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebra-chain/src/diagnostic/task.rs b/zebra-chain/src/diagnostic/task.rs index d76f7265b4d..bbdb72fecfa 100644 --- a/zebra-chain/src/diagnostic/task.rs +++ b/zebra-chain/src/diagnostic/task.rs @@ -32,7 +32,7 @@ pub trait CheckForPanics: Sized { /// If `self` contains a panic payload. #[track_caller] fn panic_if_task_has_panicked(self) -> Self::Output { - self.check_for_panics_with(true) + self.check_for_panics_with(false) } /// Check if `self` contains a panic payload, then panic. Also panics if From 798b1ca4ca223cc003dbf6c8b45851baacd95904 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 16 Nov 2023 12:24:14 +1000 Subject: [PATCH 26/26] Clarify method comments Co-authored-by: Arya --- zebra-network/src/peer_set/set.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 60536fd21d0..fdec72fded8 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -522,11 +522,11 @@ where /// task.) /// /// Returns `Poll::Ready` if there are some ready peers, and `Poll::Pending` if there are no - /// ready peers. Never registers any task wakeups. + /// ready peers. Registers a wakeup if any peer has failed due to a disconnection, hang, or protocol error. /// /// # Panics /// - /// If any peers somehow became unready. This indicates a bug in the peer set, where requests + /// If any peers somehow became unready without being sent a request. This indicates a bug in the peer set, where requests /// are sent to peers without putting them in `unready_peers`. fn poll_ready_peer_errors(&mut self, cx: &mut Context<'_>) -> Poll<()> { let mut previous = HashMap::new();