Skip to content

Commit

Permalink
Wrap Damus in DamusApp for synchronization
Browse files Browse the repository at this point in the history
- Add dispatcher and task modules
- android: use wrapped damus
- Set the relay list to exactly the user's list
- don't ever clear the relay list entirely, happens when bootstrapping
- refactor track_user_relays for clarity
- use ndb set_sub_cb to receive sub update notifications
  • Loading branch information
ksedgwic committed Oct 22, 2024
1 parent f93ff70 commit 4162481
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 29 deletions.
42 changes: 22 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ strum_macros = "0.26"
bitflags = "2.5.0"
uuid = { version = "1.10.0", features = ["v4"] }
indexmap = "2.6.0"
futures = "0.3.31"
once_cell = "1.20.0"

[target.'cfg(target_os = "macos")'.dependencies]
security-framework = "2.11.0"
Expand Down
55 changes: 50 additions & 5 deletions enostr/src/relay/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{ClientMessage, Result};
use nostrdb::Filter;

use std::collections::HashMap;
use std::collections::HashSet;
use std::time::{Duration, Instant};

use url::Url;
Expand All @@ -11,7 +12,7 @@ use url::Url;
use ewebsock::{WsEvent, WsMessage};

#[cfg(not(target_arch = "wasm32"))]
use tracing::{debug, error};
use tracing::{debug, error, info};

