Skip to content

Commit

Permalink
rocksdb: add query_single_filter method
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Oct 28, 2023
1 parent e588c12 commit 7cefaac
Showing 1 changed file with 39 additions and 19 deletions.
58 changes: 39 additions & 19 deletions crates/nostr-sdk-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::path::Path;
use std::sync::Arc;

use async_trait::async_trait;
use nostr::{Event, EventId, Filter, Timestamp, Url};
use nostr::{Event, EventId, Filter, FiltersMatchEvent, Timestamp, Url};
use nostr_sdk_db::{Backend, DatabaseError, DatabaseOptions, NostrDatabase};
use nostr_sdk_fbs::{FlatBufferBuilder, FlatBufferUtils};
use rocksdb::{
Expand Down Expand Up @@ -91,6 +91,29 @@ impl RocksDatabase {
fn cf_handle(&self, name: &str) -> Result<Arc<BoundColumnFamily>, DatabaseError> {
self.db.cf_handle(name).ok_or(DatabaseError::NotFound)
}

fn query_single_filter(
&self,
filter: &Filter,
ids_to_get: &mut HashSet<[u8; 32]>,
) -> Result<(), DatabaseError> {
if !filter.kinds.is_empty() {
let kind_index_cf = self.cf_handle(KIND_INDEX_CF)?;
let keys = filter.kinds.iter().map(|k| k.as_u64().to_be_bytes());
for v in self
.db
.batched_multi_get_cf(&kind_index_cf, keys, false)
.into_iter()
.flatten()
.flatten()
{
let set: HashSet<[u8; 32]> = HashSet::decode(&v).map_err(DatabaseError::backend)?;
ids_to_get.extend(set);
}
}

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -190,37 +213,34 @@ impl NostrDatabase for RocksDatabase {
let mut events: Vec<Event> = Vec::new();

let cf = this.cf_handle(EVENTS_CF)?;
let kind_index_cf = this.cf_handle(KIND_INDEX_CF)?;

let mut ids_to_get: HashSet<[u8; 32]> = HashSet::new();

let filter = filters.first().unwrap();
if !filter.kinds.is_empty() {
let keys = filter.kinds.iter().map(|k| k.as_u64().to_be_bytes());
for v in this
.db
.batched_multi_get_cf(&kind_index_cf, keys, false)
.into_iter()
.flatten()
.flatten()
{
let set: HashSet<[u8; 32]> =
HashSet::decode(&v).map_err(DatabaseError::backend)?;
ids_to_get.extend(set);
}
} else {
tracing::debug!("No kinds set to query");
for filter in filters.iter() {
this.query_single_filter(filter, &mut ids_to_get)?;
}

//let mut counter = 0;

for v in this
.db
.batched_multi_get_cf(&cf, ids_to_get, false)
.into_iter()
.flatten()
.flatten()
{
/* if let Some(limit) = filter.limit {
if counter >= limit && limit != 0 {
break;
}
} */

let event: Event = Event::decode(&v).map_err(DatabaseError::backend)?;
events.push(event);
if filters.match_event(&event) {
events.push(event);
}

//counter += 1;
}

/* let iter = this.db.full_iterator_cf(&cf, IteratorMode::Start);
Expand Down

0 comments on commit 7cefaac

Please sign in to comment.