diff --git a/Cargo.lock b/Cargo.lock index 1ffd9992..e7ba8b28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "ab_glyph" @@ -1510,9 +1510,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -1525,9 +1525,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1535,15 +1535,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -1552,15 +1552,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1569,21 +1569,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -2606,6 +2606,7 @@ dependencies = [ "ehttp 0.2.0", "enostr", "env_logger 0.10.2", + "futures", "hex", "image", "indexmap", diff --git a/Cargo.toml b/Cargo.toml index ca1cf02c..2a3a724c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ dirs = "5.0.1" tracing-appender = "0.2.3" urlencoding = "2.1.3" open = "5.3.0" +futures = "0.3.31" [dev-dependencies] tempfile = "3.13.0" diff --git a/enostr/src/relay/pool.rs b/enostr/src/relay/pool.rs index c138a495..0abac3b7 100644 --- a/enostr/src/relay/pool.rs +++ b/enostr/src/relay/pool.rs @@ -3,6 +3,7 @@ use crate::{ClientMessage, Result}; use nostrdb::Filter; use std::collections::HashMap; +use std::collections::HashSet; use std::time::{Duration, Instant}; use url::Url; @@ -11,7 +12,7 @@ use url::Url; use ewebsock::{WsEvent, WsMessage}; #[cfg(not(target_arch = "wasm32"))] -use tracing::{debug, error}; +use tracing::{debug, error, info}; #[derive(Debug)] pub struct PoolEvent<'a> { @@ -159,7 +160,7 @@ impl RelayPool { url: String, wakeup: impl Fn() + Send + Sync + Clone + 'static, ) -> Result<()> { - let url = Self::canonicalize_url(url); + let url = Self::canonicalize_url(&url); // Check if the URL already exists in the pool. if self.has(&url) { return Ok(()); @@ -176,12 +177,55 @@ impl RelayPool { Ok(()) } + // Add and remove relays to match the provided list + pub fn set_relays( + &mut self, + urls: &Vec, + wakeup: impl Fn() + Send + Sync + Clone + 'static, + ) -> Result<()> { + // Canonicalize the new URLs. + let new_urls = urls + .iter() + .map(|u| Self::canonicalize_url(u)) + .collect::>(); + + // Get the old URLs from the existing relays. + let old_urls = self + .relays + .iter() + .map(|pr| pr.relay.url.clone()) + .collect::>(); + + debug!("old relays: {:?}", old_urls); + debug!("new relays: {:?}", new_urls); + + if new_urls.len() == 0 { + info!("bootstrapping, not clearing the relay list ..."); + return Ok(()); + } + + // Remove the relays that are in old_urls but not in new_urls. + let to_remove: HashSet<_> = old_urls.difference(&new_urls).cloned().collect(); + self.relays.retain(|pr| !to_remove.contains(&pr.relay.url)); + + // FIXME - how do we close connections the removed relays? + + // Add the relays that are in new_urls but not in old_urls. + let to_add: HashSet<_> = new_urls.difference(&old_urls).cloned().collect(); + for url in to_add { + if let Err(e) = self.add_url(url.clone(), wakeup.clone()) { + error!("Failed to add relay with URL {}: {:?}", url, e); + } + } + + Ok(()) + } - // standardize the format (ie, trailing slashes) - fn canonicalize_url(url: String) -> String { + // standardize the format (ie, trailing slashes) to avoid dups + fn canonicalize_url(url: &String) -> String { match Url::parse(&url) { Ok(parsed_url) => parsed_url.to_string(), - Err(_) => url, // If parsing fails, return the original URL. + Err(_) => url.clone(), // If parsing fails, return the original URL. } } diff --git a/src/app.rs b/src/app.rs index 0f475170..9c1bd0e4 100644 --- a/src/app.rs +++ b/src/app.rs @@ -18,6 +18,7 @@ use crate::{ storage::{Directory, FileKeyStorage, KeyStorageType}, subscriptions::{SubKind, Subscriptions}, support::Support, + task, thread::Thread, timeline::{Timeline, TimelineId, TimelineKind, ViewFilter}, ui::{self, DesktopSidePanel}, @@ -475,6 +476,13 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) { .insert("unknownids".to_string(), SubKind::OneShot); setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns) .expect("home subscription failed"); + + // This is only safe because we are absolutely single threaded ... + let damus_ptr = &mut *damus as *mut Damus; + task::spawn_sendable(async move { + let damus = unsafe { &mut *damus_ptr }; + task::track_user_relays(damus).await; + }); } DamusState::NewTimelineSub(new_timeline_id) => { diff --git a/src/lib.rs b/src/lib.rs index 4ed39bfd..31faee14 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ mod result; mod route; mod subscriptions; mod support; +mod task; mod test_data; mod thread; mod time; diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 00000000..b8d25b29 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,138 @@ +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::Poll; +use tokio::task; + +use tracing::{debug, error}; + +use enostr::RelayPool; +use nostrdb::{Filter, Ndb, Transaction}; +use uuid::Uuid; + +use crate::note::NoteRef; +use crate::Damus; + +pub async fn track_user_relays(damus: &mut Damus) { + debug!("track_user_relays starting"); + + let filter = user_relay_filter(damus); + + // Do we have a user relay list stored in nostrdb? Start with that ... + let txn = Transaction::new(&damus.ndb).expect("transaction"); + let relays = query_nip65_relays(&damus.ndb, &txn, &filter); + debug!("track_user_relays: initial from nostrdb: {:#?}", relays); + set_relays(&mut damus.pool, relays); + drop(txn); + + // Subscribe to user relay list updates + let ndbsub = damus + .ndb + .subscribe(&[filter.clone()]) + .expect("ndb subscription"); + let poolid = Uuid::new_v4().to_string(); + damus.pool.subscribe(poolid.clone(), vec![filter.clone()]); + + // Wait for updates to the subscription + loop { + match damus.ndb.wait_for_notes(ndbsub, 10).await { + Ok(vec) => { + debug!("saw {:?}", vec); + let txn = Transaction::new(&damus.ndb).expect("transaction"); + let relays = query_nip65_relays(&damus.ndb, &txn, &filter); + debug!( + "track_user_relays: subscription from nostrdb: {:#?}", + relays + ); + set_relays(&mut damus.pool, relays); + } + Err(err) => error!("err: {:?}", err), + } + } +} + +fn user_relay_filter(damus: &mut Damus) -> Filter { + let account = damus + .accounts + .get_selected_account() + .as_ref() + .map(|a| a.pubkey.bytes()) + .expect("selected account"); + + // NIP-65 + Filter::new() + .authors([account]) + .kinds([10002]) + .limit(1) + .build() +} + +// useful for debugging +fn _query_note_json(ndb: &Ndb, txn: &Transaction, filter: &Filter) -> Vec { + let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32; + let results = ndb + .query(txn, &[filter.clone()], lim) + .expect("query results"); + results + .iter() + .map(|qr| NoteRef::new(qr.note_key, qr.note.created_at())) + .filter_map(|nr| ndb.get_note_by_key(txn, nr.key).ok()) + .map(|n| n.json().unwrap()) + .collect() +} + +fn query_nip65_relays(ndb: &Ndb, txn: &Transaction, filter: &Filter) -> Vec { + let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32; + let results = ndb + .query(txn, &[filter.clone()], lim) + .expect("query results"); + results + .iter() + .map(|qr| NoteRef::new(qr.note_key, qr.note.created_at())) + .filter_map(|nr| ndb.get_note_by_key(txn, nr.key).ok()) + .flat_map(|n| { + n.tags() + .iter() + .filter_map(|ti| ti.get_unchecked(1).variant().str()) + .map(|s| s.to_string()) + }) + .collect() +} + +fn set_relays(pool: &mut RelayPool, relays: Vec) { + let wakeup = move || { + // FIXME - how do we repaint? + }; + if let Err(e) = pool.set_relays(&relays, wakeup) { + error!("{:?}", e) + } +} + +// Generic task spawning helpers + +struct SendableFuture { + future: Pin>, + _marker: PhantomData<*const ()>, +} + +unsafe impl Send for SendableFuture {} + +impl Future for SendableFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + self.get_mut().future.as_mut().poll(cx) + } +} + +pub fn spawn_sendable(future: F) +where + F: Future + 'static, +{ + let future = Box::pin(future); + let sendable_future = SendableFuture { + future, + _marker: PhantomData, + }; + task::spawn(sendable_future); +}