Skip to content

Commit

Permalink
sdk: improve handle_relay_message method
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Nov 13, 2023
1 parent 62c406a commit df062df
Showing 1 changed file with 51 additions and 34 deletions.
85 changes: 51 additions & 34 deletions crates/nostr-sdk/src/relay/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ impl RelayPoolTask {
while let Some(msg) = receiver.recv().await {
match msg {
RelayPoolMessage::ReceivedMsg { relay_url, msg } => {
match this.handle_relay_message(msg).await {
Ok(msg) => {
match this.handle_relay_message(relay_url.clone(), msg).await {
Ok(Some(msg)) => {
let _ = this.notification_sender.send(
RelayPoolNotification::Message(
relay_url.clone(),
Expand All @@ -178,40 +178,18 @@ impl RelayPoolTask {
{
Ok(seen) => {
if !seen {
if let Err(e) =
this.database.save_event(&event).await
{
tracing::error!(
"Impossible to save event {}: {e}",
event.id
);
}

let notification = RelayPoolNotification::Event(
relay_url.clone(),
*event.clone(),
);
let _ =
this.notification_sender.send(notification);
this.notification_sender.send(RelayPoolNotification::Event(
relay_url,
*event.clone(),
));
}
}
Err(e) => tracing::error!(
"Impossible to check if event {} was already seen: {e}",
event.id
),
}

// Set event as seen by relay
if let Err(e) = this
.database
.event_id_seen(event.id, Some(relay_url))
.await
{
tracing::error!(
"Impossible to set event {} as seen by relay: {e}",
event.id
);
}
}
RelayMessage::Notice { message } => {
tracing::warn!("Notice from {relay_url}: {message}")
Expand All @@ -226,6 +204,7 @@ impl RelayPoolTask {
_ => (),
}
}
Ok(None) => (),
Err(e) => tracing::error!(
"Impossible to handle relay message from {relay_url}: {e}"
),
Expand Down Expand Up @@ -266,7 +245,11 @@ impl RelayPoolTask {
}
}

async fn handle_relay_message(&self, msg: RawRelayMessage) -> Result<RelayMessage, Error> {
async fn handle_relay_message(
&self,
relay_url: Url,
msg: RawRelayMessage,
) -> Result<Option<RelayMessage>, Error> {
match msg {
RawRelayMessage::Event {
subscription_id,
Expand All @@ -275,6 +258,28 @@ impl RelayPoolTask {
// Deserialize partial event (id, pubkey and sig)
let partial_event: PartialEvent = PartialEvent::from_json(event.to_string())?;

// Set event as seen by relay
if let Err(e) = self
.database
.event_id_seen(partial_event.id, Some(relay_url))
.await
{
tracing::error!(
"Impossible to set event {} as seen by relay: {e}",
partial_event.id
);
}

// Check if event was already saved
if self
.database
.has_event_already_been_saved(partial_event.id)
.await?
{
tracing::trace!("Event {} already saved into database", partial_event.id);
return Ok(None);
}

// Verify signature
partial_event.verify_signature()?;

Expand All @@ -293,13 +298,16 @@ impl RelayPoolTask {
// Verify event ID
event.verify_id()?;

// Save event
self.database.save_event(&event).await?;

// Compose RelayMessage
Ok(RelayMessage::Event {
Ok(Some(RelayMessage::Event {
subscription_id: SubscriptionId::new(subscription_id),
event: Box::new(event),
})
}))
}
m => Ok(RelayMessage::try_from(m)?),
m => Ok(Some(RelayMessage::try_from(m)?)),
}
}
}
Expand Down Expand Up @@ -764,13 +772,20 @@ impl RelayPool {
}

/// Get events of filters
///
/// Get events from local database and relays
pub async fn get_events_of(
&self,
filters: Vec<Filter>,
timeout: Duration,
opts: FilterOptions,
) -> Result<Vec<Event>, Error> {
let events: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(Vec::new()));
let stored_events: Vec<Event> = self
.database
.query(filters.clone())
.await
.unwrap_or_default();
let events: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(stored_events));
let mut handles = Vec::new();
let relays = self.relays().await;
for (url, relay) in relays.into_iter() {
Expand All @@ -796,7 +811,9 @@ impl RelayPool {
Ok(events.lock_owned().await.clone())
}

/// Request events of filter. All events will be sent to notification listener
/// Request events of filter.
///
/// If the events aren't already stored in the database, will be sent to notification listener
/// until the EOSE "end of stored events" message is received from the relay.
pub async fn req_events_of(
&self,
Expand Down

0 comments on commit df062df

Please sign in to comment.