diff --git a/CHANGELOG.md b/CHANGELOG.md index d4ee3bf73..a20c2c23f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,10 +29,12 @@ * nostr: move `TagsIndexes` into `Tags` struct ([Yuki Kishimoto]) * nostr: use `OnceCell` implementation from `std` lib instead of `once_cell` ([Yuki Kishimoto]) +* nostr: change `impl Ord for Event` behaviour (descending order instead of ascending) ([Yuki Kishimoto]) * relay-builder: refactor `Session::check_rate_limit` method ([Yuki Kishimoto]) * pool: changes in `RelayPool::remove_relay` behavior ([Yuki Kishimoto]) * pool: deprecate `RelayPool::remove_all_relays` ([Yuki Kishimoto]) * sdk: deprecate `Client::get_events_of` and `Client::get_events_from` methods ([Yuki Kishimoto]) +* sdk: use `Events` instead of `Vec` in fetch and query methods ([Yuki Kishimoto]) * database: improve `BTreeCappedSet` ([Yuki Kishimoto]) * database: not save invalid event deletion ([Yuki Kishimoto]) * lmdb: not save event deletion ([Yuki Kishimoto]) @@ -46,6 +48,7 @@ * nostr: add some shorthand constructors for `TagKind::SingleLetter` ([Yuki Kishimoto]) * nostr: add `Tags` struct ([Yuki Kishimoto]) * database: add `Backend::is_persistent` method ([Yuki Kishimoto]) +* database: add `Events` struct ([Yuki Kishimoto]) * relay-builder: add `LocalRelay` and `RelayBuilder` ([Yuki Kishimoto]) * relay-builder: allow to serve local relay as hidden service ([Yuki Kishimoto]) * relay-builder: allow to set number of max connections allowed ([Yuki Kishimoto]) diff --git a/crates/nostr-database/src/events.rs b/crates/nostr-database/src/events.rs new file mode 100644 index 000000000..0ca72b8d9 --- /dev/null +++ b/crates/nostr-database/src/events.rs @@ -0,0 +1,159 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +use std::collections::btree_set::IntoIter; +use std::collections::BTreeSet; + +use nostr::{Event, Filter}; + +use crate::tree::{BTreeCappedSet, Capacity, OverCapacityPolicy}; + +const POLICY: OverCapacityPolicy = OverCapacityPolicy::Last; + +/// Descending sorted collection of events +#[derive(Debug, Clone)] +pub struct Events { + set: BTreeCappedSet, +} + +impl Events { + /// New collection + #[inline] + pub fn new(filters: &[Filter]) -> Self { + // Check how many filters are passed and return the limit + let limit: Option = match (filters.len(), filters.first()) { + (1, Some(filter)) => filter.limit, + _ => None, + }; + + match limit { + Some(limit) => Self::bounded(limit), + None => Self::unbounded(), + } + } + + /// New bounded collection + #[inline] + pub fn bounded(limit: usize) -> Self { + Self { + set: BTreeCappedSet::bounded_with_policy(limit, POLICY), + } + } + + /// New unbounded collection + #[inline] + pub fn unbounded() -> Self { + Self { + set: BTreeCappedSet::unbounded(), + } + } + + /// Returns the number of events in the collection. + #[inline] + pub fn len(&self) -> usize { + self.set.len() + } + + /// Returns the number of events in the collection. + #[inline] + pub fn is_empty(&self) -> bool { + self.set.is_empty() + } + + /// Check if contains [`Event`] + #[inline] + pub fn contains(&self, event: &Event) -> bool { + self.set.contains(event) + } + + /// Insert [`Event`] + /// + /// If the set did not previously contain an equal value, `true` is returned. + #[inline] + pub fn insert(&mut self, event: Event) -> bool { + self.set.insert(event).inserted + } + + /// Insert events + #[inline] + pub fn extend(&mut self, events: I) + where + I: IntoIterator, + { + self.set.extend(events); + } + + /// Merge events collections into a single one. + pub fn merge(mut self, other: Self) -> Self { + // Get min capacity + let mut min: Capacity = core::cmp::min(self.set.capacity(), other.set.capacity()); + + // Check over capacity policy + if let Capacity::Bounded { + max, + policy: OverCapacityPolicy::First, + } = min + { + min = Capacity::Bounded { + max, + policy: POLICY, + }; + }; + + // Update capacity + self.set.change_capacity(min); + + // Extend + self.extend(other.set); + + self + } + + /// Get first [`Event`] (descending order) + #[inline] + pub fn first(&self) -> Option<&Event> { + // Lookup ID: EVENT_ORD_IMPL + self.set.first() + } + + /// Get last [`Event`] (descending order) + #[inline] + pub fn last(&self) -> Option<&Event> { + // Lookup ID: EVENT_ORD_IMPL + self.set.last() + } + + /// Iterate events in descending order + #[inline] + pub fn iter(&self) -> impl Iterator { + // Lookup ID: EVENT_ORD_IMPL + self.set.iter() + } + + /// Convert collection to vector of events. + #[inline] + pub fn to_vec(self) -> Vec { + self.into_iter().collect() + } +} + +impl IntoIterator for Events { + type Item = Event; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + // Lookup ID: EVENT_ORD_IMPL + self.set.into_iter() + } +} + +impl From> for Events { + fn from(set: BTreeSet) -> Self { + Self { + set: BTreeCappedSet::from(set), + } + } +} + +// TODO: add unit tests diff --git a/crates/nostr-database/src/helper.rs b/crates/nostr-database/src/helper.rs index f0184ff2f..006804c4c 100644 --- a/crates/nostr-database/src/helper.rs +++ b/crates/nostr-database/src/helper.rs @@ -4,7 +4,6 @@ //! Nostr Database Helper -use std::cmp::Ordering; use std::collections::{BTreeSet, HashMap, HashSet}; use std::iter; use std::ops::Deref; @@ -15,34 +14,9 @@ use nostr::{Alphabet, Event, EventId, Filter, Kind, PublicKey, SingleLetterTag, use tokio::sync::{OwnedRwLockReadGuard, RwLock}; use crate::tree::{BTreeCappedSet, Capacity, InsertResult, OverCapacityPolicy}; +use crate::Events; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct DatabaseEvent { - event: Arc, -} - -impl PartialOrd for DatabaseEvent { - #[inline] - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for DatabaseEvent { - #[inline] - fn cmp(&self, other: &Self) -> Ordering { - self.event.cmp(&other.event).reverse() - } -} - -impl Deref for DatabaseEvent { - type Target = Event; - - #[inline] - fn deref(&self) -> &Self::Target { - &self.event - } -} +type DatabaseEvent = Arc; struct QueryByAuthorParams { author: PublicKey, @@ -201,6 +175,7 @@ impl InternalDatabaseHelper { let now: Timestamp = Timestamp::now(); events .into_iter() + .rev() // Lookup ID: EVENT_ORD_IMPL .filter(|e| !e.kind.is_ephemeral()) .map(|event| self.internal_index_event(&event, &now)) .flat_map(|res| res.to_discard) @@ -216,6 +191,7 @@ impl InternalDatabaseHelper { let now: Timestamp = Timestamp::now(); events .into_iter() + .rev() // Lookup ID: EVENT_ORD_IMPL .filter(|e| !e.is_expired() && !e.kind.is_ephemeral()) .filter(move |event| self.internal_index_event(event, &now).to_store) } @@ -906,7 +882,10 @@ mod tests { Event::from_json(EVENTS[1]).unwrap(), Event::from_json(EVENTS[0]).unwrap(), ]; - assert_eq!(indexes.query([Filter::new()]).await, expected_output); + assert_eq!( + indexes.query([Filter::new()]).await.to_vec(), + expected_output + ); assert_eq!(indexes.count([Filter::new()]).await, 8); // Test get previously deleted replaceable event (check if was deleted by indexes) @@ -932,7 +911,8 @@ mod tests { .query([Filter::new() .kind(Kind::ParameterizedReplaceable(32122)) .author(keys_b.public_key())]) - .await, + .await + .to_vec(), vec![ Event::from_json(EVENTS[5]).unwrap(), Event::from_json(EVENTS[4]).unwrap(), @@ -946,14 +926,16 @@ mod tests { .kind(Kind::ParameterizedReplaceable(32122)) .author(keys_b.public_key()) .identifier("id-3")]) - .await, + .await + .to_vec(), vec![Event::from_json(EVENTS[4]).unwrap()] ); assert_eq!( indexes .query([Filter::new().author(keys_a.public_key())]) - .await, + .await + .to_vec(), vec![ Event::from_json(EVENTS[12]).unwrap(), Event::from_json(EVENTS[8]).unwrap(), @@ -968,7 +950,8 @@ mod tests { .query([Filter::new() .author(keys_a.public_key()) .kinds([Kind::TextNote, Kind::Custom(32121)])]) - .await, + .await + .to_vec(), vec![ Event::from_json(EVENTS[1]).unwrap(), Event::from_json(EVENTS[0]).unwrap(), @@ -980,7 +963,8 @@ mod tests { .query([Filter::new() .authors([keys_a.public_key(), keys_b.public_key()]) .kinds([Kind::TextNote, Kind::Custom(32121)])]) - .await, + .await + .to_vec(), vec![ Event::from_json(EVENTS[1]).unwrap(), Event::from_json(EVENTS[0]).unwrap(), @@ -989,7 +973,10 @@ mod tests { // Test get param replaceable events using identifier assert_eq!( - indexes.query([Filter::new().identifier("id-1")]).await, + indexes + .query([Filter::new().identifier("id-1")]) + .await + .to_vec(), vec![ Event::from_json(EVENTS[6]).unwrap(), Event::from_json(EVENTS[5]).unwrap(), @@ -999,7 +986,10 @@ mod tests { // Test get param replaceable events with multiple tags using identifier assert_eq!( - indexes.query([Filter::new().identifier("multi-id")]).await, + indexes + .query([Filter::new().identifier("multi-id")]) + .await + .to_vec(), vec![Event::from_json(EVENTS[13]).unwrap()] ); // As above but by using kind and pubkey @@ -1009,7 +999,8 @@ mod tests { .pubkey(keys_a.public_key()) .kind(Kind::Custom(30333)) .limit(1)]) - .await, + .await + .to_vec(), vec![Event::from_json(EVENTS[13]).unwrap()] ); @@ -1023,7 +1014,8 @@ mod tests { .query([Filter::new() .kind(Kind::Metadata) .author(keys_a.public_key())]) - .await, + .await + .to_vec(), vec![first_ev_metadata.clone()] ); @@ -1037,7 +1029,8 @@ mod tests { .query([Filter::new() .kind(Kind::Metadata) .author(keys_a.public_key())]) - .await, + .await + .to_vec(), vec![ev] ); } diff --git a/crates/nostr-database/src/lib.rs b/crates/nostr-database/src/lib.rs index 9beaae846..900007cfa 100644 --- a/crates/nostr-database/src/lib.rs +++ b/crates/nostr-database/src/lib.rs @@ -20,6 +20,7 @@ use nostr::nips::nip65::{self, RelayMetadata}; use nostr::{Event, EventId, Filter, JsonUtil, Kind, Metadata, PublicKey, Timestamp, Url}; mod error; +mod events; #[cfg(feature = "flatbuf")] pub mod flatbuffers; pub mod helper; @@ -30,6 +31,7 @@ mod tree; mod util; pub use self::error::DatabaseError; +pub use self::events::Events; #[cfg(feature = "flatbuf")] pub use self::flatbuffers::{FlatBufferBuilder, FlatBufferDecode, FlatBufferEncode}; pub use self::helper::{DatabaseEventResult, DatabaseHelper}; @@ -152,14 +154,14 @@ pub trait NostrDatabase: fmt::Debug + Send + Sync { async fn count(&self, filters: Vec) -> Result; /// Query store with filters - async fn query(&self, filters: Vec) -> Result, DatabaseError>; + async fn query(&self, filters: Vec) -> Result; /// Get `negentropy` items async fn negentropy_items( &self, filter: Filter, ) -> Result, DatabaseError> { - let events: Vec = self.query(vec![filter]).await?; + let events: Events = self.query(vec![filter]).await?; Ok(events.into_iter().map(|e| (e.id, e.created_at)).collect()) } @@ -181,7 +183,7 @@ pub trait NostrDatabaseExt: NostrDatabase { .author(public_key) .kind(Kind::Metadata) .limit(1); - let events: Vec = self.query(vec![filter]).await?; + let events: Events = self.query(vec![filter]).await?; match events.first() { Some(event) => match Metadata::from_json(&event.content) { Ok(metadata) => Ok(Profile::new(public_key, metadata)), @@ -204,7 +206,7 @@ pub trait NostrDatabaseExt: NostrDatabase { .author(public_key) .kind(Kind::ContactList) .limit(1); - let events: Vec = self.query(vec![filter]).await?; + let events: Events = self.query(vec![filter]).await?; match events.first() { Some(event) => Ok(event.tags.public_keys().copied().collect()), None => Ok(Vec::new()), @@ -218,7 +220,7 @@ pub trait NostrDatabaseExt: NostrDatabase { .author(public_key) .kind(Kind::ContactList) .limit(1); - let events: Vec = self.query(vec![filter]).await?; + let events: Events = self.query(vec![filter]).await?; match events.first() { Some(event) => { // Get contacts metadata @@ -258,7 +260,7 @@ pub trait NostrDatabaseExt: NostrDatabase { .author(public_key) .kind(Kind::RelayList) .limit(1); - let events: Vec = self.query(vec![filter]).await?; + let events: Events = self.query(vec![filter]).await?; // Extract relay list (NIP65) match events.first() { @@ -282,7 +284,7 @@ pub trait NostrDatabaseExt: NostrDatabase { { // Query let filter: Filter = Filter::default().authors(public_keys).kind(Kind::RelayList); - let events: Vec = self.query(vec![filter]).await?; + let events: Events = self.query(vec![filter]).await?; let mut map = HashMap::with_capacity(events.len()); diff --git a/crates/nostr-database/src/memory.rs b/crates/nostr-database/src/memory.rs index bb05779e8..decc80fba 100644 --- a/crates/nostr-database/src/memory.rs +++ b/crates/nostr-database/src/memory.rs @@ -15,7 +15,7 @@ use nostr::{Event, EventId, Filter, Timestamp, Url}; use tokio::sync::Mutex; use crate::{ - util, Backend, DatabaseError, DatabaseEventResult, DatabaseEventStatus, DatabaseHelper, + util, Backend, DatabaseError, DatabaseEventResult, DatabaseEventStatus, DatabaseHelper, Events, NostrDatabase, }; @@ -177,7 +177,7 @@ impl NostrDatabase for MemoryDatabase { } #[tracing::instrument(skip_all, level = "trace")] - async fn query(&self, filters: Vec) -> Result, DatabaseError> { + async fn query(&self, filters: Vec) -> Result { Ok(self.helper.query(filters).await) } diff --git a/crates/nostr-indexeddb/src/lib.rs b/crates/nostr-indexeddb/src/lib.rs index 65a121f31..1f906324f 100644 --- a/crates/nostr-indexeddb/src/lib.rs +++ b/crates/nostr-indexeddb/src/lib.rs @@ -462,7 +462,7 @@ impl_nostr_database!({ #[inline] #[tracing::instrument(skip_all, level = "trace")] - async fn query(&self, filters: Vec) -> Result, DatabaseError> { + async fn query(&self, filters: Vec) -> Result { Ok(self.helper.query(filters).await) } diff --git a/crates/nostr-lmdb/src/lib.rs b/crates/nostr-lmdb/src/lib.rs index e448911be..56ac01312 100644 --- a/crates/nostr-lmdb/src/lib.rs +++ b/crates/nostr-lmdb/src/lib.rs @@ -127,7 +127,7 @@ impl NostrDatabase for NostrLMDB { #[inline] #[tracing::instrument(skip_all, level = "trace")] - async fn query(&self, filters: Vec) -> Result, DatabaseError> { + async fn query(&self, filters: Vec) -> Result { self.db.query(filters).await.map_err(DatabaseError::backend) } @@ -326,7 +326,7 @@ mod tests { .kind(Kind::Metadata)]) .await .unwrap(); - assert_eq!(events, vec![expected_event.clone()]); + assert_eq!(events.to_vec(), vec![expected_event.clone()]); // Check if number of events in database match the expected assert_eq!(db.count_all().await, added_events + 1); @@ -358,7 +358,7 @@ mod tests { .kind(Kind::Metadata)]) .await .unwrap(); - assert_eq!(events, vec![new_expected_event]); + assert_eq!(events.to_vec(), vec![new_expected_event]); // Check if number of events in database match the expected assert_eq!(db.count_all().await, added_events + 1); @@ -390,7 +390,7 @@ mod tests { // Test filter query let events = db.query(vec![coordinate.clone().into()]).await.unwrap(); - assert_eq!(events, vec![expected_event.clone()]); + assert_eq!(events.to_vec(), vec![expected_event.clone()]); // Check if number of events in database match the expected assert_eq!(db.count_all().await, added_events + 1); @@ -422,7 +422,7 @@ mod tests { // Test filter query let events = db.query(vec![coordinate.into()]).await.unwrap(); - assert_eq!(events, vec![new_expected_event]); + assert_eq!(events.to_vec(), vec![new_expected_event]); // Check if number of events in database match the expected assert_eq!(db.count_all().await, added_events + 1); @@ -474,7 +474,7 @@ mod tests { Event::from_json(EVENTS[0]).unwrap(), ]; assert_eq!( - db.query(vec![Filter::new()]).await.unwrap(), + db.query(vec![Filter::new()]).await.unwrap().to_vec(), expected_output ); assert_eq!(db.count_all().await, 8); diff --git a/crates/nostr-lmdb/src/store/mod.rs b/crates/nostr-lmdb/src/store/mod.rs index 2dd243f06..36d44c6b1 100644 --- a/crates/nostr-lmdb/src/store/mod.rs +++ b/crates/nostr-lmdb/src/store/mod.rs @@ -3,15 +3,17 @@ // Copyright (c) 2023-2024 Rust Nostr Developers // Distributed under the MIT software license +use std::collections::BTreeSet; use std::fs; use std::path::Path; use std::sync::Arc; use heed::{RoTxn, RwTxn}; -use nostr::prelude::*; -use nostr_database::FlatBufferBuilder; +use nostr_database::prelude::*; use tokio::sync::Mutex; +use crate::store::types::DatabaseEvent; + mod error; mod lmdb; mod types; @@ -285,14 +287,16 @@ impl Store { .await? } - pub async fn query(&self, filters: Vec) -> Result, Error> { + // Lookup ID: EVENT_ORD_IMPL + pub async fn query(&self, filters: Vec) -> Result { self.interact(move |db| { let txn: RoTxn = db.read_txn()?; - let output = db.query(&txn, filters)?; - Ok(output + let output: BTreeSet = db.query(&txn, filters)?; + let set: BTreeSet = output .into_iter() .filter_map(|e| e.to_event().ok()) - .collect()) + .collect(); + Ok(Events::from(set)) }) .await? } diff --git a/crates/nostr-ndb/src/lib.rs b/crates/nostr-ndb/src/lib.rs index 24518efa6..4887acd01 100644 --- a/crates/nostr-ndb/src/lib.rs +++ b/crates/nostr-ndb/src/lib.rs @@ -144,12 +144,12 @@ impl NostrDatabase for NdbDatabase { } #[tracing::instrument(skip_all, level = "trace")] - async fn query(&self, filters: Vec) -> Result, DatabaseError> { + async fn query(&self, filters: Vec) -> Result { let txn: Transaction = Transaction::new(&self.db).map_err(DatabaseError::backend)?; + let mut events: Events = Events::new(&filters); let res: Vec = self.ndb_query(&txn, filters)?; - let mut events: Vec = Vec::with_capacity(res.len()); for r in res.into_iter() { - events.push(ndb_note_to_event(r.note)?); + events.insert(ndb_note_to_event(r.note)?); } Ok(events) } diff --git a/crates/nostr-relay-pool/src/pool/internal.rs b/crates/nostr-relay-pool/src/pool/internal.rs index 8a839b233..e8d10e7f1 100644 --- a/crates/nostr-relay-pool/src/pool/internal.rs +++ b/crates/nostr-relay-pool/src/pool/internal.rs @@ -4,9 +4,7 @@ //! Relay Pool -use std::collections::btree_set::IntoIter; -use std::collections::{BTreeSet, HashMap, HashSet}; -use std::iter::Rev; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -15,7 +13,7 @@ use async_utility::thread::JoinHandle; use async_utility::{thread, time}; use atomic_destructor::AtomicDestroyer; use nostr::prelude::*; -use nostr_database::{DynNostrDatabase, IntoNostrDatabase}; +use nostr_database::{DynNostrDatabase, Events, IntoNostrDatabase}; use tokio::sync::{broadcast, mpsc, Mutex, RwLock, RwLockReadGuard}; use tokio_stream::wrappers::ReceiverStream; @@ -725,7 +723,7 @@ impl InternalRelayPool { filters: Vec, timeout: Duration, opts: FilterOptions, - ) -> Result, Error> { + ) -> Result { let urls: Vec = self.read_relay_urls().await; self.fetch_events_from(urls, filters, timeout, opts).await } @@ -736,19 +734,13 @@ impl InternalRelayPool { filters: Vec, timeout: Duration, opts: FilterOptions, - ) -> Result, Error> + ) -> Result where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { - // Check how many filters are passed and return the limit - let limit: Option = match (filters.len(), filters.first()) { - (1, Some(filter)) => filter.limit, - _ => None, - }; - - let mut events: BTreeSet = BTreeSet::new(); + let mut events: Events = Events::new(&filters); // Stream events let mut stream = self @@ -758,14 +750,7 @@ impl InternalRelayPool { events.insert(event); } - // Iterate set and revert order (events are sorted in ascending order in the BTreeSet) - let iter: Rev> = events.into_iter().rev(); - - // Check limit - match limit { - Some(limit) => Ok(iter.take(limit).collect()), - None => Ok(iter.collect()), - } + Ok(events) } #[inline] diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index 8707a936c..ad7c57522 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -11,7 +11,7 @@ use std::time::Duration; use atomic_destructor::{AtomicDestructor, StealthClone}; use nostr::prelude::*; -use nostr_database::{DynNostrDatabase, IntoNostrDatabase, MemoryDatabase}; +use nostr_database::{DynNostrDatabase, Events, IntoNostrDatabase, MemoryDatabase}; use tokio::sync::broadcast; pub use tokio_stream::wrappers::ReceiverStream; @@ -485,7 +485,7 @@ impl RelayPool { filters: Vec, timeout: Duration, opts: FilterOptions, - ) -> Result, Error> { + ) -> Result { self.inner.fetch_events(filters, timeout, opts).await } @@ -497,7 +497,11 @@ impl RelayPool { timeout: Duration, opts: FilterOptions, ) -> Result, Error> { - self.inner.fetch_events(filters, timeout, opts).await + Ok(self + .inner + .fetch_events(filters, timeout, opts) + .await? + .to_vec()) } /// Fetch events from specific relays @@ -508,7 +512,7 @@ impl RelayPool { filters: Vec, timeout: Duration, opts: FilterOptions, - ) -> Result, Error> + ) -> Result where I: IntoIterator, U: TryIntoUrl, @@ -533,9 +537,11 @@ impl RelayPool { U: TryIntoUrl, Error: From<::Err>, { - self.inner + Ok(self + .inner .fetch_events_from(urls, filters, timeout, opts) - .await + .await? + .to_vec()) } /// Stream events of filters from relays with `READ` flag. diff --git a/crates/nostr-relay-pool/src/relay/internal.rs b/crates/nostr-relay-pool/src/relay/internal.rs index ba984d4a4..1cca3e8bf 100644 --- a/crates/nostr-relay-pool/src/relay/internal.rs +++ b/crates/nostr-relay-pool/src/relay/internal.rs @@ -16,16 +16,8 @@ use async_wsocket::{ConnectionMode, Sink, Stream, WsMessage}; use atomic_destructor::AtomicDestroyer; use negentropy::{Bytes, Id, Negentropy, NegentropyStorageVector}; use negentropy_deprecated::{Bytes as BytesDeprecated, Negentropy as NegentropyDeprecated}; -use nostr::message::MessageHandleError; -use nostr::nips::nip01::Coordinate; -#[cfg(feature = "nip11")] -use nostr::nips::nip11::RelayInformationDocument; use nostr::secp256k1::rand::{self, Rng}; -use nostr::{ - ClientMessage, Event, EventId, Filter, JsonUtil, Kind, MissingPartialEvent, PartialEvent, - RawRelayMessage, RelayMessage, SubscriptionId, Timestamp, Url, -}; -use nostr_database::{DatabaseEventStatus, DynNostrDatabase}; +use nostr_database::prelude::*; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{broadcast, oneshot, watch, Mutex, MutexGuard, RwLock}; @@ -1607,11 +1599,11 @@ impl InternalRelay { filters: Vec, timeout: Duration, opts: FilterOptions, - ) -> Result, Error> { - let events: Mutex> = Mutex::new(Vec::new()); + ) -> Result { + let events: Mutex = Mutex::new(Events::unbounded()); self.fetch_events_with_callback(filters, timeout, opts, |event| async { let mut events = events.lock().await; - events.push(event); + events.insert(event); }) .await?; Ok(events.into_inner()) diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index 6a3825b50..6da40339d 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -12,12 +12,7 @@ use std::time::Duration; use async_wsocket::futures_util::Future; use async_wsocket::ConnectionMode; use atomic_destructor::AtomicDestructor; -#[cfg(feature = "nip11")] -use nostr::nips::nip11::RelayInformationDocument; -use nostr::{ - ClientMessage, Event, EventId, Filter, RelayMessage, Result, SubscriptionId, Timestamp, Url, -}; -use nostr_database::{DynNostrDatabase, MemoryDatabase}; +use nostr_database::prelude::*; use tokio::sync::broadcast; mod constants; @@ -394,7 +389,7 @@ impl Relay { filters: Vec, timeout: Duration, opts: FilterOptions, - ) -> Result, Error> { + ) -> Result { self.inner.fetch_events(filters, timeout, opts).await } @@ -406,7 +401,7 @@ impl Relay { timeout: Duration, opts: FilterOptions, ) -> Result, Error> { - self.fetch_events(filters, timeout, opts).await + Ok(self.fetch_events(filters, timeout, opts).await?.to_vec()) } /// Count events diff --git a/crates/nostr-sdk/examples/nip65.rs b/crates/nostr-sdk/examples/nip65.rs index 9e347ce90..c6fafcb65 100644 --- a/crates/nostr-sdk/examples/nip65.rs +++ b/crates/nostr-sdk/examples/nip65.rs @@ -18,7 +18,7 @@ async fn main() -> Result<()> { client.connect().await; let filter = Filter::new().author(public_key).kind(Kind::RelayList); - let events: Vec = client + let events: Events = client .fetch_events(vec![filter], Some(Duration::from_secs(10))) .await?; let event = events.first().unwrap(); diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index bf0e17628..e083b62fa 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -4,10 +4,9 @@ //! Client -use std::collections::btree_set::IntoIter; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::future::Future; -use std::iter::{self, Rev}; +use std::iter; use std::sync::Arc; use std::time::Duration; @@ -826,8 +825,6 @@ impl Client { /// Fetch events from relays /// - /// The returned events are sorted by newest first, if there is a limit only the newest are returned. - /// /// If `gossip` is enabled (see [`Options::gossip`]) the events will be requested also to /// NIP-65 relays (automatically discovered) of public keys included in filters (if any). /// @@ -855,7 +852,7 @@ impl Client { &self, filters: Vec, timeout: Option, - ) -> Result, Error> { + ) -> Result { let timeout: Duration = timeout.unwrap_or(self.opts.timeout); if self.opts.gossip { @@ -875,7 +872,7 @@ impl Client { filters: Vec, _source: EventSource, ) -> Result, Error> { - self.fetch_events(filters, None).await + Ok(self.fetch_events(filters, None).await?.to_vec()) } /// Fetch events from specific relays @@ -885,7 +882,7 @@ impl Client { urls: I, filters: Vec, timeout: Option, - ) -> Result, Error> + ) -> Result where I: IntoIterator, U: TryIntoUrl, @@ -911,7 +908,10 @@ impl Client { U: TryIntoUrl, pool::Error: From<::Err>, { - self.fetch_events_from(urls, filters, timeout).await + Ok(self + .fetch_events_from(urls, filters, timeout) + .await? + .to_vec()) } /// Stream events of filters @@ -1165,7 +1165,8 @@ impl Client { .author(public_key) .kind(Kind::Metadata) .limit(1); - let events: Vec = self.fetch_events(vec![filter], timeout).await?; + // TODO: add fetch_event and use that + let events: Events = self.fetch_events(vec![filter], timeout).await?; match events.first() { Some(event) => Ok(Metadata::try_from(event)?), None => Err(Error::MetadataNotFound), @@ -1287,7 +1288,7 @@ impl Client { pub async fn get_contact_list(&self, timeout: Option) -> Result, Error> { let mut contact_list: Vec = Vec::new(); let filters: Vec = self.get_contact_list_filters().await?; - let events: Vec = self.fetch_events(filters, timeout).await?; + let events: Events = self.fetch_events(filters, timeout).await?; // Get first event (result of `fetch_events` is sorted DESC by timestamp) if let Some(event) = events.into_iter().next() { @@ -1316,7 +1317,7 @@ impl Client { ) -> Result, Error> { let mut pubkeys: Vec = Vec::new(); let filters: Vec = self.get_contact_list_filters().await?; - let events: Vec = self.fetch_events(filters, timeout).await?; + let events: Events = self.fetch_events(filters, timeout).await?; for event in events.into_iter() { pubkeys.extend(event.tags.public_keys()); @@ -1345,7 +1346,7 @@ impl Client { .limit(1), ); } - let events: Vec = self.fetch_events(filters, timeout).await?; + let events: Events = self.fetch_events(filters, timeout).await?; for event in events.into_iter() { let metadata = Metadata::from_json(&event.content)?; if let Some(m) = contacts.get_mut(&event.pubkey) { @@ -1785,7 +1786,7 @@ impl Client { // Query from database let database = self.database(); - let mut stored_events = database.query(vec![filter.clone()]).await?; + let stored_events: Events = database.query(vec![filter.clone()]).await?; // Get DISCOVERY and READ relays // TODO: avoid clone of both url and relay @@ -1799,15 +1800,15 @@ impl Client { .into_keys(); // Get events from discovery and read relays - let mut events: Vec = self + let events: Events = self .fetch_events_from(relays, vec![filter], Some(Duration::from_secs(10))) .await?; - // Join database and relays events - events.append(&mut stored_events); + // Merge database and relays events + let merged: Events = events.merge(stored_events); // Update gossip graph - self.gossip_graph.update(events).await; + self.gossip_graph.update(merged).await; } Ok(()) @@ -1886,14 +1887,8 @@ impl Client { &self, filters: Vec, timeout: Duration, - ) -> Result, Error> { - // Check how many filters are passed and return the limit - let limit: Option = match (filters.len(), filters.first()) { - (1, Some(filter)) => filter.limit, - _ => None, - }; - - let mut events: BTreeSet = BTreeSet::new(); + ) -> Result { + let mut events: Events = Events::new(&filters); // Stream events let mut stream: ReceiverStream = @@ -1903,13 +1898,7 @@ impl Client { events.insert(event); } - let iter: Rev> = events.into_iter().rev(); - - // Check limit - match limit { - Some(limit) => Ok(iter.take(limit).collect()), - None => Ok(iter.collect()), - } + Ok(events) } async fn gossip_subscribe( diff --git a/crates/nostr-sdk/src/client/zapper.rs b/crates/nostr-sdk/src/client/zapper.rs index c5a90f5e2..124847265 100644 --- a/crates/nostr-sdk/src/client/zapper.rs +++ b/crates/nostr-sdk/src/client/zapper.rs @@ -6,7 +6,7 @@ use std::str::FromStr; use lnurl_pay::api::Lud06OrLud16; use lnurl_pay::{LightningAddress, LnUrl}; -use nostr::prelude::*; +use nostr_database::prelude::*; use super::{Client, Error}; @@ -100,7 +100,7 @@ impl Client { ZapEntity::Event(event_id) => { // Get event let filter: Filter = Filter::new().id(event_id); - let events: Vec = self + let events: Events = self .fetch_events(vec![filter], Some(self.opts.timeout)) .await?; let event: &Event = events.first().ok_or(Error::EventNotFound(event_id))?; diff --git a/crates/nostr-sqlite/src/lib.rs b/crates/nostr-sqlite/src/lib.rs index 788c37291..29837bdcb 100644 --- a/crates/nostr-sqlite/src/lib.rs +++ b/crates/nostr-sqlite/src/lib.rs @@ -284,7 +284,7 @@ impl NostrDatabase for SQLiteDatabase { #[inline] #[tracing::instrument(skip_all)] - async fn query(&self, filters: Vec) -> Result, DatabaseError> { + async fn query(&self, filters: Vec) -> Result { Ok(self.helper.query(filters).await) } diff --git a/crates/nostr/src/event/mod.rs b/crates/nostr/src/event/mod.rs index f6ef53665..346190db6 100644 --- a/crates/nostr/src/event/mod.rs +++ b/crates/nostr/src/event/mod.rs @@ -143,9 +143,11 @@ impl PartialOrd for Event { impl Ord for Event { fn cmp(&self, other: &Self) -> Ordering { if self.created_at != other.created_at { - // Ascending order + // Descending order // NOT EDIT, will break many things!! - self.created_at.cmp(&other.created_at) + // If the change is essential, search for EVENT_ORD_IMPL commend + // in the code and adj things. + self.created_at.cmp(&other.created_at).reverse() } else { self.id.cmp(&other.id) }