Skip to content

Commit

Permalink
pool: cleanup relay Error variants
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Kishimoto <[email protected]>
  • Loading branch information
yukibtc committed Dec 6, 2024
1 parent 1304b45 commit 1b507e0
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* lmdb: use `async-utility` to spawn blocking tasks ([Yuki Kishimoto])
* ndb: bump `nostr-ndb` to 0.4 ([Yuki Kishimoto])
* pool: add `PingTracker` and improve relay ping management ([Yuki Kishimoto])
* pool: cleanup relay `Error` variants ([Yuki Kishimoto])

### Added

Expand Down
29 changes: 8 additions & 21 deletions crates/nostr-relay-pool/src/relay/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
use std::fmt;
use std::time::Duration;

use nostr::event;
use nostr::event::builder;
use nostr::message::relay::NegentropyErrorCode;
use nostr::message::MessageHandleError;
use nostr::{event, Kind};
use nostr_database::DatabaseError;
use tokio::sync::{broadcast, SetError};

Expand All @@ -18,6 +18,8 @@ use crate::RelayPoolNotification;
/// Relay error
#[derive(Debug)]
pub enum Error {
/// WebSocket error
WebSocket(Box<dyn std::error::Error + Send + Sync>),
/// Shared state error
SharedState(SharedStateError),
/// MessageHandle error
Expand All @@ -36,8 +38,6 @@ pub enum Error {
Database(DatabaseError),
/// OnceCell error
SetPoolNotificationSender(SetError<broadcast::Sender<RelayPoolNotification>>),
/// WebSocket timeout
WebSocketTimeout,
/// Generic timeout
Timeout,
/// Not replied to ping
Expand All @@ -52,7 +52,7 @@ pub enum Error {
/// Relay not connected
NotConnected,
/// Received shutdown
Shutdown,
ReceivedShutdown,
/// Relay message
RelayMessage(String),
/// Batch messages empty
Expand All @@ -66,7 +66,7 @@ pub enum Error {
/// Reconciliation error
NegentropyReconciliation(NegentropyErrorCode),
/// Negentropy not supported
NegentropyMaybeNotSupported,
NegentropyNotSupported,
/// Unknown negentropy error
UnknownNegentropyError,
/// Relay message too large
Expand Down Expand Up @@ -97,17 +97,8 @@ pub enum Error {
/// Min. difficulty
min: u8,
},
/// Unexpected kind
UnexpectedKind {
/// Expected kind
expected: Kind,
/// Found kind
found: Kind,
},
/// Notification Handler error
Handler(String),
/// WebSocket error
WebSocket(Box<dyn std::error::Error + Send + Sync>),
/// Max latency exceeded
MaximumLatencyExceeded {
/// Max
Expand All @@ -126,6 +117,7 @@ impl std::error::Error for Error {}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::WebSocket(e) => write!(f, "{e}"),
Self::SharedState(e) => write!(f, "{e}"),
Self::MessageHandle(e) => write!(f, "{e}"),
Self::Event(e) => write!(f, "{e}"),
Expand All @@ -135,22 +127,21 @@ impl fmt::Display for Error {
Self::NegentropyDeprecated(e) => write!(f, "{e}"),
Self::Database(e) => write!(f, "{e}"),
Self::SetPoolNotificationSender(e) => write!(f, "{e}"),
Self::WebSocketTimeout => write!(f, "WebSocket timeout"),
Self::Timeout => write!(f, "timeout"),
Self::NotRepliedToPing => write!(f, "not replied to ping"),
Self::CantSendChannelMessage { channel } => {
write!(f, "can't send message to the '{channel}' channel")
}
Self::NotReady => write!(f, "relay is initialized but not ready"),
Self::NotConnected => write!(f, "relay not connected"),
Self::Shutdown => write!(f, "received shutdown"),
Self::ReceivedShutdown => write!(f, "received shutdown"),
Self::RelayMessage(message) => write!(f, "{message}"),
Self::BatchMessagesEmpty => write!(f, "can't batch empty list of messages"),
Self::ReadDisabled => write!(f, "read actions are disabled"),
Self::WriteDisabled => write!(f, "write actions are disabled"),
Self::FiltersEmpty => write!(f, "filters empty"),
Self::NegentropyReconciliation(e) => write!(f, "{e}"),
Self::NegentropyMaybeNotSupported => write!(f, "negentropy (maybe) not supported"),
Self::NegentropyNotSupported => write!(f, "negentropy not supported"),
Self::UnknownNegentropyError => write!(f, "unknown negentropy error"),
Self::RelayMessageTooLarge { size, max_size } => write!(
f,
Expand All @@ -166,11 +157,7 @@ impl fmt::Display for Error {
),
Self::EventExpired => write!(f, "event expired"),
Self::PowDifficultyTooLow { min } => write!(f, "POW difficulty too low (min. {min})"),
Self::UnexpectedKind { expected, found } => {
write!(f, "Unexpected kind: expected={expected}, found={found}")
}
Self::Handler(e) => write!(f, "{e}"),
Self::WebSocket(e) => write!(f, "{e}"),
Self::MaximumLatencyExceeded { max, current } => write!(
f,
"Maximum latency exceeded: max={}ms, current={}ms",
Expand Down
16 changes: 8 additions & 8 deletions crates/nostr-relay-pool/src/relay/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ impl InnerRelay {
return Err(Error::NotConnected);
}
}
RelayNotification::Shutdown => return Err(Error::Shutdown),
RelayNotification::Shutdown => return Err(Error::ReceivedShutdown),
_ => (),
}
}
Expand Down Expand Up @@ -1675,7 +1675,7 @@ impl InnerRelay {
{
Ok(..) => {}
Err(e) => match e {
Error::NegentropyMaybeNotSupported
Error::NegentropyNotSupported
| Error::Negentropy(negentropy::Error::UnsupportedProtocolVersion) => {
tracing::warn!("Negentropy protocol '{}' (maybe) not supported, trying the deprecated one.", negentropy::PROTOCOL_VERSION);
self.sync_deprecated(filter, items, opts, &mut output)
Expand Down Expand Up @@ -1750,7 +1750,7 @@ impl InnerRelay {
|| message.contains("negentropy")
|| message.contains("NEG-"))
{
return Err(Error::NegentropyMaybeNotSupported);
return Err(Error::NegentropyNotSupported);
} else if message.contains("bad msg: invalid message")
&& message.contains("NEG-OPEN")
{
Expand Down Expand Up @@ -1978,7 +1978,7 @@ impl InnerRelay {
}
}
RelayNotification::Shutdown => {
return Err(Error::Shutdown);
return Err(Error::ReceivedShutdown);
}
_ => (),
};
Expand Down Expand Up @@ -2048,7 +2048,7 @@ impl InnerRelay {
|| message.contains("negentropy")
|| message.contains("NEG-"))
{
return Err(Error::NegentropyMaybeNotSupported);
return Err(Error::NegentropyNotSupported);
} else if message.contains("bad msg: invalid message")
&& message.contains("NEG-OPEN")
{
Expand Down Expand Up @@ -2278,7 +2278,7 @@ impl InnerRelay {
return Err(Error::NotConnected);
}
}
RelayNotification::Shutdown => return Err(Error::Shutdown),
RelayNotification::Shutdown => return Err(Error::ReceivedShutdown),
_ => (),
};

Expand Down Expand Up @@ -2306,14 +2306,14 @@ where
let mut stream = futures_util::stream::iter(msgs.into_iter().map(Ok));
match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.send_all(&mut stream)).await {
Some(res) => res.map_err(Error::websocket),
None => Err(Error::WebSocketTimeout),
None => Err(Error::Timeout),
}
}

/// Send WebSocket messages with timeout set to [WEBSOCKET_TX_TIMEOUT].
async fn close_ws(tx: &mut Sink) -> Result<(), Error> {
match time::timeout(Some(WEBSOCKET_TX_TIMEOUT), tx.close()).await {
Some(res) => res.map_err(Error::websocket),
None => Err(Error::WebSocketTimeout),
None => Err(Error::Timeout),
}
}

0 comments on commit 1b507e0

Please sign in to comment.