From 38a7a85b5d42b2f292f1f7ef5bf670c713d025c6 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Mon, 16 Oct 2023 14:28:32 +0200 Subject: [PATCH] sdk: add `handle_relay_message` method to `RelayPoolTask` Improve performance of event deserialization --- crates/nostr-sdk/src/relay/mod.rs | 5 +- crates/nostr-sdk/src/relay/pool.rs | 104 +++++++++++++++++++++-------- 2 files changed, 79 insertions(+), 30 deletions(-) diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index 69e55cf15..7bec01425 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -21,7 +21,8 @@ use nostr::negentropy::{self, Bytes, Negentropy}; use nostr::nips::nip11::RelayInformationDocument; use nostr::secp256k1::rand::{self, Rng}; use nostr::{ - ClientMessage, Event, EventId, Filter, JsonUtil, RelayMessage, SubscriptionId, Timestamp, Url, + ClientMessage, Event, EventId, Filter, JsonUtil, RawRelayMessage, RelayMessage, SubscriptionId, + Timestamp, Url, }; use nostr_sdk_net::futures_util::{Future, SinkExt, StreamExt}; use nostr_sdk_net::{self as net, WsMessage}; @@ -781,7 +782,7 @@ impl Relay { let max_size: usize = relay.limits.messages.max_size as usize; relay.stats.add_bytes_received(size); if size <= max_size { - match RelayMessage::from_json(&data) { + match RawRelayMessage::from_json(&data) { Ok(msg) => { tracing::trace!("Received message to {}: {:?}", relay.url, msg); if let Err(err) = relay diff --git a/crates/nostr-sdk/src/relay/pool.rs b/crates/nostr-sdk/src/relay/pool.rs index ced68f46f..0cc06840e 100644 --- a/crates/nostr-sdk/src/relay/pool.rs +++ b/crates/nostr-sdk/src/relay/pool.rs @@ -11,8 +11,11 @@ use std::sync::Arc; use std::time::Duration; use async_utility::thread; -use nostr::url::Url; -use nostr::{ClientMessage, Event, EventId, Filter, RelayMessage, Timestamp}; +use nostr::message::MessageHandleError; +use nostr::{ + event, ClientMessage, Event, EventId, Filter, JsonUtil, MissingPartialEvent, PartialEvent, + RawRelayMessage, RelayMessage, SubscriptionId, Timestamp, Url, +}; use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{broadcast, Mutex, RwLock}; @@ -33,6 +36,18 @@ pub enum Error { /// Relay error #[error(transparent)] Relay(#[from] RelayError), + /// Event error + #[error(transparent)] + Event(#[from] event::Error), + /// Partial Event error + #[error(transparent)] + PartialEvent(#[from] event::partial::Error), + /// Message handler error + #[error(transparent)] + MessageHandler(#[from] MessageHandleError), + /// Thread error + #[error(transparent)] + Thread(#[from] thread::Error), /// No relays #[error("no relays")] NoRelays, @@ -51,9 +66,6 @@ pub enum Error { /// Relay not found #[error("relay not found")] RelayNotFound, - /// Thread error - #[error(transparent)] - Thread(#[from] thread::Error), } /// Relay Pool Message @@ -64,7 +76,7 @@ pub enum RelayPoolMessage { /// Relay url relay_url: Url, /// Relay message - msg: RelayMessage, + msg: RawRelayMessage, }, /// Events sent BatchEvent(Vec), @@ -152,31 +164,35 @@ impl RelayPoolTask { while let Some(msg) = receiver.recv().await { match msg { RelayPoolMessage::ReceivedMsg { relay_url, msg } => { - let _ = this - .notification_sender - .send(RelayPoolNotification::Message( - relay_url.clone(), - msg.clone(), - )); - - match msg { - RelayMessage::Event { event, .. } => { - // Check if event was already seen - if this.add_event(event.id).await { - // Verifies if the event is valid - if event.verify().is_ok() { - let notification = RelayPoolNotification::Event( - relay_url, - event.as_ref().clone(), - ); - let _ = this.notification_sender.send(notification); + match this.handle_relay_message(msg).await { + Ok(msg) => { + let _ = this.notification_sender.send( + RelayPoolNotification::Message( + relay_url.clone(), + msg.clone(), + ), + ); + + match msg { + RelayMessage::Event { event, .. } => { + // Check if event was already seen + if this.add_event(event.id).await { + let notification = RelayPoolNotification::Event( + relay_url, + event.as_ref().clone(), + ); + let _ = this.notification_sender.send(notification); + } } + RelayMessage::Notice { message } => { + tracing::warn!("Notice from {relay_url}: {message}") + } + _ => (), } } - RelayMessage::Notice { message } => { - tracing::warn!("Notice from {relay_url}: {message}") - } - _ => (), + Err(e) => tracing::error!( + "Impossible to handle relay message from {relay_url}: {e}" + ), } } RelayPoolMessage::BatchEvent(ids) => { @@ -217,6 +233,38 @@ impl RelayPoolTask { } } + async fn handle_relay_message(&self, msg: RawRelayMessage) -> Result { + match msg { + RawRelayMessage::Event { + subscription_id, + event, + } => { + // Deserialize partial event (id, pubkey and sig) + let partial_event: PartialEvent = PartialEvent::from_json(event.to_string())?; + + // Verify signature + partial_event.verify_signature()?; + + // Deserialize missing event fields + let missing: MissingPartialEvent = + MissingPartialEvent::from_json(event.to_string())?; + + // Compose full event + let event: Event = partial_event.merge(missing); + + // Verify event ID + event.verify_id()?; + + // Compose RelayMessage + Ok(RelayMessage::Event { + subscription_id: SubscriptionId::new(subscription_id), + event: Box::new(event), + }) + } + m => Ok(RelayMessage::try_from(m)?), + } + } + async fn add_event(&self, event_id: EventId) -> bool { let mut events = self.events.lock().await; if events.contains(&event_id) {