From 0ff5770e1cb5c77968c06d1e69ac8cdaa1006f66 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Tue, 18 Jun 2024 16:35:14 -0400 Subject: [PATCH] pool: remove `RelayPoolNotification::Stop` * Remove all `start` and `stop` methods * Remove `RelayStatus::Stop` * Fix shutdown notification sent to external channel on `Relay::terminate` method call Signed-off-by: Yuki Kishimoto --- CHANGELOG.md | 5 + bindings/nostr-sdk-ffi/src/client/mod.rs | 8 -- bindings/nostr-sdk-ffi/src/pool/mod.rs | 15 -- bindings/nostr-sdk-ffi/src/relay/mod.rs | 5 - bindings/nostr-sdk-ffi/src/relay/status.rs | 3 - bindings/nostr-sdk-js/src/pool/mod.rs | 16 --- bindings/nostr-sdk-js/src/relay/mod.rs | 8 -- crates/nostr-relay-pool/src/pool/internal.rs | 11 -- crates/nostr-relay-pool/src/pool/mod.rs | 21 +-- crates/nostr-relay-pool/src/relay/internal.rs | 132 +++++++----------- crates/nostr-relay-pool/src/relay/mod.rs | 12 +- crates/nostr-relay-pool/src/relay/status.rs | 7 +- crates/nostr-sdk/Cargo.toml | 4 - crates/nostr-sdk/examples/client-stop.rs | 35 ----- crates/nostr-sdk/src/client/handler.rs | 2 +- crates/nostr-sdk/src/client/mod.rs | 14 -- 16 files changed, 59 insertions(+), 239 deletions(-) delete mode 100644 crates/nostr-sdk/examples/client-stop.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index daf7d56b3..75b76856e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,10 +48,15 @@ ### Fixed * nostr: fix NIP-47 `list_transactions` response deserialization ([Yuki Kishimoto] and [lnbc1QWFyb24]) +* pool: fix shutdown notification sent to external channel on `Relay::terminate` method call ([Yuki Kishimoto]) * js: fix "RuntimeError: memory access out of bounds" WASM error ([Yuki Kishimoto]) ### Removed +* pool: remove `RelayPoolNotification::Stop` ([Yuki Kishimoto]) +* pool: remove `RelayStatus::Stop` ([Yuki Kishimoto]) +* Remove all `start` and `stop` methods ([Yuki Kishimoto]) + ## [v0.32.0] ### Summary diff --git a/bindings/nostr-sdk-ffi/src/client/mod.rs b/bindings/nostr-sdk-ffi/src/client/mod.rs index cabbe6d3b..78b213a9f 100644 --- a/bindings/nostr-sdk-ffi/src/client/mod.rs +++ b/bindings/nostr-sdk-ffi/src/client/mod.rs @@ -134,14 +134,6 @@ impl Client { .await } - pub async fn start(&self) { - self.inner.start().await - } - - pub async fn stop(&self) -> Result<()> { - Ok(self.inner.stop().await?) - } - pub async fn shutdown(&self) -> Result<()> { Ok(self.inner.clone().shutdown().await?) } diff --git a/bindings/nostr-sdk-ffi/src/pool/mod.rs b/bindings/nostr-sdk-ffi/src/pool/mod.rs index a34d92f76..02cf74bbb 100644 --- a/bindings/nostr-sdk-ffi/src/pool/mod.rs +++ b/bindings/nostr-sdk-ffi/src/pool/mod.rs @@ -45,21 +45,6 @@ impl RelayPool { } } - /// Start - /// - /// Internally call `connect` without wait for connection. - #[inline] - pub async fn start(&self) { - self.inner.start().await - } - - /// Stop - /// - /// Call `connect` to re-start relays connections - pub async fn stop(&self) -> Result<()> { - Ok(self.inner.stop().await?) - } - /// Completely shutdown pool pub async fn shutdown(&self) -> Result<()> { Ok(self.inner.clone().shutdown().await?) diff --git a/bindings/nostr-sdk-ffi/src/relay/mod.rs b/bindings/nostr-sdk-ffi/src/relay/mod.rs index a0af368c6..916874ae9 100644 --- a/bindings/nostr-sdk-ffi/src/relay/mod.rs +++ b/bindings/nostr-sdk-ffi/src/relay/mod.rs @@ -154,11 +154,6 @@ impl Relay { self.inner.connect(connection_timeout).await } - /// Disconnect from relay and set status to 'Stopped' - pub async fn stop(&self) -> Result<()> { - Ok(self.inner.stop().await?) - } - /// Disconnect from relay and set status to 'Terminated' pub async fn terminate(&self) -> Result<()> { Ok(self.inner.terminate().await?) diff --git a/bindings/nostr-sdk-ffi/src/relay/status.rs b/bindings/nostr-sdk-ffi/src/relay/status.rs index 6a44e6074..906ada6da 100644 --- a/bindings/nostr-sdk-ffi/src/relay/status.rs +++ b/bindings/nostr-sdk-ffi/src/relay/status.rs @@ -16,8 +16,6 @@ pub enum RelayStatus { Connected, /// Relay disconnected, will retry to connect again Disconnected, - /// Stop - Stopped, /// Relay completely disconnected Terminated, } @@ -30,7 +28,6 @@ impl From for RelayStatus { nostr_sdk::RelayStatus::Connecting => Self::Connecting, nostr_sdk::RelayStatus::Connected => Self::Connected, nostr_sdk::RelayStatus::Disconnected => Self::Disconnected, - nostr_sdk::RelayStatus::Stopped => Self::Stopped, nostr_sdk::RelayStatus::Terminated => Self::Terminated, } } diff --git a/bindings/nostr-sdk-js/src/pool/mod.rs b/bindings/nostr-sdk-js/src/pool/mod.rs index 7feb08d13..ad9fddbb4 100644 --- a/bindings/nostr-sdk-js/src/pool/mod.rs +++ b/bindings/nostr-sdk-js/src/pool/mod.rs @@ -47,22 +47,6 @@ impl JsRelayPool { } } - /// Start - /// - /// Internally call `connect` without wait for connection. - #[wasm_bindgen] - pub async fn start(&self) { - self.inner.start().await - } - - /// Stop - /// - /// Call `connect` or `start` to re-start relays connections - #[wasm_bindgen] - pub async fn stop(&self) -> Result<()> { - self.inner.stop().await.map_err(into_err) - } - /// Completely shutdown pool #[wasm_bindgen] pub async fn shutdown(&self) -> Result<()> { diff --git a/bindings/nostr-sdk-js/src/relay/mod.rs b/bindings/nostr-sdk-js/src/relay/mod.rs index 3f93b1e6a..14316f9fc 100644 --- a/bindings/nostr-sdk-js/src/relay/mod.rs +++ b/bindings/nostr-sdk-js/src/relay/mod.rs @@ -54,8 +54,6 @@ pub enum JsRelayStatus { Connected, /// Relay disconnected, will retry to connect again Disconnected, - /// Stop - Stopped, /// Relay completely disconnected Terminated, } @@ -68,7 +66,6 @@ impl From for JsRelayStatus { RelayStatus::Connecting => Self::Connecting, RelayStatus::Connected => Self::Connected, RelayStatus::Disconnected => Self::Disconnected, - RelayStatus::Stopped => Self::Stopped, RelayStatus::Terminated => Self::Terminated, } } @@ -133,11 +130,6 @@ impl JsRelay { self.inner.connect(connection_timeout.map(|d| *d)).await } - /// Disconnect from relay and set status to 'Stopped' - pub async fn stop(&self) -> Result<()> { - self.inner.stop().await.map_err(into_err) - } - /// Disconnect from relay and set status to 'Terminated' pub async fn terminate(&self) -> Result<()> { self.inner.terminate().await.map_err(into_err) diff --git a/crates/nostr-relay-pool/src/pool/internal.rs b/crates/nostr-relay-pool/src/pool/internal.rs index 401111098..e94810542 100644 --- a/crates/nostr-relay-pool/src/pool/internal.rs +++ b/crates/nostr-relay-pool/src/pool/internal.rs @@ -65,17 +65,6 @@ impl InternalRelayPool { } } - pub async fn stop(&self) -> Result<(), Error> { - let relays = self.relays().await; - for relay in relays.values() { - relay.stop().await?; - } - if let Err(e) = self.notification_sender.send(RelayPoolNotification::Stop) { - tracing::error!("Impossible to send STOP notification: {e}"); - } - Ok(()) - } - pub async fn shutdown(&self) -> Result<(), Error> { // Disconnect all relays self.disconnect().await?; diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index ae36e94fe..0c8c5addd 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -56,8 +56,6 @@ pub enum RelayPoolNotification { /// Relay Status status: RelayStatus, }, - /// Stop - Stop, /// Shutdown Shutdown, } @@ -101,22 +99,6 @@ impl RelayPool { } } - /// Start - /// - /// Internally call `connect` without wait for connection. - #[inline] - pub async fn start(&self) { - self.inner.connect(None).await - } - - /// Stop - /// - /// Call `connect` to re-start relays connections - #[inline] - pub async fn stop(&self) -> Result<(), Error> { - self.inner.stop().await - } - /// Completely shutdown pool #[inline] pub async fn shutdown(self) -> Result<(), Error> { @@ -516,12 +498,11 @@ impl RelayPool { { let mut notifications = self.notifications(); while let Ok(notification) = notifications.recv().await { - let stop: bool = RelayPoolNotification::Stop == notification; let shutdown: bool = RelayPoolNotification::Shutdown == notification; let exit: bool = func(notification) .await .map_err(|e| Error::Handler(e.to_string()))?; - if exit || stop || shutdown { + if exit || shutdown { break; } } diff --git a/crates/nostr-relay-pool/src/relay/internal.rs b/crates/nostr-relay-pool/src/relay/internal.rs index 0464b22de..a8fc39f10 100644 --- a/crates/nostr-relay-pool/src/relay/internal.rs +++ b/crates/nostr-relay-pool/src/relay/internal.rs @@ -52,8 +52,6 @@ struct NostrMessage { enum RelayServiceEvent { /// None None, - /// Stop - Stop, /// Completely disconnect Terminate, } @@ -149,7 +147,6 @@ pub(crate) struct InternalRelay { blacklist: RelayBlacklist, database: Arc, channels: RelayChannels, - scheduled_for_stop: Arc, scheduled_for_termination: Arc, pub(super) internal_notification_sender: broadcast::Sender, external_notification_sender: Arc>>>, @@ -190,7 +187,6 @@ impl InternalRelay { blacklist, database, channels: RelayChannels::new(), - scheduled_for_stop: Arc::new(AtomicBool::new(false)), scheduled_for_termination: Arc::new(AtomicBool::new(false)), internal_notification_sender: relay_notification_sender, external_notification_sender: Arc::new(RwLock::new(None)), @@ -231,7 +227,6 @@ impl InternalRelay { RelayStatus::Connecting => tracing::debug!("Connecting to '{}'", self.url), RelayStatus::Connected => tracing::info!("Connected to '{}'", self.url), RelayStatus::Disconnected => tracing::info!("Disconnected from '{}'", self.url), - RelayStatus::Stopped => tracing::info!("Stopped '{}'", self.url), RelayStatus::Terminated => { tracing::info!("Completely disconnected from '{}'", self.url) } @@ -239,7 +234,7 @@ impl InternalRelay { } // Send notification - self.send_notification(RelayNotification::RelayStatus { status }) + self.send_notification(RelayNotification::RelayStatus { status }, true) .await; } @@ -312,16 +307,6 @@ impl InternalRelay { self.channels.nostr_queue() } - #[inline] - fn is_scheduled_for_stop(&self) -> bool { - self.scheduled_for_stop.load(Ordering::SeqCst) - } - - #[inline] - fn schedule_for_stop(&self, value: bool) { - self.scheduled_for_stop.store(value, Ordering::SeqCst); - } - #[inline] fn is_scheduled_for_termination(&self) -> bool { self.scheduled_for_termination.load(Ordering::SeqCst) @@ -341,47 +326,47 @@ impl InternalRelay { *external_notification_sender = notification_sender; } - async fn send_notification(&self, notification: RelayNotification) { + async fn send_notification(&self, notification: RelayNotification, external: bool) { // Send internal notification let _ = self.internal_notification_sender.send(notification.clone()); // Send external notification - let external_notification_sender = self.external_notification_sender.read().await; - if let Some(external_notification_sender) = external_notification_sender.as_ref() { - // Convert relay to notification to pool notification - let notification: RelayPoolNotification = match notification { - RelayNotification::Event { - subscription_id, - event, - } => RelayPoolNotification::Event { - relay_url: self.url(), - subscription_id, - event, - }, - RelayNotification::Message { message } => RelayPoolNotification::Message { - relay_url: self.url(), - message, - }, - RelayNotification::RelayStatus { status } => RelayPoolNotification::RelayStatus { - relay_url: self.url(), - status, - }, - RelayNotification::Shutdown => RelayPoolNotification::Shutdown, - RelayNotification::Stop => RelayPoolNotification::Stop, - }; + if external { + let external_notification_sender = self.external_notification_sender.read().await; + if let Some(external_notification_sender) = external_notification_sender.as_ref() { + // Convert relay to notification to pool notification + let notification: RelayPoolNotification = match notification { + RelayNotification::Event { + subscription_id, + event, + } => RelayPoolNotification::Event { + relay_url: self.url(), + subscription_id, + event, + }, + RelayNotification::Message { message } => RelayPoolNotification::Message { + relay_url: self.url(), + message, + }, + RelayNotification::RelayStatus { status } => { + RelayPoolNotification::RelayStatus { + relay_url: self.url(), + status, + } + } + RelayNotification::Shutdown => RelayPoolNotification::Shutdown, + }; - // Send notification - let _ = external_notification_sender.send(notification); + // Send notification + let _ = external_notification_sender.send(notification); + } } } pub async fn connect(&self, connection_timeout: Option) { - self.schedule_for_stop(false); // TODO: remove? self.schedule_for_termination(false); // TODO: remove? - if let RelayStatus::Initialized | RelayStatus::Stopped | RelayStatus::Terminated = - self.status().await - { + if let RelayStatus::Initialized | RelayStatus::Terminated = self.status().await { if self.opts.get_reconnect() { // If connection timeout is not null, try to connect match connection_timeout { @@ -423,15 +408,7 @@ impl InternalRelay { // Schedule relay for termination // Needed to terminate the auto reconnect loop, also if the relay is not connected yet. - if relay.is_scheduled_for_stop() { - relay.set_status(RelayStatus::Stopped, true).await; - relay.schedule_for_stop(false); - tracing::debug!( - "Auto connect loop terminated for {} [stop - schedule]", - relay.url - ); - break; - } else if relay.is_scheduled_for_termination() { + if relay.is_scheduled_for_termination() { relay.set_status(RelayStatus::Terminated, true).await; relay.schedule_for_termination(false); tracing::debug!( @@ -448,7 +425,7 @@ impl InternalRelay { | RelayStatus::Disconnected => { relay.try_connect(connection_timeout).await } - RelayStatus::Stopped | RelayStatus::Terminated => { + RelayStatus::Terminated => { tracing::debug!("Auto connect loop terminated for {}", relay.url); break; } @@ -598,14 +575,6 @@ impl InternalRelay { match service { // Do nothing RelayServiceEvent::None => {}, - // Stop - RelayServiceEvent::Stop => { - if relay.is_scheduled_for_stop() { - relay.set_status(RelayStatus::Stopped, true).await; - relay.schedule_for_stop(false); - break; - } - } // Terminate RelayServiceEvent::Terminate => { if relay.is_scheduled_for_termination() { @@ -809,7 +778,7 @@ impl InternalRelay { } // Send notification - self.send_notification(RelayNotification::Message { message }) + self.send_notification(RelayNotification::Message { message }, true) .await; } Ok(None) | Err(Error::MessageHandle(MessageHandleError::EmptyMsg)) => (), @@ -975,10 +944,13 @@ impl InternalRelay { // Check if seen if !seen { // Send notification - self.send_notification(RelayNotification::Event { - subscription_id: SubscriptionId::new(&subscription_id), - event: event.clone(), - }) + self.send_notification( + RelayNotification::Event { + subscription_id: SubscriptionId::new(&subscription_id), + event: event.clone(), + }, + true, + ) .await; } @@ -991,22 +963,14 @@ impl InternalRelay { } } - pub async fn stop(&self) -> Result<(), Error> { - self.schedule_for_stop(true); // TODO: remove? - if !self.is_disconnected().await { - self.channels.send_service_msg(RelayServiceEvent::Stop)?; - } - self.send_notification(RelayNotification::Stop).await; - Ok(()) - } - pub async fn terminate(&self) -> Result<(), Error> { self.schedule_for_termination(true); // TODO: remove? if !self.is_disconnected().await { self.channels .send_service_msg(RelayServiceEvent::Terminate)?; } - self.send_notification(RelayNotification::Shutdown).await; + self.send_notification(RelayNotification::Shutdown, false) + .await; Ok(()) } @@ -1255,7 +1219,7 @@ impl InternalRelay { return false; // No need to send CLOSE msg } } - RelayNotification::Stop | RelayNotification::Shutdown => { + RelayNotification::Shutdown => { return false; // No need to send CLOSE msg } _ => (), @@ -1271,7 +1235,7 @@ impl InternalRelay { return Ok(()); // No need to send CLOSE msg } } - RelayNotification::Stop | RelayNotification::Shutdown => { + RelayNotification::Shutdown => { return Ok(()); // No need to send CLOSE msg } _ => (), @@ -1428,7 +1392,7 @@ impl InternalRelay { return Err(Error::NotConnectedStatusChanged); } } - RelayNotification::Stop | RelayNotification::Shutdown => break, + RelayNotification::Shutdown => break, _ => (), } } @@ -1459,7 +1423,7 @@ impl InternalRelay { return Err(Error::NotConnected); } } - RelayNotification::Stop | RelayNotification::Shutdown => break, + RelayNotification::Shutdown => break, _ => (), } } @@ -1766,7 +1730,7 @@ impl InternalRelay { return Err(Error::NotConnectedStatusChanged); } } - RelayNotification::Stop | RelayNotification::Shutdown => break, + RelayNotification::Shutdown => break, _ => (), }; diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index a519e88c0..cb0195482 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -64,8 +64,6 @@ pub enum RelayNotification { /// Relay Status status: RelayStatus, }, - /// Stop - Stop, /// Shutdown Shutdown, } @@ -223,15 +221,10 @@ impl Relay { self.inner.connect(connection_timeout).await } - /// Disconnect from relay and set status to 'Stopped' - #[inline] - pub async fn stop(&self) -> Result<(), Error> { - self.inner.stop().await - } - /// Disconnect from relay and set status to 'Terminated' #[inline] pub async fn terminate(&self) -> Result<(), Error> { + // TODO: rename to disconnect self.inner.terminate().await } @@ -391,12 +384,11 @@ impl Relay { { let mut notifications = self.notifications(); while let Ok(notification) = notifications.recv().await { - let stop: bool = RelayNotification::Stop == notification; let shutdown: bool = RelayNotification::Shutdown == notification; let exit: bool = func(notification) .await .map_err(|e| Error::Handler(e.to_string()))?; - if exit || stop || shutdown { + if exit || shutdown { break; } } diff --git a/crates/nostr-relay-pool/src/relay/status.rs b/crates/nostr-relay-pool/src/relay/status.rs index 63b57b01d..9d9039259 100644 --- a/crates/nostr-relay-pool/src/relay/status.rs +++ b/crates/nostr-relay-pool/src/relay/status.rs @@ -19,8 +19,6 @@ pub enum RelayStatus { Connected, /// Relay disconnected, will retry to connect again Disconnected, - /// Stop - Stopped, /// Relay completely disconnected Terminated, } @@ -33,15 +31,14 @@ impl fmt::Display for RelayStatus { Self::Connecting => write!(f, "Connecting"), Self::Connected => write!(f, "Connected"), Self::Disconnected => write!(f, "Disconnected"), - Self::Stopped => write!(f, "Stopped"), Self::Terminated => write!(f, "Terminated"), } } } impl RelayStatus { - /// Check if is `disconnected`, `stopped` or `terminated` + /// Check if is `disconnected` or `terminated` pub(crate) fn is_disconnected(&self) -> bool { - matches!(self, Self::Disconnected | Self::Stopped | Self::Terminated) + matches!(self, Self::Disconnected | Self::Terminated) } } diff --git a/crates/nostr-sdk/Cargo.toml b/crates/nostr-sdk/Cargo.toml index 58354760e..b526d87a2 100644 --- a/crates/nostr-sdk/Cargo.toml +++ b/crates/nostr-sdk/Cargo.toml @@ -87,10 +87,6 @@ required-features = ["nip46"] name = "bot" required-features = ["all-nips"] -[[example]] -name = "client-stop" -required-features = ["all-nips"] - [[example]] name = "rocksdb" required-features = ["all-nips", "rocksdb"] diff --git a/crates/nostr-sdk/examples/client-stop.rs b/crates/nostr-sdk/examples/client-stop.rs deleted file mode 100644 index 49e592eee..000000000 --- a/crates/nostr-sdk/examples/client-stop.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright (c) 2022-2023 Yuki Kishimoto -// Copyright (c) 2023-2024 Rust Nostr Developers -// Distributed under the MIT software license - -use std::time::Duration; - -use async_utility::thread; -use nostr_sdk::prelude::*; - -#[tokio::main] -async fn main() -> Result<()> { - tracing_subscriber::fmt::init(); - - let client = Client::default(); - - client.add_relay("wss://nostr.oxtr.dev").await?; - client.add_relay("wss://relay.damus.io").await?; - client.add_relay("wss://nostr.openchain.fr").await?; - - client.connect().await; - - thread::sleep(Duration::from_secs(10)).await; - - client.stop().await?; - - thread::sleep(Duration::from_secs(15)).await; - - client.start().await; - - thread::sleep(Duration::from_secs(10)).await; - - client.publish_text_note("Test", []).await?; - - Ok(()) -} diff --git a/crates/nostr-sdk/src/client/handler.rs b/crates/nostr-sdk/src/client/handler.rs index 0c75df678..1fd67fb2c 100644 --- a/crates/nostr-sdk/src/client/handler.rs +++ b/crates/nostr-sdk/src/client/handler.rs @@ -38,7 +38,7 @@ impl Client { } } } - RelayPoolNotification::Shutdown => break, // TODO: handle also 'Stop' msg? + RelayPoolNotification::Shutdown => break, _ => (), } } diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index 3c3b401dd..f1012340a 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -327,20 +327,6 @@ impl Client { // TODO: update mute list event? } - /// Start a previously stopped client - #[inline] - pub async fn start(&self) { - self.connect().await; - } - - /// Stop the client - /// - /// Disconnect all relays and set their status to `RelayStatus::Stopped`. - #[inline] - pub async fn stop(&self) -> Result<(), Error> { - Ok(self.pool.stop().await?) - } - /// Completely shutdown [`Client`] #[inline] pub async fn shutdown(self) -> Result<(), Error> {