Skip to content

Commit

Permalink
Merge #201: database: improve NostrDatabase::count method
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Dec 4, 2023
2 parents 358ab9a + 1028a4f commit 29b113e
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 67 deletions.
12 changes: 8 additions & 4 deletions bindings/nostr-sdk-ffi/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ impl NostrDatabase {
})
}

pub fn count(&self) -> Result<u64> {
block_on(async move { Ok(self.inner.count().await? as u64) })
}

/// Save [`Event`] into store
///
/// Return `true` if event was successfully saved into database.
Expand All @@ -60,6 +56,14 @@ impl NostrDatabase {
block_on(async move { Ok(Arc::new(self.inner.event_by_id(**event_id).await?.into())) })
}

pub fn count(&self, filters: Vec<Arc<Filter>>) -> Result<u64> {
let filters = filters
.into_iter()
.map(|f| f.as_ref().deref().clone())
.collect();
block_on(async move { Ok(self.inner.count(filters).await? as u64) })
}

pub fn query(&self, filters: Vec<Arc<Filter>>) -> Result<Vec<Arc<Event>>> {
block_on(async move {
let filters = filters
Expand Down
4 changes: 2 additions & 2 deletions bindings/nostr-sdk-ffi/src/nostr_sdk.udl
Original file line number Diff line number Diff line change
Expand Up @@ -595,15 +595,15 @@ interface NostrDatabase {
[Throws=NostrSdkError, Name=sqlite]
constructor(string path);

[Throws=NostrSdkError]
u64 count();
[Throws=NostrSdkError]
boolean save_event(Event event);
[Throws=NostrSdkError]
sequence<string>? event_seen_on_relays(EventId event_id);
[Throws=NostrSdkError]
Event event_by_id(EventId event_id);
[Throws=NostrSdkError]
u64 count(sequence<Filter> filters);
[Throws=NostrSdkError]
sequence<Event> query(sequence<Filter> filters);
[Throws=NostrSdkError]
void wipe();
Expand Down
57 changes: 47 additions & 10 deletions crates/nostr-database/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,19 @@ impl EventIndex {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PublicKeyPrefix([u8; PUBLIC_KEY_PREFIX_SIZE]);

impl From<XOnlyPublicKey> for PublicKeyPrefix {
fn from(pk: XOnlyPublicKey) -> Self {
impl From<&XOnlyPublicKey> for PublicKeyPrefix {
fn from(pk: &XOnlyPublicKey) -> Self {
let pk: [u8; 32] = pk.serialize();
Self::from(pk)
}
}

impl From<XOnlyPublicKey> for PublicKeyPrefix {
fn from(pk: XOnlyPublicKey) -> Self {
Self::from(&pk)
}
}

impl From<[u8; 32]> for PublicKeyPrefix {
fn from(pk: [u8; 32]) -> Self {
let mut pubkey = [0u8; PUBLIC_KEY_PREFIX_SIZE];
Expand Down Expand Up @@ -239,11 +245,8 @@ impl DatabaseIndexes {
filter: Filter,
allow_empty_filter: bool,
) -> impl Iterator<Item = &'a EventIndex> {
let authors: HashSet<PublicKeyPrefix> = filter
.authors
.iter()
.map(|p| PublicKeyPrefix::from(*p))
.collect();
let authors: HashSet<PublicKeyPrefix> =
filter.authors.iter().map(PublicKeyPrefix::from).collect();
index.iter().filter(move |m| {
if (filter.is_empty() && allow_empty_filter) || !filter.is_empty() {
(filter.ids.is_empty() || filter.ids.contains(&m.event_id))
Expand Down Expand Up @@ -287,6 +290,35 @@ impl DatabaseIndexes {
matching_ids.into_iter().map(|e| e.event_id).collect()
}

/// Count events
#[tracing::instrument(skip_all, level = "trace")]
pub async fn count<I>(&self, filters: I) -> usize
where
I: IntoIterator<Item = Filter>,
{
let index = self.index.read().await;

let mut counter: usize = 0;

for filter in filters.into_iter() {
if let (Some(since), Some(until)) = (filter.since, filter.until) {
if since > until {
continue;
}
}

let limit: Option<usize> = filter.limit;
let iter = self.internal_query(&index, filter, true).await;
if let Some(limit) = limit {
counter += iter.take(limit).count();
} else {
counter += iter.count();
}
}

counter
}

/// Check if an event was deleted
pub async fn has_been_deleted(&self, event_id: &EventId) -> bool {
let deleted = self.deleted.read().await;
Expand All @@ -312,7 +344,7 @@ mod tests {
const SECRET_KEY_B: &str = "nsec1j4c6269y9w0q2er2xjw8sv2ehyrtfxq3jwgdlxj6qfn8z4gjsq5qfvfk99";

#[tokio::test]
async fn test_event_deletion() {
async fn test_database_indexes() {
let indexes = DatabaseIndexes::new();

// Keys
Expand Down Expand Up @@ -397,7 +429,12 @@ mod tests {

// Check total number of indexes
let filter = Filter::new();
let res = indexes.query([filter]).await;
assert_eq!(res.len(), 7);
assert_eq!(indexes.count([filter]).await, 7);

// Check if query len and count match
assert_eq!(
indexes.query([Filter::new()]).await.len(),
indexes.count([Filter::new()]).await
);
}
}
16 changes: 9 additions & 7 deletions crates/nostr-database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ pub trait NostrDatabase: AsyncTraitDeps {
/// Database options
fn opts(&self) -> DatabaseOptions;

/// Count number of [`Event`] stored
async fn count(&self) -> Result<usize, Self::Err>;

/// Save [`Event`] into store
///
/// Return `true` if event was successfully saved into database.
Expand All @@ -126,6 +123,11 @@ pub trait NostrDatabase: AsyncTraitDeps {
/// Get [`Event`] by [`EventId`]
async fn event_by_id(&self, event_id: EventId) -> Result<Event, Self::Err>;

/// Count number of [`Event`] found by filters
///
/// Use `Filter::new()` or `Filter::default()` to count all events.
async fn count(&self, filters: Vec<Filter>) -> Result<usize, Self::Err>;

/// Query store with filters
async fn query(&self, filters: Vec<Filter>) -> Result<Vec<Event>, Self::Err>;

Expand Down Expand Up @@ -244,10 +246,6 @@ impl<T: NostrDatabase> NostrDatabase for EraseNostrDatabaseError<T> {
self.0.opts()
}

async fn count(&self) -> Result<usize, Self::Err> {
self.0.count().await.map_err(Into::into)
}

async fn save_event(&self, event: &Event) -> Result<bool, Self::Err> {
self.0.save_event(event).await.map_err(Into::into)
}
Expand Down Expand Up @@ -287,6 +285,10 @@ impl<T: NostrDatabase> NostrDatabase for EraseNostrDatabaseError<T> {
self.0.event_by_id(event_id).await.map_err(Into::into)
}

async fn count(&self, filters: Vec<Filter>) -> Result<usize, Self::Err> {
self.0.count(filters).await.map_err(Into::into)
}

async fn query(&self, filters: Vec<Filter>) -> Result<Vec<Event>, Self::Err> {
self.0.query(filters).await.map_err(Into::into)
}
Expand Down
10 changes: 5 additions & 5 deletions crates/nostr-database/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ impl NostrDatabase for MemoryDatabase {
self.opts
}

async fn count(&self) -> Result<usize, Self::Err> {
let events = self.events.read().await;
Ok(events.len())
}

async fn save_event(&self, event: &Event) -> Result<bool, Self::Err> {
if self.opts.events {
let EventIndexResult {
Expand Down Expand Up @@ -147,6 +142,11 @@ impl NostrDatabase for MemoryDatabase {
}
}

#[tracing::instrument(skip_all, level = "trace")]
async fn count(&self, filters: Vec<Filter>) -> Result<usize, Self::Err> {
Ok(self.indexes.count(filters).await)
}

#[tracing::instrument(skip_all, level = "trace")]
async fn query(&self, filters: Vec<Filter>) -> Result<Vec<Event>, Self::Err> {
if self.opts.events {
Expand Down
13 changes: 4 additions & 9 deletions crates/nostr-indexeddb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,6 @@ impl_nostr_database!({
DatabaseOptions::default()
}

async fn count(&self) -> Result<usize, IndexedDBError> {
let tx = self
.db
.transaction_on_one_with_mode(EVENTS_CF, IdbTransactionMode::Readonly)?;
let store = tx.object_store(EVENTS_CF)?;
let count: u32 = store.count()?.await?;
Ok(count as usize)
}

#[tracing::instrument(skip_all, level = "trace")]
async fn save_event(&self, event: &Event) -> Result<bool, IndexedDBError> {
// Index event
Expand Down Expand Up @@ -346,6 +337,10 @@ impl_nostr_database!({
}
}

async fn count(&self, filters: Vec<Filter>) -> Result<usize, IndexedDBError> {
Ok(self.indexes.count(filters).await)
}

#[tracing::instrument(skip_all, level = "trace")]
async fn query(&self, filters: Vec<Filter>) -> Result<Vec<Event>, IndexedDBError> {
let ids = self.indexes.query(filters).await;
Expand Down
5 changes: 4 additions & 1 deletion crates/nostr-rocksdb/examples/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ async fn main() {

let database = RocksDatabase::open("./db/rocksdb").await.unwrap();

println!("Events stored: {}", database.count().await.unwrap());
println!(
"Events stored: {}",
database.count(vec![Filter::new()]).await.unwrap()
);

/* for i in 0..100_000 {
let event = EventBuilder::new_text_note(format!("Event #{i}"), &[])
Expand Down
19 changes: 5 additions & 14 deletions crates/nostr-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,6 @@ impl NostrDatabase for RocksDatabase {
DatabaseOptions::default()
}

async fn count(&self) -> Result<usize, Self::Err> {
let this = self.clone();
tokio::task::spawn_blocking(move || {
let cf = this.cf_handle(EVENTS_CF)?;
Ok(this
.db
.full_iterator_cf(&cf, IteratorMode::Start)
.flatten()
.count())
})
.await
.unwrap()
}

#[tracing::instrument(skip_all, level = "trace")]
async fn save_event(&self, event: &Event) -> Result<bool, Self::Err> {
// Index event
Expand Down Expand Up @@ -250,6 +236,11 @@ impl NostrDatabase for RocksDatabase {
.map_err(DatabaseError::backend)?
}

#[tracing::instrument(skip_all, level = "trace")]
async fn count(&self, filters: Vec<Filter>) -> Result<usize, Self::Err> {
Ok(self.indexes.count(filters).await)
}

#[tracing::instrument(skip_all, level = "trace")]
async fn query(&self, filters: Vec<Filter>) -> Result<Vec<Event>, Self::Err> {
let ids: Vec<EventId> = self.indexes.query(filters).await;
Expand Down
5 changes: 4 additions & 1 deletion crates/nostr-sqlite/examples/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ async fn main() {

let database = SQLiteDatabase::open("./db/sqlite.db").await.unwrap();

println!("Events stored: {}", database.count().await.unwrap());
println!(
"Events stored: {}",
database.count(vec![Filter::new()]).await.unwrap()
);

/* for i in 0..100_000 {
let event = EventBuilder::new_text_note(format!("Event #{i}"), &[])
Expand Down
19 changes: 5 additions & 14 deletions crates/nostr-sqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,6 @@ impl NostrDatabase for SQLiteDatabase {
DatabaseOptions::default()
}

async fn count(&self) -> Result<usize, Self::Err> {
let conn = self.acquire().await?;
conn.interact(move |conn| {
let mut stmt = conn.prepare_cached("SELECT COUNT(*) FROM events;")?;
let mut rows = stmt.query([])?;
let row = rows
.next()?
.ok_or_else(|| Error::NotFound("count result".into()))?;
let count: usize = row.get(0)?;
Ok(count)
})
.await?
}

#[tracing::instrument(skip_all, level = "trace")]
async fn save_event(&self, event: &Event) -> Result<bool, Self::Err> {
// Index event
Expand Down Expand Up @@ -244,6 +230,11 @@ impl NostrDatabase for SQLiteDatabase {
.await?
}

#[tracing::instrument(skip_all, level = "trace")]
async fn count(&self, filters: Vec<Filter>) -> Result<usize, Self::Err> {
Ok(self.indexes.count(filters).await)
}

#[tracing::instrument(skip_all, level = "trace")]
async fn query(&self, filters: Vec<Filter>) -> Result<Vec<Event>, Self::Err> {
let ids: Vec<EventId> = self.indexes.query(filters).await;
Expand Down

0 comments on commit 29b113e

Please sign in to comment.