Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Negentropy Syncing #158

Merged
merged 13 commits into from
Sep 28, 2023
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions bindings/nostr-ffi/src/message/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ pub enum RelayMessage {
subscription_id: String,
count: u64,
},
NegMsg {
subscription_id: String,
message: String,
},
NegErr {
subscription_id: String,
code: String,
},
}

impl From<NRelayMessage> for RelayMessage {
Expand Down Expand Up @@ -59,6 +67,20 @@ impl From<NRelayMessage> for RelayMessage {
subscription_id: subscription_id.to_string(),
count: count as u64,
},
NRelayMessage::NegMsg {
subscription_id,
message,
} => Self::NegMsg {
subscription_id: subscription_id.to_string(),
message,
},
NRelayMessage::NegErr {
subscription_id,
code,
} => Self::NegErr {
subscription_id: subscription_id.to_string(),
code: code.to_string(),
},
}
}
}
2 changes: 2 additions & 0 deletions bindings/nostr-ffi/src/nostr.udl
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ interface RelayMessage {
Ok(string event_id, boolean status, string message);
Auth(string challenge);
Count(string subscription_id, u64 count);
NegMsg(string subscription_id, string message);
NegErr(string subscription_id, string code);
};

interface ZapRequestData {
Expand Down
2 changes: 2 additions & 0 deletions bindings/nostr-sdk-ffi/src/nostr_sdk.udl
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ interface RelayMessage {
Ok(string event_id, boolean status, string message);
Auth(string challenge);
Count(string subscription_id, u64 count);
NegMsg(string subscription_id, string message);
NegErr(string subscription_id, string code);
};

interface ZapRequestData {
Expand Down
3 changes: 3 additions & 0 deletions crates/nostr-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,8 @@ name = "shutdown-on-drop"
name = "subscriptions"
required-features = ["all-nips"]

[[example]]
name = "negentropy"

[[example]]
name = "nip65"
37 changes: 37 additions & 0 deletions crates/nostr-sdk/examples/negentropy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2022-2023 Yuki Kishimoto
// Distributed under the MIT software license

use nostr_sdk::prelude::*;

const BECH32_SK: &str = "nsec1ufnus6pju578ste3v90xd5m2decpuzpql2295m3sknqcjzyys9ls0qlc85";

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();

let secret_key = SecretKey::from_bech32(BECH32_SK)?;
let my_keys = Keys::new(secret_key);

let client = Client::new(&my_keys);
client.add_relay("wss://atl.purplerelay.com", None).await?;

client.connect().await;

let my_items = Vec::new();
let filter = Filter::new()
.author(my_keys.public_key().to_string())
.limit(10);
let relay = client.relay("wss://atl.purplerelay.com").await?;
relay.reconcilie(filter, my_items).await?;

client
.handle_notifications(|notification| async {
if let RelayPoolNotification::Event(_url, event) = notification {
println!("{:?}", event);
}
Ok(false) // Set to true to exit from the loop
})
.await?;

Ok(())
}
111 changes: 111 additions & 0 deletions crates/nostr-sdk/src/relay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use std::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use async_utility::futures_util::stream::AbortHandle;
use async_utility::{futures_util, thread, time};
use nostr::message::relay::NegentropyErrorCode;
use nostr::message::MessageHandleError;
use nostr::negentropy::hex;
use nostr::negentropy::{self, Bytes, Negentropy};
#[cfg(feature = "nip11")]
use nostr::nips::nip11::RelayInformationDocument;
use nostr::{ClientMessage, Event, EventId, Filter, RelayMessage, SubscriptionId, Timestamp, Url};
Expand All @@ -41,6 +44,12 @@ type Message = (RelayEvent, Option<oneshot::Sender<bool>>);
/// [`Relay`] error
#[derive(Debug, Error)]
pub enum Error {
/// Negentropy error
#[error(transparent)]
Negentropy(#[from] negentropy::Error),
/// Hex error
#[error(transparent)]
Hex(#[from] hex::Error),
/// Channel timeout
#[error("channel timeout")]
ChannelTimeout,
Expand Down Expand Up @@ -88,6 +97,9 @@ pub enum Error {
/// Filters empty
#[error("filters empty")]
FiltersEmpty,
/// Reconciliation error
#[error("negentropy reconciliation error: {0}")]
NegentropyReconciliation(NegentropyErrorCode),
}

/// Relay connection status
Expand Down Expand Up @@ -1546,4 +1558,103 @@ impl Relay {
}
});
}

/// Negentropy reconciliation
pub async fn reconcilie(
&self,
filter: Filter,
my_items: Vec<(EventId, Timestamp)>,
) -> Result<(), Error> {
if !self.opts.read() {
return Err(Error::ReadDisabled);
}

let id_size: usize = 16;

let mut negentropy = Negentropy::new(id_size, Some(5_000))?;

for (id, timestamp) in my_items.into_iter() {
let cutted_id: &[u8] = &id.as_bytes()[..id_size];
let cutted_id = Bytes::from_slice(cutted_id);
negentropy.add_item(timestamp.as_u64(), cutted_id)?;
}

negentropy.seal()?;

let sub_id = SubscriptionId::generate();
let open_msg = ClientMessage::neg_open(&mut negentropy, &sub_id, filter)?;

self.send_msg(open_msg, Some(Duration::from_secs(10)))
.await?;

let mut notifications = self.notification_sender.subscribe();
while let Ok(notification) = notifications.recv().await {
if let RelayPoolNotification::Message(url, msg) = notification {
if url == self.url {
match msg {
RelayMessage::NegMsg {
subscription_id,
message,
} => {
if subscription_id == sub_id {
let query: Bytes = Bytes::from_hex(message)?;
let mut need_ids: Vec<Bytes> = Vec::new();
let msg: Option<Bytes> = negentropy.reconcile_with_ids(
&query,
&mut Vec::new(),
&mut need_ids,
)?;

let ids: Vec<String> =
need_ids.into_iter().map(|id| id.to_hex()).collect();
let filter = Filter::new().ids(ids);
self.req_events_of(
vec![filter],
Duration::from_secs(120),
FilterOptions::ExitOnEOSE,
);

match msg {
Some(query) => {
tracing::info!(
"Continue with reconciliation with {}",
self.url
);
self.send_msg(
ClientMessage::NegMsg {
subscription_id: sub_id.clone(),
message: query.to_hex(),
},
None,
)
.await?;
}
None => {
tracing::info!("Reconciliation terminated");
break;
}
}
}
}
RelayMessage::NegErr {
subscription_id,
code,
} => {
if subscription_id == sub_id {
return Err(Error::NegentropyReconciliation(code));
}
}
_ => (),
}
}
}
}

let close_msg = ClientMessage::NegClose {
subscription_id: sub_id,
};
self.send_msg(close_msg, None).await?;

Ok(())
}
}
26 changes: 16 additions & 10 deletions crates/nostr-sdk/src/relay/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,24 @@ impl RelayPoolTask {
msg.clone(),
));

