Skip to content

Commit

Permalink
sdk: remove RelayPoolMessage::BatchEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
yukibtc committed Oct 20, 2023
1 parent a1b0c05 commit e93e58e
Showing 1 changed file with 25 additions and 35 deletions.
60 changes: 25 additions & 35 deletions crates/nostr-sdk/src/relay/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use nostr::{
RawRelayMessage, RelayMessage, SubscriptionId, Timestamp, Url,
};
use nostr_sdk_db::memory::MemoryDatabase;
use nostr_sdk_db::DynNostrDatabase;
use nostr_sdk_db::{DatabaseError, DynNostrDatabase};
use thiserror::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{broadcast, Mutex, RwLock};
Expand Down Expand Up @@ -47,6 +47,9 @@ pub enum Error {
/// Message handler error
#[error(transparent)]
MessageHandler(#[from] MessageHandleError),
/// Database error
#[error(transparent)]
Database(#[from] DatabaseError),
/// Thread error
#[error(transparent)]
Thread(#[from] thread::Error),
Expand Down Expand Up @@ -80,8 +83,6 @@ pub enum RelayPoolMessage {
/// Relay message
msg: RawRelayMessage,
},
/// Events sent
BatchEvent(Vec<EventId>),
/// Relay status changed
RelayStatus {
/// Relay url
Expand Down Expand Up @@ -213,6 +214,13 @@ impl RelayPoolTask {
RelayMessage::Notice { message } => {
tracing::warn!("Notice from {relay_url}: {message}")
}
RelayMessage::Ok {
event_id,
status,
message,
} => {
tracing::debug!("Received OK from {relay_url} for event {event_id}: status={status}, message={message}");
}
_ => (),
}
}
Expand All @@ -221,11 +229,6 @@ impl RelayPoolTask {
),
}
}
RelayPoolMessage::BatchEvent(ids) => {
if let Err(e) = this.database.event_ids_seen(ids, None).await {
tracing::error!("Impossible to set events as seen: {e}");
}
}
RelayPoolMessage::RelayStatus { url, status } => {
let _ = this
.notification_sender
Expand Down Expand Up @@ -501,16 +504,6 @@ impl RelayPool {
Ok(())
}

async fn set_events_as_sent(&self, ids: Vec<EventId>) {
if let Err(e) = self
.pool_task_sender
.send(RelayPoolMessage::BatchEvent(ids))
.await
{
tracing::error!("{e}");
};
}

/// Send client message
pub async fn send_msg(&self, msg: ClientMessage, wait: Option<Duration>) -> Result<(), Error> {
let relays = self.relays().await;
Expand All @@ -520,7 +513,7 @@ impl RelayPool {
}

if let ClientMessage::Event(event) = &msg {
self.set_events_as_sent(vec![event.id]).await;
self.database.save_event(event).await?;
}

let sent_to_at_least_one_relay: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -564,17 +557,12 @@ impl RelayPool {
return Err(Error::NoRelays);
}

let ids: Vec<EventId> = msgs
.iter()
.filter_map(|msg| {
if let ClientMessage::Event(event) = msg {
Some(event.id)
} else {
None
}
})
.collect();
self.set_events_as_sent(ids).await;
// Save events into database
for msg in msgs.iter() {
if let ClientMessage::Event(event) = msg {
self.database.save_event(event).await?;
}
}

let sent_to_at_least_one_relay: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
Expand Down Expand Up @@ -620,7 +608,7 @@ impl RelayPool {
let url: Url = url.try_into_url()?;

if let ClientMessage::Event(event) = &msg {
self.set_events_as_sent(vec![event.id]).await;
self.database.save_event(event).await?;
}

let relays = self.relays().await;
Expand All @@ -640,7 +628,7 @@ impl RelayPool {
return Err(Error::NoRelays);
}

self.set_events_as_sent(vec![event.id]).await;
self.database.save_event(&event).await?;

let sent_to_at_least_one_relay: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
Expand Down Expand Up @@ -685,8 +673,10 @@ impl RelayPool {
return Err(Error::NoRelays);
}

let ids: Vec<EventId> = events.iter().map(|e| e.id).collect();
self.set_events_as_sent(ids).await;
// Save events into database
for event in events.iter() {
self.database.save_event(event).await?;
}

let sent_to_at_least_one_relay: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
Expand Down Expand Up @@ -730,7 +720,7 @@ impl RelayPool {
Error: From<<U as TryIntoUrl>::Err>,
{
let url: Url = url.try_into_url()?;
self.set_events_as_sent(vec![event.id]).await;
self.database.save_event(&event).await?;
let relays = self.relays().await;
if let Some(relay) = relays.get(&url) {
Ok(relay.send_event(event, opts).await?)
Expand Down

0 comments on commit e93e58e

Please sign in to comment.