From 7e4e719c4e324e64fafc48606fd5f811afdffe89 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Fri, 25 Aug 2023 13:06:29 +0200 Subject: [PATCH 01/13] nostr: add `NEG-OPEN` client message --- crates/nostr/src/message/client.rs | 45 ++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/crates/nostr/src/message/client.rs b/crates/nostr/src/message/client.rs index 3b73da7e4..ef8ed8d10 100644 --- a/crates/nostr/src/message/client.rs +++ b/crates/nostr/src/message/client.rs @@ -44,6 +44,17 @@ pub enum ClientMessage { Close(SubscriptionId), /// Auth Auth(Box), + /// Negentropy Open + NegOpen { + /// Subscription ID + subscription_id: SubscriptionId, + /// Filter + filter: Box, + /// ID size (MUST be between 8 and 32, inclusive) + id_size: u8, + /// Initial message + initial_message: String, + }, } impl Serialize for ClientMessage { @@ -150,6 +161,20 @@ impl ClientMessage { } Self::Close(subscription_id) => json!(["CLOSE", subscription_id]), Self::Auth(event) => json!(["AUTH", event]), + Self::NegOpen { + subscription_id, + filter, + id_size, + initial_message, + } => { + json!([ + "NEG-OPEN", + subscription_id, + filter, + id_size, + initial_message + ]) + } } } @@ -243,6 +268,26 @@ impl ClientMessage { return Ok(Self::new_auth(event)); } + // Negentropy Open + // ["NEG-OPEN", , , , ] + if v[0] == "NEG-OPEN" { + if v_len != 5 { + return Err(MessageHandleError::InvalidMessageFormat); + } + let subscription_id = SubscriptionId::new(v[1].to_string()); + let filter = Filter::from_json(v[2].to_string())?; + let id_size = v[3] + .as_u64() + .ok_or(MessageHandleError::InvalidMessageFormat)? as u8; + let initial_message = v[4].to_string(); + return Ok(Self::NegOpen { + subscription_id, + filter: Box::new(filter), + id_size, + initial_message, + }); + } + Err(MessageHandleError::InvalidMessageFormat) } From aa4184e2f344774c67d5fec66fadf0a59def2ff1 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Mon, 18 Sep 2023 14:43:16 +0200 Subject: [PATCH 02/13] nostr: add `NEG-MSG` message --- crates/nostr/src/message/client.rs | 25 +++++++++++++++++++++++++ crates/nostr/src/message/relay.rs | 25 +++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/crates/nostr/src/message/client.rs b/crates/nostr/src/message/client.rs index ef8ed8d10..9dd7c7f3e 100644 --- a/crates/nostr/src/message/client.rs +++ b/crates/nostr/src/message/client.rs @@ -55,6 +55,13 @@ pub enum ClientMessage { /// Initial message initial_message: String, }, + /// Negentropy Message + NegMsg { + /// Subscription ID + subscription_id: SubscriptionId, + /// Message + message: String, + }, } impl Serialize for ClientMessage { @@ -175,6 +182,10 @@ impl ClientMessage { initial_message ]) } + Self::NegMsg { + subscription_id, + message, + } => json!(["NEG-MSG", subscription_id, message]), } } @@ -288,6 +299,20 @@ impl ClientMessage { }); } + // Negentropy Message + // ["NEG-MSG", , ] + if v[0] == "NEG-MSG" { + if v_len != 3 { + return Err(MessageHandleError::InvalidMessageFormat); + } + let subscription_id: SubscriptionId = SubscriptionId::new(v[1].to_string()); + let message: String = v[2].to_string(); + return Ok(Self::NegMsg { + subscription_id, + message, + }); + } + Err(MessageHandleError::InvalidMessageFormat) } diff --git a/crates/nostr/src/message/relay.rs b/crates/nostr/src/message/relay.rs index b17b1c9ed..744f4b3e1 100644 --- a/crates/nostr/src/message/relay.rs +++ b/crates/nostr/src/message/relay.rs @@ -56,6 +56,13 @@ pub enum RelayMessage { /// Events count count: usize, }, + /// Negentropy Message + NegMsg { + /// Subscription ID + subscription_id: SubscriptionId, + /// Message + message: String, + }, } impl Serialize for RelayMessage { @@ -153,6 +160,10 @@ impl RelayMessage { subscription_id, count, } => json!(["COUNT", subscription_id, { "count": count }]), + Self::NegMsg { + subscription_id, + message, + } => json!(["NEG-MSG", subscription_id, message]), } } @@ -266,6 +277,20 @@ impl RelayMessage { return Ok(Self::new_count(subscription_id, count)); } + // Negentropy Message + // ["NEG-MSG", , ] + if v[0] == "NEG-MSG" { + if v_len != 3 { + return Err(MessageHandleError::InvalidMessageFormat); + } + let subscription_id: SubscriptionId = SubscriptionId::new(v[1].to_string()); + let message: String = v[2].to_string(); + return Ok(Self::NegMsg { + subscription_id, + message, + }); + } + Err(MessageHandleError::InvalidMessageFormat) } From d0e33400cc34bfd7bad98d06399071eac8cbf353 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Mon, 18 Sep 2023 14:45:30 +0200 Subject: [PATCH 03/13] nostr: add `NEG-CLOSE` client message --- crates/nostr/src/message/client.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/crates/nostr/src/message/client.rs b/crates/nostr/src/message/client.rs index 9dd7c7f3e..8118bc6c5 100644 --- a/crates/nostr/src/message/client.rs +++ b/crates/nostr/src/message/client.rs @@ -62,6 +62,11 @@ pub enum ClientMessage { /// Message message: String, }, + /// Negentropy Close + NegClose { + /// Subscription ID + subscription_id: SubscriptionId, + }, } impl Serialize for ClientMessage { @@ -186,6 +191,7 @@ impl ClientMessage { subscription_id, message, } => json!(["NEG-MSG", subscription_id, message]), + Self::NegClose { subscription_id } => json!(["NEG-CLOSE", subscription_id]), } } @@ -313,6 +319,16 @@ impl ClientMessage { }); } + // Negentropy Close + // ["NEG-CLOSE", ] + if v[0] == "NEG-CLOSE" { + if v_len != 2 { + return Err(MessageHandleError::InvalidMessageFormat); + } + let subscription_id: SubscriptionId = SubscriptionId::new(v[1].to_string()); + return Ok(Self::NegClose { subscription_id }); + } + Err(MessageHandleError::InvalidMessageFormat) } From 0037c282068b13e0018e01aa56bc83393d60d369 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Mon, 18 Sep 2023 14:57:29 +0200 Subject: [PATCH 04/13] nostr: add `NEG-ERR` relay message --- crates/nostr/src/message/relay.rs | 91 ++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/crates/nostr/src/message/relay.rs b/crates/nostr/src/message/relay.rs index 744f4b3e1..f4f9b4057 100644 --- a/crates/nostr/src/message/relay.rs +++ b/crates/nostr/src/message/relay.rs @@ -6,9 +6,9 @@ use alloc::boxed::Box; use alloc::string::{String, ToString}; +use core::fmt; use bitcoin::secp256k1::{Secp256k1, Verification}; -#[cfg(feature = "std")] use serde::{Deserialize, Deserializer}; use serde::{Serialize, Serializer}; use serde_json::{json, Value}; @@ -18,6 +18,70 @@ use super::MessageHandleError; use crate::SECP256K1; use crate::{Event, EventId, SubscriptionId}; +/// Negentropy error code +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum NegentropyErrorCode { + /// Results too big + ResultsTooBig, + /// Because the NEG-OPEN queries are stateful, relays may choose to time-out inactive queries to recover memory resources + Closed, + /// If an event ID is used as the filter, this error will be returned if the relay does not have this event. + /// The client should retry with the full filter, or upload the event to the relay. + FilterNotFound, + /// The event's content was not valid JSON, or the filter was invalid for some other reason. + FilterInvalid, + /// Other + Other(String), +} + +impl fmt::Display for NegentropyErrorCode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::ResultsTooBig => write!(f, "RESULTS_TOO_BIG"), + Self::Closed => write!(f, "CLOSED"), + Self::FilterNotFound => write!(f, "FILTER_NOT_FOUND"), + Self::FilterInvalid => write!(f, "FILTER_INVALID"), + Self::Other(e) => write!(f, "{e}"), + } + } +} + +impl From for NegentropyErrorCode +where + S: Into, +{ + fn from(code: S) -> Self { + let code: String = code.into(); + match code.as_str() { + "RESULTS_TOO_BIG" => Self::ResultsTooBig, + "CLOSED" => Self::Closed, + "FILTER_NOT_FOUND" => Self::FilterNotFound, + "FILTER_INVALID" => Self::FilterInvalid, + o => Self::Other(o.to_string()), + } + } +} + +impl Serialize for NegentropyErrorCode { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl<'de> Deserialize<'de> for NegentropyErrorCode { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Value::deserialize(deserializer)?; + let alphaber: String = serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Self::from(alphaber)) + } +} + /// Messages sent by relays, received by clients #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum RelayMessage { @@ -63,6 +127,13 @@ pub enum RelayMessage { /// Message message: String, }, + /// Negentropy Error + NegErr { + /// Subscription ID + subscription_id: SubscriptionId, + /// Error code + code: NegentropyErrorCode, + }, } impl Serialize for RelayMessage { @@ -164,6 +235,10 @@ impl RelayMessage { subscription_id, message, } => json!(["NEG-MSG", subscription_id, message]), + Self::NegErr { + subscription_id, + code, + } => json!(["NEG-ERR", subscription_id, code]), } } @@ -291,6 +366,20 @@ impl RelayMessage { }); } + // Negentropy Error + // ["NEG-ERR", , ] + if v[0] == "NEG-ERR" { + if v_len != 3 { + return Err(MessageHandleError::InvalidMessageFormat); + } + let subscription_id: SubscriptionId = SubscriptionId::new(v[1].to_string()); + let code: NegentropyErrorCode = NegentropyErrorCode::from(v[2].to_string()); + return Ok(Self::NegErr { + subscription_id, + code, + }); + } + Err(MessageHandleError::InvalidMessageFormat) } From a6fca2b07483903b252492b99db432fc7b03e11f Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Mon, 18 Sep 2023 15:23:21 +0200 Subject: [PATCH 05/13] nostr: add `neg_open` method --- Cargo.lock | 6 ++++++ crates/nostr/Cargo.toml | 2 ++ crates/nostr/src/lib.rs | 1 + crates/nostr/src/message/client.rs | 16 ++++++++++++++++ crates/nostr/src/prelude.rs | 1 + 5 files changed, 26 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index aae263d21..2e3611522 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -998,6 +998,11 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "negentropy" +version = "0.3.0" +source = "git+https://github.com/yukibtc/rust-negentropy?rev=62a65db938c7088d521980d3fc1e1593713a899d#62a65db938c7088d521980d3fc1e1593713a899d" + [[package]] name = "nom" version = "7.1.3" @@ -1021,6 +1026,7 @@ dependencies = [ "csv", "getrandom", "instant", + "negentropy", "nostr-ots", "num_cpus", "once_cell", diff --git a/crates/nostr/Cargo.toml b/crates/nostr/Cargo.toml index 4891a505e..1fa4eb137 100644 --- a/crates/nostr/Cargo.toml +++ b/crates/nostr/Cargo.toml @@ -25,6 +25,7 @@ std = [ "bitcoin/rand-std", "bip39?/std", "chacha20?/std", + "negentropy/std", "serde/std", "serde_json/std", "tracing/std", @@ -55,6 +56,7 @@ bip39 = { version = "2.0", default-features = false, optional = true } bitcoin = { version = "0.30", default-features = false, features = ["rand", "serde"] } cbc = { version = "0.1", optional = true } chacha20 = { version = "0.9", optional = true } +negentropy = { git = "https://github.com/yukibtc/rust-negentropy", rev = "62a65db938c7088d521980d3fc1e1593713a899d", default-features = false } nostr-ots = { version = "0.2", optional = true } once_cell = { workspace = true, optional = true } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-webpki-roots", "socks"], optional = true } diff --git a/crates/nostr/src/lib.rs b/crates/nostr/src/lib.rs index b73d3b46a..d64f60904 100644 --- a/crates/nostr/src/lib.rs +++ b/crates/nostr/src/lib.rs @@ -31,6 +31,7 @@ pub use bitcoin; pub use bitcoin::bech32; pub use bitcoin::hashes; pub use bitcoin::secp256k1; +pub use negentropy; pub use serde_json; pub use url_fork::{self as url, Url}; diff --git a/crates/nostr/src/message/client.rs b/crates/nostr/src/message/client.rs index 8118bc6c5..423c19c0a 100644 --- a/crates/nostr/src/message/client.rs +++ b/crates/nostr/src/message/client.rs @@ -9,6 +9,7 @@ use alloc::string::{String, ToString}; use alloc::vec::Vec; use bitcoin::secp256k1::{Secp256k1, Verification}; +use negentropy::{Bytes, Negentropy}; #[cfg(feature = "std")] use serde::{Deserialize, Deserializer}; use serde::{Serialize, Serializer}; @@ -122,6 +123,21 @@ impl ClientMessage { Self::Auth(Box::new(event)) } + /// Create new `NEG-OPEN` message + pub fn neg_open( + negentropy: &mut Negentropy, + subscription_id: &SubscriptionId, + filter: Filter, + ) -> Result { + let initial_message: Bytes = negentropy.initiate()?; + Ok(Self::NegOpen { + subscription_id: subscription_id.clone(), + filter: Box::new(filter), + id_size: negentropy.id_size() as u8, + initial_message: initial_message.to_hex(), + }) + } + /// Check if is an `EVENT` message pub fn is_event(&self) -> bool { matches!(self, ClientMessage::Event(_)) diff --git a/crates/nostr/src/prelude.rs b/crates/nostr/src/prelude.rs index b61d44054..c7c760266 100644 --- a/crates/nostr/src/prelude.rs +++ b/crates/nostr/src/prelude.rs @@ -13,6 +13,7 @@ pub use bitcoin::bech32::*; pub use bitcoin::hashes::*; pub use bitcoin::secp256k1::*; pub use bitcoin::*; +pub use negentropy::*; pub use serde_json::*; pub use url_fork::*; From decfbcc494942de186ea8d6ab8cb99c556793499 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Mon, 18 Sep 2023 16:41:47 +0200 Subject: [PATCH 06/13] sdk: log notices from relays --- crates/nostr-sdk/src/relay/pool.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/crates/nostr-sdk/src/relay/pool.rs b/crates/nostr-sdk/src/relay/pool.rs index f184898e3..56a6184ec 100644 --- a/crates/nostr-sdk/src/relay/pool.rs +++ b/crates/nostr-sdk/src/relay/pool.rs @@ -159,18 +159,24 @@ impl RelayPoolTask { msg.clone(), )); - if let RelayMessage::Event { event, .. } = msg { - // Verifies if the event is valid - if event.verify().is_ok() { - // Adds only new events - if this.add_event(event.id).await { - let notification = RelayPoolNotification::Event( - relay_url, - event.as_ref().clone(), - ); - let _ = this.notification_sender.send(notification); + match msg { + RelayMessage::Event { event, .. } => { + // Verifies if the event is valid + if event.verify().is_ok() { + // Adds only new events + if this.add_event(event.id).await { + let notification = RelayPoolNotification::Event( + relay_url, + event.as_ref().clone(), + ); + let _ = this.notification_sender.send(notification); + } } } + RelayMessage::Notice { message } => { + tracing::warn!("Notice from {relay_url}: {message}") + } + _ => (), } } RelayPoolMessage::BatchEvent(ids) => { From 9747a15066fc98eced9f1e2acfb49fffd9e2ba29 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Mon, 18 Sep 2023 16:42:30 +0200 Subject: [PATCH 07/13] sdk: init `reconcilie` method for `Relay` --- Cargo.lock | 2 +- crates/nostr-sdk/examples/reconcilie.rs | 29 ++++++++ crates/nostr-sdk/src/relay/mod.rs | 93 +++++++++++++++++++++++++ crates/nostr/Cargo.toml | 2 +- 4 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 crates/nostr-sdk/examples/reconcilie.rs diff --git a/Cargo.lock b/Cargo.lock index 2e3611522..e50146cf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1001,7 +1001,7 @@ dependencies = [ [[package]] name = "negentropy" version = "0.3.0" -source = "git+https://github.com/yukibtc/rust-negentropy?rev=62a65db938c7088d521980d3fc1e1593713a899d#62a65db938c7088d521980d3fc1e1593713a899d" +source = "git+https://github.com/yukibtc/rust-negentropy?rev=547592168356f423dd10773384b798e7c7139893#547592168356f423dd10773384b798e7c7139893" [[package]] name = "nom" diff --git a/crates/nostr-sdk/examples/reconcilie.rs b/crates/nostr-sdk/examples/reconcilie.rs new file mode 100644 index 000000000..9a8b9f39e --- /dev/null +++ b/crates/nostr-sdk/examples/reconcilie.rs @@ -0,0 +1,29 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Distributed under the MIT software license + +use nostr_sdk::prelude::*; + +const BECH32_SK: &str = "nsec1ufnus6pju578ste3v90xd5m2decpuzpql2295m3sknqcjzyys9ls0qlc85"; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); + + let secret_key = SecretKey::from_bech32(BECH32_SK)?; + let my_keys = Keys::new(secret_key); + + let client = Client::new(&my_keys); + client.add_relay("wss://relay.damus.io", None).await?; + + client.connect().await; + + let filter = Filter::new() + .author(my_keys.public_key().to_string()) + .limit(10); + let relay = client.relay("wss://relay.damus.io").await?; + relay + .reconcilie(filter, vec![(EventId::all_zeros(), Timestamp::now())]) + .await?; + + Ok(()) +} diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index f29526d78..6b5b48e7f 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -19,6 +19,8 @@ use std::time::Instant; use async_utility::futures_util::stream::AbortHandle; use async_utility::{futures_util, thread, time}; use nostr::message::MessageHandleError; +use nostr::negentropy::hex; +use nostr::negentropy::{self, Bytes, Negentropy}; #[cfg(feature = "nip11")] use nostr::nips::nip11::RelayInformationDocument; use nostr::{ClientMessage, Event, EventId, Filter, RelayMessage, SubscriptionId, Timestamp, Url}; @@ -41,6 +43,12 @@ type Message = (RelayEvent, Option>); /// [`Relay`] error #[derive(Debug, Error)] pub enum Error { + /// Negentropy error + #[error(transparent)] + Negentropy(#[from] negentropy::Error), + /// Hex error + #[error(transparent)] + Hex(#[from] hex::Error), /// Channel timeout #[error("channel timeout")] ChannelTimeout, @@ -1546,4 +1554,89 @@ impl Relay { } }); } + + /// Negentropy reconciliation + pub async fn reconcilie( + &self, + filter: Filter, + my_items: Vec<(EventId, Timestamp)>, + ) -> Result<(), Error> { + if !self.opts.read() { + return Err(Error::ReadDisabled); + } + + let id_size: usize = 16; + + let mut negentropy = Negentropy::new(id_size, Some(5_000))?; + + for (id, timestamp) in my_items.into_iter() { + let cutted_id: &[u8] = &id.as_bytes()[..id_size]; + let cutted_id = Bytes::from_slice(cutted_id); + negentropy.add_item(timestamp.as_u64(), cutted_id)?; + } + + negentropy.seal()?; + + let id = SubscriptionId::generate(); + let open_msg = ClientMessage::neg_open(&mut negentropy, &id, filter)?; + + self.send_msg(open_msg, Some(Duration::from_secs(10))) + .await?; + + let mut notifications = self.notification_sender.subscribe(); + while let Ok(notification) = notifications.recv().await { + if let RelayPoolNotification::Message(url, msg) = notification { + if url == self.url { + match msg { + RelayMessage::NegMsg { + subscription_id, + message, + } => { + if subscription_id == id { + let query: Bytes = Bytes::from_hex(message)?; + let mut need_ids: Vec = Vec::new(); + let msg: Option = negentropy.reconcile_with_ids( + &query, + &mut Vec::new(), + &mut need_ids, + )?; + + // TODO: request ids to relay + println!("IDs: {need_ids:?}"); + + match msg { + Some(query) => { + self.send_msg( + ClientMessage::NegMsg { + subscription_id: id.clone(), + message: query.to_hex(), + }, + None, + ) + .await?; + } + None => { + tracing::info!("Reconciliation terminated"); + break; + } + } + } + } + RelayMessage::NegErr { + subscription_id, + code, + } => { + if subscription_id == id { + tracing::error!("Negentropy syncing error: {code}"); + break; + } + } + _ => (), + } + } + } + } + + Ok(()) + } } diff --git a/crates/nostr/Cargo.toml b/crates/nostr/Cargo.toml index 1fa4eb137..baeb5a282 100644 --- a/crates/nostr/Cargo.toml +++ b/crates/nostr/Cargo.toml @@ -56,7 +56,7 @@ bip39 = { version = "2.0", default-features = false, optional = true } bitcoin = { version = "0.30", default-features = false, features = ["rand", "serde"] } cbc = { version = "0.1", optional = true } chacha20 = { version = "0.9", optional = true } -negentropy = { git = "https://github.com/yukibtc/rust-negentropy", rev = "62a65db938c7088d521980d3fc1e1593713a899d", default-features = false } +negentropy = { git = "https://github.com/yukibtc/rust-negentropy", rev = "547592168356f423dd10773384b798e7c7139893", default-features = false } nostr-ots = { version = "0.2", optional = true } once_cell = { workspace = true, optional = true } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-webpki-roots", "socks"], optional = true } From 5c35019c0faccfbc20f4a0a8e91875dca0a152ea Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Tue, 19 Sep 2023 09:50:27 +0200 Subject: [PATCH 08/13] nostr: fix NEG messages deserialization --- crates/nostr/src/message/client.rs | 18 +++++++++--------- crates/nostr/src/message/relay.rs | 11 ++++------- crates/nostr/src/message/subscription.rs | 22 +++++++++++++++++++++- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/crates/nostr/src/message/client.rs b/crates/nostr/src/message/client.rs index 423c19c0a..217b7f32c 100644 --- a/crates/nostr/src/message/client.rs +++ b/crates/nostr/src/message/client.rs @@ -307,12 +307,12 @@ impl ClientMessage { if v_len != 5 { return Err(MessageHandleError::InvalidMessageFormat); } - let subscription_id = SubscriptionId::new(v[1].to_string()); - let filter = Filter::from_json(v[2].to_string())?; - let id_size = v[3] - .as_u64() - .ok_or(MessageHandleError::InvalidMessageFormat)? as u8; - let initial_message = v[4].to_string(); + let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; + let filter: Filter = Filter::from_json(v[2].to_string())?; + let id_size: u8 = + v[3].as_u64() + .ok_or(MessageHandleError::InvalidMessageFormat)? as u8; + let initial_message: String = serde_json::from_value(v[4].clone())?; return Ok(Self::NegOpen { subscription_id, filter: Box::new(filter), @@ -327,8 +327,8 @@ impl ClientMessage { if v_len != 3 { return Err(MessageHandleError::InvalidMessageFormat); } - let subscription_id: SubscriptionId = SubscriptionId::new(v[1].to_string()); - let message: String = v[2].to_string(); + let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; + let message: String = serde_json::from_value(v[2].clone())?; return Ok(Self::NegMsg { subscription_id, message, @@ -341,7 +341,7 @@ impl ClientMessage { if v_len != 2 { return Err(MessageHandleError::InvalidMessageFormat); } - let subscription_id: SubscriptionId = SubscriptionId::new(v[1].to_string()); + let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; return Ok(Self::NegClose { subscription_id }); } diff --git a/crates/nostr/src/message/relay.rs b/crates/nostr/src/message/relay.rs index f4f9b4057..54a093f7a 100644 --- a/crates/nostr/src/message/relay.rs +++ b/crates/nostr/src/message/relay.rs @@ -314,11 +314,8 @@ impl RelayMessage { } let event_id: EventId = serde_json::from_value(v[1].clone())?; - let status: bool = serde_json::from_value(v[2].clone())?; - let message: String = serde_json::from_value(v[3].clone())?; - return Ok(Self::new_ok(event_id, status, message)); } @@ -358,8 +355,8 @@ impl RelayMessage { if v_len != 3 { return Err(MessageHandleError::InvalidMessageFormat); } - let subscription_id: SubscriptionId = SubscriptionId::new(v[1].to_string()); - let message: String = v[2].to_string(); + let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; + let message: String = serde_json::from_value(v[2].clone())?; return Ok(Self::NegMsg { subscription_id, message, @@ -372,8 +369,8 @@ impl RelayMessage { if v_len != 3 { return Err(MessageHandleError::InvalidMessageFormat); } - let subscription_id: SubscriptionId = SubscriptionId::new(v[1].to_string()); - let code: NegentropyErrorCode = NegentropyErrorCode::from(v[2].to_string()); + let subscription_id: SubscriptionId = serde_json::from_value(v[1].clone())?; + let code: NegentropyErrorCode = serde_json::from_value(v[2].clone())?; return Ok(Self::NegErr { subscription_id, code, diff --git a/crates/nostr/src/message/subscription.rs b/crates/nostr/src/message/subscription.rs index 3503683c0..ddbe385cb 100644 --- a/crates/nostr/src/message/subscription.rs +++ b/crates/nostr/src/message/subscription.rs @@ -162,7 +162,7 @@ impl<'de> Deserialize<'de> for Alphabet { } /// Subscription ID -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct SubscriptionId(String); impl SubscriptionId { @@ -199,6 +199,26 @@ impl fmt::Display for SubscriptionId { } } +impl Serialize for SubscriptionId { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl<'de> Deserialize<'de> for SubscriptionId { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Value::deserialize(deserializer)?; + let id: String = serde_json::from_value(value).map_err(serde::de::Error::custom)?; + Ok(Self::new(id)) + } +} + /// Subscription filters #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct Filter { From f221dfc36602a3b31af90d96a19307fc6e00aab4 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Tue, 19 Sep 2023 15:06:36 +0200 Subject: [PATCH 09/13] sdk: update `reconcilie` method --- crates/nostr-sdk/examples/reconcilie.rs | 8 +++----- crates/nostr-sdk/src/relay/mod.rs | 24 +++++++++++++++++------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/crates/nostr-sdk/examples/reconcilie.rs b/crates/nostr-sdk/examples/reconcilie.rs index 9a8b9f39e..079b147ad 100644 --- a/crates/nostr-sdk/examples/reconcilie.rs +++ b/crates/nostr-sdk/examples/reconcilie.rs @@ -13,17 +13,15 @@ async fn main() -> Result<()> { let my_keys = Keys::new(secret_key); let client = Client::new(&my_keys); - client.add_relay("wss://relay.damus.io", None).await?; + client.add_relay("wss://atl.purplerelay.com", None).await?; client.connect().await; let filter = Filter::new() .author(my_keys.public_key().to_string()) .limit(10); - let relay = client.relay("wss://relay.damus.io").await?; - relay - .reconcilie(filter, vec![(EventId::all_zeros(), Timestamp::now())]) - .await?; + let relay = client.relay("wss://atl.purplerelay.com").await?; + relay.reconcilie(filter, Vec::new()).await?; Ok(()) } diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index 6b5b48e7f..2a4e051bd 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -1577,8 +1577,8 @@ impl Relay { negentropy.seal()?; - let id = SubscriptionId::generate(); - let open_msg = ClientMessage::neg_open(&mut negentropy, &id, filter)?; + let sub_id = SubscriptionId::generate(); + let open_msg = ClientMessage::neg_open(&mut negentropy, &sub_id, filter)?; self.send_msg(open_msg, Some(Duration::from_secs(10))) .await?; @@ -1592,7 +1592,7 @@ impl Relay { subscription_id, message, } => { - if subscription_id == id { + if subscription_id == sub_id { let query: Bytes = Bytes::from_hex(message)?; let mut need_ids: Vec = Vec::new(); let msg: Option = negentropy.reconcile_with_ids( @@ -1601,14 +1601,24 @@ impl Relay { &mut need_ids, )?; - // TODO: request ids to relay - println!("IDs: {need_ids:?}"); + let ids: Vec = + need_ids.into_iter().map(|id| id.to_hex()).collect(); + let filter = Filter::new().ids(ids); + self.req_events_of( + vec![filter], + Duration::from_secs(120), + FilterOptions::ExitOnEOSE, + ); match msg { Some(query) => { + tracing::info!( + "Continue with reconciliation with {}", + self.url + ); self.send_msg( ClientMessage::NegMsg { - subscription_id: id.clone(), + subscription_id: sub_id.clone(), message: query.to_hex(), }, None, @@ -1626,7 +1636,7 @@ impl Relay { subscription_id, code, } => { - if subscription_id == id { + if subscription_id == sub_id { tracing::error!("Negentropy syncing error: {code}"); break; } From e99733f369bfd3392c3b0685a82a83d471cf2ec9 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Thu, 28 Sep 2023 12:33:31 +0200 Subject: [PATCH 10/13] sdk: return negentropy error code --- crates/nostr-sdk/src/relay/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index 2a4e051bd..9bc35acba 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -18,6 +18,7 @@ use std::time::Instant; #[cfg(not(target_arch = "wasm32"))] use async_utility::futures_util::stream::AbortHandle; use async_utility::{futures_util, thread, time}; +use nostr::message::relay::NegentropyErrorCode; use nostr::message::MessageHandleError; use nostr::negentropy::hex; use nostr::negentropy::{self, Bytes, Negentropy}; @@ -96,6 +97,9 @@ pub enum Error { /// Filters empty #[error("filters empty")] FiltersEmpty, + /// Reconciliation error + #[error("negentropy reconciliation error: {0}")] + NegentropyReconciliation(NegentropyErrorCode), } /// Relay connection status @@ -1637,8 +1641,7 @@ impl Relay { code, } => { if subscription_id == sub_id { - tracing::error!("Negentropy syncing error: {code}"); - break; + return Err(Error::NegentropyReconciliation(code)); } } _ => (), From 4c5a376ca9149ed61b68770f9bfb790c0c941dbe Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Thu, 28 Sep 2023 12:34:09 +0200 Subject: [PATCH 11/13] ffi: add missing `RelayMessage` --- bindings/nostr-ffi/src/message/relay.rs | 22 ++++++++++++++++++++++ bindings/nostr-ffi/src/nostr.udl | 2 ++ bindings/nostr-sdk-ffi/src/nostr_sdk.udl | 2 ++ 3 files changed, 26 insertions(+) diff --git a/bindings/nostr-ffi/src/message/relay.rs b/bindings/nostr-ffi/src/message/relay.rs index 73e5320a7..59d0924c7 100644 --- a/bindings/nostr-ffi/src/message/relay.rs +++ b/bindings/nostr-ffi/src/message/relay.rs @@ -26,6 +26,14 @@ pub enum RelayMessage { subscription_id: String, count: u64, }, + NegMsg { + subscription_id: String, + message: String, + }, + NegErr { + subscription_id: String, + code: String, + }, } impl From for RelayMessage { @@ -59,6 +67,20 @@ impl From for RelayMessage { subscription_id: subscription_id.to_string(), count: count as u64, }, + NRelayMessage::NegMsg { + subscription_id, + message, + } => Self::NegMsg { + subscription_id: subscription_id.to_string(), + message, + }, + NRelayMessage::NegErr { + subscription_id, + code, + } => Self::NegErr { + subscription_id: subscription_id.to_string(), + code: code.to_string(), + }, } } } diff --git a/bindings/nostr-ffi/src/nostr.udl b/bindings/nostr-ffi/src/nostr.udl index 9f1a33562..b949b8b96 100644 --- a/bindings/nostr-ffi/src/nostr.udl +++ b/bindings/nostr-ffi/src/nostr.udl @@ -136,6 +136,8 @@ interface RelayMessage { Ok(string event_id, boolean status, string message); Auth(string challenge); Count(string subscription_id, u64 count); + NegMsg(string subscription_id, string message); + NegErr(string subscription_id, string code); }; interface ZapRequestData { diff --git a/bindings/nostr-sdk-ffi/src/nostr_sdk.udl b/bindings/nostr-sdk-ffi/src/nostr_sdk.udl index 507ec9dd3..3fdf391ca 100644 --- a/bindings/nostr-sdk-ffi/src/nostr_sdk.udl +++ b/bindings/nostr-sdk-ffi/src/nostr_sdk.udl @@ -156,6 +156,8 @@ interface RelayMessage { Ok(string event_id, boolean status, string message); Auth(string challenge); Count(string subscription_id, u64 count); + NegMsg(string subscription_id, string message); + NegErr(string subscription_id, string code); }; interface ZapRequestData { From fe4b8090cc908969184eccd4a9b7b0f1a848b916 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Thu, 28 Sep 2023 12:40:54 +0200 Subject: [PATCH 12/13] sdk: add `negentropy` example --- crates/nostr-sdk/Cargo.toml | 3 +++ .../examples/{reconcilie.rs => negentropy.rs} | 12 +++++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) rename crates/nostr-sdk/examples/{reconcilie.rs => negentropy.rs} (66%) diff --git a/crates/nostr-sdk/Cargo.toml b/crates/nostr-sdk/Cargo.toml index 46398cc75..c875802a6 100644 --- a/crates/nostr-sdk/Cargo.toml +++ b/crates/nostr-sdk/Cargo.toml @@ -76,5 +76,8 @@ name = "shutdown-on-drop" name = "subscriptions" required-features = ["all-nips"] +[[example]] +name = "negentropy" + [[example]] name = "nip65" \ No newline at end of file diff --git a/crates/nostr-sdk/examples/reconcilie.rs b/crates/nostr-sdk/examples/negentropy.rs similarity index 66% rename from crates/nostr-sdk/examples/reconcilie.rs rename to crates/nostr-sdk/examples/negentropy.rs index 079b147ad..6c161b4fc 100644 --- a/crates/nostr-sdk/examples/reconcilie.rs +++ b/crates/nostr-sdk/examples/negentropy.rs @@ -17,11 +17,21 @@ async fn main() -> Result<()> { client.connect().await; + let my_items = Vec::new(); let filter = Filter::new() .author(my_keys.public_key().to_string()) .limit(10); let relay = client.relay("wss://atl.purplerelay.com").await?; - relay.reconcilie(filter, Vec::new()).await?; + relay.reconcilie(filter, my_items).await?; + + client + .handle_notifications(|notification| async { + if let RelayPoolNotification::Event(_url, event) = notification { + println!("{:?}", event); + } + Ok(false) // Set to true to exit from the loop + }) + .await?; Ok(()) } From a542ea68b5b442fbb47e4ba8d928c7a4fb7c026f Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Thu, 28 Sep 2023 12:41:55 +0200 Subject: [PATCH 13/13] sdk: close negentropy reconciliation when terminated --- crates/nostr-sdk/src/relay/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index 9bc35acba..5164394bd 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -1650,6 +1650,11 @@ impl Relay { } } + let close_msg = ClientMessage::NegClose { + subscription_id: sub_id, + }; + self.send_msg(close_msg, None).await?; + Ok(()) } }