diff --git a/crates/nostr-sdk/src/relay/pool.rs b/crates/nostr-sdk/src/relay/pool.rs index d27ae3488..26cbe011e 100644 --- a/crates/nostr-sdk/src/relay/pool.rs +++ b/crates/nostr-sdk/src/relay/pool.rs @@ -12,7 +12,7 @@ use std::time::Duration; use async_utility::thread; use nostr::url::Url; -use nostr::{ClientMessage, Event, EventId, Filter, RelayMessage}; +use nostr::{ClientMessage, Event, EventId, Filter, RelayMessage, Timestamp}; use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::{broadcast, Mutex, RwLock}; @@ -797,4 +797,31 @@ impl RelayPool { relay.terminate().await?; Ok(()) } + + /// Negentropy reconciliation + pub async fn reconcilie( + &self, + filter: Filter, + my_items: Vec<(EventId, Timestamp)>, + timeout: Duration, + ) -> Result<(), Error> { + let mut handles = Vec::new(); + let relays = self.relays().await; + for (url, relay) in relays.into_iter() { + let filter = filter.clone(); + let my_items = my_items.clone(); + let handle = thread::spawn(async move { + if let Err(e) = relay.reconcilie(filter, my_items, timeout).await { + tracing::error!("Failed to get reconcilie with {url}: {e}"); + } + }); + handles.push(handle); + } + + for handle in handles.into_iter().flatten() { + handle.join().await?; + } + + Ok(()) + } }