From df062df9f51f2987b2cbe9651380948de80d442b Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto <yukikishimoto@protonmail.com> Date: Mon, 13 Nov 2023 11:11:33 +0100 Subject: [PATCH] sdk: improve `handle_relay_message` method --- crates/nostr-sdk/src/relay/pool.rs | 85 ++++++++++++++++++------------ 1 file changed, 51 insertions(+), 34 deletions(-) diff --git a/crates/nostr-sdk/src/relay/pool.rs b/crates/nostr-sdk/src/relay/pool.rs index 7976bff13..53e0b20ba 100644 --- a/crates/nostr-sdk/src/relay/pool.rs +++ b/crates/nostr-sdk/src/relay/pool.rs @@ -162,8 +162,8 @@ impl RelayPoolTask { while let Some(msg) = receiver.recv().await { match msg { RelayPoolMessage::ReceivedMsg { relay_url, msg } => { - match this.handle_relay_message(msg).await { - Ok(msg) => { + match this.handle_relay_message(relay_url.clone(), msg).await { + Ok(Some(msg)) => { let _ = this.notification_sender.send( RelayPoolNotification::Message( relay_url.clone(), @@ -178,21 +178,11 @@ impl RelayPoolTask { { Ok(seen) => { if !seen { - if let Err(e) = - this.database.save_event(&event).await - { - tracing::error!( - "Impossible to save event {}: {e}", - event.id - ); - } - - let notification = RelayPoolNotification::Event( - relay_url.clone(), - *event.clone(), - ); let _ = - this.notification_sender.send(notification); + this.notification_sender.send(RelayPoolNotification::Event( + relay_url, + *event.clone(), + )); } } Err(e) => tracing::error!( @@ -200,18 +190,6 @@ impl RelayPoolTask { event.id ), } - - // Set event as seen by relay - if let Err(e) = this - .database - .event_id_seen(event.id, Some(relay_url)) - .await - { - tracing::error!( - "Impossible to set event {} as seen by relay: {e}", - event.id - ); - } } RelayMessage::Notice { message } => { tracing::warn!("Notice from {relay_url}: {message}") @@ -226,6 +204,7 @@ impl RelayPoolTask { _ => (), } } + Ok(None) => (), Err(e) => tracing::error!( "Impossible to handle relay message from {relay_url}: {e}" ), @@ -266,7 +245,11 @@ impl RelayPoolTask { } } - async fn handle_relay_message(&self, msg: RawRelayMessage) -> Result<RelayMessage, Error> { + async fn handle_relay_message( + &self, + relay_url: Url, + msg: RawRelayMessage, + ) -> Result<Option<RelayMessage>, Error> { match msg { RawRelayMessage::Event { subscription_id, @@ -275,6 +258,28 @@ impl RelayPoolTask { // Deserialize partial event (id, pubkey and sig) let partial_event: PartialEvent = PartialEvent::from_json(event.to_string())?; + // Set event as seen by relay + if let Err(e) = self + .database + .event_id_seen(partial_event.id, Some(relay_url)) + .await + { + tracing::error!( + "Impossible to set event {} as seen by relay: {e}", + partial_event.id + ); + } + + // Check if event was already saved + if self + .database + .has_event_already_been_saved(partial_event.id) + .await? + { + tracing::trace!("Event {} already saved into database", partial_event.id); + return Ok(None); + } + // Verify signature partial_event.verify_signature()?; @@ -293,13 +298,16 @@ impl RelayPoolTask { // Verify event ID event.verify_id()?; + // Save event + self.database.save_event(&event).await?; + // Compose RelayMessage - Ok(RelayMessage::Event { + Ok(Some(RelayMessage::Event { subscription_id: SubscriptionId::new(subscription_id), event: Box::new(event), - }) + })) } - m => Ok(RelayMessage::try_from(m)?), + m => Ok(Some(RelayMessage::try_from(m)?)), } } } @@ -764,13 +772,20 @@ impl RelayPool { } /// Get events of filters + /// + /// Get events from local database and relays pub async fn get_events_of( &self, filters: Vec<Filter>, timeout: Duration, opts: FilterOptions, ) -> Result<Vec<Event>, Error> { - let events: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(Vec::new())); + let stored_events: Vec<Event> = self + .database + .query(filters.clone()) + .await + .unwrap_or_default(); + let events: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(stored_events)); let mut handles = Vec::new(); let relays = self.relays().await; for (url, relay) in relays.into_iter() { @@ -796,7 +811,9 @@ impl RelayPool { Ok(events.lock_owned().await.clone()) } - /// Request events of filter. All events will be sent to notification listener + /// Request events of filter. + /// + /// If the events aren't already stored in the database, will be sent to notification listener /// until the EOSE "end of stored events" message is received from the relay. pub async fn req_events_of( &self,