#[derive(Debug)]
pub struct PoolEvent<'a> {
Expand Down Expand Up @@ -159,7 +160,7 @@ impl RelayPool {
url: String,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<()> {
let url = Self::canonicalize_url(url);
let url = Self::canonicalize_url(&url);
// Check if the URL already exists in the pool.
if self.has(&url) {
return Ok(());
Expand All @@ -177,11 +178,55 @@ impl RelayPool {
Ok(())
}

// standardize the format (ie, trailing slashes)
fn canonicalize_url(url: String) -> String {
// Add and remove relays to match the provided list
pub fn set_relays(
&mut self,
urls: &Vec<String>,
wakeup: impl Fn() + Send + Sync + Clone + 'static,
) -> Result<()> {
// Canonicalize the new URLs.
let new_urls = urls
.iter()
.map(|u| Self::canonicalize_url(u))
.collect::<HashSet<_>>();

// Get the old URLs from the existing relays.
let old_urls = self
.relays
.iter()
.map(|pr| pr.relay.url.clone())
.collect::<HashSet<_>>();

debug!("old relays: {:?}", old_urls);
debug!("new relays: {:?}", new_urls);

if new_urls.len() == 0 {
info!("bootstrapping, not clearing the relay list ...");
return Ok(());
}

// Remove the relays that are in old_urls but not in new_urls.
let to_remove: HashSet<_> = old_urls.difference(&new_urls).cloned().collect();
self.relays.retain(|pr| !to_remove.contains(&pr.relay.url));

// FIXME - how do we close connections the removed relays?

// Add the relays that are in new_urls but not in old_urls.
let to_add: HashSet<_> = new_urls.difference(&old_urls).cloned().collect();
for url in to_add {
if let Err(e) = self.add_url(url.clone(), wakeup.clone()) {
error!("Failed to add relay with URL {}: {:?}", url, e);
}
}

Ok(())
}

// standardize the format (ie, trailing slashes) to avoid dups
fn canonicalize_url(url: &String) -> String {
match Url::parse(&url) {
Ok(parsed_url) => parsed_url.to_string(),
Err(_) => url, // If parsing fails, return the original URL.
Err(_) => url.clone(), // If parsing fails, return the original URL.
}
}

Expand Down
108 changes: 108 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
app_style::user_requested_visuals_change,
args::Args,
column::Columns,
dispatcher::{self, HandlerTable},
draft::Drafts,
error::{Error, FilterError},
filter::{self, FilterState},
Expand All @@ -16,6 +17,7 @@ use crate::{
notes_holder::NotesHolderStorage,
profile::Profile,
subscriptions::{SubKind, Subscriptions},
task,
thread::Thread,
timeline::{Timeline, TimelineId, TimelineKind, ViewFilter},
ui::{self, DesktopSidePanel},
Expand Down Expand Up @@ -46,6 +48,7 @@ pub enum DamusState {

/// We derive Deserialize/Serialize so we can persist app state on shutdown.
pub struct Damus {
reference: Option<Weak<Mutex<Damus>>>,
state: DamusState,
pub note_cache: NoteCache,
pub pool: RelayPool,
Expand All @@ -60,6 +63,7 @@ pub struct Damus {
pub img_cache: ImageCache,
pub accounts: AccountManager,
pub subscriptions: Subscriptions,
pub dispatch: HandlerTable,

frame_history: crate::frame_history::FrameHistory,

Expand Down Expand Up @@ -471,6 +475,11 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
.insert("unknownids".to_string(), SubKind::OneShot);
setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns)
.expect("home subscription failed");

let damusref = damus.reference();
tokio::spawn(async move {
task::track_user_relays(damusref).await;
});
}

DamusState::NewTimelineSub(new_timeline_id) => {
Expand Down Expand Up @@ -663,6 +672,7 @@ impl Damus {

let mut config = Config::new();
config.set_ingester_threads(4);
config.set_sub_cb(ndb_sub_updated);

let mut accounts = AccountManager::new(
// TODO: should pull this from settings
Expand Down Expand Up @@ -720,10 +730,12 @@ impl Damus {
}

Self {
reference: None,
pool,
debug,
unknown_ids: UnknownIds::default(),
subscriptions: Subscriptions::default(),
dispatch: HandlerTable::default(),
since_optimize: parsed_args.since_optimize,
threads: NotesHolderStorage::default(),
profiles: NotesHolderStorage::default(),
Expand All @@ -740,6 +752,19 @@ impl Damus {
}
}

pub fn set_reference(&mut self, reference: Weak<Mutex<Damus>>) {
self.reference = Some(reference);
}

pub fn reference(&self) -> DamusRef {
self.reference
.as_ref()
.expect("weak damus reference")
.upgrade()
.expect("strong damus reference")
.clone()
}

pub fn pool_mut(&mut self) -> &mut RelayPool {
&mut self.pool
}
Expand Down Expand Up @@ -803,9 +828,11 @@ impl Damus {
let mut config = Config::new();
config.set_ingester_threads(2);
Self {
reference: None,
debug,
unknown_ids: UnknownIds::default(),
subscriptions: Subscriptions::default(),
dispatch: HandlerTable::default(),
since_optimize: true,
threads: NotesHolderStorage::default(),
profiles: NotesHolderStorage::default(),
Expand Down Expand Up @@ -1056,3 +1083,84 @@ impl eframe::App for Damus {
render_damus(self, ctx);
}
}

use futures::SinkExt;
use std::sync::{Arc, Mutex, Weak};
use tokio::runtime::Handle;

pub type DamusRef = Arc<Mutex<Damus>>;

pub fn with_mut_damus<F, T>(damusref: &DamusRef, mut f: F) -> T
where
F: FnMut(&mut Damus) -> T,
{
let mut damus = damusref.as_ref().lock().unwrap();
f(&mut damus)
}

/// A wrapper so access to Damus can be synchronized
pub struct DamusApp {
damus: DamusRef,
}

impl DamusApp {
pub fn new(damus: DamusRef) -> Self {
if ONCEDAMUSREF.set(Arc::clone(&damus)).is_err() {
panic!("ONCEDAMUSREF was already initialized.");
}

let handle = tokio::runtime::Handle::try_current().expect("current tokio runtime");
if TOKIORUNTIME.set(handle.clone()).is_err() {
panic!("Failed to set Tokio runtime handle");
}

let weak_damus = Arc::downgrade(&damus);
damus.lock().unwrap().set_reference(weak_damus);
Self { damus }
}

pub fn with_mut_damus<F, T>(&mut self, f: F) -> T
where
F: FnMut(&mut Damus) -> T,
{
with_mut_damus(&self.damus, f)
}
}

impl eframe::App for DamusApp {
fn save(&mut self, storage: &mut dyn eframe::Storage) {
self.with_mut_damus(|damus| damus.save(storage))
}

fn update(&mut self, ctx: &egui::Context, frame: &mut eframe::Frame) {
self.with_mut_damus(|damus| damus.update(ctx, frame));
}
}

use once_cell::sync::OnceCell;

static ONCEDAMUSREF: OnceCell<Arc<Mutex<Damus>>> = OnceCell::new();
static TOKIORUNTIME: OnceCell<Handle> = OnceCell::new();

extern "C" fn ndb_sub_updated(_ctx: *mut std::ffi::c_void, subid: u64) {
let damus = ONCEDAMUSREF
.get()
.expect("ONCEDAMUSREF is not initialized.")
.lock()
.unwrap();

if let Some(sink) = damus.dispatch.get(&subid) {
let mut sink_clone = sink.clone();
if let Some(handler) = TOKIORUNTIME.get() {
handler.spawn(async move {
sink_clone
.sender
.send(dispatcher::Event::NdbSubUpdate)
.await
.ok();
});
} else {
eprintln!("Tokio runtime handle is not set.");
}
}
}
Loading

0 comments on commit 4162481

Please sign in to comment.