From e469fe3a82e5f81f27e790146b531324988d90f8 Mon Sep 17 00:00:00 2001 From: Ken Sedgwick Date: Mon, 9 Dec 2024 15:22:33 -0800 Subject: [PATCH] WIP: rough subscription manager API proposal --- Cargo.lock | 4 + crates/notedeck/Cargo.toml | 4 + crates/notedeck/src/lib.rs | 5 + crates/notedeck/src/submgr.rs | 266 ++++++++++++++++++++++++++ crates/notedeck/src/util/mod.rs | 4 + crates/notedeck/src/util/test_util.rs | 72 +++++++ 6 files changed, 355 insertions(+) create mode 100644 crates/notedeck/src/submgr.rs create mode 100644 crates/notedeck/src/util/mod.rs create mode 100644 crates/notedeck/src/util/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index 949abbb1..9494bfe1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2531,18 +2531,22 @@ dependencies = [ "dirs", "egui", "enostr", + "futures", "hex", "image", + "nostr", "nostrdb", "poll-promise", "puffin 0.19.1 (git+https://github.com/jb55/puffin?rev=70ff86d5503815219b01a009afd3669b7903a057)", "security-framework", "serde", "serde_json", + "sha2", "strum", "strum_macros", "tempfile", "thiserror 2.0.7", + "tokio", "tracing", "url", "uuid", diff --git a/crates/notedeck/Cargo.toml b/crates/notedeck/Cargo.toml index 31bdfba2..3e2949cd 100644 --- a/crates/notedeck/Cargo.toml +++ b/crates/notedeck/Cargo.toml @@ -22,9 +22,13 @@ serde = { workspace = true } hex = { workspace = true } thiserror = { workspace = true } puffin = { workspace = true, optional = true } +futures = "0.3.31" [dev-dependencies] tempfile = { workspace = true } +tokio = { workspace = true } +nostr = { workspace = true } +sha2 = "0.10.8" [target.'cfg(target_os = "macos")'.dependencies] security-framework = { workspace = true } diff --git a/crates/notedeck/src/lib.rs b/crates/notedeck/src/lib.rs index 50ad2c7d..0ad7bf4b 100644 --- a/crates/notedeck/src/lib.rs +++ b/crates/notedeck/src/lib.rs @@ -12,6 +12,7 @@ mod notecache; mod result; pub mod storage; mod style; +pub mod submgr; pub mod theme; mod theme_handler; mod time; @@ -20,6 +21,10 @@ pub mod ui; mod unknowns; mod user_account; +/// Various utilities +#[macro_use] +pub mod util; + pub use accounts::{AccountData, Accounts, AccountsAction, AddAccountAction}; pub use app::App; pub use args::Args; diff --git a/crates/notedeck/src/submgr.rs b/crates/notedeck/src/submgr.rs new file mode 100644 index 00000000..b4391edd --- /dev/null +++ b/crates/notedeck/src/submgr.rs @@ -0,0 +1,266 @@ +#![allow(unused)] + +use futures::StreamExt; +use std::cmp::Ordering; +use std::collections::BTreeMap; +use std::error::Error; +use std::fmt; +use thiserror::Error; + +use enostr::Filter; +use nostrdb::{self, Config, Ndb, NoteKey, Subscription, SubscriptionStream}; + +/// The Subscription Manager +/// +/// NOTE - This interface wishes it was called Subscriptions but there +/// already is one. Using a lame (but short) placeholder name instead +/// for now ... +/// +/// ```no_run +/// use std::error::Error; +/// +/// use nostrdb::{Config, Ndb}; +/// use enostr::Filter; +/// use notedeck::submgr::{SubConstraint, SubMgr, SubSpecBuilder, SubError}; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let mut ndb = Ndb::new("the/db/path/", &Config::new())?; +/// let mut submgr = SubMgr::new(&mut ndb); +/// +/// // Define a filter and build the subscription specification +/// let filter = Filter::new().kinds(vec![1, 2, 3]).build(); +/// let spec = SubSpecBuilder::new() +/// .filters(vec![filter]) +/// .constraint(SubConstraint::Local) +/// .build(); +/// +/// // Subscribe and obtain a SubReceiver +/// let mut receiver = submgr.subscribe(spec)?; +/// +/// // Process incoming note keys +/// loop { +/// match receiver.next().await { +/// Ok(note_keys) => { +/// // Process the note keys +/// println!("Received note keys: {:?}", note_keys); +/// }, +/// Err(SubError::StreamEnded) => { +/// // Not really an error; we should clean up +/// break; +/// }, +/// Err(err) => { +/// // Handle other errors +/// eprintln!("Error: {:?}", err); +/// break; +/// }, +/// } +/// } +/// +/// // Unsubscribe when the subscription is no longer needed +/// submgr.unsubscribe(&receiver)?; +/// +/// Ok(()) +/// } +/// ``` + +#[derive(Debug, Error)] +pub enum SubError { + #[error("Stream ended")] + StreamEnded, + + #[error("Internal error: {0}")] + InternalError(String), + + #[error("nostrdb error: {0}")] + NdbError(#[from] nostrdb::Error), +} + +pub type SubResult = Result; + +#[derive(Debug, Clone, Copy)] +pub struct SubId(nostrdb::Subscription); + +impl From for SubId { + fn from(subscription: Subscription) -> Self { + SubId(subscription) + } +} + +impl Ord for SubId { + fn cmp(&self, other: &Self) -> Ordering { + self.0.id().cmp(&other.0.id()) + } +} + +impl PartialOrd for SubId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for SubId { + fn eq(&self, other: &Self) -> bool { + self.0.id() == other.0.id() + } +} + +impl Eq for SubId {} + +#[derive(Debug, Clone)] +pub enum SubConstraint { + OneShot, // terminate subscription after initial query + Local, // only query the local db, no remote subs + OutboxRelays(Vec), // ensure one of these is in the active relay set + AllowedRelays(Vec), // if not empty, only use these relays + BlockedRelays(Vec), // if not empty, don't use these relays +} + +#[derive(Debug, Default)] +pub struct SubSpecBuilder { + rmtid: Option, + filters: Vec, + constraints: Vec, +} + +impl SubSpecBuilder { + pub fn new() -> Self { + SubSpecBuilder::default() + } + pub fn rmtid(mut self, id: String) -> Self { + self.rmtid = Some(id); + self + } + pub fn filters(mut self, filters: Vec) -> Self { + self.filters.extend(filters); + self + } + pub fn constraint(mut self, constraint: SubConstraint) -> Self { + self.constraints.push(constraint); + self + } + pub fn build(self) -> SubSpec { + let mut outbox_relays = Vec::new(); + let mut allowed_relays = Vec::new(); + let mut blocked_relays = Vec::new(); + let mut is_oneshot = false; + let mut is_local = false; + + for constraint in self.constraints { + match constraint { + SubConstraint::OneShot => is_oneshot = true, + SubConstraint::Local => is_local = true, + SubConstraint::OutboxRelays(relays) => outbox_relays.extend(relays), + SubConstraint::AllowedRelays(relays) => allowed_relays.extend(relays), + SubConstraint::BlockedRelays(relays) => blocked_relays.extend(relays), + } + } + + SubSpec { + rmtid: self.rmtid, + filters: self.filters, + outbox_relays, + allowed_relays, + blocked_relays, + is_oneshot, + is_local, + } + } +} + +#[derive(Debug, Clone)] +pub struct SubSpec { + rmtid: Option, + filters: Vec, + outbox_relays: Vec, + allowed_relays: Vec, + blocked_relays: Vec, + is_oneshot: bool, + is_local: bool, +} + +pub struct SubMgr<'a> { + ndb: &'a mut Ndb, + subs: BTreeMap, +} + +impl<'a> SubMgr<'a> { + pub fn new(ndb: &'a mut Ndb) -> Self { + SubMgr { + ndb, + subs: BTreeMap::new(), + } + } + + pub fn subscribe(&mut self, spec: SubSpec) -> SubResult { + let receiver = self.make_subscription(&spec)?; + self.subs.insert(receiver.id, spec); + Ok(receiver) + } + + pub fn unsubscribe(&mut self, rcvr: &SubReceiver) -> SubResult<()> { + self.subs.remove(&rcvr.id); + Ok(()) + } + + fn make_subscription(&mut self, sub: &SubSpec) -> SubResult { + let subscription = self.ndb.subscribe(&sub.filters)?; + let mut stream = subscription.stream(self.ndb).notes_per_await(1); + Ok(SubReceiver::new(subscription.into(), stream)) + } +} + +pub struct SubReceiver { + id: SubId, + stream: SubscriptionStream, +} + +impl SubReceiver { + pub fn new(id: SubId, stream: SubscriptionStream) -> Self { + SubReceiver { id, stream } + } + + pub async fn next(&mut self) -> SubResult> { + self.stream.next().await.ok_or(SubError::StreamEnded) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use nostrdb::NoteBuilder; + + use crate::testdbs_path_async; + use crate::util::test_util::{test_keypair, ScopedNdb}; + + #[tokio::test] + async fn test_submgr_sub() { + // setup an ndb and submgr to test + let mut sndb = ScopedNdb::new(&testdbs_path_async!()); + let mut submgr = SubMgr::new(&mut sndb.ndb); + + // subscribe to some stuff + let filter = Filter::new().kinds(vec![1]).build(); + let spec = SubSpecBuilder::new() + .filters(vec![filter]) + .constraint(SubConstraint::Local) + .build(); + let mut receiver = submgr.subscribe(spec).expect("receiver"); + + // generate a test event that matches the subscription + let keys1 = test_keypair(1); + let note = NoteBuilder::new() + .kind(1) + .content("hello, world") + .sign(&keys1.secret_key.to_secret_bytes()) + .build() + .expect("note"); + let raw_msg = format!("[\"EVENT\" \"random_string\", {}]", note.json().unwrap()); + sndb.ndb.process_event(&raw_msg).expect("process ok"); + + // receiver should now see the msg + let notekeys = receiver.next().await.expect("notekeys"); + assert_eq!(notekeys, vec![NoteKey::new(1)]); + } +} diff --git a/crates/notedeck/src/util/mod.rs b/crates/notedeck/src/util/mod.rs new file mode 100644 index 00000000..0964f7ae --- /dev/null +++ b/crates/notedeck/src/util/mod.rs @@ -0,0 +1,4 @@ +#[allow(missing_docs)] +#[cfg(test)] +#[macro_use] +pub mod test_util; diff --git a/crates/notedeck/src/util/test_util.rs b/crates/notedeck/src/util/test_util.rs new file mode 100644 index 00000000..d890a101 --- /dev/null +++ b/crates/notedeck/src/util/test_util.rs @@ -0,0 +1,72 @@ +use enostr::{FullKeypair, Pubkey}; +use nostrdb::{Config, Ndb}; +use std::fs; +use std::path::Path; + +// FIXME - make nostrdb::test_util::cleanup_db accessible instead +#[allow(dead_code)] +fn cleanup_db(path: &str) { + let p = Path::new(path); + let _ = fs::remove_file(p.join("data.mdb")); + let _ = fs::remove_file(p.join("lock.mdb")); +} + +// scoped ndb handle that cleans up when dropped +pub struct ScopedNdb { + pub path: String, + pub ndb: Ndb, +} +impl ScopedNdb { + pub fn new(path: &str) -> Self { + cleanup_db(path); // ensure a clean slate before starting + let ndb = Ndb::new(path, &Config::new()) + .unwrap_or_else(|err| panic!("Failed to create Ndb at {}: {}", path, err)); + Self { + path: path.to_string(), + ndb, + } + } +} +impl Drop for ScopedNdb { + fn drop(&mut self) { + cleanup_db(&self.path); // comment this out to leave the db for inspection + } +} + +// generate a testdbs_path for an async test automatically +#[macro_export] +macro_rules! testdbs_path_async { + () => {{ + fn f() {} + fn type_name_of(_: T) -> &'static str { + core::any::type_name::() + } + let name = type_name_of(f); + + // Find and cut the rest of the path + let test_name = match &name[..name.len() - 3].strip_suffix("::{{closure}}") { + Some(stripped) => match &stripped.rfind(':') { + Some(pos) => &stripped[pos + 1..stripped.len()], + None => &stripped, + }, + None => &name[..name.len() - 3], + }; + + format!("target/testdbs/{}", test_name) + }}; +} + +// generate a deterministic keypair for testing +pub fn test_keypair(input: u64) -> FullKeypair { + use sha2::{Digest, Sha256}; + + let mut hasher = Sha256::new(); + hasher.update(input.to_le_bytes()); + let hash = hasher.finalize(); + + let secret_key = nostr::SecretKey::from_slice(&hash).expect("valid secret key"); + let (xopk, _) = secret_key.x_only_public_key(&nostr::SECP256K1); + let pubkey = Pubkey::new(xopk.serialize()); + + FullKeypair::new(pubkey, secret_key) +}