Skip to content

Commit

Permalink
database: add DatabaseIndexes::index_raw_event
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Dec 5, 2023
1 parent be32b2a commit b8611da
Showing 1 changed file with 136 additions and 24 deletions.
160 changes: 136 additions & 24 deletions crates/nostr-database/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,24 @@ use std::cmp::Ordering;
use std::collections::{BTreeSet, HashSet};
use std::sync::Arc;

use nostr::event::id;
use nostr::event::raw::RawEvent;
use nostr::secp256k1::XOnlyPublicKey;
use nostr::secp256k1::{self, XOnlyPublicKey};
use nostr::{Event, EventId, Filter, Kind, TagIndexValues, TagIndexes, Timestamp};
use thiserror::Error;
use tokio::sync::RwLock;

/// Public Key Prefix Size
const PUBLIC_KEY_PREFIX_SIZE: usize = 8;

#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
Secp256k1(#[from] secp256k1::Error),
#[error(transparent)]
EventId(#[from] id::Error),
}

/// Event Index
#[derive(Debug, Clone, PartialEq, Eq)]
struct EventIndex {
Expand Down Expand Up @@ -82,11 +92,12 @@ impl EventIndex {
}

filter.generic_tags.iter().all(|(tagname, set)| {
let set = TagIndexValues::from(set);
self.tags
.get(tagname)
.map(|valset| valset.intersection(&set).count() > 0)
.unwrap_or(false)
self.tags.get(tagname).map_or(false, |valset| {
TagIndexValues::iter(set)
.filter(|t| valset.contains(t))
.count()
> 0
})
})
}
}
Expand Down Expand Up @@ -145,13 +156,105 @@ impl DatabaseIndexes {
I: IntoIterator<Item = RawEvent>,
{
let mut index = self.index.write().await;
let mut deleted = self.deleted.write().await;
let mut to_discard: HashSet<EventId> = HashSet::new();
let now = Timestamp::now();
index.extend(
events
.into_iter()
.filter(|raw| !raw.is_expired(&now) && !raw.is_ephemeral())
.filter_map(|raw| EventIndex::try_from(raw).ok()),
);
events
.into_iter()
.filter(|raw| !raw.is_expired(&now) && !raw.is_ephemeral())
.for_each(|event| {
let _ = self.index_raw_event(&mut index, &mut deleted, &mut to_discard, event);
});

// Remove events
if !to_discard.is_empty() {
index.retain(|e| !to_discard.contains(&e.event_id));
}

// TODO: return to_discard events?
}

fn index_raw_event(
&self,
index: &mut BTreeSet<EventIndex>,
deleted: &mut HashSet<EventId>,
to_discard: &mut HashSet<EventId>,
raw: RawEvent,
) -> Result<(), Error> {
let event_id: EventId = EventId::from_slice(&raw.id)?;
let pubkey = XOnlyPublicKey::from_slice(&raw.pubkey)?;
let pubkey_prefix: PublicKeyPrefix = PublicKeyPrefix::from(pubkey);
let timestamp = Timestamp::from(raw.created_at);
let kind = Kind::from(raw.kind);

let mut should_insert: bool = true;

if kind.is_replaceable() {
let filter: Filter = Filter::new().author(pubkey).kind(kind);
for ev in self.internal_query(index, filter, false) {
if ev.created_at > timestamp {
should_insert = false;
} else if ev.created_at <= timestamp {
to_discard.insert(ev.event_id);
}
}
} else if kind.is_parameterized_replaceable() {
match raw.identifier() {
Some(identifier) => {
let filter: Filter = Filter::new()
.author(pubkey)
.kind(kind)
.identifier(identifier);
for ev in self.internal_query(index, filter, false) {
if ev.created_at >= timestamp {
should_insert = false;
} else if ev.created_at < timestamp {
to_discard.insert(ev.event_id);
}
}
}
None => should_insert = false,
}
} else if kind == Kind::EventDeletion {
// Check `e` tags
let ids = raw.event_ids();
let filter: Filter = Filter::new().ids(ids).until(timestamp);
if !filter.ids.is_empty() {
for ev in self.internal_query(index, filter, false) {
if ev.pubkey == pubkey_prefix {
to_discard.insert(ev.event_id);
deleted.insert(ev.event_id);
}
}
}

// Check `a` tags
for coordinate in raw.coordinates() {
let coordinate_pubkey_prefix: PublicKeyPrefix =
PublicKeyPrefix::from(coordinate.pubkey);
if coordinate_pubkey_prefix == pubkey_prefix {
let filter: Filter = coordinate.into();
let filter: Filter = filter.until(timestamp);
for ev in self.internal_query(index, filter, false) {
to_discard.insert(ev.event_id);
deleted.insert(ev.event_id);
}
}
}
}

// Insert event
if should_insert {
index.insert(EventIndex {
created_at: timestamp,
event_id,
pubkey: pubkey_prefix,
kind,
tags: TagIndexes::from(raw.tags.into_iter()),
});
}

Ok(())
}

/// Index [`Event`]
Expand All @@ -165,13 +268,14 @@ impl DatabaseIndexes {
}

let mut index = self.index.write().await;
let mut deleted = self.deleted.write().await;

let mut should_insert: bool = true;
let mut to_discard: HashSet<EventId> = HashSet::new();

if event.is_replaceable() {
let filter: Filter = Filter::new().author(event.pubkey).kind(event.kind);
for ev in self.internal_query(&index, filter, false).await {
for ev in self.internal_query(&index, filter, false) {
if ev.created_at > event.created_at {
should_insert = false;
} else if ev.created_at <= event.created_at {
Expand All @@ -185,7 +289,7 @@ impl DatabaseIndexes {
.author(event.pubkey)
.kind(event.kind)
.identifier(identifier);
for ev in self.internal_query(&index, filter, false).await {
for ev in self.internal_query(&index, filter, false) {
if ev.created_at >= event.created_at {
should_insert = false;
} else if ev.created_at < event.created_at {
Expand All @@ -196,16 +300,17 @@ impl DatabaseIndexes {
None => should_insert = false,
}
} else if event.kind == Kind::EventDeletion {
let mut deleted = self.deleted.write().await;
let pubkey_prefix: PublicKeyPrefix = PublicKeyPrefix::from(event.pubkey);

// Check `e` tags
let ids = event.event_ids().copied();
let filter: Filter = Filter::new().ids(ids);
for ev in self.internal_query(&index, filter, false).await {
if ev.pubkey == pubkey_prefix {
to_discard.insert(ev.event_id);
deleted.insert(ev.event_id);
let filter: Filter = Filter::new().ids(ids).until(event.created_at);
if !filter.ids.is_empty() {
for ev in self.internal_query(&index, filter, false) {
if ev.pubkey == pubkey_prefix {
to_discard.insert(ev.event_id);
deleted.insert(ev.event_id);
}
}
}

Expand All @@ -215,7 +320,8 @@ impl DatabaseIndexes {
PublicKeyPrefix::from(coordinate.pubkey);
if coordinate_pubkey_prefix == pubkey_prefix {
let filter: Filter = coordinate.into();
for ev in self.internal_query(&index, filter, false).await {
let filter: Filter = filter.until(event.created_at);
for ev in self.internal_query(&index, filter, false) {
to_discard.insert(ev.event_id);
deleted.insert(ev.event_id);
}
Expand All @@ -239,7 +345,7 @@ impl DatabaseIndexes {
}
}

async fn internal_query<'a>(
fn internal_query<'a>(
&self,
index: &'a BTreeSet<EventIndex>,
filter: Filter,
Expand Down Expand Up @@ -279,7 +385,7 @@ impl DatabaseIndexes {
}

let limit: Option<usize> = filter.limit;
let iter = self.internal_query(&index, filter, true).await;
let iter = self.internal_query(&index, filter, true);
if let Some(limit) = limit {
matching_ids.extend(iter.take(limit))
} else {
Expand Down Expand Up @@ -308,7 +414,7 @@ impl DatabaseIndexes {
}

let limit: Option<usize> = filter.limit;
let iter = self.internal_query(&index, filter, true).await;
let iter = self.internal_query(&index, filter, true);
if let Some(limit) = limit {
counter += iter.take(limit).count();
} else {
Expand Down Expand Up @@ -406,6 +512,9 @@ mod tests {

// Total events: 6

let filter = Filter::new();
assert_eq!(indexes.count([filter]).await, 6);

// Invalid event deletion (wrong signing keys)
let coordinate =
Coordinate::new(Kind::ParameterizedReplaceable(32122), keys_a.public_key());
Expand All @@ -416,6 +525,9 @@ mod tests {

// Total events: 7 (6 + 1)

let filter = Filter::new();
assert_eq!(indexes.count([filter]).await, 7);

// Delete valid event
let coordinate =
Coordinate::new(Kind::ParameterizedReplaceable(32122), keys_a.public_key())
Expand Down

0 comments on commit b8611da

Please sign in to comment.