diff --git a/crates/nostr-sdk/examples/client-with-opts.rs b/crates/nostr-sdk/examples/client-with-opts.rs index 674704512..59c75ea02 100644 --- a/crates/nostr-sdk/examples/client-with-opts.rs +++ b/crates/nostr-sdk/examples/client-with-opts.rs @@ -22,7 +22,12 @@ async fn main() -> Result<()> { client.add_relay("wss://relay.damus.io", None).await?; client.add_relay("wss://nostr.openchain.fr", None).await?; client - .add_relay_with_opts("wss://nostr.mom", None, RelayOptions::new().write(false)) + .add_relay_with_opts( + "wss://nostr.mom", + None, + RelayRole::default(), + RelayOptions::new().write(false), + ) .await?; client .add_relay( diff --git a/crates/nostr-sdk/src/client/blocking.rs b/crates/nostr-sdk/src/client/blocking.rs index 005b67207..518d4b0b7 100644 --- a/crates/nostr-sdk/src/client/blocking.rs +++ b/crates/nostr-sdk/src/client/blocking.rs @@ -18,7 +18,7 @@ use tokio::sync::broadcast; #[cfg(feature = "nip46")] use super::signer::remote::RemoteSigner; use super::{Error, Options, TryIntoUrl}; -use crate::relay::{pool, Relay, RelayOptions, RelayPoolNotification}; +use crate::relay::{pool, Relay, RelayOptions, RelayPoolNotification, RelayRole}; use crate::RUNTIME; #[derive(Debug, Clone)] @@ -143,13 +143,18 @@ impl Client { &self, url: U, proxy: Option, + role: RelayRole, opts: RelayOptions, ) -> Result where U: TryIntoUrl, pool::Error: From<::Err>, { - RUNTIME.block_on(async { self.client.add_relay_with_opts(url, proxy, opts).await }) + RUNTIME.block_on(async { + self.client + .add_relay_with_opts(url, proxy, role, opts) + .await + }) } pub fn remove_relay(&self, url: U) -> Result<(), Error> diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index c244724f5..ca11bde2c 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -35,7 +35,9 @@ pub use self::options::Options; #[cfg(feature = "nip46")] pub use self::signer::remote::RemoteSigner; use crate::relay::pool::{self, Error as RelayPoolError, RelayPool}; -use crate::relay::{FilterOptions, Relay, RelayOptions, RelayPoolNotification, RelaySendOptions}; +use crate::relay::{ + FilterOptions, Relay, RelayOptions, RelayPoolNotification, RelayRole, RelaySendOptions, +}; use crate::util::TryIntoUrl; /// [`Client`] error @@ -321,7 +323,7 @@ impl Client { U: TryIntoUrl, pool::Error: From<::Err>, { - self.add_relay_with_opts(url, proxy, RelayOptions::default()) + self.add_relay_with_opts(url, proxy, RelayRole::default(), RelayOptions::default()) .await } @@ -351,7 +353,8 @@ impl Client { U: TryIntoUrl, pool::Error: From<::Err>, { - self.add_relay_with_opts(url, RelayOptions::default()).await + self.add_relay_with_opts(url, RelayRole::default(), RelayOptions::default()) + .await } /// Add new relay with [`RelayOptions`] @@ -370,7 +373,7 @@ impl Client { /// # let client = Client::new(&my_keys); /// let opts = RelayOptions::new().write(false).retry_sec(11); /// client - /// .add_relay_with_opts("wss://relay.nostr.info", None, opts) + /// .add_relay_with_opts("wss://relay.nostr.info", None, RelayRole::default(), opts) /// .await /// .unwrap(); /// @@ -382,13 +385,14 @@ impl Client { &self, url: U, proxy: Option, + role: RelayRole, opts: RelayOptions, ) -> Result where U: TryIntoUrl, pool::Error: From<::Err>, { - Ok(self.pool.add_relay(url, proxy, opts).await?) + Ok(self.pool.add_relay(url, proxy, role, opts).await?) } /// Add new relay with [`RelayOptions`] @@ -397,12 +401,17 @@ impl Client { /// /// Return `false` if the relay already exists. #[cfg(target_arch = "wasm32")] - pub async fn add_relay_with_opts(&self, url: U, opts: RelayOptions) -> Result + pub async fn add_relay_with_opts( + &self, + url: U, + role: RelayRole, + opts: RelayOptions, + ) -> Result where U: TryIntoUrl, pool::Error: From<::Err>, { - Ok(self.pool.add_relay(url, opts).await?) + Ok(self.pool.add_relay(url, role, opts).await?) } /// Disconnect and remove relay @@ -674,7 +683,9 @@ impl Client { } else { None }; - self.pool.send_msg(msg, wait).await?; + self.pool + .send_msg(msg, &[RelayRole::default()], wait) + .await?; Ok(()) } @@ -684,7 +695,9 @@ impl Client { msgs: Vec, wait: Option, ) -> Result<(), Error> { - self.pool.batch_msg(msgs, wait).await?; + self.pool + .batch_msg(msgs, &[RelayRole::default()], wait) + .await?; Ok(()) } @@ -711,7 +724,10 @@ impl Client { let opts = RelaySendOptions::new() .skip_disconnected(self.opts.get_skip_disconnected_relays()) .timeout(timeout); - Ok(self.pool.send_event(event, opts).await?) + Ok(self + .pool + .send_event(event, &[RelayRole::default()], opts) + .await?) } /// Send multiple [`Event`] at once @@ -720,7 +736,9 @@ impl Client { events: Vec, opts: RelaySendOptions, ) -> Result<(), Error> { - self.pool.batch_event(events, opts).await?; + self.pool + .batch_event(events, &[RelayRole::default()], opts) + .await?; Ok(()) } diff --git a/crates/nostr-sdk/src/lib.rs b/crates/nostr-sdk/src/lib.rs index 309f8884a..56ddd1ae4 100644 --- a/crates/nostr-sdk/src/lib.rs +++ b/crates/nostr-sdk/src/lib.rs @@ -33,7 +33,8 @@ pub use self::client::blocking; pub use self::client::{Client, Options}; pub use self::relay::{ ActiveSubscription, FilterOptions, InternalSubscriptionId, Relay, RelayConnectionStats, - RelayOptions, RelayPoolNotification, RelayPoolOptions, RelaySendOptions, RelayStatus, + RelayOptions, RelayPoolNotification, RelayPoolOptions, RelayRole, RelaySendOptions, + RelayStatus, }; #[cfg(feature = "blocking")] diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index 7032f59fc..9c875514b 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -33,12 +33,14 @@ use tokio::sync::{broadcast, oneshot, Mutex, RwLock}; pub mod limits; mod options; pub mod pool; +mod role; mod stats; pub use self::limits::Limits; pub use self::options::{FilterOptions, RelayOptions, RelayPoolOptions, RelaySendOptions}; use self::options::{MAX_ADJ_RETRY_SEC, MIN_RETRY_SEC}; pub use self::pool::{RelayPoolMessage, RelayPoolNotification}; +pub use self::role::RelayRole; pub use self::stats::RelayConnectionStats; #[cfg(feature = "blocking")] use crate::RUNTIME; @@ -252,6 +254,7 @@ pub struct Relay { #[cfg(not(target_arch = "wasm32"))] proxy: Option, status: Arc>, + role: Arc>, #[cfg(feature = "nip11")] document: Arc>, opts: RelayOptions, @@ -278,6 +281,7 @@ impl Relay { #[cfg(not(target_arch = "wasm32"))] pub fn new( url: Url, + role: RelayRole, pool_sender: Sender, notification_sender: broadcast::Sender, proxy: Option, @@ -290,6 +294,7 @@ impl Relay { url, proxy, status: Arc::new(RwLock::new(RelayStatus::Initialized)), + role: Arc::new(RwLock::new(role)), #[cfg(feature = "nip11")] document: Arc::new(RwLock::new(RelayInformationDocument::new())), opts, @@ -310,6 +315,7 @@ impl Relay { #[cfg(target_arch = "wasm32")] pub fn new( url: Url, + role: RelayRole, pool_sender: Sender, notification_sender: broadcast::Sender, opts: RelayOptions, @@ -320,6 +326,7 @@ impl Relay { Self { url, status: Arc::new(RwLock::new(RelayStatus::Initialized)), + role: Arc::new(RwLock::new(role)), #[cfg(feature = "nip11")] document: Arc::new(RwLock::new(RelayInformationDocument::new())), opts, @@ -373,6 +380,18 @@ impl Relay { } } + /// Get [`RelayRole`] + pub async fn role(&self) -> RelayRole { + let role = self.role.read().await; + role.clone() + } + + /// Change [`RelayRole`] + pub async fn change_role(&self, role: RelayRole) { + let mut r = self.role.write().await; + *r = role; + } + /// Check if [`Relay`] is connected pub async fn is_connected(&self) -> bool { self.status().await == RelayStatus::Connected diff --git a/crates/nostr-sdk/src/relay/pool.rs b/crates/nostr-sdk/src/relay/pool.rs index ff520578d..fd20059d4 100644 --- a/crates/nostr-sdk/src/relay/pool.rs +++ b/crates/nostr-sdk/src/relay/pool.rs @@ -23,7 +23,7 @@ use tokio::sync::{broadcast, Mutex, RwLock}; use super::options::RelayPoolOptions; use super::{ Error as RelayError, FilterOptions, InternalSubscriptionId, Limits, Relay, RelayOptions, - RelaySendOptions, RelayStatus, + RelayRole, RelaySendOptions, RelayStatus, }; use crate::util::TryIntoUrl; @@ -403,12 +403,24 @@ impl RelayPool { self.notification_sender.subscribe() } - /// Get relays + /// Get all relays pub async fn relays(&self) -> HashMap { let relays = self.relays.read().await; relays.clone() } + /// Get relays by role + pub async fn relays_by_role(&self, role: RelayRole) -> HashMap { + let relays = self.relays.read().await; + let mut map = HashMap::new(); + for (url, relay) in relays.iter() { + if relay.role().await == role { + map.insert(url.clone(), relay.clone()); + } + } + map + } + /// Get [`Relay`] pub async fn relay(&self, url: U) -> Result where @@ -437,6 +449,7 @@ impl RelayPool { &self, url: U, proxy: Option, + role: RelayRole, opts: RelayOptions, ) -> Result where @@ -448,6 +461,7 @@ impl RelayPool { if !relays.contains_key(&url) { let relay = Relay::new( url, + role, self.pool_task_sender.clone(), self.notification_sender.clone(), proxy, @@ -463,7 +477,12 @@ impl RelayPool { /// Add new relay #[cfg(target_arch = "wasm32")] - pub async fn add_relay(&self, url: U, opts: RelayOptions) -> Result + pub async fn add_relay( + &self, + url: U, + role: RelayRole, + opts: RelayOptions, + ) -> Result where U: TryIntoUrl, Error: From<::Err>, @@ -473,6 +492,7 @@ impl RelayPool { if !relays.contains_key(&url) { let relay = Relay::new( url, + role, self.pool_task_sender.clone(), self.notification_sender.clone(), opts, @@ -510,8 +530,13 @@ impl RelayPool { } /// Send client message - pub async fn send_msg(&self, msg: ClientMessage, wait: Option) -> Result<(), Error> { - let relays = self.relays().await; + pub async fn send_msg( + &self, + msg: ClientMessage, + roles: &[RelayRole], + wait: Option, + ) -> Result<(), Error> { + let relays: HashMap = self.relays().await; if relays.is_empty() { return Err(Error::NoRelays); @@ -525,18 +550,20 @@ impl RelayPool { let mut handles = Vec::new(); for (url, relay) in relays.into_iter() { - let msg = msg.clone(); - let sent = sent_to_at_least_one_relay.clone(); - let handle = thread::spawn(async move { - match relay.send_msg(msg, wait).await { - Ok(_) => { - let _ = - sent.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + if roles.contains(&relay.role().await) { + let msg: ClientMessage = msg.clone(); + let sent: Arc = sent_to_at_least_one_relay.clone(); + let handle = thread::spawn(async move { + match relay.send_msg(msg, wait).await { + Ok(_) => { + let _ = sent + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + } + Err(e) => tracing::error!("Impossible to send msg to {url}: {e}"), } - Err(e) => tracing::error!("Impossible to send msg to {url}: {e}"), - } - }); - handles.push(handle); + }); + handles.push(handle); + } } for handle in handles.into_iter().flatten() { @@ -554,9 +581,10 @@ impl RelayPool { pub async fn batch_msg( &self, msgs: Vec, + roles: &[RelayRole], wait: Option, ) -> Result<(), Error> { - let relays = self.relays().await; + let relays: HashMap = self.relays().await; if relays.is_empty() { return Err(Error::NoRelays); @@ -578,19 +606,23 @@ impl RelayPool { let mut handles = Vec::new(); for (url, relay) in relays.into_iter() { - let len = msgs.len(); - let msgs = msgs.clone(); - let sent = sent_to_at_least_one_relay.clone(); - let handle = thread::spawn(async move { - match relay.batch_msg(msgs, wait).await { - Ok(_) => { - let _ = - sent.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + if roles.contains(&relay.role().await) { + let len: usize = msgs.len(); + let msgs: Vec = msgs.clone(); + let sent: Arc = sent_to_at_least_one_relay.clone(); + let handle = thread::spawn(async move { + match relay.batch_msg(msgs, wait).await { + Ok(_) => { + let _ = sent + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + } + Err(e) => { + tracing::error!("Impossible to send {len} messages to {url}: {e}") + } } - Err(e) => tracing::error!("Impossible to send {len} messages to {url}: {e}"), - } - }); - handles.push(handle); + }); + handles.push(handle); + } } for handle in handles.into_iter().flatten() { @@ -615,24 +647,23 @@ impl RelayPool { U: TryIntoUrl, Error: From<::Err>, { - let url: Url = url.try_into_url()?; + let relay: Relay = self.relay(url).await?; if let ClientMessage::Event(event) = &msg { self.set_events_as_sent(vec![event.id]).await; } - let relays = self.relays().await; - if let Some(relay) = relays.get(&url) { - relay.send_msg(msg, wait).await?; - Ok(()) - } else { - Err(Error::RelayNotFound) - } + Ok(relay.send_msg(msg, wait).await?) } /// Send event and wait for `OK` relay msg - pub async fn send_event(&self, event: Event, opts: RelaySendOptions) -> Result { - let relays = self.relays().await; + pub async fn send_event( + &self, + event: Event, + roles: &[RelayRole], + opts: RelaySendOptions, + ) -> Result { + let relays: HashMap = self.relays().await; if relays.is_empty() { return Err(Error::NoRelays); @@ -643,21 +674,23 @@ impl RelayPool { let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); let mut handles = Vec::new(); - let event_id = event.id; + let event_id: EventId = event.id; for (url, relay) in relays.into_iter() { - let event = event.clone(); - let sent = sent_to_at_least_one_relay.clone(); - let handle = thread::spawn(async move { - match relay.send_event(event, opts).await { - Ok(_) => { - let _ = - sent.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + if roles.contains(&relay.role().await) { + let event: Event = event.clone(); + let sent: Arc = sent_to_at_least_one_relay.clone(); + let handle = thread::spawn(async move { + match relay.send_event(event, opts).await { + Ok(_) => { + let _ = sent + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + } + Err(e) => tracing::error!("Impossible to send event to {url}: {e}"), } - Err(e) => tracing::error!("Impossible to send event to {url}: {e}"), - } - }); - handles.push(handle); + }); + handles.push(handle); + } } for handle in handles.into_iter().flatten() { @@ -675,9 +708,10 @@ impl RelayPool { pub async fn batch_event( &self, events: Vec, + roles: &[RelayRole], opts: RelaySendOptions, ) -> Result<(), Error> { - let relays = self.relays().await; + let relays: HashMap = self.relays().await; if relays.is_empty() { return Err(Error::NoRelays); @@ -690,19 +724,21 @@ impl RelayPool { let mut handles = Vec::new(); for (url, relay) in relays.into_iter() { - let len = events.len(); - let events = events.clone(); - let sent = sent_to_at_least_one_relay.clone(); - let handle = thread::spawn(async move { - match relay.batch_event(events, opts).await { - Ok(_) => { - let _ = - sent.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + if roles.contains(&relay.role().await) { + let len: usize = events.len(); + let events: Vec = events.clone(); + let sent: Arc = sent_to_at_least_one_relay.clone(); + let handle = thread::spawn(async move { + match relay.batch_event(events, opts).await { + Ok(_) => { + let _ = sent + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)); + } + Err(e) => tracing::error!("Impossible to send {len} events to {url}: {e}"), } - Err(e) => tracing::error!("Impossible to send {len} events to {url}: {e}"), - } - }); - handles.push(handle); + }); + handles.push(handle); + } } for handle in handles.into_iter().flatten() { @@ -727,19 +763,14 @@ impl RelayPool { U: TryIntoUrl, Error: From<::Err>, { - let url: Url = url.try_into_url()?; + let relay: Relay = self.relay(url).await?; self.set_events_as_sent(vec![event.id]).await; - let relays = self.relays().await; - if let Some(relay) = relays.get(&url) { - Ok(relay.send_event(event, opts).await?) - } else { - Err(Error::RelayNotFound) - } + Ok(relay.send_event(event, opts).await?) } /// Subscribe to filters pub async fn subscribe(&self, filters: Vec, wait: Option) { - let relays = self.relays().await; + let relays: HashMap = self.relays().await; self.update_subscription_filters(filters.clone()).await; for relay in relays.values() { if let Err(e) = relay diff --git a/crates/nostr-sdk/src/relay/role.rs b/crates/nostr-sdk/src/relay/role.rs new file mode 100644 index 000000000..5e21d976b --- /dev/null +++ b/crates/nostr-sdk/src/relay/role.rs @@ -0,0 +1,29 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Distributed under the MIT software license + +use core::fmt; + +/// Relay role +#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum RelayRole { + /// Generic + #[default] + Generic, + /// Identity + Identity, + /// Gossip + Gossip, + /// Custom + Custom(String), +} + +impl fmt::Display for RelayRole { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Generic => write!(f, "generic"), + Self::Identity => write!(f, "identity"), + Self::Gossip => write!(f, "gossip"), + Self::Custom(c) => write!(f, "{c}"), + } + } +}