Skip to content

Commit

Permalink
database: speedup DatabaseIndexes::bulk_index with threads
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Dec 5, 2023
1 parent 4224ea7 commit b50949a
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ license = "MIT"

[workspace.dependencies]
async-trait = "0.1"
async-utility = "0.1"
nostr = { version = "0.25", path = "./crates/nostr", default-features = false }
nostr-database = { version = "0.1", path = "./crates/nostr-database", default-features = false }
once_cell = "1.18"
Expand Down
1 change: 1 addition & 0 deletions crates/nostr-database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ default = []
flatbuf = ["dep:flatbuffers"]

[dependencies]
async-utility = { workspace = true }
async-trait = { workspace = true }
flatbuffers = { version = "23.5", optional = true }
nostr = { workspace = true, features = ["std"] }
Expand Down
73 changes: 42 additions & 31 deletions crates/nostr-database/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;

use async_utility::thread;
use nostr::event::id;
use nostr::event::raw::RawEvent;
use nostr::secp256k1::XOnlyPublicKey;
Expand Down Expand Up @@ -209,38 +210,32 @@ impl DatabaseIndexes {
where
I: IntoIterator<Item = RawEvent>,
{
let mut index = self.index.write().await;
let mut deleted = self.deleted.write().await;

let mut to_discard: HashSet<EventId> = HashSet::new();
let now = Timestamp::now();
let mut handles = Vec::new();

events
.into_iter()
.filter(|raw| !raw.is_expired(&now) && !raw.is_ephemeral())
.for_each(|event| {
let _ = self.index_raw_event(&mut index, &mut deleted, &mut to_discard, event);
let this = self.clone();
let handle = thread::spawn(async move {
this.index_raw_event(event).await.unwrap(); // TODO: remove unwrap?
});
handles.push(handle);
});

// Remove events
if !to_discard.is_empty() {
index.retain(|e| !to_discard.contains(&e.event_id));
deleted.extend(to_discard);
for handle in handles.into_iter().flatten() {
let _ = handle.join().await; // TODO: propagate error?
}

// TODO: return to_discard events?
}

fn index_raw_event(
&self,
index: &mut BTreeSet<EventIndex>,
deleted: &mut HashSet<EventId>,
to_discard: &mut HashSet<EventId>,
raw: RawEvent,
) -> Result<(), Error> {
async fn index_raw_event(&self, raw: RawEvent) -> Result<(), Error> {
// Parse event ID
let event_id: EventId = EventId::from_slice(&raw.id)?;

let index = self.index.read().await;
let deleted = self.deleted.read().await;

// Check if was deleted
if deleted.contains(&event_id) {
return Ok(());
Expand All @@ -252,10 +247,11 @@ impl DatabaseIndexes {
let kind = Kind::from(raw.kind);

let mut should_insert: bool = true;
let mut to_discard: HashSet<EventId> = HashSet::new();

if kind.is_replaceable() {
let filter: FilterIndex = FilterIndex::default().author(pubkey_prefix).kind(kind);
for ev in self.internal_query(index, deleted, filter) {
for ev in self.internal_query(&index, &deleted, filter) {
if ev.created_at > timestamp {
should_insert = false;
} else if ev.created_at <= timestamp {
Expand All @@ -269,7 +265,7 @@ impl DatabaseIndexes {
.author(pubkey_prefix)
.kind(kind)
.identifier(identifier);
for ev in self.internal_query(index, deleted, filter) {
for ev in self.internal_query(&index, &deleted, filter) {
if ev.created_at >= timestamp {
should_insert = false;
} else if ev.created_at < timestamp {
Expand All @@ -284,7 +280,7 @@ impl DatabaseIndexes {
let ids = raw.event_ids();
let filter: Filter = Filter::new().ids(ids).until(timestamp);
if !filter.ids.is_empty() {
for ev in self.internal_query(index, deleted, filter) {
for ev in self.internal_query(&index, &deleted, filter) {
if ev.pubkey == pubkey_prefix {
to_discard.insert(ev.event_id);
}
Expand All @@ -298,22 +294,37 @@ impl DatabaseIndexes {
if coordinate_pubkey_prefix == pubkey_prefix {
let filter: Filter = coordinate.into();
let filter: Filter = filter.until(timestamp);
for ev in self.internal_query(index, deleted, filter) {
for ev in self.internal_query(&index, &deleted, filter) {
to_discard.insert(ev.event_id);
}
}
}
}

// Insert event
if should_insert {
index.insert(EventIndex {
created_at: timestamp,
event_id,
pubkey: pubkey_prefix,
kind,
tags: TagIndexes::from(raw.tags.into_iter()),
});
// Drop aquired read locks
drop(index);
drop(deleted);

if should_insert || !to_discard.is_empty() {
let mut index = self.index.write().await;

// Remove events
if !to_discard.is_empty() {
let mut deleted = self.deleted.write().await;
index.retain(|e| !to_discard.contains(&e.event_id));
deleted.extend(to_discard);
}

// Insert event
if should_insert {
index.insert(EventIndex {
created_at: timestamp,
event_id,
pubkey: pubkey_prefix,
kind,
tags: TagIndexes::from(raw.tags.into_iter()),
});
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/nostr-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ nip46 = ["nostr/nip46"]
nip47 = ["nostr/nip47"]

[dependencies]
async-utility = "0.1"
async-utility = { workspace = true }
nostr = { workspace = true, features = ["std"] }
nostr-database = { workspace = true }
nostr-sdk-net = { version = "0.25", path = "../nostr-sdk-net" }
Expand Down

0 comments on commit b50949a

Please sign in to comment.