Skip to content

Commit

Permalink
pool: add some gossip methods to InternalRelayPool
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Jun 26, 2024
1 parent 8e6eae1 commit 15c8bde
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 4 deletions.
4 changes: 4 additions & 0 deletions crates/nostr-relay-pool/src/pool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use async_utility::thread;
use nostr::message::MessageHandleError;
use nostr::types::url;
use nostr::PublicKey;
use nostr_database::DatabaseError;
use thiserror::Error;

Expand Down Expand Up @@ -52,6 +53,9 @@ pub enum Error {
/// Relay not found
#[error("relay not found")]
RelayNotFound,
/// Relay not found
#[error("relay metadata not found for `{0}` public key")]
RelayMetadataNotFound(PublicKey),
/// Notification Handler error
#[error("notification handler error: {0}")]
Handler(String),
Expand Down
86 changes: 83 additions & 3 deletions crates/nostr-relay-pool/src/pool/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ use std::time::Duration;
use async_utility::thread::JoinHandle;
use async_utility::{thread, time};
use atomic_destructor::AtomicDestroyer;
use nostr::{ClientMessage, Event, EventId, Filter, SubscriptionId, Timestamp, TryIntoUrl, Url};
use nostr::nips::nip65::{self, RelayMetadata};
use nostr::{
ClientMessage, Event, EventId, Filter, Kind, PublicKey, SubscriptionId, Timestamp, TryIntoUrl,
Url,
};
use nostr_database::{DynNostrDatabase, IntoNostrDatabase, Order};
use tokio::sync::{broadcast, Mutex, RwLock};

Expand All @@ -30,7 +34,7 @@ pub struct InternalRelayPool {
notification_sender: broadcast::Sender<RelayPoolNotification>,
subscriptions: Arc<RwLock<HashMap<SubscriptionId, Vec<Filter>>>>,
blacklist: RelayBlacklist,
// opts: RelayPoolOptions,
opts: RelayPoolOptions,
}

impl AtomicDestroyer for InternalRelayPool {
Expand Down Expand Up @@ -61,7 +65,7 @@ impl InternalRelayPool {
notification_sender,
subscriptions: Arc::new(RwLock::new(HashMap::new())),
blacklist: RelayBlacklist::empty(),
//opts,
opts,
}
}

Expand Down Expand Up @@ -848,3 +852,79 @@ impl InternalRelayPool {
}
}
}

// Gossip methods
impl InternalRelayPool {
pub async fn add_discovery_relay<U>(&self, url: U) -> Result<bool, Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
// Compose flags
let mut flags: RelayServiceFlags = RelayServiceFlags::NONE;
flags.add(RelayServiceFlags::DISCOVERY);

// Add relay
let opts: RelayOptions = RelayOptions::default().flags(flags);
self.add_relay(url, opts).await
}

pub async fn add_inbox_relay<U>(&self, url: U) -> Result<bool, Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
// Compose flags
let mut flags: RelayServiceFlags = RelayServiceFlags::NONE;
flags.add(RelayServiceFlags::READ);
flags.add(RelayServiceFlags::INBOX);

// Add relay
let opts: RelayOptions = RelayOptions::default().flags(flags);
self.add_relay(url, opts).await
}

pub async fn add_outbox_relay<U>(&self, url: U) -> Result<bool, Error>
where
U: TryIntoUrl,
Error: From<<U as TryIntoUrl>::Err>,
{
// Compose flags
let mut flags: RelayServiceFlags = RelayServiceFlags::NONE;
flags.add(RelayServiceFlags::WRITE);
flags.add(RelayServiceFlags::OUTBOX);

// Add relay
let opts: RelayOptions = RelayOptions::default().flags(flags);
self.add_relay(url, opts).await
}

pub async fn get_relays_for_public_key(
&self,
public_key: PublicKey,
timeout: Duration,
) -> Result<HashMap<Url, Option<RelayMetadata>>, Error> {
// Get discovery relays
let relays = self
.relays_with_flag(RelayServiceFlags::DISCOVERY)
.await
.into_keys();

// Get events
let filter: Filter = Filter::default()
.author(public_key)
.kind(Kind::RelayList)
.limit(1);
let events: Vec<Event> = self
.get_events_from(relays, vec![filter], timeout, FilterOptions::ExitOnEOSE)
.await?;

// Extract relay list (NIP65)
let event: &Event = events
.first()
.ok_or(Error::RelayMetadataNotFound(public_key))?;
Ok(nip65::extract_relay_list(event)
.map(|(u, m)| (u.clone(), *m))
.collect())
}
}
13 changes: 13 additions & 0 deletions crates/nostr-relay-pool/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,38 @@
#[derive(Debug, Clone, Copy)]
pub struct RelayPoolOptions {
pub(super) notification_channel_size: usize,
pub(super) gossip: bool,
pub(super) restore_relays_from_database: bool,
}

impl Default for RelayPoolOptions {
fn default() -> Self {
Self {
notification_channel_size: 4096,
gossip: false,
restore_relays_from_database: false,
}
}
}

impl RelayPoolOptions {
/// New default options
#[inline]
pub fn new() -> Self {
Self::default()
}

/// Notification channel size (default: 4096)
#[inline]
pub fn notification_channel_size(mut self, size: usize) -> Self {
self.notification_channel_size = size;
self
}

/// Enable gossip model (default: false)
#[inline]
pub fn gossip(mut self, enable: bool) -> Self {
self.gossip = enable;
self
}
}
2 changes: 1 addition & 1 deletion crates/nostr-relay-pool/src/relay/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl AtomicRelayServiceFlags {
let f: RelayServiceFlags = RelayServiceFlags(_f);
f.has(flags)
}

/// Check if `READ` service is enabled
pub fn has_read(&self) -> bool {
self.has(RelayServiceFlags::READ)
Expand Down
9 changes: 9 additions & 0 deletions crates/nostr-sdk/src/client/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Options {
pub(super) connection_timeout: Option<Duration>,
send_timeout: Option<Duration>,
nip42_auto_authentication: Arc<AtomicBool>,
pub(super) gossip: bool,
#[cfg(not(target_arch = "wasm32"))]
pub(super) proxy: Proxy,
pub(super) relay_limits: RelayLimits,
Expand All @@ -45,6 +46,7 @@ impl Default for Options {
connection_timeout: None,
send_timeout: Some(DEFAULT_SEND_TIMEOUT),
nip42_auto_authentication: Arc::new(AtomicBool::new(true)),
gossip: false,
#[cfg(not(target_arch = "wasm32"))]
proxy: Proxy::default(),
relay_limits: RelayLimits::default(),
Expand Down Expand Up @@ -176,6 +178,13 @@ impl Options {
self
}

/// Enable gossip model (default: false)
#[inline]
pub fn gossip(mut self, enable: bool) -> Self {
self.gossip = enable;
self
}

#[inline]
pub(super) fn is_nip42_auto_authentication_enabled(&self) -> bool {
self.nip42_auto_authentication.load(Ordering::SeqCst)
Expand Down

0 comments on commit 15c8bde

Please sign in to comment.