From 44a8da9e62e416fe4aec28fe4ddf9de8f50986e8 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Thu, 12 Oct 2023 15:08:05 +0200 Subject: [PATCH] sdk: init integration of `NostrDatabase` --- crates/nostr-sdk/Cargo.toml | 1 + crates/nostr-sdk/src/client/blocking.rs | 5 -- crates/nostr-sdk/src/client/mod.rs | 5 +- crates/nostr-sdk/src/relay/mod.rs | 9 ++- crates/nostr-sdk/src/relay/options.rs | 6 -- crates/nostr-sdk/src/relay/pool.rs | 101 +++++++++++------------- 6 files changed, 57 insertions(+), 70 deletions(-) diff --git a/crates/nostr-sdk/Cargo.toml b/crates/nostr-sdk/Cargo.toml index 3f5229388..31ded334e 100644 --- a/crates/nostr-sdk/Cargo.toml +++ b/crates/nostr-sdk/Cargo.toml @@ -31,6 +31,7 @@ nip47 = ["nostr/nip47"] [dependencies] async-utility = "0.1" nostr = { version = "0.24", path = "../nostr", default-features = false, features = ["std"] } +nostr-sdk-db = { version = "0.1", path = "../nostr-sdk-db" } nostr-sdk-net = { version = "0.24", path = "../nostr-sdk-net" } once_cell = { workspace = true } thiserror = { workspace = true } diff --git a/crates/nostr-sdk/src/client/blocking.rs b/crates/nostr-sdk/src/client/blocking.rs index d501e2422..e59b9a24c 100644 --- a/crates/nostr-sdk/src/client/blocking.rs +++ b/crates/nostr-sdk/src/client/blocking.rs @@ -99,11 +99,6 @@ impl Client { RUNTIME.block_on(async { self.client.shutdown().await }) } - /// Clear already seen events - pub fn clear_already_seen_events(&self) { - RUNTIME.block_on(async { self.client.clear_already_seen_events().await }) - } - pub fn notifications(&self) -> broadcast::Receiver { self.client.notifications() } diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index 49da69080..1e3379a35 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -265,9 +265,8 @@ impl Client { } /// Clear already seen events - pub async fn clear_already_seen_events(&self) { - self.pool.clear_already_seen_events().await; - } + #[deprecated] + pub async fn clear_already_seen_events(&self) {} /// Get new notification listener pub fn notifications(&self) -> broadcast::Receiver { diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index 69e55cf15..be17d2956 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -23,6 +23,7 @@ use nostr::secp256k1::rand::{self, Rng}; use nostr::{ ClientMessage, Event, EventId, Filter, JsonUtil, RelayMessage, SubscriptionId, Timestamp, Url, }; +use nostr_sdk_db::DynNostrDatabase; use nostr_sdk_net::futures_util::{Future, SinkExt, StreamExt}; use nostr_sdk_net::{self as net, WsMessage}; use thiserror::Error; @@ -255,7 +256,7 @@ pub struct Relay { document: Arc>, opts: RelayOptions, stats: RelayConnectionStats, - // auto_connect_loop_running: Arc, + database: Arc, scheduled_for_stop: Arc, scheduled_for_termination: Arc, pool_sender: Sender, @@ -277,6 +278,7 @@ impl Relay { #[cfg(not(target_arch = "wasm32"))] pub fn new( url: Url, + database: Arc, pool_sender: Sender, notification_sender: broadcast::Sender, proxy: Option, @@ -293,7 +295,7 @@ impl Relay { document: Arc::new(RwLock::new(RelayInformationDocument::new())), opts, stats: RelayConnectionStats::new(), - // auto_connect_loop_running: Arc::new(AtomicBool::new(false)), + database, scheduled_for_stop: Arc::new(AtomicBool::new(false)), scheduled_for_termination: Arc::new(AtomicBool::new(false)), pool_sender, @@ -309,6 +311,7 @@ impl Relay { #[cfg(target_arch = "wasm32")] pub fn new( url: Url, + database: Arc, pool_sender: Sender, notification_sender: broadcast::Sender, opts: RelayOptions, @@ -323,7 +326,7 @@ impl Relay { document: Arc::new(RwLock::new(RelayInformationDocument::new())), opts, stats: RelayConnectionStats::new(), - // auto_connect_loop_running: Arc::new(AtomicBool::new(false)), + database, scheduled_for_stop: Arc::new(AtomicBool::new(false)), scheduled_for_termination: Arc::new(AtomicBool::new(false)), pool_sender, diff --git a/crates/nostr-sdk/src/relay/options.rs b/crates/nostr-sdk/src/relay/options.rs index a2357a921..8a35d3a61 100644 --- a/crates/nostr-sdk/src/relay/options.rs +++ b/crates/nostr-sdk/src/relay/options.rs @@ -215,11 +215,6 @@ pub struct RelayPoolOptions { pub notification_channel_size: usize, /// Task channel size (default: 1024) pub task_channel_size: usize, - /// Max seen events by Task thread (default: 1_000_000) - /// - /// A lower number can cause receiving in notification channel - /// the same event multiple times - pub task_max_seen_events: usize, /// Shutdown on [RelayPool](super::pool::RelayPool) drop pub shutdown_on_drop: bool, } @@ -229,7 +224,6 @@ impl Default for RelayPoolOptions { Self { notification_channel_size: 1024, task_channel_size: 1024, - task_max_seen_events: 1_000_000, shutdown_on_drop: false, } } diff --git a/crates/nostr-sdk/src/relay/pool.rs b/crates/nostr-sdk/src/relay/pool.rs index ced68f46f..86246133a 100644 --- a/crates/nostr-sdk/src/relay/pool.rs +++ b/crates/nostr-sdk/src/relay/pool.rs @@ -3,7 +3,7 @@ //! Relay Pool -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; #[cfg(not(target_arch = "wasm32"))] use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; @@ -13,6 +13,8 @@ use std::time::Duration; use async_utility::thread; use nostr::url::Url; use nostr::{ClientMessage, Event, EventId, Filter, RelayMessage, Timestamp}; +use nostr_sdk_db::memory::MemoryDatabase; +use nostr_sdk_db::DynNostrDatabase; use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{broadcast, Mutex, RwLock}; @@ -103,25 +105,23 @@ pub enum RelayPoolNotification { #[derive(Debug, Clone)] struct RelayPoolTask { + database: Arc, receiver: Arc>>, notification_sender: broadcast::Sender, - events: Arc>>, running: Arc, - max_seen_events: usize, } impl RelayPoolTask { pub fn new( + database: Arc, pool_task_receiver: Receiver, notification_sender: broadcast::Sender, - max_seen_events: usize, ) -> Self { Self { + database, receiver: Arc::new(Mutex::new(pool_task_receiver)), - events: Arc::new(Mutex::new(VecDeque::new())), notification_sender, running: Arc::new(AtomicBool::new(false)), - max_seen_events, } } @@ -135,11 +135,6 @@ impl RelayPoolTask { .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(value)); } - pub async fn clear_already_seen_events(&self) { - let mut events = self.events.lock().await; - events.clear(); - } - pub fn run(&self) { if self.is_running() { tracing::warn!("Relay Pool Task is already running!") @@ -162,15 +157,35 @@ impl RelayPoolTask { match msg { RelayMessage::Event { event, .. } => { // Check if event was already seen - if this.add_event(event.id).await { - // Verifies if the event is valid - if event.verify().is_ok() { - let notification = RelayPoolNotification::Event( - relay_url, - event.as_ref().clone(), - ); - let _ = this.notification_sender.send(notification); + match this.database.has_event_already_been_seen(event.id).await + { + Ok(seen) => { + if !seen { + // Verifies if the event is valid + if event.verify().is_ok() { + let notification = RelayPoolNotification::Event( + relay_url.clone(), + event.as_ref().clone(), + ); + let _ = + this.notification_sender.send(notification); + } + } } + Err(e) => tracing::error!( + "Impossible to check if event {} was already seen: {e}", + event.id + ), + } + + // Set event as seen by relay + if let Err(e) = + this.database.event_id_seen(event.id, Some(relay_url)).await + { + tracing::error!( + "Impossible to set event {} as seen by relay: {e}", + event.id + ); } } RelayMessage::Notice { message } => { @@ -180,7 +195,9 @@ impl RelayPoolTask { } } RelayPoolMessage::BatchEvent(ids) => { - this.add_events(ids).await; + if let Err(e) = this.database.event_ids_seen(ids, None).await { + tracing::error!("Impossible to set events as seen: {e}"); + } } RelayPoolMessage::RelayStatus { url, status } => { let _ = this @@ -216,38 +233,12 @@ impl RelayPoolTask { }); } } - - async fn add_event(&self, event_id: EventId) -> bool { - let mut events = self.events.lock().await; - if events.contains(&event_id) { - false - } else { - while events.len() >= self.max_seen_events { - events.pop_front(); - } - events.push_back(event_id); - true - } - } - - async fn add_events(&self, ids: Vec) { - if !ids.is_empty() { - let mut events = self.events.lock().await; - for event_id in ids.into_iter() { - if !events.contains(&event_id) { - while events.len() >= self.max_seen_events { - events.pop_front(); - } - events.push_back(event_id); - } - } - } - } } /// Relay Pool #[derive(Debug, Clone)] pub struct RelayPool { + database: Arc, relays: Arc>>, pool_task_sender: Sender, notification_sender: broadcast::Sender, @@ -284,13 +275,16 @@ impl RelayPool { let (notification_sender, _) = broadcast::channel(opts.notification_channel_size); let (pool_task_sender, pool_task_receiver) = mpsc::channel(opts.task_channel_size); + let database = Arc::new(MemoryDatabase::new()); + let relay_pool_task = RelayPoolTask::new( + database.clone(), pool_task_receiver, notification_sender.clone(), - opts.task_max_seen_events, ); let pool = Self { + database, relays: Arc::new(RwLock::new(HashMap::new())), pool_task_sender, notification_sender, @@ -337,16 +331,16 @@ impl RelayPool { Ok(()) } - /// Clear already seen events - pub async fn clear_already_seen_events(&self) { - self.pool_task.clear_already_seen_events().await; - } - /// Get new notification listener pub fn notifications(&self) -> broadcast::Receiver { self.notification_sender.subscribe() } + /// Get database + pub fn database(&self) -> Arc { + self.database.clone() + } + /// Get relays pub async fn relays(&self) -> HashMap { let relays = self.relays.read().await; @@ -392,6 +386,7 @@ impl RelayPool { if !relays.contains_key(&url) { let relay = Relay::new( url, + self.database.clone(), self.pool_task_sender.clone(), self.notification_sender.clone(), proxy,