Skip to content

Commit

Permalink
Deprecate futures ticker (#6630)
Browse files Browse the repository at this point in the history
* deprecate futures-ticker

* Merge branch 'unstable' of github.com:sigp/lighthouse into deprecate-futures-timer

* Merge branch 'unstable' into deprecate-futures-timer

* making the linter happy

* remove unrequired #[allow(unused_imports)]

* fixing minor issues

* merge commit

* minor fix

* clippy changes
  • Loading branch information
hopinheimer authored Dec 3, 2024
1 parent 1fd86f8 commit fec502d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 29 deletions.
36 changes: 23 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions beacon_node/lighthouse_network/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[features]
wasm-bindgen = ["getrandom/js"]
wasm-bindgen = ["getrandom/js", "futures-timer/wasm-bindgen"]
rsa = []

[dependencies]
Expand All @@ -22,7 +22,6 @@ bytes = "1.5"
either = "1.9"
fnv = "1.0.7"
futures = "0.3.30"
futures-ticker = "0.0.3"
futures-timer = "3.0.2"
getrandom = "0.2.12"
hashlink.workspace = true
Expand Down
25 changes: 12 additions & 13 deletions beacon_node/lighthouse_network/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use std::{
time::Duration,
};

use futures::StreamExt;
use futures_ticker::Ticker;
use futures::FutureExt;
use hashlink::LinkedHashMap;
use prometheus_client::registry::Registry;
use rand::{seq::SliceRandom, thread_rng};
Expand Down Expand Up @@ -74,6 +73,7 @@ use super::{
types::RpcOut,
};
use super::{PublishError, SubscriptionError, TopicScoreParams, ValidationError};
use futures_timer::Delay;
use quick_protobuf::{MessageWrite, Writer};
use std::{cmp::Ordering::Equal, fmt::Debug};

Expand Down Expand Up @@ -301,7 +301,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
mcache: MessageCache,

/// Heartbeat interval stream.
heartbeat: Ticker,
heartbeat: Delay,

/// Number of heartbeats since the beginning of time; this allows us to amortize some resource
/// clean up -- eg backoff clean up.
Expand All @@ -318,7 +318,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
outbound_peers: HashSet<PeerId>,

/// Stores optional peer score data together with thresholds and decay interval.
peer_score: Option<(PeerScore, PeerScoreThresholds, Ticker)>,
peer_score: Option<(PeerScore, PeerScoreThresholds, Delay)>,

/// Counts the number of `IHAVE` received from each peer since the last heartbeat.
count_received_ihave: HashMap<PeerId, usize>,
Expand Down Expand Up @@ -466,10 +466,7 @@ where
config.backoff_slack(),
),
mcache: MessageCache::new(config.history_gossip(), config.history_length()),
heartbeat: Ticker::new_with_next(
config.heartbeat_interval(),
config.heartbeat_initial_delay(),
),
heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
heartbeat_ticks: 0,
px_peers: HashSet::new(),
outbound_peers: HashSet::new(),
Expand Down Expand Up @@ -938,7 +935,7 @@ where
return Err("Peer score set twice".into());
}

let interval = Ticker::new(params.decay_interval);
let interval = Delay::new(params.decay_interval);
let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
self.peer_score = Some((peer_score, threshold, interval));
Ok(())
Expand Down Expand Up @@ -1208,7 +1205,7 @@ where
}

fn score_below_threshold_from_scores(
peer_score: &Option<(PeerScore, PeerScoreThresholds, Ticker)>,
peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay)>,
peer_id: &PeerId,
threshold: impl Fn(&PeerScoreThresholds) -> f64,
) -> (bool, f64) {
Expand Down Expand Up @@ -3427,14 +3424,16 @@ where
}

// update scores
if let Some((peer_score, _, interval)) = &mut self.peer_score {
while let Poll::Ready(Some(_)) = interval.poll_next_unpin(cx) {
if let Some((peer_score, _, delay)) = &mut self.peer_score {
if delay.poll_unpin(cx).is_ready() {
peer_score.refresh_scores();
delay.reset(peer_score.params.decay_interval);
}
}

while let Poll::Ready(Some(_)) = self.heartbeat.poll_next_unpin(cx) {
if self.heartbeat.poll_unpin(cx).is_ready() {
self.heartbeat();
self.heartbeat.reset(self.config.heartbeat_interval());
}

Poll::Pending
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::subscription_filter::WhitelistSubscriptionFilter;
use crate::types::RpcReceiver;
use crate::{config::ConfigBuilder, types::Rpc, IdentTopic as Topic};
use byteorder::{BigEndian, ByteOrder};
use futures::StreamExt;
use libp2p::core::ConnectedPoint;
use rand::Rng;
use std::net::Ipv4Addr;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/gossipsub/src/peer_score.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mod tests;
const TIME_CACHE_DURATION: u64 = 120;

pub(crate) struct PeerScore {
params: PeerScoreParams,
pub(crate) params: PeerScoreParams,
/// The score parameters.
peer_stats: HashMap<PeerId, PeerStats>,
/// Tracking peers per IP.
Expand Down

0 comments on commit fec502d

Please sign in to comment.