Skip to content

Commit

Permalink
WIP: Add dispatcher and task modules
Browse files Browse the repository at this point in the history
  • Loading branch information
ksedgwic committed Oct 17, 2024
1 parent 2a59106 commit 20ea787
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 1 deletion.
25 changes: 24 additions & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
app_style::user_requested_visuals_change,
args::Args,
column::Columns,
dispatcher::{self, HandlerTable},
draft::Drafts,
error::{Error, FilterError},
filter::{self, FilterState},
Expand All @@ -16,6 +17,7 @@ use crate::{
notes_holder::NotesHolderStorage,
profile::Profile,
subscriptions::{SubKind, Subscriptions},
task,
thread::Thread,
timeline::{Timeline, TimelineId, TimelineKind, ViewFilter},
ui::{self, DesktopSidePanel},
Expand All @@ -32,6 +34,7 @@ use egui_extras::{Size, StripBuilder};

use nostrdb::{Config, Filter, Ndb, Note, Transaction};

use futures::SinkExt;
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
Expand Down Expand Up @@ -61,6 +64,7 @@ pub struct Damus {
pub img_cache: ImageCache,
pub accounts: AccountManager,
pub subscriptions: Subscriptions,
pub dispatch: HandlerTable,

frame_history: crate::frame_history::FrameHistory,

Expand Down Expand Up @@ -472,6 +476,11 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
.insert("unknownids".to_string(), SubKind::OneShot);
setup_initial_nostrdb_subs(&damus.ndb, &mut damus.note_cache, &mut damus.columns)
.expect("home subscription failed");

let damusref = damus.reference();
tokio::spawn(async move {

Check failure on line 481 in src/app.rs

View workflow job for this annotation

GitHub Actions / Test Suite

`*mut u8` cannot be sent between threads safely

Check failure on line 481 in src/app.rs

View workflow job for this annotation

GitHub Actions / Clippy

`*mut u8` cannot be sent between threads safely
task::setup_user_relays(damusref).await;
});
}

DamusState::NewTimelineSub(new_timeline_id) => {
Expand Down Expand Up @@ -511,14 +520,26 @@ fn update_damus(damus: &mut Damus, ctx: &egui::Context) {
damus.columns.attempt_perform_deletion_request();
}

fn process_event(damus: &mut Damus, _subid: &str, event: &str) {
fn process_event(damus: &mut Damus, subid: &str, event: &str) {
#[cfg(feature = "profiling")]
puffin::profile_function!();

//info!("processing event {}", event);
if let Err(_err) = damus.ndb.process_event(event) {
error!("error processing event {}", event);
}

// Notify waiting subscribers that a pool event has happened
if let Some(handler) = damus.dispatch.get(subid) {
let mut handler_clone = handler.clone();
tokio::spawn(async move {
handler_clone
.sender
.send(dispatcher::Event::Pool)
.await
.ok();
});
}
}

fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> {
Expand Down Expand Up @@ -726,6 +747,7 @@ impl Damus {
debug,
unknown_ids: UnknownIds::default(),
subscriptions: Subscriptions::default(),
dispatch: HandlerTable::default(),
since_optimize: parsed_args.since_optimize,
threads: NotesHolderStorage::default(),
profiles: NotesHolderStorage::default(),
Expand Down Expand Up @@ -822,6 +844,7 @@ impl Damus {
debug,
unknown_ids: UnknownIds::default(),
subscriptions: Subscriptions::default(),
dispatch: HandlerTable::default(),
since_optimize: true,
threads: NotesHolderStorage::default(),
profiles: NotesHolderStorage::default(),
Expand Down
67 changes: 67 additions & 0 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use futures::channel::mpsc;
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use uuid::Uuid;

use nostrdb::Filter;

use crate::Damus;

#[allow(dead_code)] // until InternalError is used
#[derive(Debug)]
pub enum DispatcherError {
InternalError(String),
}

impl fmt::Display for DispatcherError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DispatcherError::InternalError(msg) => write!(f, "Internal error: {}", msg),
}
}
}

impl Error for DispatcherError {}

pub type DispatcherResult<T> = Result<T, DispatcherError>;

#[derive(Debug)]
pub enum Event {
Pool,
}

/// Used by the relay code to dispatch events to a waiting handlers
#[derive(Debug, Clone)]
pub struct SubscriptionHandler {
pub sender: mpsc::Sender<Event>,
}

/// Maps subscription id to handler for the subscription
pub type HandlerTable = HashMap<String, SubscriptionHandler>;

/// Used by async tasks to receive events
#[allow(dead_code)] // until id is read
#[derive(Debug)]
pub struct Subscription {
pub id: String,
pub receiver: mpsc::Receiver<Event>,
}

pub fn subscribe(
damus: &mut Damus,
filters: &[Filter],
bufsz: usize,
) -> DispatcherResult<Subscription> {
let (sender, receiver) = mpsc::channel::<Event>(bufsz);
let id = Uuid::new_v4().to_string();
damus
.dispatch
.insert(id.clone(), SubscriptionHandler { sender });
damus.pool.subscribe(id.clone(), filters.into());
Ok(Subscription { id, receiver })
}

pub fn _unsubscribe(_sub: Subscription) -> DispatcherResult<()> {
unimplemented!()
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod app_style;
mod args;
mod colors;
mod column;
mod dispatcher;
mod draft;
mod filter;
mod fonts;
Expand All @@ -32,6 +33,7 @@ pub mod relay_pool_manager;
mod result;
mod route;
mod subscriptions;
mod task;
mod test_data;
mod thread;
mod time;
Expand Down
73 changes: 73 additions & 0 deletions src/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use futures::stream::StreamExt;
use tracing::debug;

use nostrdb::{Filter, Ndb, Transaction};

use crate::dispatcher;
use crate::note::NoteRef;
use crate::{with_mut_damus, DamusRef};

pub async fn setup_user_relays(damusref: DamusRef) {
debug!("do_setup_user_relays starting");

let filter = with_mut_damus(&damusref, |damus| {
debug!("setup_user_relays: acquired damus for filter");

let account = damus
.accounts
.get_selected_account()
.as_ref()
.map(|a| a.pubkey.bytes())
.expect("selected account");

// NIP-65
Filter::new()
.authors([account])
.kinds([10002])
.limit(1)
.build()
});

let mut sub = with_mut_damus(&damusref, |mut damus| {
debug!("setup_user_relays: acquired damus for query + subscribe");
let txn = Transaction::new(&damus.ndb).expect("transaction");
let results = query_note_json(&damus.ndb, &txn, &filter);
debug!("setup_user_relays: query #1 results: {:#?}", results);

// Add a relay subscription to the pool
dispatcher::subscribe(&mut damus, &[filter.clone()], 10).expect("subscribe")
});
debug!("setup_user_relays: sub {}", sub.id);

loop {
match sub.receiver.next().await {
Some(ev) => {
debug!("setup_user_relays: saw {:?}", ev);
with_mut_damus(&damusref, |damus| {
let txn = Transaction::new(&damus.ndb).expect("transaction");
let results = query_note_json(&damus.ndb, &txn, &filter);
debug!("setup_user_relays: query #2 results: {:#?}", results);
})
}
None => {
debug!("setup_user_relays: saw None");
break;
}
}
}

debug!("do_setup_user_relays finished");
}

fn query_note_json<'a>(ndb: &Ndb, txn: &'a Transaction, filter: &Filter) -> Vec<String> {
let lim = filter.limit().unwrap_or(crate::filter::default_limit()) as i32;
let results = ndb
.query(&txn, &[filter.clone()], lim)
.expect("query results");
results
.iter()
.map(|qr| NoteRef::new(qr.note_key, qr.note.created_at()))
.filter_map(|nr| ndb.get_note_by_key(&txn, nr.key).ok())
.map(|n| n.json().unwrap())
.collect()
}

0 comments on commit 20ea787

Please sign in to comment.