diff --git a/Cargo.lock b/Cargo.lock index 1fafac8e1..c69d02349 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -566,31 +566,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-deque" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-epoch" -version = "0.9.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" -dependencies = [ - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" - [[package]] name = "crypto-common" version = "0.1.6" @@ -1451,7 +1426,6 @@ dependencies = [ "async-trait", "flatbuffers", "nostr", - "rayon", "thiserror", "tokio", "tracing", @@ -1802,26 +1776,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "rayon" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" -dependencies = [ - "either", - "rayon-core", -] - -[[package]] -name = "rayon-core" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" -dependencies = [ - "crossbeam-deque", - "crossbeam-utils", -] - [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/bindings/nostr-sdk-ffi/src/database.rs b/bindings/nostr-sdk-ffi/src/database.rs index 512b33d03..845220ae8 100644 --- a/bindings/nostr-sdk-ffi/src/database.rs +++ b/bindings/nostr-sdk-ffi/src/database.rs @@ -6,7 +6,7 @@ use std::ops::Deref; use std::sync::Arc; use nostr_ffi::{Event, EventId, Filter, PublicKey}; -use nostr_sdk::database::{DynNostrDatabase, IntoNostrDatabase, NostrDatabaseExt}; +use nostr_sdk::database::{DynNostrDatabase, IntoNostrDatabase, NostrDatabaseExt, Order}; use nostr_sdk::{block_on, SQLiteDatabase}; use uniffi::Object; @@ -78,7 +78,7 @@ impl NostrDatabase { .collect(); Ok(self .inner - .query(filters) + .query(filters, Order::Desc) .await? .into_iter() .map(|e| Arc::new(e.into())) diff --git a/bindings/nostr-sdk-js/src/database.rs b/bindings/nostr-sdk-js/src/database.rs index a598ac59b..8b51e64a8 100644 --- a/bindings/nostr-sdk-js/src/database.rs +++ b/bindings/nostr-sdk-js/src/database.rs @@ -9,7 +9,7 @@ use nostr_js::error::{into_err, Result}; use nostr_js::event::{JsEvent, JsEventArray, JsEventId}; use nostr_js::key::JsPublicKey; use nostr_js::message::JsFilter; -use nostr_sdk::database::{DynNostrDatabase, IntoNostrDatabase, NostrDatabaseExt}; +use nostr_sdk::database::{DynNostrDatabase, IntoNostrDatabase, NostrDatabaseExt, Order}; use nostr_sdk::WebDatabase; use wasm_bindgen::prelude::*; @@ -94,7 +94,7 @@ impl JsNostrDatabase { let filters = filters.into_iter().map(|f| f.inner()).collect(); Ok(self .inner - .query(filters) + .query(filters, Order::Desc) .await .map_err(into_err)? .into_iter() diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 000000000..d707fe59e --- /dev/null +++ b/clippy.toml @@ -0,0 +1 @@ +too-many-arguments-threshold = 13 \ No newline at end of file diff --git a/crates/nostr-database/Cargo.toml b/crates/nostr-database/Cargo.toml index 50d068d3a..591508c6e 100644 --- a/crates/nostr-database/Cargo.toml +++ b/crates/nostr-database/Cargo.toml @@ -20,7 +20,6 @@ flatbuf = ["dep:flatbuffers"] async-trait.workspace = true flatbuffers = { version = "23.5", optional = true } nostr = { workspace = true, features = ["std"] } -rayon = "1.8" thiserror.workspace = true tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true, features = ["std", "attributes"] } diff --git a/crates/nostr-database/Makefile b/crates/nostr-database/Makefile index d7691e91e..fa7b9ec9a 100644 --- a/crates/nostr-database/Makefile +++ b/crates/nostr-database/Makefile @@ -1,5 +1,3 @@ -all: build - flatbuf: flatc --rust -o ./src/flatbuffers ./fbs/event.fbs flatc --rust -o ./src/flatbuffers ./fbs/event_seen_by.fbs \ No newline at end of file diff --git a/crates/nostr-database/examples/indexes.rs b/crates/nostr-database/examples/indexes.rs index 5758e24c1..2323232e3 100644 --- a/crates/nostr-database/examples/indexes.rs +++ b/crates/nostr-database/examples/indexes.rs @@ -3,7 +3,7 @@ // Distributed under the MIT software license use nostr::prelude::*; -use nostr_database::DatabaseIndexes; +use nostr_database::{DatabaseIndexes, Order}; use tracing_subscriber::fmt::format::FmtSpan; #[tokio::main] @@ -59,12 +59,15 @@ async fn main() { } let ids = index - .query(vec![Filter::new() - .kinds(vec![Kind::Metadata, Kind::Custom(123), Kind::TextNote]) - .limit(20) - //.kind(Kind::Custom(123)) - //.identifier("myid5000") - .author(keys_a.public_key())]) + .query( + vec![Filter::new() + .kinds(vec![Kind::Metadata, Kind::Custom(123), Kind::TextNote]) + .limit(20) + //.kind(Kind::Custom(123)) + //.identifier("myid5000") + .author(keys_a.public_key())], + Order::Desc, + ) .await; println!("Got {} ids", ids.len()); diff --git a/crates/nostr-database/examples/memory.rs b/crates/nostr-database/examples/memory.rs index 867d7f8ef..aedf622c6 100644 --- a/crates/nostr-database/examples/memory.rs +++ b/crates/nostr-database/examples/memory.rs @@ -5,7 +5,7 @@ use nostr::prelude::*; use nostr::{EventBuilder, Filter, Keys, Kind, Metadata, Tag}; use nostr_database::memory::MemoryDatabase; -use nostr_database::{DatabaseOptions, NostrDatabase}; +use nostr_database::{DatabaseOptions, NostrDatabase, Order}; use tracing_subscriber::fmt::format::FmtSpan; #[tokio::main] @@ -63,12 +63,15 @@ async fn main() { } let events = database - .query(vec![Filter::new() - .kinds(vec![Kind::Metadata, Kind::Custom(123), Kind::TextNote]) - .limit(20) - //.kind(Kind::Custom(123)) - //.identifier("myid5000") - .author(keys_a.public_key())]) + .query( + vec![Filter::new() + .kinds(vec![Kind::Metadata, Kind::Custom(123), Kind::TextNote]) + .limit(20) + //.kind(Kind::Custom(123)) + //.identifier("myid5000") + .author(keys_a.public_key())], + Order::Desc, + ) .await .unwrap(); println!("Got {} events", events.len()); diff --git a/crates/nostr-database/src/index.rs b/crates/nostr-database/src/index.rs index 988de8b16..29b475133 100644 --- a/crates/nostr-database/src/index.rs +++ b/crates/nostr-database/src/index.rs @@ -12,12 +12,12 @@ use nostr::event::id; use nostr::nips::nip01::Coordinate; use nostr::secp256k1::XOnlyPublicKey; use nostr::{Alphabet, Event, EventId, Filter, GenericTagValue, Kind, Timestamp}; -use rayon::prelude::*; use thiserror::Error; use tokio::sync::RwLock; use crate::raw::RawEvent; use crate::tag_indexes::{TagIndexValues, TagIndexes}; +use crate::Order; /// Public Key Prefix Size const PUBLIC_KEY_PREFIX_SIZE: usize = 8; @@ -28,19 +28,25 @@ enum Error { EventId(#[from] id::Error), } +type ArcEventId = Arc; +type ArcEventIndex = Arc; +type ArcTagIndexes = Arc; +type ParameterizedReplaceableIndexes = + HashMap<(Kind, PublicKeyPrefix, ArcTagIndexes), ArcEventIndex>; + /// Event Index -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] struct EventIndex { /// Timestamp (seconds) created_at: Timestamp, /// Event ID - event_id: EventId, + event_id: ArcEventId, /// Public key prefix pubkey: PublicKeyPrefix, /// Kind kind: Kind, /// Tag indexes - tags: TagIndexes, + tags: ArcTagIndexes, } impl PartialOrd for EventIndex { @@ -64,10 +70,10 @@ impl TryFrom for EventIndex { fn try_from(raw: RawEvent) -> Result { Ok(Self { created_at: raw.created_at, - event_id: EventId::from_slice(&raw.id)?, + event_id: Arc::new(EventId::from_slice(&raw.id)?), pubkey: PublicKeyPrefix::from(raw.pubkey), kind: raw.kind, - tags: TagIndexes::from(raw.tags.into_iter()), + tags: Arc::new(TagIndexes::from(raw.tags.into_iter())), }) } } @@ -76,10 +82,10 @@ impl From<&Event> for EventIndex { fn from(e: &Event) -> Self { Self { created_at: e.created_at, - event_id: e.id, + event_id: Arc::new(e.id), pubkey: PublicKeyPrefix::from(e.pubkey), kind: e.kind, - tags: TagIndexes::from(e.tags.iter().map(|t| t.as_vec())), + tags: Arc::new(TagIndexes::from(e.tags.iter().map(|t| t.as_vec()))), } } } @@ -203,6 +209,123 @@ impl From for FilterIndex { } } +#[allow(missing_docs)] +pub enum EventOrRawEvent<'a> { + Event(&'a Event), + EventOwned(Event), + Raw(RawEvent), +} + +impl<'a> From for EventOrRawEvent<'a> { + fn from(value: Event) -> Self { + Self::EventOwned(value) + } +} + +impl<'a> From<&'a Event> for EventOrRawEvent<'a> { + fn from(value: &'a Event) -> Self { + Self::Event(value) + } +} + +impl<'a> From for EventOrRawEvent<'a> { + fn from(value: RawEvent) -> Self { + Self::Raw(value) + } +} + +impl<'a> EventOrRawEvent<'a> { + fn pubkey(&self) -> PublicKeyPrefix { + match self { + Self::Event(e) => PublicKeyPrefix::from(e.pubkey), + Self::EventOwned(e) => PublicKeyPrefix::from(e.pubkey), + Self::Raw(r) => PublicKeyPrefix::from(r.pubkey), + } + } + + fn created_at(&self) -> Timestamp { + match self { + Self::Event(e) => e.created_at, + Self::EventOwned(e) => e.created_at, + Self::Raw(r) => r.created_at, + } + } + + fn kind(&self) -> Kind { + match self { + Self::Event(e) => e.kind, + Self::EventOwned(e) => e.kind, + Self::Raw(r) => r.kind, + } + } + + fn tags(self) -> TagIndexes { + match self { + Self::Event(e) => TagIndexes::from(e.tags.iter().map(|t| t.as_vec())), + Self::EventOwned(e) => TagIndexes::from(e.tags.iter().map(|t| t.as_vec())), + Self::Raw(r) => TagIndexes::from(r.tags.into_iter()), + } + } + + fn identifier(&self) -> Option<&str> { + match self { + Self::Event(e) => e.identifier(), + Self::EventOwned(e) => e.identifier(), + Self::Raw(r) => r.identifier(), + } + } + + fn event_ids(&self) -> Box + '_> { + match self { + Self::Event(e) => Box::new(e.event_ids().copied()), + Self::EventOwned(e) => Box::new(e.event_ids().copied()), + Self::Raw(r) => Box::new(r.event_ids()), + } + } + + fn coordinates(&self) -> Box + '_> { + match self { + Self::Event(e) => Box::new(e.coordinates()), + Self::EventOwned(e) => Box::new(e.coordinates()), + Self::Raw(r) => Box::new(r.coordinates()), + } + } +} + +enum QueryPattern { + Replaceable, + ParamReplaceable, + Generic, +} + +impl From<&Filter> for QueryPattern { + fn from(filter: &Filter) -> Self { + let kinds_len = filter.kinds.len(); + let first_kind = filter.kinds.iter().next(); + let authors_len = filter.authors.len(); + let ids_len = filter.ids.len(); + let generic_tags_len = filter.generic_tags.len(); + + if kinds_len == 1 + && first_kind.map_or(false, |k| k.is_replaceable()) + && authors_len == 1 + && ids_len == 0 + && generic_tags_len == 0 + { + Self::Replaceable + } else if kinds_len == 1 + && first_kind.map_or(false, |k| k.is_parameterized_replaceable()) + && authors_len == 1 + && generic_tags_len != 0 + && ids_len == 0 + { + Self::ParamReplaceable + } else { + Self::Generic + } + } +} + /// Event Index Result #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct EventIndexResult { @@ -215,8 +338,14 @@ pub struct EventIndexResult { /// Database Indexes #[derive(Debug, Clone, Default)] pub struct DatabaseIndexes { - index: Arc>>, - deleted_ids: Arc>>, + index: Arc>>, + /// Event IDs index + ids_index: Arc>>, + /// Replaceable index + kind_author_index: Arc>>, + /// Param. replaceable index + kind_author_tags_index: Arc>, + deleted_ids: Arc>>, deleted_coordinates: Arc>>, } @@ -228,135 +357,197 @@ impl DatabaseIndexes { /// Bulk index #[tracing::instrument(skip_all)] - pub async fn bulk_index(&self, events: BTreeSet) -> HashSet { + pub async fn bulk_index<'a, E>(&self, events: BTreeSet) -> HashSet + where + E: Into>, + { let mut index = self.index.write().await; + let mut ids_index = self.ids_index.write().await; + let mut kind_author_index = self.kind_author_index.write().await; + let mut kind_author_tags_index = self.kind_author_tags_index.write().await; let mut deleted_ids = self.deleted_ids.write().await; let mut deleted_coordinates = self.deleted_coordinates.write().await; let mut to_discard: HashSet = HashSet::new(); - let now = Timestamp::now(); + let now: Timestamp = Timestamp::now(); events .into_iter() - .filter(|raw| !raw.kind.is_ephemeral()) + .map(|e| e.into()) + .filter(|e| !e.kind().is_ephemeral()) .for_each(|event| { - let _ = self.index_raw_event( + let res = self.internal_index_event( &mut index, + &mut ids_index, + &mut kind_author_index, + &mut kind_author_tags_index, &mut deleted_ids, &mut deleted_coordinates, - &mut to_discard, event, &now, ); + if let Ok(res) = res { + to_discard.extend(res.to_discard); + } }); - // Remove events - if !to_discard.is_empty() { - index.retain(|e| !to_discard.contains(&e.event_id)); - deleted_ids.par_extend(to_discard.par_iter()); - } - to_discard } - fn index_raw_event( + fn internal_index_event<'a, E>( &self, - index: &mut BTreeSet, - deleted_ids: &mut HashSet, + index: &mut BTreeSet, + ids_index: &mut HashMap, + kind_author_index: &mut HashMap<(Kind, PublicKeyPrefix), ArcEventIndex>, + kind_author_tags_index: &mut ParameterizedReplaceableIndexes, + deleted_ids: &mut HashSet, deleted_coordinates: &mut HashMap, - to_discard: &mut HashSet, - raw: RawEvent, + event: E, now: &Timestamp, - ) -> Result<(), Error> { + ) -> Result + where + E: Into>, + { + let event = event.into(); + // Parse event ID - let event_id: EventId = EventId::from_slice(&raw.id)?; + let event_id: ArcEventId = match &event { + EventOrRawEvent::Event(e) => Arc::new(e.id), + EventOrRawEvent::EventOwned(e) => Arc::new(e.id), + EventOrRawEvent::Raw(r) => Arc::new(EventId::from_slice(&r.id)?), + }; // Check if was deleted if deleted_ids.contains(&event_id) { - return Ok(()); + return Ok(EventIndexResult { + to_store: false, + to_discard: HashSet::new(), + }); } + let mut to_discard: HashSet = HashSet::new(); + // Check if is expired - if raw.is_expired(now) { - to_discard.insert(event_id); - return Ok(()); + if let EventOrRawEvent::Raw(raw) = &event { + if raw.is_expired(now) { + let mut to_discard = HashSet::with_capacity(1); + to_discard.insert(*event_id); + return Ok(EventIndexResult { + to_store: false, + to_discard, + }); + } } // Compose others fields - let pubkey_prefix: PublicKeyPrefix = PublicKeyPrefix::from(raw.pubkey); + let pubkey_prefix: PublicKeyPrefix = event.pubkey(); + let created_at: Timestamp = event.created_at(); + let kind: Kind = event.kind(); let mut should_insert: bool = true; - if raw.kind.is_replaceable() { - let filter: FilterIndex = FilterIndex::default().author(pubkey_prefix).kind(raw.kind); - for ev in self.internal_query(index, deleted_ids, filter) { - if ev.created_at > raw.created_at { + if kind.is_replaceable() { + let filter: FilterIndex = FilterIndex::default().author(pubkey_prefix).kind(kind); + if let Some(ev) = + self.internal_query_by_kind_and_author(kind_author_index, deleted_ids, filter) + { + if ev.created_at > created_at || ev.event_id == event_id { should_insert = false; - } else if ev.created_at <= raw.created_at { - to_discard.insert(ev.event_id); + } else { + to_discard.insert(ev.clone()); } } - } else if raw.kind.is_parameterized_replaceable() { - match raw.identifier() { + } else if kind.is_parameterized_replaceable() { + match event.identifier() { Some(identifier) => { let filter: FilterIndex = FilterIndex::default() .author(pubkey_prefix) - .kind(raw.kind) + .kind(kind) .identifier(identifier); - for ev in self.internal_query(index, deleted_ids, filter) { - if ev.created_at >= raw.created_at { + if let Some(ev) = self.internal_query_by_kind_author_tag( + kind_author_tags_index, + deleted_ids, + filter, + ) { + if ev.created_at > created_at || ev.event_id == event_id { should_insert = false; - } else if ev.created_at < raw.created_at { - to_discard.insert(ev.event_id); + } else { + to_discard.insert(ev.clone()); } } } None => should_insert = false, } - } else if raw.kind == Kind::EventDeletion { + } else if kind == Kind::EventDeletion { // Check `e` tags - let ids = raw.event_ids(); - let filter: Filter = Filter::new().ids(ids).until(raw.created_at); - if !filter.ids.is_empty() { - to_discard.par_extend( - self.internal_parallel_query(index, deleted_ids, filter) - .filter(|ev| ev.pubkey == pubkey_prefix) - .map(|ev| ev.event_id), - ); + for id in event.event_ids() { + if let Some(ev) = ids_index.get(&Arc::new(id)) { + if ev.pubkey == pubkey_prefix && ev.created_at <= created_at { + to_discard.insert(ev.clone()); + } + } } // Check `a` tags - for coordinate in raw.coordinates() { + for coordinate in event.coordinates() { let coordinate_pubkey_prefix: PublicKeyPrefix = PublicKeyPrefix::from(coordinate.pubkey); if coordinate_pubkey_prefix == pubkey_prefix { // Save deleted coordinate at certain timestamp - deleted_coordinates.insert(coordinate.clone(), raw.created_at); + deleted_coordinates.insert(coordinate.clone(), created_at); let filter: Filter = coordinate.into(); - let filter: Filter = filter.until(raw.created_at); + let filter: Filter = filter.until(created_at); // Not check if ev.pubkey match the pubkey_prefix because asume that query // returned only the events owned by pubkey_prefix - to_discard.par_extend( - self.internal_parallel_query(index, deleted_ids, filter) - .map(|ev| ev.event_id), + to_discard.extend( + self.internal_generic_query(index, deleted_ids, filter) + .cloned(), ); } } } + // Remove events + if !to_discard.is_empty() { + for ev in to_discard.iter() { + index.remove(ev); + ids_index.remove(&ev.event_id); + + if ev.kind.is_replaceable() { + kind_author_index.remove(&(ev.kind, ev.pubkey)); + } else if ev.kind.is_parameterized_replaceable() { + kind_author_tags_index.remove(&(ev.kind, ev.pubkey, ev.tags.clone())); + } + } + + deleted_ids.extend(to_discard.iter().map(|ev| ev.event_id.clone())); + } + // Insert event if should_insert { - index.insert(EventIndex { - created_at: raw.created_at, - event_id, + let e: ArcEventIndex = Arc::new(EventIndex { + created_at, + event_id: event_id.clone(), pubkey: pubkey_prefix, - kind: raw.kind, - tags: TagIndexes::from(raw.tags.into_iter()), + kind, + tags: Arc::new(event.tags()), }); + + index.insert(e.clone()); + ids_index.insert(event_id, e.clone()); + + if kind.is_replaceable() { + kind_author_index.insert((kind, pubkey_prefix), e); + } else if kind.is_parameterized_replaceable() { + kind_author_tags_index.insert((kind, pubkey_prefix, e.tags.clone()), e); + } } - Ok(()) + Ok(EventIndexResult { + to_store: should_insert, + to_discard: to_discard.into_iter().map(|ev| *ev.event_id).collect(), + }) } /// Index [`Event`] @@ -371,139 +562,165 @@ impl DatabaseIndexes { // Acquire write lock let mut index = self.index.write().await; + let mut ids_index = self.ids_index.write().await; + let mut kind_author_index = self.kind_author_index.write().await; + let mut kind_author_tags_index = self.kind_author_tags_index.write().await; let mut deleted_ids = self.deleted_ids.write().await; let mut deleted_coordinates = self.deleted_coordinates.write().await; - let mut should_insert: bool = true; - let mut to_discard: HashSet = HashSet::new(); - - // Check if was deleted - if deleted_ids.contains(&event.id) { - to_discard.insert(event.id); - return EventIndexResult { - to_store: false, - to_discard, - }; - } + let now = Timestamp::now(); - if event.is_replaceable() { - let filter: Filter = Filter::new().author(event.pubkey).kind(event.kind); - for ev in self.internal_query(&index, &deleted_ids, filter) { - if ev.created_at > event.created_at { - should_insert = false; - } else if ev.created_at <= event.created_at { - to_discard.insert(ev.event_id); - } - } - } else if event.is_parameterized_replaceable() { - match event.identifier() { - Some(identifier) => { - let filter: Filter = Filter::new() - .author(event.pubkey) - .kind(event.kind) - .identifier(identifier); - for ev in self.internal_query(&index, &deleted_ids, filter) { - if ev.created_at >= event.created_at { - should_insert = false; - } else if ev.created_at < event.created_at { - to_discard.insert(ev.event_id); - } - } - } - None => should_insert = false, - } - } else if event.kind == Kind::EventDeletion { - let pubkey_prefix: PublicKeyPrefix = PublicKeyPrefix::from(event.pubkey); + self.internal_index_event( + &mut index, + &mut ids_index, + &mut kind_author_index, + &mut kind_author_tags_index, + &mut deleted_ids, + &mut deleted_coordinates, + event, + &now, + ) + .unwrap_or_default() + } - // Check `e` tags - let ids = event.event_ids().copied(); - let filter: Filter = Filter::new().ids(ids).until(event.created_at); - if !filter.ids.is_empty() { - to_discard.par_extend( - self.internal_parallel_query(&index, &deleted_ids, filter) - .filter(|ev| ev.pubkey == pubkey_prefix) - .map(|ev| ev.event_id), - ); - } + /// Query by [`Kind`] and [`PublicKeyPrefix`] (replaceable) + fn internal_query_by_kind_and_author<'a, T>( + &self, + kind_author_index: &'a HashMap<(Kind, PublicKeyPrefix), ArcEventIndex>, + deleted_ids: &'a HashSet, + filter: T, + ) -> Option<&'a ArcEventIndex> + where + T: Into, + { + let FilterIndex { + authors, + kinds, + since, + until, + .. + } = filter.into(); + + let kind = kinds.iter().next()?; + let author = authors.iter().next()?; + + if !kind.is_replaceable() { + return None; + } - // Check `a` tags - for coordinate in event.coordinates() { - let coordinate_pubkey_prefix: PublicKeyPrefix = - PublicKeyPrefix::from(coordinate.pubkey); - if coordinate_pubkey_prefix == pubkey_prefix { - // Save deleted coordinate at certain timestamp - deleted_coordinates.insert(coordinate.clone(), event.created_at); + let ev = kind_author_index.get(&(*kind, *author))?; - let filter: Filter = coordinate.into(); - let filter: Filter = filter.until(event.created_at); - to_discard.par_extend( - self.internal_parallel_query(&index, &deleted_ids, filter) - .map(|ev| ev.event_id), - ); - } - } + if deleted_ids.contains(&ev.event_id) { + return None; } - // Remove events - if !to_discard.is_empty() { - index.retain(|e| !to_discard.contains(&e.event_id)); - deleted_ids.par_extend(to_discard.par_iter()); + if let Some(since) = since { + if ev.created_at < since { + return None; + } } - // Insert event - if should_insert { - index.insert(EventIndex::from(event)); + if let Some(until) = until { + if ev.created_at > until { + return None; + } } - EventIndexResult { - to_store: should_insert, - to_discard, - } + Some(ev) } - fn internal_query<'a, T>( + /// Query by [`Kind`], [`PublicKeyPrefix`] and [`TagIndexes`] (param. replaceable) + fn internal_query_by_kind_author_tag<'a, T>( &self, - index: &'a BTreeSet, - deleted_ids: &'a HashSet, + kind_author_tag_index: &'a ParameterizedReplaceableIndexes, + deleted_ids: &'a HashSet, filter: T, - ) -> impl Iterator + ) -> Option<&'a ArcEventIndex> where T: Into, { - self.internal_parallel_query(index, deleted_ids, filter) - .collect::>() - .into_iter() + let FilterIndex { + authors, + kinds, + since, + until, + generic_tags, + .. + } = filter.into(); + + let kind = kinds.iter().next()?; + let author = authors.iter().next()?; + + if !kind.is_parameterized_replaceable() { + return None; + } + + let tags = { + let mut tag_index: TagIndexes = TagIndexes::default(); + for (tagnamechar, set) in generic_tags.into_iter() { + for inner in TagIndexValues::iter(set.iter()) { + tag_index.entry(tagnamechar).or_default().insert(inner); + } + } + Arc::new(tag_index) + }; + + let ev = kind_author_tag_index.get(&(*kind, *author, tags))?; + + if deleted_ids.contains(&ev.event_id) { + return None; + } + + if let Some(since) = since { + if ev.created_at < since { + return None; + } + } + + if let Some(until) = until { + if ev.created_at > until { + return None; + } + } + + Some(ev) } - fn internal_parallel_query<'a, T>( + /// Generic query + fn internal_generic_query<'a, T>( &self, - index: &'a BTreeSet, - deleted_ids: &'a HashSet, + index: &'a BTreeSet, + deleted_ids: &'a HashSet, filter: T, - ) -> impl ParallelIterator + ) -> impl Iterator where T: Into, { let filter: FilterIndex = filter.into(); - index.par_iter().filter(move |event| { + index.iter().filter(move |event| { !deleted_ids.contains(&event.event_id) && filter.match_event(event) }) } /// Query #[tracing::instrument(skip_all, level = "trace")] - pub async fn query(&self, filters: I) -> Vec + pub async fn query(&self, filters: I, order: Order) -> Vec where I: IntoIterator, { let index = self.index.read().await; + let kind_author_index = self.kind_author_index.read().await; + let kind_author_tags_index = self.kind_author_tags_index.read().await; let deleted_ids = self.deleted_ids.read().await; - let mut matching_ids: BTreeSet<&EventIndex> = BTreeSet::new(); + let mut matching_ids: BTreeSet<&ArcEventIndex> = BTreeSet::new(); for filter in filters.into_iter() { if filter.is_empty() { - return index.iter().map(|e| e.event_id).collect(); + return match order { + Order::Asc => index.iter().map(|e| *e.event_id).rev().collect(), + Order::Desc => index.iter().map(|e| *e.event_id).collect(), + }; } if let (Some(since), Some(until)) = (filter.since, filter.until) { @@ -512,18 +729,50 @@ impl DatabaseIndexes { } } - if let Some(limit) = filter.limit { - matching_ids.par_extend( - self.internal_query(&index, &deleted_ids, filter) - .take(limit) - .par_bridge(), - ) - } else { - matching_ids.par_extend(self.internal_parallel_query(&index, &deleted_ids, filter)) + match QueryPattern::from(&filter) { + QueryPattern::Replaceable => { + if let Some(ev) = self.internal_query_by_kind_and_author( + &kind_author_index, + &deleted_ids, + filter, + ) { + matching_ids.insert(ev); + }; + } + QueryPattern::ParamReplaceable => { + if let Some(ev) = self.internal_query_by_kind_author_tag( + &kind_author_tags_index, + &deleted_ids, + filter, + ) { + matching_ids.insert(ev); + }; + } + QueryPattern::Generic => { + if let Some(limit) = filter.limit { + matching_ids.extend( + self.internal_generic_query(&index, &deleted_ids, filter) + .take(limit), + ) + } else { + matching_ids.extend(self.internal_generic_query( + &index, + &deleted_ids, + filter, + )) + } + } } } - matching_ids.into_iter().map(|e| e.event_id).collect() + match order { + Order::Asc => matching_ids + .into_iter() + .map(|ev| *ev.event_id) + .rev() + .collect(), + Order::Desc => matching_ids.into_iter().map(|ev| *ev.event_id).collect(), + } } /// Count events @@ -551,7 +800,7 @@ impl DatabaseIndexes { let limit: Option = filter.limit; let count = self - .internal_parallel_query(&index, &deleted_ids, filter) + .internal_generic_query(&index, &deleted_ids, filter) .count(); if let Some(limit) = limit { let count = if limit >= count { limit } else { count }; @@ -597,113 +846,176 @@ impl DatabaseIndexes { #[cfg(test)] mod tests { - use nostr::nips::nip01::Coordinate; use nostr::secp256k1::SecretKey; - use nostr::{EventBuilder, FromBech32, Keys, Tag}; + use nostr::{FromBech32, JsonUtil, Keys}; use super::*; - const SECRET_KEY_A: &str = "nsec1ufnus6pju578ste3v90xd5m2decpuzpql2295m3sknqcjzyys9ls0qlc85"; - const SECRET_KEY_B: &str = "nsec1j4c6269y9w0q2er2xjw8sv2ehyrtfxq3jwgdlxj6qfn8z4gjsq5qfvfk99"; + const SECRET_KEY_A: &str = "nsec1j4c6269y9w0q2er2xjw8sv2ehyrtfxq3jwgdlxj6qfn8z4gjsq5qfvfk99"; // aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4 + const SECRET_KEY_B: &str = "nsec1ufnus6pju578ste3v90xd5m2decpuzpql2295m3sknqcjzyys9ls0qlc85"; // 79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3 + + const EVENTS: [&str; 13] = [ + r#"{"id":"b7b1fb52ad8461a03e949820ae29a9ea07e35bcd79c95c4b59b0254944f62805","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644581,"kind":1,"tags":[],"content":"Text note","sig":"ed73a8a4e7c26cd797a7b875c634d9ecb6958c57733305fed23b978109d0411d21b3e182cb67c8ad750884e30ca383b509382ae6187b36e76ee76e6a142c4284"}"#, + r#"{"id":"7296747d91c53f1d71778ef3e12d18b66d494a41f688ef244d518abf37c959b6","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644586,"kind":32121,"tags":[["d","id-1"]],"content":"Empty 1","sig":"8848989a8e808f7315e950f871b231c1dff7752048f8957d4a541881d2005506c30e85c7dd74dab022b3e01329c88e69c9d5d55d961759272a738d150b7dbefc"}"#, + r#"{"id":"ec6ea04ba483871062d79f78927df7979f67545b53f552e47626cb1105590442","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644591,"kind":32122,"tags":[["d","id-1"]],"content":"Empty 2","sig":"89946113a97484850fe35fefdb9120df847b305de1216dae566616fe453565e8707a4da7e68843b560fa22a932f81fc8db2b5a2acb4dcfd3caba9a91320aac92"}"#, + r#"{"id":"63b8b829aa31a2de870c3a713541658fcc0187be93af2032ec2ca039befd3f70","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644596,"kind":32122,"tags":[["d","id-2"]],"content":"","sig":"607b1a67bef57e48d17df4e145718d10b9df51831d1272c149f2ab5ad4993ae723f10a81be2403ae21b2793c8ed4c129e8b031e8b240c6c90c9e6d32f62d26ff"}"#, + r#"{"id":"6fe9119c7db13ae13e8ecfcdd2e5bf98e2940ba56a2ce0c3e8fba3d88cd8e69d","pubkey":"79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3","created_at":1704644601,"kind":32122,"tags":[["d","id-3"]],"content":"","sig":"d07146547a726fc9b4ec8d67bbbe690347d43dadfe5d9890a428626d38c617c52e6945f2b7144c4e0c51d1e2b0be020614a5cadc9c0256b2e28069b70d9fc26e"}"#, + r#"{"id":"a82f6ebfc709f4e7c7971e6bf738e30a3bc112cfdb21336054711e6779fd49ef","pubkey":"79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3","created_at":1704644606,"kind":32122,"tags":[["d","id-1"]],"content":"","sig":"96d3349b42ed637712b4d07f037457ab6e9180d58857df77eb5fa27ff1fd68445c72122ec53870831ada8a4d9a0b484435f80d3ff21a862238da7a723a0d073c"}"#, + r#"{"id":"8ab0cb1beceeb68f080ec11a3920b8cc491ecc7ec5250405e88691d733185832","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644611,"kind":32122,"tags":[["d","id-1"]],"content":"Test","sig":"49153b482d7110e2538eb48005f1149622247479b1c0057d902df931d5cea105869deeae908e4e3b903e3140632dc780b3f10344805eab77bb54fb79c4e4359d"}"#, + r#"{"id":"63dc49a8f3278a2de8dc0138939de56d392b8eb7a18c627e4d78789e2b0b09f2","pubkey":"79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3","created_at":1704644616,"kind":5,"tags":[["a","32122:aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4:"]],"content":"","sig":"977e54e5d57d1fbb83615d3a870037d9eb5182a679ca8357523bbf032580689cf481f76c88c7027034cfaf567ba9d9fe25fc8cd334139a0117ad5cf9fe325eef"}"#, + r#"{"id":"6975ace0f3d66967f330d4758fbbf45517d41130e2639b54ca5142f37757c9eb","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644621,"kind":5,"tags":[["a","32122:aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4:id-2"]],"content":"","sig":"9bb09e4759899d86e447c3fa1be83905fe2eda74a5068a909965ac14fcdabaed64edaeb732154dab734ca41f2fc4d63687870e6f8e56e3d9e180e4a2dd6fb2d2"}"#, + r#"{"id":"33f5b4e6a38e107638c20f4536db35191d4b8651ba5a2cefec983b9ec2d65084","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704645586,"kind":0,"tags":[],"content":"{\"name\":\"Key A\"}","sig":"285d090f45a6adcae717b33771149f7840a8c27fb29025d63f1ab8d95614034a54e9f4f29cee9527c4c93321a7ebff287387b7a19ba8e6f764512a40e7120429"}"#, + r#"{"id":"90a761aec9b5b60b399a76826141f529db17466deac85696a17e4a243aa271f9","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704645606,"kind":0,"tags":[],"content":"{\"name\":\"key-a\",\"display_name\":\"Key A\",\"lud16\":\"keya@ln.address\"}","sig":"ec8f49d4c722b7ccae102d49befff08e62db775e5da43ef51b25c47dfdd6a09dc7519310a3a63cbdb6ec6b3250e6f19518eb47be604edeb598d16cdc071d3dbc"}"#, + r#"{"id":"a295422c636d3532875b75739e8dae3cdb4dd2679c6e4994c9a39c7ebf8bc620","pubkey":"79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3","created_at":1704646569,"kind":5,"tags":[["e","90a761aec9b5b60b399a76826141f529db17466deac85696a17e4a243aa271f9"]],"content":"","sig":"d4dc8368a4ad27eef63cacf667345aadd9617001537497108234fc1686d546c949cbb58e007a4d4b632c65ea135af4fbd7a089cc60ab89b6901f5c3fc6a47b29"}"#, + r#"{"id":"999e3e270100d7e1eaa98fcfab4a98274872c1f2dfdab024f32e42a5a12d5b5e","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704646606,"kind":5,"tags":[["e","90a761aec9b5b60b399a76826141f529db17466deac85696a17e4a243aa271f9"]],"content":"","sig":"4f3a33fd52784cea7ca8428fd35d94d65049712e9aa11a70b1a16a1fcd761c7b7e27afac325728b1c00dfa11e33e78b2efd0430a7e4b28f4ede5b579b3f32614"}"#, + ]; #[tokio::test] async fn test_database_indexes() { - let indexes = DatabaseIndexes::new(); - // Keys let keys_a = Keys::new(SecretKey::from_bech32(SECRET_KEY_A).unwrap()); let keys_b = Keys::new(SecretKey::from_bech32(SECRET_KEY_B).unwrap()); - // Build some events - let events = [ - EventBuilder::new_text_note("Text note", []) - .to_event(&keys_a) - .unwrap(), - EventBuilder::new( - Kind::ParameterizedReplaceable(32121), - "Empty 1", - [Tag::Identifier(String::from("abdefgh:12345678"))], - ) - .to_event(&keys_a) - .unwrap(), - EventBuilder::new( - Kind::ParameterizedReplaceable(32122), - "Empty 2", - [Tag::Identifier(String::from("abdefgh:12345678"))], - ) - .to_event(&keys_a) - .unwrap(), - EventBuilder::new( - Kind::ParameterizedReplaceable(32122), - "", - [Tag::Identifier(String::from("ijklmnop:87654321"))], - ) - .to_event(&keys_a) - .unwrap(), - EventBuilder::new( - Kind::ParameterizedReplaceable(32122), - "", - [Tag::Identifier(String::from("abdefgh:87654321"))], - ) - .to_event(&keys_b) - .unwrap(), - EventBuilder::new( - Kind::ParameterizedReplaceable(32122), - "", - [Tag::Identifier(String::from("abdefgh:12345678"))], - ) - .to_event(&keys_b) - .unwrap(), - EventBuilder::new( - Kind::ParameterizedReplaceable(32122), - "Test", - [Tag::Identifier(String::from("abdefgh:12345678"))], - ) - .to_event(&keys_a) - .unwrap(), - ]; + let indexes = DatabaseIndexes::new(); - for event in events.iter() { - indexes.index_event(event).await; + // Build indexes + let mut events: BTreeSet = BTreeSet::new(); + for event in EVENTS.into_iter() { + let event = Event::from_json(event).unwrap(); + let raw: RawEvent = event.into(); + events.insert(raw); } + indexes.bulk_index(events).await; + + // Test expected output + let expected_output = vec![ + Event::from_json(EVENTS[12]).unwrap().id, + Event::from_json(EVENTS[11]).unwrap().id, + // Event 10 deleted by event 12 + // Event 9 replaced by event 10 + Event::from_json(EVENTS[8]).unwrap().id, + Event::from_json(EVENTS[7]).unwrap().id, + Event::from_json(EVENTS[6]).unwrap().id, + Event::from_json(EVENTS[5]).unwrap().id, + Event::from_json(EVENTS[4]).unwrap().id, + // Event 3 deleted by Event 8 + // Event 2 replaced by Event 6 + Event::from_json(EVENTS[1]).unwrap().id, + Event::from_json(EVENTS[0]).unwrap().id, + ]; + assert_eq!( + indexes.query([Filter::new()], Order::Desc).await, + expected_output + ); + assert_eq!(indexes.count([Filter::new()]).await, 9); + + // Test get previously deleted replaceable event (check if was deleted by indexes) + assert!(indexes + .query( + [Filter::new() + .kind(Kind::Metadata) + .author(keys_a.public_key())], + Order::Desc + ) + .await + .is_empty()); + + // Test get previously deleted param. replaceable event (check if was deleted by indexes) + assert!(indexes + .query( + [Filter::new() + .kind(Kind::ParameterizedReplaceable(32122)) + .author(keys_a.public_key()) + .identifier("id-2")], + Order::Desc + ) + .await + .is_empty()); - // Total events: 6 - - let filter = Filter::new(); - assert_eq!(indexes.count([filter]).await, 6); - - // Invalid event deletion (wrong signing keys) - let coordinate = - Coordinate::new(Kind::ParameterizedReplaceable(32122), keys_a.public_key()); - let event = EventBuilder::delete([coordinate]) - .to_event(&keys_b) - .unwrap(); - indexes.index_event(&event).await; - - // Total events: 7 (6 + 1) + // Test get param replaceable events WITHOUT using indexes (identifier not passed) + // Test ascending order + assert_eq!( + indexes + .query( + [Filter::new() + .kind(Kind::ParameterizedReplaceable(32122)) + .author(keys_b.public_key())], + Order::Asc + ) + .await, + vec![ + Event::from_json(EVENTS[4]).unwrap().id, + Event::from_json(EVENTS[5]).unwrap().id, + ] + ); - let filter = Filter::new(); - assert_eq!(indexes.count([filter]).await, 7); + // Test get param replaceable events using indexes + assert_eq!( + indexes + .query( + [Filter::new() + .kind(Kind::ParameterizedReplaceable(32122)) + .author(keys_b.public_key()) + .identifier("id-3")], + Order::Desc + ) + .await, + vec![Event::from_json(EVENTS[4]).unwrap().id,] + ); - // Delete valid event - let coordinate = - Coordinate::new(Kind::ParameterizedReplaceable(32122), keys_a.public_key()) - .identifier("ijklmnop:87654321"); - let event = EventBuilder::delete([coordinate]) - .to_event(&keys_a) - .unwrap(); - indexes.index_event(&event).await; + assert_eq!( + indexes + .query([Filter::new().author(keys_a.public_key())], Order::Desc) + .await, + vec![ + Event::from_json(EVENTS[12]).unwrap().id, + Event::from_json(EVENTS[8]).unwrap().id, + Event::from_json(EVENTS[6]).unwrap().id, + Event::from_json(EVENTS[1]).unwrap().id, + Event::from_json(EVENTS[0]).unwrap().id, + ] + ); - // Total events: 7 (7 - 1 + 1) + assert_eq!( + indexes + .query( + [Filter::new() + .author(keys_a.public_key()) + .kinds([Kind::TextNote, Kind::Custom(32121)])], + Order::Desc + ) + .await, + vec![ + Event::from_json(EVENTS[1]).unwrap().id, + Event::from_json(EVENTS[0]).unwrap().id, + ] + ); - // Check total number of indexes - let filter = Filter::new(); - assert_eq!(indexes.count([filter]).await, 7); + assert_eq!( + indexes + .query( + [Filter::new() + .authors([keys_a.public_key(), keys_b.public_key()]) + .kinds([Kind::TextNote, Kind::Custom(32121)])], + Order::Desc + ) + .await, + vec![ + Event::from_json(EVENTS[1]).unwrap().id, + Event::from_json(EVENTS[0]).unwrap().id, + ] + ); - // Check if query len and count match + // Test get param replaceable events using identifier assert_eq!( - indexes.query([Filter::new()]).await.len(), - indexes.count([Filter::new()]).await + indexes + .query([Filter::new().identifier("id-1")], Order::Desc) + .await, + vec![ + Event::from_json(EVENTS[6]).unwrap().id, + Event::from_json(EVENTS[5]).unwrap().id, + Event::from_json(EVENTS[1]).unwrap().id, + ] ); } } diff --git a/crates/nostr-database/src/lib.rs b/crates/nostr-database/src/lib.rs index 519856cd6..ee2f83543 100644 --- a/crates/nostr-database/src/lib.rs +++ b/crates/nostr-database/src/lib.rs @@ -52,6 +52,16 @@ pub enum Backend { Custom(String), } +/// Query result order +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum Order { + /// Ascending + Asc, + /// Descending (default) + #[default] + Desc, +} + /// A type-erased [`NostrDatabase`]. pub type DynNostrDatabase = dyn NostrDatabase; @@ -147,10 +157,14 @@ pub trait NostrDatabase: AsyncTraitDeps { async fn count(&self, filters: Vec) -> Result; /// Query store with filters - async fn query(&self, filters: Vec) -> Result, Self::Err>; + async fn query(&self, filters: Vec, order: Order) -> Result, Self::Err>; /// Get event IDs by filters - async fn event_ids_by_filters(&self, filters: Vec) -> Result, Self::Err>; + async fn event_ids_by_filters( + &self, + filters: Vec, + order: Order, + ) -> Result, Self::Err>; /// Get `negentropy` items async fn negentropy_items( @@ -167,12 +181,13 @@ pub trait NostrDatabase: AsyncTraitDeps { #[cfg_attr(not(target_arch = "wasm32"), async_trait)] pub trait NostrDatabaseExt: NostrDatabase { /// Get profile metadata + #[tracing::instrument(skip_all, level = "trace")] async fn profile(&self, public_key: XOnlyPublicKey) -> Result { let filter = Filter::new() .author(public_key) .kind(Kind::Metadata) .limit(1); - let events: Vec = self.query(vec![filter]).await?; + let events: Vec = self.query(vec![filter], Order::Desc).await?; match events.first() { Some(event) => match Metadata::from_json(&event.content) { Ok(metadata) => Ok(Profile::new(public_key, metadata)), @@ -186,6 +201,7 @@ pub trait NostrDatabaseExt: NostrDatabase { } /// Get contact list public keys + #[tracing::instrument(skip_all, level = "trace")] async fn contacts_public_keys( &self, public_key: XOnlyPublicKey, @@ -194,7 +210,7 @@ pub trait NostrDatabaseExt: NostrDatabase { .author(public_key) .kind(Kind::ContactList) .limit(1); - let events: Vec = self.query(vec![filter]).await?; + let events: Vec = self.query(vec![filter], Order::Desc).await?; match events.first() { Some(event) => Ok(event.public_keys().copied().collect()), None => Ok(Vec::new()), @@ -208,7 +224,7 @@ pub trait NostrDatabaseExt: NostrDatabase { .author(public_key) .kind(Kind::ContactList) .limit(1); - let events: Vec = self.query(vec![filter]).await?; + let events: Vec = self.query(vec![filter], Order::Desc).await?; match events.first() { Some(event) => { // Get contacts metadata @@ -216,7 +232,7 @@ pub trait NostrDatabaseExt: NostrDatabase { .authors(event.public_keys().copied()) .kind(Kind::Metadata); let mut contacts: HashSet = self - .query(vec![filter]) + .query(vec![filter], Order::Desc) .await? .into_iter() .map(|e| { @@ -323,13 +339,17 @@ impl NostrDatabase for EraseNostrDatabaseError { self.0.count(filters).await.map_err(Into::into) } - async fn query(&self, filters: Vec) -> Result, Self::Err> { - self.0.query(filters).await.map_err(Into::into) + async fn query(&self, filters: Vec, order: Order) -> Result, Self::Err> { + self.0.query(filters, order).await.map_err(Into::into) } - async fn event_ids_by_filters(&self, filters: Vec) -> Result, Self::Err> { + async fn event_ids_by_filters( + &self, + filters: Vec, + order: Order, + ) -> Result, Self::Err> { self.0 - .event_ids_by_filters(filters) + .event_ids_by_filters(filters, order) .await .map_err(Into::into) } diff --git a/crates/nostr-database/src/memory.rs b/crates/nostr-database/src/memory.rs index 97516fd67..42d4ef762 100644 --- a/crates/nostr-database/src/memory.rs +++ b/crates/nostr-database/src/memory.rs @@ -14,6 +14,7 @@ use tokio::sync::RwLock; use crate::{ Backend, DatabaseError, DatabaseIndexes, DatabaseOptions, EventIndexResult, NostrDatabase, + Order, }; /// Memory Database (RAM) @@ -165,9 +166,9 @@ impl NostrDatabase for MemoryDatabase { } #[tracing::instrument(skip_all, level = "trace")] - async fn query(&self, filters: Vec) -> Result, Self::Err> { + async fn query(&self, filters: Vec, order: Order) -> Result, Self::Err> { if self.opts.events { - let ids = self.indexes.query(filters).await; + let ids = self.indexes.query(filters, order).await; let events = self.events.read().await; let mut list: Vec = Vec::new(); @@ -182,9 +183,13 @@ impl NostrDatabase for MemoryDatabase { } } - async fn event_ids_by_filters(&self, filters: Vec) -> Result, Self::Err> { + async fn event_ids_by_filters( + &self, + filters: Vec, + order: Order, + ) -> Result, Self::Err> { if self.opts.events { - Ok(self.indexes.query(filters).await) + Ok(self.indexes.query(filters, order).await) } else { Err(DatabaseError::FeatureDisabled) } diff --git a/crates/nostr-database/src/raw.rs b/crates/nostr-database/src/raw.rs index 5259ecb2f..f2af26782 100644 --- a/crates/nostr-database/src/raw.rs +++ b/crates/nostr-database/src/raw.rs @@ -108,6 +108,19 @@ impl RawEvent { } } +impl From<&Event> for RawEvent { + fn from(event: &Event) -> Self { + Self { + id: event.id.to_bytes(), + pubkey: event.pubkey.serialize(), + created_at: event.created_at, + kind: event.kind, + tags: event.tags.iter().map(|t| t.as_vec()).collect(), + content: event.content.clone(), + } + } +} + impl From for RawEvent { fn from(event: Event) -> Self { Self { diff --git a/crates/nostr-database/src/tag_indexes.rs b/crates/nostr-database/src/tag_indexes.rs index 56acaa810..5697093fb 100644 --- a/crates/nostr-database/src/tag_indexes.rs +++ b/crates/nostr-database/src/tag_indexes.rs @@ -4,7 +4,7 @@ //! Tag Indexes -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet}; use std::ops::{Deref, DerefMut}; use nostr::hashes::siphash24::Hash as SipHash24; @@ -17,18 +17,11 @@ pub const TAG_INDEX_VALUE_SIZE: usize = 8; /// Tag Indexes #[derive(Debug, Clone, Default, PartialEq, Eq, Hash)] pub struct TagIndexes { - inner: HashMap, -} - -impl TagIndexes { - #[allow(missing_docs)] - pub fn new(map: BTreeMap) -> Self { - Self { inner: map } - } + inner: BTreeMap, } impl Deref for TagIndexes { - type Target = HashMap; + type Target = BTreeMap; fn deref(&self) -> &Self::Target { &self.inner } @@ -79,11 +72,11 @@ where /// Tag Index Values #[derive(Debug, Clone, Default, PartialEq, Eq, Hash)] pub struct TagIndexValues { - inner: HashSet<[u8; TAG_INDEX_VALUE_SIZE]>, + inner: BTreeSet<[u8; TAG_INDEX_VALUE_SIZE]>, } impl Deref for TagIndexValues { - type Target = HashSet<[u8; TAG_INDEX_VALUE_SIZE]>; + type Target = BTreeSet<[u8; TAG_INDEX_VALUE_SIZE]>; fn deref(&self) -> &Self::Target { &self.inner } @@ -96,7 +89,6 @@ impl DerefMut for TagIndexValues { } impl TagIndexValues { - #[allow(missing_docs)] pub fn iter<'a, I>(iter: I) -> impl Iterator + 'a where I: Iterator + 'a, diff --git a/crates/nostr-indexeddb/examples/webapp/src/app.rs b/crates/nostr-indexeddb/examples/webapp/src/app.rs index 1f0c51cde..1eb2d3212 100644 --- a/crates/nostr-indexeddb/examples/webapp/src/app.rs +++ b/crates/nostr-indexeddb/examples/webapp/src/app.rs @@ -2,7 +2,7 @@ // Copyright (c) 2023-2024 Rust Nostr Developers // Distributed under the MIT software license -use nostr_indexeddb::database::NostrDatabase; +use nostr_indexeddb::database::{NostrDatabase, Order}; use nostr_indexeddb::nostr::prelude::*; use nostr_indexeddb::WebDatabase; use wasm_bindgen_futures::spawn_local; @@ -31,7 +31,7 @@ pub fn app() -> Html { .query(vec![Filter::new() .kinds(vec![Kind::Metadata, Kind::Custom(123), Kind::TextNote]) .limit(20) - .author(keys_a.public_key())]) + .author(keys_a.public_key())], Order::Desc) .await .unwrap(); console::log_1(&format!("Events: {events:?}").into()); diff --git a/crates/nostr-indexeddb/src/lib.rs b/crates/nostr-indexeddb/src/lib.rs index 51f7cb0d6..70eb171cf 100644 --- a/crates/nostr-indexeddb/src/lib.rs +++ b/crates/nostr-indexeddb/src/lib.rs @@ -29,7 +29,7 @@ use nostr::{Event, EventId, Filter, Timestamp, Url}; use nostr_database::NostrDatabase; use nostr_database::{ Backend, DatabaseError, DatabaseIndexes, DatabaseOptions, EventIndexResult, FlatBufferBuilder, - FlatBufferDecode, FlatBufferEncode, RawEvent, + FlatBufferDecode, FlatBufferEncode, Order, RawEvent, }; use tokio::sync::Mutex; use wasm_bindgen::JsValue; @@ -400,14 +400,17 @@ impl_nostr_database!({ } #[tracing::instrument(skip_all, level = "trace")] - async fn query(&self, filters: Vec) -> Result, IndexedDBError> { - let ids = self.indexes.query(filters).await; - + async fn query( + &self, + filters: Vec, + order: Order, + ) -> Result, IndexedDBError> { let tx = self .db .transaction_on_one_with_mode(EVENTS_CF, IdbTransactionMode::Readonly)?; let store = tx.object_store(EVENTS_CF)?; + let ids = self.indexes.query(filters, order).await; let mut events: Vec = Vec::with_capacity(ids.len()); for event_id in ids.into_iter() { @@ -426,22 +429,22 @@ impl_nostr_database!({ async fn event_ids_by_filters( &self, filters: Vec, + order: Order, ) -> Result, IndexedDBError> { - Ok(self.indexes.query(filters).await) + Ok(self.indexes.query(filters, order).await) } async fn negentropy_items( &self, filter: Filter, ) -> Result, IndexedDBError> { - let ids = self.indexes.query(vec![filter]).await; - let tx = self .db .transaction_on_one_with_mode(EVENTS_CF, IdbTransactionMode::Readonly)?; let store = tx.object_store(EVENTS_CF)?; - let mut events: Vec<(EventId, Timestamp)> = Vec::new(); + let ids = self.indexes.query(vec![filter], Order::Desc).await; + let mut events: Vec<(EventId, Timestamp)> = Vec::with_capacity(ids.len()); for event_id in ids.into_iter() { let key = JsValue::from(event_id.to_hex()); diff --git a/crates/nostr-rocksdb/examples/rocksdb.rs b/crates/nostr-rocksdb/examples/rocksdb.rs index 578d5c8a0..5d1a5e464 100644 --- a/crates/nostr-rocksdb/examples/rocksdb.rs +++ b/crates/nostr-rocksdb/examples/rocksdb.rs @@ -5,7 +5,7 @@ use std::time::Duration; use nostr::prelude::*; -use nostr_database::NostrDatabase; +use nostr_database::{NostrDatabase, Order}; use nostr_rocksdb::RocksDatabase; use tracing_subscriber::fmt::format::FmtSpan; @@ -35,16 +35,16 @@ async fn main() { ); /* for i in 0..100_000 { - let event = EventBuilder::new_text_note(format!("Event #{i}"), &[]) + let event = EventBuilder::new_text_note(format!("Event #{i}"), []) .to_event(&keys_a) .unwrap(); database.save_event(&event).await.unwrap(); let event = EventBuilder::new_text_note( format!("Reply to event #{i}"), - &[ + [ Tag::event(event.id), - Tag::PubKey(event.pubkey, None), + Tag::public_key(event.pubkey), ], ) .to_event(&keys_b) @@ -54,7 +54,7 @@ async fn main() { for i in 0..10 { let metadata = Metadata::new().name(format!("Name #{i}")); - let event = EventBuilder::set_metadata(metadata) + let event = EventBuilder::set_metadata(&metadata) .to_event(&keys_a) .unwrap(); database.save_event(&event).await.unwrap(); @@ -64,14 +64,14 @@ async fn main() { let event = EventBuilder::new( Kind::Custom(123), "Custom with d tag", - &[Tag::Identifier(format!("myid{i}"))], + [Tag::Identifier(format!("myid{i}"))], ) .to_event(&keys_a) .unwrap(); database.save_event(&event).await.unwrap(); } */ - /* let event_id = EventId::all_zeros(); + /* let event_id = EventId::all_zeros(); database.event_id_seen(event_id, Some(Url::parse("wss://relay.damus.io").unwrap())).await.unwrap(); database.event_id_seen(event_id, Some(Url::parse("wss://relay.nostr.info").unwrap())).await.unwrap(); database.event_id_seen(event_id, Some(Url::parse("wss://relay.damus.io").unwrap())).await.unwrap(); @@ -80,17 +80,20 @@ async fn main() { println!("Seen on: {relays:?}"); */ let events = database - .query(vec![ - Filter::new() - //.kinds(vec![Kind::Custom(123), Kind::TextNote]) - .kind(Kind::Custom(123)) - .identifier("myid5000") - .author(keys_a.public_key()), - Filter::new() - .limit(1) - .kind(Kind::Metadata) - .author(keys_a.public_key()), - ]) + .query( + vec![ + Filter::new() + //.kinds(vec![Kind::Custom(123), Kind::TextNote]) + .kind(Kind::Custom(33333)) + .identifier("myid5") + .author(keys_a.public_key()), + Filter::new() + .limit(1) + .kind(Kind::Metadata) + .author(keys_a.public_key()), + ], + Order::Desc, + ) .await .unwrap(); println!("Events: {:?}", events); diff --git a/crates/nostr-rocksdb/src/lib.rs b/crates/nostr-rocksdb/src/lib.rs index 884eb667c..0c34bbd26 100644 --- a/crates/nostr-rocksdb/src/lib.rs +++ b/crates/nostr-rocksdb/src/lib.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use nostr::{nips::nip01::Coordinate, Event, EventId, Filter, Timestamp, Url}; use nostr_database::{ Backend, DatabaseError, DatabaseIndexes, DatabaseOptions, EventIndexResult, FlatBufferBuilder, - FlatBufferDecode, FlatBufferEncode, NostrDatabase, RawEvent, + FlatBufferDecode, FlatBufferEncode, NostrDatabase, Order, RawEvent, }; use rocksdb::{ BoundColumnFamily, ColumnFamilyDescriptor, DBCompactionStyle, DBCompressionType, IteratorMode, @@ -275,8 +275,8 @@ impl NostrDatabase for RocksDatabase { } #[tracing::instrument(skip_all, level = "trace")] - async fn query(&self, filters: Vec) -> Result, Self::Err> { - let ids: Vec = self.indexes.query(filters).await; + async fn query(&self, filters: Vec, order: Order) -> Result, Self::Err> { + let ids: Vec = self.indexes.query(filters, order).await; let this = self.clone(); tokio::task::spawn_blocking(move || { @@ -301,15 +301,19 @@ impl NostrDatabase for RocksDatabase { .map_err(DatabaseError::backend)? } - async fn event_ids_by_filters(&self, filters: Vec) -> Result, Self::Err> { - Ok(self.indexes.query(filters).await) + async fn event_ids_by_filters( + &self, + filters: Vec, + order: Order, + ) -> Result, Self::Err> { + Ok(self.indexes.query(filters, order).await) } async fn negentropy_items( &self, filter: Filter, ) -> Result, Self::Err> { - let ids: Vec = self.indexes.query(vec![filter]).await; + let ids: Vec = self.indexes.query(vec![filter], Order::Desc).await; let this = self.clone(); tokio::task::spawn_blocking(move || { diff --git a/crates/nostr-sdk/examples/rocksdb.rs b/crates/nostr-sdk/examples/rocksdb.rs index 236023c96..a1b4b80c8 100644 --- a/crates/nostr-sdk/examples/rocksdb.rs +++ b/crates/nostr-sdk/examples/rocksdb.rs @@ -29,7 +29,7 @@ async fn main() -> Result<()> { // Query events from database let filter = Filter::new().author(public_key).limit(10); - let events = client.database().query(vec![filter]).await?; + let events = client.database().query(vec![filter], Order::Desc).await?; println!("Events: {events:?}"); Ok(()) diff --git a/crates/nostr-sdk/examples/sqlite.rs b/crates/nostr-sdk/examples/sqlite.rs index fa02ce72d..1b31a9f51 100644 --- a/crates/nostr-sdk/examples/sqlite.rs +++ b/crates/nostr-sdk/examples/sqlite.rs @@ -36,7 +36,7 @@ async fn main() -> Result<()> { // Query events from database let filter = Filter::new().author(my_keys.public_key()).limit(10); - let events = client.database().query(vec![filter]).await?; + let events = client.database().query(vec![filter], Order::Desc).await?; println!("Events: {events:?}"); Ok(()) diff --git a/crates/nostr-sdk/src/prelude.rs b/crates/nostr-sdk/src/prelude.rs index 01ae3e873..ae5a18150 100644 --- a/crates/nostr-sdk/src/prelude.rs +++ b/crates/nostr-sdk/src/prelude.rs @@ -9,6 +9,7 @@ // External crates pub use nostr::prelude::*; +pub use nostr_database::*; // Internal modules pub use crate::client::*; diff --git a/crates/nostr-sdk/src/relay/mod.rs b/crates/nostr-sdk/src/relay/mod.rs index 5904e31be..5e8c08e03 100644 --- a/crates/nostr-sdk/src/relay/mod.rs +++ b/crates/nostr-sdk/src/relay/mod.rs @@ -26,7 +26,7 @@ use nostr::{ ClientMessage, Event, EventId, Filter, JsonUtil, Keys, RawRelayMessage, RelayMessage, SubscriptionId, Timestamp, Url, }; -use nostr_database::{DatabaseError, DynNostrDatabase}; +use nostr_database::{DatabaseError, DynNostrDatabase, Order}; use nostr_sdk_net::futures_util::{Future, SinkExt, StreamExt}; use nostr_sdk_net::{self as net, WsMessage}; use thiserror::Error; @@ -1370,7 +1370,7 @@ impl Relay { ) -> Result, Error> { let stored_events: Vec = self .database - .query(filters.clone()) + .query(filters.clone(), Order::Desc) .await .unwrap_or_default(); let events: Mutex> = Mutex::new(stored_events); @@ -1567,7 +1567,7 @@ impl Relay { .filter_map(|id| EventId::from_slice(&id).ok()); let filter = Filter::new().ids(ids); let events: Vec = - self.database.query(vec![filter]).await?; + self.database.query(vec![filter], Order::Desc).await?; let msgs: Vec = events .into_iter() .map(ClientMessage::new_event) diff --git a/crates/nostr-sdk/src/relay/pool.rs b/crates/nostr-sdk/src/relay/pool.rs index 4a5e876eb..9f2b00ca3 100644 --- a/crates/nostr-sdk/src/relay/pool.rs +++ b/crates/nostr-sdk/src/relay/pool.rs @@ -16,7 +16,7 @@ use nostr::{ event, ClientMessage, Event, EventId, Filter, JsonUtil, MissingPartialEvent, PartialEvent, RawRelayMessage, RelayMessage, SubscriptionId, Timestamp, Url, }; -use nostr_database::{DatabaseError, DynNostrDatabase, IntoNostrDatabase, MemoryDatabase}; +use nostr_database::{DatabaseError, DynNostrDatabase, IntoNostrDatabase, MemoryDatabase, Order}; use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{broadcast, Mutex, RwLock}; @@ -795,7 +795,7 @@ impl RelayPool { // Get stored events let stored_events: Vec = self .database - .query(filters.clone()) + .query(filters.clone(), Order::Desc) .await .unwrap_or_default(); diff --git a/crates/nostr-sqlite/examples/sqlite.rs b/crates/nostr-sqlite/examples/sqlite.rs index 81f8f0fb4..72bc47ff7 100644 --- a/crates/nostr-sqlite/examples/sqlite.rs +++ b/crates/nostr-sqlite/examples/sqlite.rs @@ -5,7 +5,7 @@ use std::time::Duration; use nostr::prelude::*; -use nostr_database::NostrDatabase; +use nostr_database::{NostrDatabase, Order}; use nostr_sqlite::SQLiteDatabase; use tracing_subscriber::fmt::format::FmtSpan; @@ -89,12 +89,15 @@ async fn main() { println!("Seen on: {relays:?}"); let events = database - .query(vec![Filter::new() - .kinds(vec![Kind::Metadata, Kind::Custom(123), Kind::TextNote]) - .limit(20) - //.kind(Kind::Custom(123)) - //.identifier("myid5000") - .author(keys_a.public_key())]) + .query( + vec![Filter::new() + .kinds(vec![Kind::Metadata, Kind::Custom(123), Kind::TextNote]) + .limit(20) + //.kind(Kind::Custom(123)) + //.identifier("myid5000") + .author(keys_a.public_key())], + Order::Desc, + ) .await .unwrap(); println!("Got {} events", events.len()); diff --git a/crates/nostr-sqlite/src/lib.rs b/crates/nostr-sqlite/src/lib.rs index 19ec1272e..78b4c489c 100644 --- a/crates/nostr-sqlite/src/lib.rs +++ b/crates/nostr-sqlite/src/lib.rs @@ -21,7 +21,7 @@ use nostr::nips::nip01::Coordinate; use nostr::{Event, EventId, Filter, Timestamp, Url}; use nostr_database::{ Backend, DatabaseIndexes, DatabaseOptions, EventIndexResult, FlatBufferBuilder, - FlatBufferDecode, FlatBufferEncode, NostrDatabase, RawEvent, + FlatBufferDecode, FlatBufferEncode, NostrDatabase, Order, RawEvent, }; use rusqlite::config::DbConfig; use tokio::sync::RwLock; @@ -273,9 +273,9 @@ impl NostrDatabase for SQLiteDatabase { } #[tracing::instrument(skip_all, level = "trace")] - async fn query(&self, filters: Vec) -> Result, Self::Err> { - let ids: Vec = self.indexes.query(filters).await; + async fn query(&self, filters: Vec, order: Order) -> Result, Self::Err> { let conn = self.acquire().await?; + let ids: Vec = self.indexes.query(filters, order).await; conn.interact(move |conn| { let mut stmt = conn.prepare_cached("SELECT event FROM events WHERE event_id = ?;")?; let mut events = Vec::with_capacity(ids.len()); @@ -291,16 +291,20 @@ impl NostrDatabase for SQLiteDatabase { .await? } - async fn event_ids_by_filters(&self, filters: Vec) -> Result, Self::Err> { - Ok(self.indexes.query(filters).await) + async fn event_ids_by_filters( + &self, + filters: Vec, + order: Order, + ) -> Result, Self::Err> { + Ok(self.indexes.query(filters, order).await) } async fn negentropy_items( &self, filter: Filter, ) -> Result, Self::Err> { - let ids: Vec = self.indexes.query(vec![filter]).await; let conn = self.acquire().await?; + let ids: Vec = self.indexes.query(vec![filter], Order::Desc).await; conn.interact(move |conn| { let mut stmt = conn.prepare_cached("SELECT event FROM events WHERE event_id = ?;")?; let mut events = Vec::with_capacity(ids.len());