Skip to content

Commit

Permalink
Revert "pool: deprecate auto adjustment of retry seconds for relays"
Browse files Browse the repository at this point in the history
This reverts commit c4270c0

Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Sep 19, 2024
1 parent 9d8d1ef commit 4dd08af
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 11 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
* database: update `NostrDatabase::event_by_id` fingerprint ([Yuki Kishimoto])
* relay-builder: bump `tokio-tungstenite` to `v0.24` ([Yuki Kishimoto])
* pool: bump `async-wsocket` to `v0.8` ([Yuki Kishimoto])
* pool: deprecate auto adjustment of retry seconds for relays ([Yuki Kishimoto])
* pool: avoid unnecessary `Url` and `Relay` clone in `RelayPool` methods ([Yuki Kishimoto])
* pool: avoid `Relay` clone in `RelayPool::connect_relay` method ([Yuki Kishimoto])
* pool: `RelayPool::send_event` and `RelayPool::batch_event` send only to relays with `WRITE` flag ([Yuki Kishimoto])
Expand Down
12 changes: 12 additions & 0 deletions bindings/nostr-sdk-ffi/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ impl RelayOptions {
self.inner.update_retry_sec(retry_sec);
}

/// Automatically adjust retry seconds based on success/attempts (default: true)
pub fn adjust_retry_sec(self: Arc<Self>, adjust_retry_sec: bool) -> Self {
let mut builder = unwrap_or_clone_arc(self);
builder.inner = builder.inner.adjust_retry_sec(adjust_retry_sec);
builder
}

/// Set adjust_retry_sec option
pub fn update_adjust_retry_sec(&self, adjust_retry_sec: bool) {
self.inner.update_adjust_retry_sec(adjust_retry_sec);
}

/// Set custom limits
pub fn limits(self: Arc<Self>, limits: &RelayLimits) -> Self {
let mut builder = unwrap_or_clone_arc(self);
Expand Down
10 changes: 10 additions & 0 deletions bindings/nostr-sdk-js/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ impl JsRelayOptions {
self.inner.update_retry_sec(retry_sec);
}

/// Automatically adjust retry seconds based on success/attempts (default: true)
pub fn adjust_retry_sec(self, adjust_retry_sec: bool) -> Self {
self.inner.adjust_retry_sec(adjust_retry_sec).into()
}

/// Set adjust_retry_sec option
pub fn update_adjust_retry_sec(&self, adjust_retry_sec: bool) {
self.inner.update_adjust_retry_sec(adjust_retry_sec);
}

/// Set custom limits
pub fn limits(self, limits: &JsRelayLimits) -> Self {
self.inner.limits(limits.deref().clone()).into()
Expand Down
30 changes: 25 additions & 5 deletions crates/nostr-relay-pool/src/relay/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

//! Internal Relay
use std::cmp;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -19,8 +20,7 @@ use nostr::message::MessageHandleError;
use nostr::nips::nip01::Coordinate;
#[cfg(feature = "nip11")]
use nostr::nips::nip11::RelayInformationDocument;
#[cfg(not(target_arch = "wasm32"))]
use nostr::secp256k1::rand;
use nostr::secp256k1::rand::{self, Rng};
use nostr::{
ClientMessage, Event, EventId, Filter, JsonUtil, Kind, MissingPartialEvent, PartialEvent,
RawRelayMessage, RelayMessage, SubscriptionId, Timestamp, Url,
Expand All @@ -34,8 +34,8 @@ use super::filtering::{CheckFiltering, RelayFiltering};
use super::flags::AtomicRelayServiceFlags;
use super::options::{
FilterOptions, NegentropyOptions, RelayOptions, RelaySendOptions, SubscribeAutoCloseOptions,
SubscribeOptions, NEGENTROPY_BATCH_SIZE_DOWN, NEGENTROPY_HIGH_WATER_UP,
NEGENTROPY_LOW_WATER_UP,
SubscribeOptions, MAX_ADJ_RETRY_SEC, MIN_RETRY_SEC, NEGENTROPY_BATCH_SIZE_DOWN,
NEGENTROPY_HIGH_WATER_UP, NEGENTROPY_LOW_WATER_UP,
};
use super::stats::RelayConnectionStats;
use super::{Error, Reconciliation, RelayNotification, RelayStatus};
Expand Down Expand Up @@ -513,7 +513,8 @@ impl InternalRelay {
};

// Sleep
let retry_sec: u64 = relay.opts.get_retry_sec();
let retry_sec: u64 = relay.calculate_retry_sec();
tracing::trace!("{} retry time set to {retry_sec} secs", relay.url);
thread::sleep(Duration::from_secs(retry_sec)).await;
}
});
Expand All @@ -526,6 +527,25 @@ impl InternalRelay {
}
}