if let RelayMessage::Event { event, .. } = msg {
// Verifies if the event is valid
if event.verify().is_ok() {
// Adds only new events
if this.add_event(event.id).await {
let notification = RelayPoolNotification::Event(
relay_url,
event.as_ref().clone(),
);
let _ = this.notification_sender.send(notification);
match msg {
RelayMessage::Event { event, .. } => {
// Verifies if the event is valid
if event.verify().is_ok() {
// Adds only new events
if this.add_event(event.id).await {
let notification = RelayPoolNotification::Event(
relay_url,
event.as_ref().clone(),
);
let _ = this.notification_sender.send(notification);
}
}
}
RelayMessage::Notice { message } => {
tracing::warn!("Notice from {relay_url}: {message}")
}
_ => (),
}
}
RelayPoolMessage::BatchEvent(ids) => {
Expand Down
2 changes: 2 additions & 0 deletions crates/nostr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ std = [
"bitcoin/rand-std",
"bip39?/std",
"chacha20?/std",
"negentropy/std",
"serde/std",
"serde_json/std",
"tracing/std",
Expand Down Expand Up @@ -55,6 +56,7 @@ bip39 = { version = "2.0", default-features = false, optional = true }
bitcoin = { version = "0.30", default-features = false, features = ["rand", "serde"] }
cbc = { version = "0.1", optional = true }
chacha20 = { version = "0.9", optional = true }
negentropy = { git = "https://github.com/yukibtc/rust-negentropy", rev = "547592168356f423dd10773384b798e7c7139893", default-features = false }
nostr-ots = { version = "0.2", optional = true }
once_cell = { workspace = true, optional = true }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-webpki-roots", "socks"], optional = true }
Expand Down
1 change: 1 addition & 0 deletions crates/nostr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use bitcoin;
pub use bitcoin::bech32;
pub use bitcoin::hashes;
pub use bitcoin::secp256k1;
pub use negentropy;
pub use serde_json;
pub use url_fork::{self as url, Url};

Expand Down
Loading