Skip to content

Commit

Permalink
sdk: fallback to READ relays if no relay list is set when breaking do…
Browse files Browse the repository at this point in the history
…wn filters

Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Nov 24, 2024
1 parent 5353d13 commit f85c974
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* pool: immediately terminate relay connection on `Relay::disconnect` call ([Yuki Kishimoto])
* pool: return error if relay doesn't exist when removing it ([Yuki Kishimoto])
* sdk: cleanup `Client` methods ([Yuki Kishimoto])
* sdk: fallback to READ relays if no relay list is set when breaking down filters ([Yuki Kishimoto])
* relay-builder: port selection by using random port generation ([Yuki Kishimoto])
* lmdb: optimize vector initialization in unit tests ([Xiao Yu])
* lmdb: commit also read txn ([Yuki Kishimoto])
Expand Down
75 changes: 59 additions & 16 deletions crates/nostr-sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

//! Client
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::future::Future;
use std::iter;
use std::sync::Arc;
Expand Down Expand Up @@ -1709,25 +1709,64 @@ impl Client {
.await?;

// Broken down filters
let mut broken_down = self.gossip_graph.break_down_filters(filters).await;
let broken_down = self.gossip_graph.break_down_filters(filters).await;

let mut filters: HashMap<Url, BTreeSet<Filter>> = broken_down.filters;

// Get read relays
let read_relays = self
.pool
.relays_with_flag(RelayServiceFlags::READ, FlagCheck::All)
.await;

// Extend filters with read relays and "other" filters (the filters that aren't linked to public keys)
if let Some(other) = broken_down.other {
for url in read_relays.into_keys() {
broken_down
.filters
.entry(url)
.and_modify(|f| {
f.extend(other.clone());
})
.or_default()
.extend(other.clone())
match (broken_down.orphans, broken_down.others) {
(Some(orphans), Some(others)) => {
for url in read_relays.into_keys() {
filters
.entry(url)
.and_modify(|f| {
f.extend(orphans.clone());
f.extend(others.clone());
})
.or_insert_with(|| {
let mut new = BTreeSet::new();
new.extend(orphans.clone());
new.extend(others.clone());
new
});
}
}
(Some(orphans), None) => {
for url in read_relays.into_keys() {
filters
.entry(url)
.and_modify(|f| {
f.extend(orphans.clone());
})
.or_insert_with(|| {
let mut new = BTreeSet::new();
new.extend(orphans.clone());
new
});
}
}
(None, Some(others)) => {
// Extend filters with read relays and "other" filters (the filters that aren't linked to public keys)
for url in read_relays.into_keys() {
filters
.entry(url)
.and_modify(|f| {
f.extend(others.clone());
})
.or_insert_with(|| {
let mut new = BTreeSet::new();
new.extend(others.clone());
new
});
}
}
(None, None) => {
// Nothing to do
}
}

Expand All @@ -1738,12 +1777,16 @@ impl Client {
}
}

// Check if filters aren't empty
if broken_down.filters.is_empty() {
// Check if filters are empty
if filters.is_empty() {
return Err(Error::GossipFiltersEmpty);
}

Ok(broken_down.filters)
// Convert btree filters to vec
Ok(filters
.into_iter()
.map(|(u, f)| (u, f.into_iter().collect()))
.collect())
}

async fn gossip_send_event(&self, event: Event, nip17: bool) -> Result<Output<EventId>, Error> {
Expand Down
118 changes: 88 additions & 30 deletions crates/nostr-sdk/src/gossip/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ const P_TAG: SingleLetterTag = SingleLetterTag::lowercase(Alphabet::P);
#[derive(Debug)]
pub struct BrokenDownFilters {
/// Filters by url
pub filters: HashMap<Url, Vec<Filter>>,
pub filters: HashMap<Url, BTreeSet<Filter>>,
/// Filters that match a certain pattern but where no relays are available
pub orphans: Option<BTreeSet<Filter>>,
/// Filters that can be sent to read relays (generic query, not related to public keys)
pub other: Option<Vec<Filter>>,
pub others: Option<BTreeSet<Filter>>,
pub urls: HashSet<Url>,
}

Expand Down Expand Up @@ -351,10 +353,14 @@ impl GossipGraph {
self.map_nip65_relays(txn, public_keys, RelayMetadata::Read)
}

pub async fn break_down_filters(&self, filters: Vec<Filter>) -> BrokenDownFilters {
pub async fn break_down_filters<I>(&self, filters: I) -> BrokenDownFilters
where
I: IntoIterator<Item = Filter>,
{
let mut map: HashMap<Url, BTreeSet<Filter>> = HashMap::new();
let mut other = Vec::new();
let mut urls = HashSet::new();
let mut orphans: BTreeSet<Filter> = BTreeSet::new();
let mut others: BTreeSet<Filter> = BTreeSet::new();
let mut urls: HashSet<Url> = HashSet::new();

let txn = self.public_keys.read().await;

Expand All @@ -366,6 +372,7 @@ impl GossipGraph {
.collect()
});

// Match pattern
match (&filter.authors, &p_tag) {
(Some(authors), None) => {
// Get map of outbox relays
Expand All @@ -374,6 +381,12 @@ impl GossipGraph {
// Extend with NIP17 relays
outbox.extend(self.map_nip17_relays(&txn, authors));

// No relay available for the authors
if outbox.is_empty() {
orphans.insert(filter.clone());
continue;
}

// Construct new filters
for (relay, pk_set) in outbox.into_iter() {
urls.insert(relay.clone());
Expand All @@ -398,6 +411,12 @@ impl GossipGraph {
// Extend with NIP17 relays
inbox.extend(self.map_nip17_relays(&txn, p_public_keys));

// No relay available for the p tags
if inbox.is_empty() {
orphans.insert(filter.clone());
continue;
}

// Construct new filters
for (relay, pk_set) in inbox.into_iter() {
urls.insert(relay.clone());
Expand Down Expand Up @@ -425,6 +444,12 @@ impl GossipGraph {
// Extend with NIP17 relays
relays.extend(self.get_nip17_relays(&txn, authors.union(p_public_keys)));

// No relay available for the authors and p tags
if relays.is_empty() {
orphans.insert(filter.clone());
continue;
}

for relay in relays.into_iter() {
urls.insert(relay.clone());

Expand All @@ -439,17 +464,25 @@ impl GossipGraph {
}
// Nothing to do, add to `other` list
(None, None) => {
other.push(filter);
others.insert(filter);
}
}
}

tracing::debug!(gossip = %map.len(), orphans = %orphans.len(), others = %others.len(), "Broken down filters:");

BrokenDownFilters {
filters: map
.into_iter()
.map(|(u, f)| (u, f.into_iter().collect::<Vec<_>>()))
.collect(),
other: if other.is_empty() { None } else { Some(other) },
filters: map,
orphans: if orphans.is_empty() {
None
} else {
Some(orphans)
},
others: if others.is_empty() {
None
} else {
Some(others)
},
urls,
}
}
Expand All @@ -476,6 +509,18 @@ mod tests {
("wss://relay.snort.social", Some(RelayMetadata::Read)),
];

macro_rules! btreeset {
($( $x:expr ),* $(,)?) => {
{
let mut set = BTreeSet::new();
$(
set.insert($x);
)*
set
}
};
}

fn build_relay_list_event(
secret_key: &str,
relays: Vec<(&str, Option<RelayMetadata>)>,
Expand Down Expand Up @@ -517,50 +562,52 @@ mod tests {
let graph = setup_graph().await;

// Single filter, single author
let filters = vec![Filter::new().author(keys_a.public_key)];
let filters = btreeset![Filter::new().author(keys_a.public_key)];
let broken_down = graph.break_down_filters(filters.clone()).await;

assert_eq!(broken_down.filters.get(&damus_url).unwrap(), &filters);
assert_eq!(broken_down.filters.get(&nostr_bg_url).unwrap(), &filters);
assert_eq!(broken_down.filters.get(&nos_lol_url).unwrap(), &filters);
assert!(!broken_down.filters.contains_key(&nostr_mom_url));
assert!(broken_down.other.is_none());
assert!(broken_down.orphans.is_none());
assert!(broken_down.others.is_none());

// Multiple filters, multiple authors
let authors_filter = Filter::new().authors([keys_a.public_key, keys_b.public_key]);
let search_filter = Filter::new().search("Test").limit(10);
let filters = vec![authors_filter.clone(), search_filter.clone()];
let filters = btreeset![authors_filter.clone(), search_filter.clone()];
let broken_down = graph.break_down_filters(filters.clone()).await;

assert_eq!(
broken_down.filters.get(&damus_url).unwrap(),
&vec![authors_filter]
&btreeset![authors_filter]
);
assert_eq!(
broken_down.filters.get(&nostr_bg_url).unwrap(),
&vec![Filter::new().author(keys_a.public_key)]
&btreeset![Filter::new().author(keys_a.public_key)]
);
assert_eq!(
broken_down.filters.get(&nos_lol_url).unwrap(),
&vec![Filter::new().author(keys_a.public_key)]
&btreeset![Filter::new().author(keys_a.public_key)]
);
assert!(!broken_down.filters.contains_key(&nostr_mom_url));
assert_eq!(
broken_down.filters.get(&nostr_info_url).unwrap(),
&vec![Filter::new().author(keys_b.public_key)]
&btreeset![Filter::new().author(keys_b.public_key)]
);
assert_eq!(
broken_down.filters.get(&relay_rip_url).unwrap(),
&vec![Filter::new().author(keys_b.public_key)]
&btreeset![Filter::new().author(keys_b.public_key)]
);
assert!(!broken_down.filters.contains_key(&snort_url));
assert_eq!(broken_down.other, Some(vec![search_filter]));
assert!(broken_down.orphans.is_none());
assert_eq!(broken_down.others, Some(btreeset![search_filter]));

// Multiple filters, multiple authors and single p tags
let authors_filter = Filter::new().authors([keys_a.public_key, keys_b.public_key]);
let p_tag_filter = Filter::new().pubkey(keys_a.public_key);
let search_filter = Filter::new().search("Test").limit(10);
let filters = vec![
let filters = btreeset![
authors_filter.clone(),
p_tag_filter.clone(),
search_filter.clone(),
Expand All @@ -569,36 +616,37 @@ mod tests {

assert_eq!(
broken_down.filters.get(&damus_url).unwrap(),
&vec![p_tag_filter.clone(), authors_filter]
&btreeset![p_tag_filter.clone(), authors_filter]
);
assert_eq!(
broken_down.filters.get(&nostr_bg_url).unwrap(),
&vec![
&btreeset![
p_tag_filter.clone(),
Filter::new().author(keys_a.public_key),
]
);
assert_eq!(
broken_down.filters.get(&nos_lol_url).unwrap(),
&vec![Filter::new().author(keys_a.public_key)]
&btreeset![Filter::new().author(keys_a.public_key)]
);
assert_eq!(
broken_down.filters.get(&nostr_mom_url).unwrap(),
&vec![p_tag_filter]
&btreeset![p_tag_filter]
);
assert_eq!(
broken_down.filters.get(&nostr_info_url).unwrap(),
&vec![Filter::new().author(keys_b.public_key)]
&btreeset![Filter::new().author(keys_b.public_key)]
);
assert_eq!(
broken_down.filters.get(&relay_rip_url).unwrap(),
&vec![Filter::new().author(keys_b.public_key)]
&btreeset![Filter::new().author(keys_b.public_key)]
);
assert!(!broken_down.filters.contains_key(&snort_url));
assert_eq!(broken_down.other, Some(vec![search_filter]));
assert!(broken_down.orphans.is_none());
assert_eq!(broken_down.others, Some(btreeset![search_filter]));

// Single filter, both author and p tag
let filters = vec![Filter::new()
let filters = btreeset![Filter::new()
.author(keys_a.public_key)
.pubkey(keys_b.public_key)];
let broken_down = graph.break_down_filters(filters.clone()).await;
Expand All @@ -610,6 +658,16 @@ mod tests {
assert_eq!(broken_down.filters.get(&nostr_info_url).unwrap(), &filters);
assert_eq!(broken_down.filters.get(&relay_rip_url).unwrap(), &filters);
assert_eq!(broken_down.filters.get(&snort_url).unwrap(), &filters);
assert!(broken_down.other.is_none());
assert!(broken_down.orphans.is_none());
assert!(broken_down.others.is_none());

// test orphan filters
let random_keys = Keys::generate();
let filters = btreeset![Filter::new().author(random_keys.public_key)];
let broken_down = graph.break_down_filters(filters.clone()).await;

assert!(broken_down.filters.is_empty());
assert_eq!(broken_down.orphans, Some(filters.clone()));
assert!(broken_down.others.is_none());
}
}

0 comments on commit f85c974

Please sign in to comment.