Skip to content

Commit

Permalink
pool: very small adj.
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Dec 6, 2024
1 parent 1b507e0 commit 83cdf7a
Showing 1 changed file with 17 additions and 25 deletions.
42 changes: 17 additions & 25 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ use crate::pool::RelayPoolNotification;
use crate::relay::status::AtomicRelayStatus;
use crate::shared::SharedState;

struct NostrMessage {
msgs: Vec<ClientMessage>,
}

#[derive(Debug, Clone, Copy)]
enum RelayServiceEvent {
/// None
Expand All @@ -53,7 +49,10 @@ enum RelayServiceEvent {

#[derive(Debug)]
struct RelayChannels {
nostr: (Sender<NostrMessage>, Mutex<Receiver<NostrMessage>>),
nostr: (
Sender<Vec<ClientMessage>>,
Mutex<Receiver<Vec<ClientMessage>>>,
),
ping: (watch::Sender<u64>, Mutex<watch::Receiver<u64>>),
service: (
watch::Sender<RelayServiceEvent>,
Expand All @@ -63,7 +62,7 @@ struct RelayChannels {

impl RelayChannels {
pub fn new() -> Self {
let (tx_nostr, rx_nostr) = mpsc::channel::<NostrMessage>(1024);
let (tx_nostr, rx_nostr) = mpsc::channel::<Vec<ClientMessage>>(1024);
let (tx_ping, rx_ping) = watch::channel::<u64>(0);
let (tx_service, rx_service) = watch::channel::<RelayServiceEvent>(RelayServiceEvent::None);

Expand All @@ -74,16 +73,16 @@ impl RelayChannels {
}
}

pub fn send_nostr_msg(&self, msg: NostrMessage) -> Result<(), Error> {
pub fn send_client_msgs(&self, msgs: Vec<ClientMessage>) -> Result<(), Error> {
self.nostr
.0
.try_send(msg)
.try_send(msgs)
.map_err(|_| Error::CantSendChannelMessage {
channel: String::from("nostr"),
})
}

pub async fn rx_nostr(&self) -> MutexGuard<'_, Receiver<NostrMessage>> {
pub async fn rx_nostr(&self) -> MutexGuard<'_, Receiver<Vec<ClientMessage>>> {
self.nostr.1.lock().await
}

Expand Down Expand Up @@ -673,8 +672,8 @@ impl InnerRelay {
loop {
tokio::select! {
// Nostr channel receiver
Some(NostrMessage { msgs }) = rx_nostr.recv() => {
// Serialize messages to JSON and compose WebSocket text message
Some(msgs) = rx_nostr.recv() => {
// Serialize messages to JSON and compose WebSocket text messages
let msgs: Vec<WsMessage> = msgs
.into_iter()
.map(|msg| WsMessage::Text(msg.as_json()))
Expand All @@ -684,23 +683,19 @@ impl InnerRelay {
let size: usize = msgs.iter().map(|msg| msg.len()).sum();
let len: usize = msgs.len();

// Compose log msg without prefix ("Sending" or "Sent")
let partial_log_msg: String = if len == 1 {
// Log
if len == 1 {
let json = &msgs[0]; // SAFETY: len checked above (len == 1)
format!("'{json}' to '{}'", self.url)
tracing::debug!("Sending '{json}' to '{}' (size: {size} bytes)", self.url);
} else {
format!("{len} messages to '{}'", self.url)
tracing::debug!("Sending {len} messages to '{}' (size: {size} bytes)", self.url);
};

tracing::trace!("Sending {partial_log_msg} (size: {size} bytes)");

// Send WebSocket messages
send_ws_msgs(&mut ws_tx, msgs).await?;

// Increase sent bytes
self.stats.add_bytes_sent(size);

tracing::debug!("Sent {partial_log_msg} (size: {size} bytes)");
}
// Ping channel receiver
Ok(()) = rx_ping.changed() => {
Expand All @@ -713,7 +708,7 @@ impl InnerRelay {
let msg = WsMessage::Ping(nonce.to_be_bytes().to_vec());

// Send WebSocket message
send_ws_msgs(&mut ws_tx, [msg]).await?;
send_ws_msgs(&mut ws_tx, vec![msg]).await?;

// Set ping as just sent
ping.just_sent().await;
Expand Down Expand Up @@ -1126,7 +1121,7 @@ impl InnerRelay {
}

// Send message
self.channels.send_nostr_msg(NostrMessage { msgs })
self.channels.send_client_msgs(msgs)
}

#[inline]
Expand Down Expand Up @@ -2299,10 +2294,7 @@ impl InnerRelay {
}

/// Send WebSocket messages with timeout set to [WEBSOCKET_TX_TIMEOUT].
async fn send_ws_msgs<I>(tx: &mut Sink, msgs: I) -> Result<(), Error>
where
I: IntoIterator<Item = WsMessage>,
{
async fn send_ws_msgs(tx: &mut Sink, msgs: Vec<WsMessage>) -> Result<(), Error> {
let mut stream = futures_util::stream::iter(msgs.into_iter().map(Ok));
match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.send_all(&mut stream)).await {
Some(res) => res.map_err(Error::websocket),
Expand Down

0 comments on commit 83cdf7a

Please sign in to comment.