/// Depending on attempts and success, use default or incremental retry time
fn calculate_retry_sec(&self) -> u64 {
if self.opts.get_adjust_retry_sec() {
// diff = attempts - success
let diff: u64 = self.stats.attempts().saturating_sub(self.stats.success()) as u64;

// Use incremental retry time if diff >= 3
if diff >= 3 {
let retry_interval: i64 =
cmp::min(MIN_RETRY_SEC * (1 + diff), MAX_ADJ_RETRY_SEC) as i64;
let jitter: i64 = rand::thread_rng().gen_range(-1..=1);
return retry_interval.saturating_add(jitter) as u64;
}
}

// Use default retry time
self.opts.get_retry_sec()
}

#[cfg(feature = "nip11")]
fn request_nip11_document(&self) {
let (allowed, proxy) = match self.opts.connection_mode {
Expand Down
21 changes: 16 additions & 5 deletions crates/nostr-relay-pool/src/relay/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::RelayLimits;
pub const DEFAULT_SEND_TIMEOUT: Duration = Duration::from_secs(20);
pub(super) const DEFAULT_RETRY_SEC: u64 = 10;
pub(super) const MIN_RETRY_SEC: u64 = 5;
pub(super) const MAX_ADJ_RETRY_SEC: u64 = 60;
pub(super) const NEGENTROPY_HIGH_WATER_UP: usize = 100;
pub(super) const NEGENTROPY_LOW_WATER_UP: usize = 50;
pub(super) const NEGENTROPY_BATCH_SIZE_DOWN: usize = 50;
Expand All @@ -33,6 +34,7 @@ pub struct RelayOptions {
pow: Arc<AtomicU8>,
reconnect: Arc<AtomicBool>,
retry_sec: Arc<AtomicU64>,
adjust_retry_sec: Arc<AtomicBool>,
pub(super) limits: RelayLimits,
pub(super) max_avg_latency: Option<Duration>,
pub(super) filtering_mode: RelayFilteringMode,
Expand All @@ -46,6 +48,7 @@ impl Default for RelayOptions {
pow: Arc::new(AtomicU8::new(0)),
reconnect: Arc::new(AtomicBool::new(true)),
retry_sec: Arc::new(AtomicU64::new(DEFAULT_RETRY_SEC)),
adjust_retry_sec: Arc::new(AtomicBool::new(true)),
limits: RelayLimits::default(),
max_avg_latency: None,
filtering_mode: RelayFilteringMode::default(),
Expand Down Expand Up @@ -174,14 +177,22 @@ impl RelayOptions {
}

/// Automatically adjust retry seconds based on success/attempts (default: true)
#[deprecated(since = "0.35.0")]
pub fn adjust_retry_sec(self, _adjust_retry_sec: bool) -> Self {
self
pub fn adjust_retry_sec(self, adjust_retry_sec: bool) -> Self {
Self {
adjust_retry_sec: Arc::new(AtomicBool::new(adjust_retry_sec)),
..self
}
}

pub(crate) fn get_adjust_retry_sec(&self) -> bool {
self.adjust_retry_sec.load(Ordering::SeqCst)
}

/// Set adjust_retry_sec option
#[deprecated(since = "0.35.0")]
pub fn update_adjust_retry_sec(&self, _adjust_retry_sec: bool) {}
pub fn update_adjust_retry_sec(&self, adjust_retry_sec: bool) {
self.adjust_retry_sec
.store(adjust_retry_sec, Ordering::SeqCst);
}

/// Set custom limits
pub fn limits(mut self, limits: RelayLimits) -> Self {
Expand Down

0 comments on commit 4dd08af

Please sign in to comment.