diff --git a/Cargo.lock b/Cargo.lock index fa20ffeec..f101289b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1427,6 +1427,7 @@ name = "nostr-database" version = "0.1.0" dependencies = [ "async-trait", + "async-utility", "flatbuffers", "nostr", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index ebbbc8d91..e07b56f90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ license = "MIT" [workspace.dependencies] async-trait = "0.1" +async-utility = "0.1" nostr = { version = "0.25", path = "./crates/nostr", default-features = false } nostr-database = { version = "0.1", path = "./crates/nostr-database", default-features = false } once_cell = "1.18" diff --git a/crates/nostr-database/Cargo.toml b/crates/nostr-database/Cargo.toml index 3831a66df..9ad409993 100644 --- a/crates/nostr-database/Cargo.toml +++ b/crates/nostr-database/Cargo.toml @@ -17,6 +17,7 @@ default = [] flatbuf = ["dep:flatbuffers"] [dependencies] +async-utility = { workspace = true } async-trait = { workspace = true } flatbuffers = { version = "23.5", optional = true } nostr = { workspace = true, features = ["std"] } diff --git a/crates/nostr-database/src/index.rs b/crates/nostr-database/src/index.rs index ff9520c37..2b0ef9846 100644 --- a/crates/nostr-database/src/index.rs +++ b/crates/nostr-database/src/index.rs @@ -7,6 +7,7 @@ use std::cmp::Ordering; use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; +use async_utility::thread; use nostr::event::id; use nostr::event::raw::RawEvent; use nostr::secp256k1::XOnlyPublicKey; @@ -209,38 +210,32 @@ impl DatabaseIndexes { where I: IntoIterator, { - let mut index = self.index.write().await; - let mut deleted = self.deleted.write().await; - - let mut to_discard: HashSet = HashSet::new(); let now = Timestamp::now(); + let mut handles = Vec::new(); events .into_iter() .filter(|raw| !raw.is_expired(&now) && !raw.is_ephemeral()) .for_each(|event| { - let _ = self.index_raw_event(&mut index, &mut deleted, &mut to_discard, event); + let this = self.clone(); + let handle = thread::spawn(async move { + this.index_raw_event(event).await.unwrap(); // TODO: remove unwrap? + }); + handles.push(handle); }); - // Remove events - if !to_discard.is_empty() { - index.retain(|e| !to_discard.contains(&e.event_id)); - deleted.extend(to_discard); + for handle in handles.into_iter().flatten() { + let _ = handle.join().await; // TODO: propagate error? } - - // TODO: return to_discard events? } - fn index_raw_event( - &self, - index: &mut BTreeSet, - deleted: &mut HashSet, - to_discard: &mut HashSet, - raw: RawEvent, - ) -> Result<(), Error> { + async fn index_raw_event(&self, raw: RawEvent) -> Result<(), Error> { // Parse event ID let event_id: EventId = EventId::from_slice(&raw.id)?; + let index = self.index.read().await; + let deleted = self.deleted.read().await; + // Check if was deleted if deleted.contains(&event_id) { return Ok(()); @@ -252,10 +247,11 @@ impl DatabaseIndexes { let kind = Kind::from(raw.kind); let mut should_insert: bool = true; + let mut to_discard: HashSet = HashSet::new(); if kind.is_replaceable() { let filter: FilterIndex = FilterIndex::default().author(pubkey_prefix).kind(kind); - for ev in self.internal_query(index, deleted, filter) { + for ev in self.internal_query(&index, &deleted, filter) { if ev.created_at > timestamp { should_insert = false; } else if ev.created_at <= timestamp { @@ -269,7 +265,7 @@ impl DatabaseIndexes { .author(pubkey_prefix) .kind(kind) .identifier(identifier); - for ev in self.internal_query(index, deleted, filter) { + for ev in self.internal_query(&index, &deleted, filter) { if ev.created_at >= timestamp { should_insert = false; } else if ev.created_at < timestamp { @@ -284,7 +280,7 @@ impl DatabaseIndexes { let ids = raw.event_ids(); let filter: Filter = Filter::new().ids(ids).until(timestamp); if !filter.ids.is_empty() { - for ev in self.internal_query(index, deleted, filter) { + for ev in self.internal_query(&index, &deleted, filter) { if ev.pubkey == pubkey_prefix { to_discard.insert(ev.event_id); } @@ -298,22 +294,37 @@ impl DatabaseIndexes { if coordinate_pubkey_prefix == pubkey_prefix { let filter: Filter = coordinate.into(); let filter: Filter = filter.until(timestamp); - for ev in self.internal_query(index, deleted, filter) { + for ev in self.internal_query(&index, &deleted, filter) { to_discard.insert(ev.event_id); } } } } - // Insert event - if should_insert { - index.insert(EventIndex { - created_at: timestamp, - event_id, - pubkey: pubkey_prefix, - kind, - tags: TagIndexes::from(raw.tags.into_iter()), - }); + // Drop aquired read locks + drop(index); + drop(deleted); + + if should_insert || !to_discard.is_empty() { + let mut index = self.index.write().await; + + // Remove events + if !to_discard.is_empty() { + let mut deleted = self.deleted.write().await; + index.retain(|e| !to_discard.contains(&e.event_id)); + deleted.extend(to_discard); + } + + // Insert event + if should_insert { + index.insert(EventIndex { + created_at: timestamp, + event_id, + pubkey: pubkey_prefix, + kind, + tags: TagIndexes::from(raw.tags.into_iter()), + }); + } } Ok(()) diff --git a/crates/nostr-sdk/Cargo.toml b/crates/nostr-sdk/Cargo.toml index 93aa55ea5..0228472d5 100644 --- a/crates/nostr-sdk/Cargo.toml +++ b/crates/nostr-sdk/Cargo.toml @@ -32,7 +32,7 @@ nip46 = ["nostr/nip46"] nip47 = ["nostr/nip47"] [dependencies] -async-utility = "0.1" +async-utility = { workspace = true } nostr = { workspace = true, features = ["std"] } nostr-database = { workspace = true } nostr-sdk-net = { version = "0.25", path = "../nostr-sdk-net" }