Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk: fix pong not match if connect method called multiple times #177

Merged
merged 1 commit into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bindings/nostr-sdk-ffi/src/nostr_sdk.udl
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,9 @@ interface RelayConnectionStats {

enum RelayStatus {
"Initialized",
"Connected",
"Pending",
"Connecting",
"Connected",
"Disconnected",
"Stopped",
"Terminated",
Expand Down
173 changes: 86 additions & 87 deletions crates/nostr-sdk/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub enum Error {
pub enum RelayStatus {
/// Relay initialized
Initialized,
/// Pending
Pending,
/// Connecting
Connecting,
/// Relay connected
Expand All @@ -131,6 +133,7 @@ impl fmt::Display for RelayStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Initialized => write!(f, "Initialized"),
Self::Pending => write!(f, "Pending"),
Self::Connecting => write!(f, "Connecting"),
Self::Connected => write!(f, "Connected"),
Self::Disconnected => write!(f, "Disconnected"),
Expand Down Expand Up @@ -454,98 +457,85 @@ impl Relay {
self.schedule_for_stop(false);
self.schedule_for_termination(false);

if self.opts.get_reconnect() {
if wait_for_connection {
if let RelayStatus::Initialized | RelayStatus::Stopped | RelayStatus::Terminated =
self.status().await
{
if let RelayStatus::Initialized | RelayStatus::Stopped | RelayStatus::Terminated =
self.status().await
{
if self.opts.get_reconnect() {
if wait_for_connection {
self.try_connect().await
}
}

// TODO: temp disable because cause stop-start issues
/* if !self.is_auto_connect_loop_running() {
self.set_auto_connect_loop_running(true); */

tracing::debug!("Auto connect loop started for {}", self.url);

if !wait_for_connection {
self.set_status(RelayStatus::Initialized).await;
}

let relay = self.clone();
thread::abortable(async move {
loop {
let queue = relay.queue();
if queue > 0 {
tracing::info!(
"{} messages queued for {} (capacity: {})",
queue,
relay.url(),
relay.relay_sender.capacity()
);
}
tracing::debug!("Auto connect loop started for {}", self.url);

// Schedule relay for termination
// Needed to terminate the auto reconnect loop, also if the relay is not connected yet.
if relay.is_scheduled_for_stop() {
relay.set_status(RelayStatus::Stopped).await;
relay.schedule_for_stop(false);
tracing::debug!(
"Auto connect loop terminated for {} [stop - schedule]",
relay.url
);
break;
} else if relay.is_scheduled_for_termination() {
relay.set_status(RelayStatus::Terminated).await;
relay.schedule_for_termination(false);
tracing::debug!(
"Auto connect loop terminated for {} [schedule]",
relay.url
);
break;
}
if !wait_for_connection {
self.set_status(RelayStatus::Pending).await;
}

// Check status
match relay.status().await {
RelayStatus::Initialized | RelayStatus::Disconnected => {
relay.try_connect().await
let relay = self.clone();
thread::abortable(async move {
loop {
let queue = relay.queue();
if queue > 0 {
tracing::info!(
"{} messages queued for {} (capacity: {})",
queue,
relay.url(),
relay.relay_sender.capacity()
);
}
RelayStatus::Stopped | RelayStatus::Terminated => {
tracing::debug!("Auto connect loop terminated for {}", relay.url);

// Schedule relay for termination
// Needed to terminate the auto reconnect loop, also if the relay is not connected yet.
if relay.is_scheduled_for_stop() {
relay.set_status(RelayStatus::Stopped).await;
relay.schedule_for_stop(false);
tracing::debug!(
"Auto connect loop terminated for {} [stop - schedule]",
relay.url
);
break;
} else if relay.is_scheduled_for_termination() {
relay.set_status(RelayStatus::Terminated).await;
relay.schedule_for_termination(false);
tracing::debug!(
"Auto connect loop terminated for {} [schedule]",
relay.url
);
break;
}
_ => (),
};

let retry_sec: u64 = if relay.opts.get_adjust_retry_sec() {
let var: u64 =
relay.stats.attempts().saturating_sub(relay.stats.success()) as u64;
if var >= 3 {
let retry_interval: i64 =
cmp::min(MIN_RETRY_SEC * (1 + var), MAX_ADJ_RETRY_SEC) as i64;
let jitter: i64 = rand::thread_rng().gen_range(-1..=1);
retry_interval.saturating_add(jitter) as u64
// Check status
match relay.status().await {
RelayStatus::Initialized
| RelayStatus::Pending
| RelayStatus::Disconnected => relay.try_connect().await,
RelayStatus::Stopped | RelayStatus::Terminated => {
tracing::debug!("Auto connect loop terminated for {}", relay.url);
break;
}
_ => (),
};

let retry_sec: u64 = if relay.opts.get_adjust_retry_sec() {
let var: u64 =
relay.stats.attempts().saturating_sub(relay.stats.success()) as u64;
if var >= 3 {
let retry_interval: i64 =
cmp::min(MIN_RETRY_SEC * (1 + var), MAX_ADJ_RETRY_SEC) as i64;
let jitter: i64 = rand::thread_rng().gen_range(-1..=1);
retry_interval.saturating_add(jitter) as u64
} else {
relay.opts().get_retry_sec()
}
} else {
relay.opts().get_retry_sec()
}
} else {
relay.opts().get_retry_sec()
};

tracing::trace!("{} retry time set to {retry_sec} secs", relay.url);
thread::sleep(Duration::from_secs(retry_sec)).await;
}
};

//relay.set_auto_connect_loop_running(false);
});
/* } else {
tracing::warn!("Auto connect loop for {} is already running!", self.url)
} */
} else if let RelayStatus::Initialized | RelayStatus::Stopped | RelayStatus::Terminated =
self.status().await
{
if wait_for_connection {
tracing::trace!("{} retry time set to {retry_sec} secs", relay.url);
thread::sleep(Duration::from_secs(retry_sec)).await;
}
});
} else if wait_for_connection {
self.try_connect().await
} else {
let relay = self.clone();
Expand Down Expand Up @@ -610,14 +600,23 @@ impl Relay {
break;
}

let nonce: u64 = nostr::secp256k1::rand::random();
relay.stats.ping.set_last_nonce(nonce);
relay.stats.ping.set_replied(false);
if let Err(e) = relay.send_relay_event(RelayEvent::Ping { nonce }, None)
let nonce: u64 = rand::thread_rng().gen();
if relay.stats.ping.set_last_nonce(nonce)
&& relay.stats.ping.set_replied(false)
{
tracing::error!("Impossible to ping {}: {e}", relay.url);
break;
};
if let Err(e) =
relay.send_relay_event(RelayEvent::Ping { nonce }, None)
{
tracing::error!("Impossible to ping {}: {e}", relay.url);
break;
};
} else {
tracing::warn!(
"`last_nonce` or `replied` not updated for {}!",
relay.url
);
}

thread::sleep(Duration::from_secs(PING_INTERVAL)).await;
}

Expand Down
16 changes: 8 additions & 8 deletions crates/nostr-sdk/src/relay/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ impl PingStats {
*sent_at = Instant::now();
}

pub(crate) fn set_last_nonce(&self, nonce: u64) {
let _ = self
.last_nonce
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(nonce));
pub(crate) fn set_last_nonce(&self, nonce: u64) -> bool {
self.last_nonce
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(nonce))
.is_ok()
}

pub(crate) fn set_replied(&self, replied: bool) {
let _ = self
.replied
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(replied));
pub(crate) fn set_replied(&self, replied: bool) -> bool {
self.replied
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(replied))
.is_ok()
}
}

Expand Down