Skip to content

Commit

Permalink
pool: error propagation for InnerRelay::receiver_message_handler
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Dec 11, 2024
1 parent 9df6639 commit fe03c3d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 44 deletions.
14 changes: 14 additions & 0 deletions crates/nostr-relay-pool/src/relay/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ pub enum Error {
Timeout,
/// Not replied to ping
NotRepliedToPing,
/// Can't parse pong
CantParsePong,
/// Pong not match
PongNotMatch {
/// Expected nonce
expected: u64,
/// Received nonce
received: u64,
},
/// Message response timeout
CantSendChannelMessage {
/// Name of channel
Expand Down Expand Up @@ -126,6 +135,11 @@ impl fmt::Display for Error {
Self::SetPoolNotificationSender(e) => write!(f, "{e}"),
Self::Timeout => write!(f, "timeout"),
Self::NotRepliedToPing => write!(f, "not replied to ping"),
Self::CantParsePong => write!(f, "can't parse pong"),
Self::PongNotMatch { expected, received } => write!(
f,
"pong not match: expected={expected}, received={received}"
),
Self::CantSendChannelMessage { channel } => {
write!(f, "can't send message to the '{channel}' channel")
}
Expand Down
87 changes: 43 additions & 44 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,9 @@ impl InnerRelay {

// Wait that one of the futures terminate/complete
tokio::select! {
_ = self.receiver_message_handler(ws_rx, &ping) => {
tracing::trace!(url = %self.url, "Relay received exited.");
res = self.receiver_message_handler(ws_rx, &ping) => match res {
Ok(()) => tracing::trace!(url = %self.url, "Relay received exited."),
Err(e) => tracing::error!(url = %self.url, error = %e, "Relay receiver exited with error.")
},
res = self.sender_message_handler(ws_tx, &ping) => match res {
Ok(()) => tracing::trace!(url = %self.url, "Relay sender exited."),
Expand Down Expand Up @@ -718,60 +719,58 @@ impl InnerRelay {
close_ws(&mut ws_tx).await
}

async fn receiver_message_handler(&self, mut ws_rx: Stream, ping: &PingTracker) {
async fn receiver_message_handler(
&self,
mut ws_rx: Stream,
ping: &PingTracker,
) -> Result<(), Error> {
#[cfg(target_arch = "wasm32")]
let _ping = ping;

while let Some(msg) = ws_rx.next().await {
if let Ok(msg) = msg {
match msg {
#[cfg(not(target_arch = "wasm32"))]
WsMessage::Pong(bytes) => {
if self.flags.has_ping() {
match bytes.try_into() {
Ok(nonce) => {
// Nonce from big-endian bytes
let nonce: u64 = u64::from_be_bytes(nonce);

// Get last nonce
let last_nonce: u64 = ping.last_nonce();

// Check if last nonce not match the current one
if last_nonce != nonce {
tracing::error!(url = %self.url, received = %nonce, expected = %last_nonce, "Pong not match.");
break;
}

tracing::trace!(
url = %self.url, nonce = %nonce,
"Pong match"
);
match msg.map_err(Error::websocket)? {
#[cfg(not(target_arch = "wasm32"))]
WsMessage::Pong(bytes) => {
if self.flags.has_ping() {
match bytes.try_into() {
Ok(nonce) => {
// Nonce from big-endian bytes
let nonce: u64 = u64::from_be_bytes(nonce);

// Get last nonce
let last_nonce: u64 = ping.last_nonce();

// Check if last nonce not matches the received one
if last_nonce != nonce {
return Err(Error::PongNotMatch {
expected: last_nonce,
received: nonce,
});
}

// Set ping as replied
ping.set_replied(true);
// Set ping as replied
ping.set_replied(true);

// Save latency
let sent_at = ping.sent_at().await;
self.stats.save_latency(sent_at.elapsed());
}
Err(e) => {
tracing::error!(url = %self.url, bytes = format!("{e:?}"), "Can't parse pong");
break;
}
// Save latency
let sent_at = ping.sent_at().await;
self.stats.save_latency(sent_at.elapsed());
}
Err(..) => {
return Err(Error::CantParsePong);
}
}
}
WsMessage::Text(json) => {
self.handle_relay_message(&json).await;
}
WsMessage::Binary(_) => {
tracing::warn!(url = %self.url, "Binary messages aren't supported.");
}
#[cfg(not(target_arch = "wasm32"))]
_ => {}
}
WsMessage::Text(json) => self.handle_relay_message(&json).await,
WsMessage::Binary(_) => {
tracing::warn!(url = %self.url, "Binary messages aren't supported.");
}
#[cfg(not(target_arch = "wasm32"))]
_ => {}
}
}

Ok(())
}

async fn ping_handler(&self, ping: &PingTracker) -> Result<(), Error> {
Expand Down

0 comments on commit fe03c3d

Please sign in to comment.