From 83cdf7a31aa171963bb0bbf140826a370b763364 Mon Sep 17 00:00:00 2001 From: Yuki Kishimoto Date: Fri, 6 Dec 2024 16:16:02 +0100 Subject: [PATCH] pool: very small adj. Signed-off-by: Yuki Kishimoto --- crates/nostr-relay-pool/src/relay/inner.rs | 42 +++++++++------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index 4f6d1dbf3..6f3c30acd 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -39,10 +39,6 @@ use crate::pool::RelayPoolNotification; use crate::relay::status::AtomicRelayStatus; use crate::shared::SharedState; -struct NostrMessage { - msgs: Vec, -} - #[derive(Debug, Clone, Copy)] enum RelayServiceEvent { /// None @@ -53,7 +49,10 @@ enum RelayServiceEvent { #[derive(Debug)] struct RelayChannels { - nostr: (Sender, Mutex>), + nostr: ( + Sender>, + Mutex>>, + ), ping: (watch::Sender, Mutex>), service: ( watch::Sender, @@ -63,7 +62,7 @@ struct RelayChannels { impl RelayChannels { pub fn new() -> Self { - let (tx_nostr, rx_nostr) = mpsc::channel::(1024); + let (tx_nostr, rx_nostr) = mpsc::channel::>(1024); let (tx_ping, rx_ping) = watch::channel::(0); let (tx_service, rx_service) = watch::channel::(RelayServiceEvent::None); @@ -74,16 +73,16 @@ impl RelayChannels { } } - pub fn send_nostr_msg(&self, msg: NostrMessage) -> Result<(), Error> { + pub fn send_client_msgs(&self, msgs: Vec) -> Result<(), Error> { self.nostr .0 - .try_send(msg) + .try_send(msgs) .map_err(|_| Error::CantSendChannelMessage { channel: String::from("nostr"), }) } - pub async fn rx_nostr(&self) -> MutexGuard<'_, Receiver> { + pub async fn rx_nostr(&self) -> MutexGuard<'_, Receiver>> { self.nostr.1.lock().await } @@ -673,8 +672,8 @@ impl InnerRelay { loop { tokio::select! { // Nostr channel receiver - Some(NostrMessage { msgs }) = rx_nostr.recv() => { - // Serialize messages to JSON and compose WebSocket text message + Some(msgs) = rx_nostr.recv() => { + // Serialize messages to JSON and compose WebSocket text messages let msgs: Vec = msgs .into_iter() .map(|msg| WsMessage::Text(msg.as_json())) @@ -684,23 +683,19 @@ impl InnerRelay { let size: usize = msgs.iter().map(|msg| msg.len()).sum(); let len: usize = msgs.len(); - // Compose log msg without prefix ("Sending" or "Sent") - let partial_log_msg: String = if len == 1 { + // Log + if len == 1 { let json = &msgs[0]; // SAFETY: len checked above (len == 1) - format!("'{json}' to '{}'", self.url) + tracing::debug!("Sending '{json}' to '{}' (size: {size} bytes)", self.url); } else { - format!("{len} messages to '{}'", self.url) + tracing::debug!("Sending {len} messages to '{}' (size: {size} bytes)", self.url); }; - tracing::trace!("Sending {partial_log_msg} (size: {size} bytes)"); - // Send WebSocket messages send_ws_msgs(&mut ws_tx, msgs).await?; // Increase sent bytes self.stats.add_bytes_sent(size); - - tracing::debug!("Sent {partial_log_msg} (size: {size} bytes)"); } // Ping channel receiver Ok(()) = rx_ping.changed() => { @@ -713,7 +708,7 @@ impl InnerRelay { let msg = WsMessage::Ping(nonce.to_be_bytes().to_vec()); // Send WebSocket message - send_ws_msgs(&mut ws_tx, [msg]).await?; + send_ws_msgs(&mut ws_tx, vec![msg]).await?; // Set ping as just sent ping.just_sent().await; @@ -1126,7 +1121,7 @@ impl InnerRelay { } // Send message - self.channels.send_nostr_msg(NostrMessage { msgs }) + self.channels.send_client_msgs(msgs) } #[inline] @@ -2299,10 +2294,7 @@ impl InnerRelay { } /// Send WebSocket messages with timeout set to [WEBSOCKET_TX_TIMEOUT]. -async fn send_ws_msgs(tx: &mut Sink, msgs: I) -> Result<(), Error> -where - I: IntoIterator, -{ +async fn send_ws_msgs(tx: &mut Sink, msgs: Vec) -> Result<(), Error> { let mut stream = futures_util::stream::iter(msgs.into_iter().map(Ok)); match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.send_all(&mut stream)).await { Some(res) => res.map_err(Error::websocket),