diff --git a/crates/nostr-sdk/src/relay/pool.rs b/crates/nostr-sdk/src/relay/pool.rs index c8d9bbea2..b6c4ebff2 100644 --- a/crates/nostr-sdk/src/relay/pool.rs +++ b/crates/nostr-sdk/src/relay/pool.rs @@ -17,7 +17,7 @@ use nostr::{ RawRelayMessage, RelayMessage, SubscriptionId, Timestamp, Url, }; use nostr_sdk_db::memory::MemoryDatabase; -use nostr_sdk_db::DynNostrDatabase; +use nostr_sdk_db::{DatabaseError, DynNostrDatabase}; use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{broadcast, Mutex, RwLock}; @@ -47,6 +47,9 @@ pub enum Error { /// Message handler error #[error(transparent)] MessageHandler(#[from] MessageHandleError), + /// Database error + #[error(transparent)] + Database(#[from] DatabaseError), /// Thread error #[error(transparent)] Thread(#[from] thread::Error), @@ -80,8 +83,6 @@ pub enum RelayPoolMessage { /// Relay message msg: RawRelayMessage, }, - /// Events sent - BatchEvent(Vec), /// Relay status changed RelayStatus { /// Relay url @@ -213,6 +214,13 @@ impl RelayPoolTask { RelayMessage::Notice { message } => { tracing::warn!("Notice from {relay_url}: {message}") } + RelayMessage::Ok { + event_id, + status, + message, + } => { + tracing::debug!("Received OK from {relay_url} for event {event_id}: status={status}, message={message}"); + } _ => (), } } @@ -221,11 +229,6 @@ impl RelayPoolTask { ), } } - RelayPoolMessage::BatchEvent(ids) => { - if let Err(e) = this.database.event_ids_seen(ids, None).await { - tracing::error!("Impossible to set events as seen: {e}"); - } - } RelayPoolMessage::RelayStatus { url, status } => { let _ = this .notification_sender @@ -501,16 +504,6 @@ impl RelayPool { Ok(()) } - async fn set_events_as_sent(&self, ids: Vec) { - if let Err(e) = self - .pool_task_sender - .send(RelayPoolMessage::BatchEvent(ids)) - .await - { - tracing::error!("{e}"); - }; - } - /// Send client message pub async fn send_msg(&self, msg: ClientMessage, wait: Option) -> Result<(), Error> { let relays = self.relays().await; @@ -520,7 +513,7 @@ impl RelayPool { } if let ClientMessage::Event(event) = &msg { - self.set_events_as_sent(vec![event.id]).await; + self.database.save_event(event).await?; } let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); @@ -564,17 +557,12 @@ impl RelayPool { return Err(Error::NoRelays); } - let ids: Vec = msgs - .iter() - .filter_map(|msg| { - if let ClientMessage::Event(event) = msg { - Some(event.id) - } else { - None - } - }) - .collect(); - self.set_events_as_sent(ids).await; + // Save events into database + for msg in msgs.iter() { + if let ClientMessage::Event(event) = msg { + self.database.save_event(event).await?; + } + } let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); let mut handles = Vec::new(); @@ -620,7 +608,7 @@ impl RelayPool { let url: Url = url.try_into_url()?; if let ClientMessage::Event(event) = &msg { - self.set_events_as_sent(vec![event.id]).await; + self.database.save_event(event).await?; } let relays = self.relays().await; @@ -640,7 +628,7 @@ impl RelayPool { return Err(Error::NoRelays); } - self.set_events_as_sent(vec![event.id]).await; + self.database.save_event(&event).await?; let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); let mut handles = Vec::new(); @@ -685,8 +673,10 @@ impl RelayPool { return Err(Error::NoRelays); } - let ids: Vec = events.iter().map(|e| e.id).collect(); - self.set_events_as_sent(ids).await; + // Save events into database + for event in events.iter() { + self.database.save_event(event).await?; + } let sent_to_at_least_one_relay: Arc = Arc::new(AtomicBool::new(false)); let mut handles = Vec::new(); @@ -730,7 +720,7 @@ impl RelayPool { Error: From<::Err>, { let url: Url = url.try_into_url()?; - self.set_events_as_sent(vec![event.id]).await; + self.database.save_event(&event).await?; let relays = self.relays().await; if let Some(relay) = relays.get(&url) { Ok(relay.send_event(event, opts).await?)