From bfecac7b9cc5c4c60834fe71ec05dd56a6d31931 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Fri, 25 Aug 2023 17:27:29 +0200 Subject: [PATCH] sdk: add `latency` to `RelayConnectionStats` --- crates/nostr-sdk/src/relay/mod.rs | 195 +++++++++++++++++++++++++++++- 1 file changed, 190 insertions(+), 5 deletions(-) diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index 5f3bbda6c..2e82cb2f6 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -3,6 +3,8 @@ //! Relay +#[cfg(not(target_arch = "wasm32"))] +use std::collections::VecDeque; use std::collections::{HashMap, HashSet}; use std::fmt; #[cfg(not(target_arch = "wasm32"))] @@ -10,7 +12,11 @@ use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; +#[cfg(not(target_arch = "wasm32"))] +use async_utility::futures_util::stream::AbortHandle; use async_utility::{futures_util, thread, time}; use nostr::message::MessageHandleError; #[cfg(feature = "nip11")] @@ -120,7 +126,12 @@ pub enum RelayEvent { SendMsg(Box), /// Send multiple messages at once Batch(Vec), - // Ping, + /// Ping + #[cfg(not(target_arch = "wasm32"))] + Ping { + /// Nonce + nonce: u64, + }, /// Close Close, /// Stop @@ -129,6 +140,66 @@ pub enum RelayEvent { Terminate, } +/// Ping Stats +#[cfg(not(target_arch = "wasm32"))] +#[derive(Debug, Clone)] +pub struct PingStats { + sent_at: Arc>, + last_nonce: Arc, + replied: Arc, +} + +#[cfg(not(target_arch = "wasm32"))] +impl Default for PingStats { + fn default() -> Self { + Self::new() + } +} + +#[cfg(not(target_arch = "wasm32"))] +impl PingStats { + /// New default ping stats + pub fn new() -> Self { + Self { + sent_at: Arc::new(Mutex::new(Instant::now())), + last_nonce: Arc::new(AtomicU64::new(0)), + replied: Arc::new(AtomicBool::new(false)), + } + } + + /// Get sent at + pub async fn sent_at(&self) -> Instant { + *self.sent_at.lock().await + } + + /// Last nonce + pub fn last_nonce(&self) -> u64 { + self.last_nonce.load(Ordering::SeqCst) + } + + /// Replied + pub fn replied(&self) -> bool { + self.replied.load(Ordering::SeqCst) + } + + pub(crate) async fn just_sent(&self) { + let mut sent_at = self.sent_at.lock().await; + *sent_at = Instant::now(); + } + + pub(crate) fn set_last_nonce(&self, nonce: u64) { + let _ = self + .last_nonce + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(nonce)); + } + + pub(crate) fn set_replied(&self, replied: bool) { + let _ = self + .replied + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(replied)); + } +} + /// [`Relay`] connection stats #[derive(Debug, Clone)] pub struct RelayConnectionStats { @@ -137,6 +208,10 @@ pub struct RelayConnectionStats { bytes_sent: Arc, bytes_received: Arc, connected_at: Arc, + #[cfg(not(target_arch = "wasm32"))] + latencies: Arc>>, + #[cfg(not(target_arch = "wasm32"))] + ping: PingStats, } impl Default for RelayConnectionStats { @@ -154,6 +229,10 @@ impl RelayConnectionStats { bytes_sent: Arc::new(AtomicUsize::new(0)), bytes_received: Arc::new(AtomicUsize::new(0)), connected_at: Arc::new(AtomicU64::new(0)), + #[cfg(not(target_arch = "wasm32"))] + latencies: Arc::new(Mutex::new(VecDeque::new())), + #[cfg(not(target_arch = "wasm32"))] + ping: PingStats::default(), } } @@ -182,6 +261,20 @@ impl RelayConnectionStats { Timestamp::from(self.connected_at.load(Ordering::SeqCst)) } + /// Calculate latency + #[cfg(not(target_arch = "wasm32"))] + pub async fn latency(&self) -> Option { + let latencies = self.latencies.lock().await; + let sum: Duration = latencies.iter().sum(); + sum.checked_div(latencies.len() as u32) + } + + /// Calculate latency + #[cfg(all(not(target_arch = "wasm32"), feature = "blocking"))] + pub fn latency_blocking(&self) -> Option { + RUNTIME.block_on(async { self.latency().await }) + } + pub(crate) fn new_attempt(&self) { self.attempts.fetch_add(1, Ordering::SeqCst); } @@ -202,6 +295,15 @@ impl RelayConnectionStats { pub(crate) fn add_bytes_received(&self, size: usize) { self.bytes_received.fetch_add(size, Ordering::SeqCst); } + + #[cfg(not(target_arch = "wasm32"))] + pub(crate) async fn save_latency(&self, latency: Duration) { + let mut latencies = self.latencies.lock().await; + if latencies.len() >= 5 { + latencies.pop_back(); + } + latencies.push_front(latency) + } } /// Internal Subscription ID @@ -499,6 +601,11 @@ impl Relay { relay.relay_sender.capacity() ); + #[cfg(not(target_arch = "wasm32"))] + if let Some(latency) = relay.stats.latency().await { + tracing::info!("{} latency: {} ms", relay.url, latency.as_millis()); + } + // Schedule relay for termination // Needed to terminate the auto reconnect loop, also if the relay is not connected yet. if relay.is_scheduled_for_stop() { @@ -578,6 +685,37 @@ impl Relay { self.stats.new_success(); + #[cfg(not(target_arch = "wasm32"))] + let ping_abort_handle: AbortHandle = { + let relay = self.clone(); + thread::abortable(async move { + tracing::debug!("Relay Ping Thread Started"); + + loop { + if relay.stats.ping.last_nonce() != 0 && !relay.stats.ping.replied() { + tracing::warn!("{} not replied to ping", relay.url); + break; + } + + let nonce: u64 = nostr::secp256k1::rand::random(); + relay.stats.ping.set_last_nonce(nonce); + relay.stats.ping.set_replied(false); + if let Err(e) = relay.send_relay_event(RelayEvent::Ping { nonce }, None) + { + tracing::error!("Impossible to ping {}: {e}", relay.url); + break; + }; + thread::sleep(Duration::from_secs(60)).await; + } + + tracing::debug!("Exited from Ping Thread of {}", relay.url); + + if let Err(err) = relay.disconnect().await { + tracing::error!("Impossible to disconnect {}: {}", relay.url, err); + } + }) + }; + let relay = self.clone(); thread::spawn(async move { tracing::debug!("Relay Event Thread Started"); @@ -663,6 +801,25 @@ impl Relay { } } } + #[cfg(not(target_arch = "wasm32"))] + RelayEvent::Ping { nonce } => { + match ws_tx + .send(WsMessage::Ping(nonce.to_string().as_bytes().to_vec())) + .await + { + Ok(_) => { + relay.stats.ping.just_sent().await; + tracing::debug!("Ping {} (nonce {})", relay.url, nonce); + } + Err(e) => { + tracing::error!( + "Impossible to ping {}: {}", + relay.url(), + e.to_string() + ); + } + } + } RelayEvent::Close => { let _ = ws_tx.close().await; relay.set_status(RelayStatus::Disconnected).await; @@ -689,7 +846,11 @@ impl Relay { } } } + tracing::debug!("Exited from Relay Event Thread"); + + #[cfg(not(target_arch = "wasm32"))] + ping_abort_handle.abort(); }); let relay = self.clone(); @@ -733,10 +894,34 @@ impl Relay { #[cfg(not(target_arch = "wasm32"))] while let Some(msg_res) = ws_rx.next().await { if let Ok(msg) = msg_res { - let data: Vec = msg.into_data(); - let exit: bool = func(&relay, data).await; - if exit { - break; + match msg { + WsMessage::Pong(bytes) => match String::from_utf8(bytes) { + Ok(nonce) => match nonce.parse::() { + Ok(nonce) => { + if relay.stats.ping.last_nonce() == nonce { + tracing::debug!( + "Pong from {} match nonce: {}", + relay.url, + nonce + ); + relay.stats.ping.set_replied(true); + let sent_at = relay.stats.ping.sent_at().await; + relay.stats.save_latency(sent_at.elapsed()).await; + } else { + tracing::error!("Pong nonce not match: received={nonce}, expected={}", relay.stats.ping.last_nonce()); + } + } + Err(e) => tracing::error!("{e}"), + }, + Err(e) => tracing::error!("{e}"), + }, + _ => { + let data: Vec = msg.into_data(); + let exit: bool = func(&relay, data).await; + if exit { + break; + } + } } } }