Skip to content

Commit

Permalink
Merge pull request #95 from helius-labs/feat/better-ws-health-checks
Browse files Browse the repository at this point in the history
feat(ws): Better Health Checks
  • Loading branch information
0xIchigo authored Nov 28, 2024
2 parents 385cc15 + 6673763 commit 6a2999a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 8 deletions.
3 changes: 2 additions & 1 deletion examples/enhanced_websocket_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ async fn main() -> Result<()> {
let api_key: &str = "your_api_key";
let cluster: Cluster = Cluster::MainnetBeta;

let helius: Helius = Helius::new_with_ws(api_key, cluster).await.unwrap();
// Uses custom ping-pong timeouts to ping every 15s and timeout after 45s of no pong
let helius: Helius = Helius::new_with_ws_with_timeouts(api_key, cluster, Some(15), Some(45)).await?;

let key: pubkey::Pubkey = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH");

Expand Down
27 changes: 25 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,23 @@ impl Helius {
/// # Arguments
/// * `api_key` - The API key required for authenticating requests made
/// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment
/// * `ping_interval_secs` - Optional duration in seconds between ping messages (defaults to 10 seconds if None)
/// * `pong_timeout_secs` - Optional duration in seconds to wait for a pong response before considering the connection dead
///
/// # Returns
/// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client
pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result<Self> {
pub async fn new_with_ws_with_timeouts(
api_key: &str,
cluster: Cluster,
ping_interval_secs: Option<u64>,
pong_timeout_secs: Option<u64>,
) -> Result<Self> {
let config: Arc<Config> = Arc::new(Config::new(api_key, cluster)?);
let client: Client = Client::builder().build().map_err(HeliusError::ReqwestError)?;
let rpc_client: Arc<RpcClient> = Arc::new(RpcClient::new(Arc::new(client.clone()), config.clone())?);
let wss: String = format!("{}{}", ENHANCED_WEBSOCKET_URL, api_key);
let ws_client: Arc<EnhancedWebsocket> = Arc::new(EnhancedWebsocket::new(&wss).await?);
let ws_client: Arc<EnhancedWebsocket> =
Arc::new(EnhancedWebsocket::new(&wss, ping_interval_secs, pong_timeout_secs).await?);

Ok(Helius {
config,
Expand All @@ -113,6 +122,20 @@ impl Helius {
})
}

/// Creates a new instance of `Helius` with an enhanced websocket client using default timeout settings.
/// This is a convenience method that uses default values of 10 seconds for ping interval and 3 failed pings
/// before considering the connection dead.
///
/// # Arguments
/// * `api_key` - The API key required for authenticating requests made
/// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment
///
/// # Returns
/// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client
pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result<Self> {
Self::new_with_ws_with_timeouts(api_key, cluster, None, None).await
}

/// Provides a thread-safe way to access RPC functionalities
///
/// # Returns
Expand Down
48 changes: 43 additions & 5 deletions src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use tokio_tungstenite::{
};

pub const ENHANCED_WEBSOCKET_URL: &str = "wss://atlas-mainnet.helius-rpc.com/?api-key=";
const DEFAULT_PING_DURATION_SECONDS: u64 = 10;
pub const DEFAULT_PING_DURATION_SECONDS: u64 = 10;
pub const DEFAULT_MAX_FAILED_PINGS: usize = 3;

// pub type Result<T = ()> = Result<T, HeliusError>;

Expand All @@ -40,7 +41,7 @@ type SubscribeRequestMsg = (String, Value, oneshot::Sender<SubscribeResponseMsg>
type SubscribeResult<'a, T> = Result<(BoxStream<'a, T>, UnsubscribeFn)>;
type RequestMsg = (String, Value, oneshot::Sender<Result<Value>>);

/// A client for subscribing to transaction or account updates from a Helius (geyser) enhanced websocket server.
/// A client for subscribing to transaction or account updates from a Helius (Geyser) enhanced websocket server.
///
/// Forked from Solana's [`PubsubClient`].
pub struct EnhancedWebsocket {
Expand All @@ -52,13 +53,26 @@ pub struct EnhancedWebsocket {

impl EnhancedWebsocket {
/// Expects enhanced websocket endpoint: wss://atlas-mainnet.helius-rpc.com?api-key=<API_KEY>
pub async fn new(url: &str) -> Result<Self> {
pub async fn new(url: &str, ping_interval_secs: Option<u64>, pong_timeout_secs: Option<u64>) -> Result<Self> {
let (ws, _response) = connect_async(url).await.map_err(HeliusError::Tungstenite)?;

let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
let (_request_sender, request_receiver) = mpsc::unbounded_channel();
let (shutdown_sender, shutdown_receiver) = oneshot::channel();

let ping_interval = ping_interval_secs
.filter(|interval: &u64| *interval != 0)
.unwrap_or(DEFAULT_PING_DURATION_SECONDS);
let max_failed_pings = pong_timeout_secs
.map(|timeout| (timeout as f64 / ping_interval as f64).ceil() as usize)
.map_or(DEFAULT_MAX_FAILED_PINGS, |max_failed_pings| {
if max_failed_pings != 0 {
max_failed_pings
} else {
usize::MAX
}
});

Ok(Self {
subscribe_sender,
shutdown_sender,
Expand All @@ -68,7 +82,8 @@ impl EnhancedWebsocket {
subscribe_receiver,
request_receiver,
shutdown_receiver,
DEFAULT_PING_DURATION_SECONDS,
ping_interval,
max_failed_pings,
)),
})
}
Expand Down Expand Up @@ -189,8 +204,10 @@ impl EnhancedWebsocket {
mut request_receiver: mpsc::UnboundedReceiver<RequestMsg>,
mut shutdown_receiver: oneshot::Receiver<()>,
ping_duration_seconds: u64,
max_failed_pings: usize,
) -> Result<()> {
let mut request_id: u64 = 0;
let mut unmatched_pings: usize = 0;

let mut requests_subscribe = BTreeMap::new();
let mut requests_unsubscribe = BTreeMap::<u64, oneshot::Sender<()>>::new();
Expand All @@ -209,7 +226,23 @@ impl EnhancedWebsocket {
},
// Send `Message::Ping` each 10s if no any other communication
() = sleep(Duration::from_secs(ping_duration_seconds)) => {
// Check if we've exceeded our failed ping threshold
if unmatched_pings >= max_failed_pings {
let frame = CloseFrame {
code: CloseCode::Abnormal,
reason: format!("No pong received after {} pings", max_failed_pings).into()
};

ws.send(Message::Close(Some(frame))).await?;
ws.flush().await?;

return Err(HeliusError::WebsocketClosed(
format!("Connection timeout: no pong received after {} pings", max_failed_pings)
));
}

ws.send(Message::Ping(Vec::new())).await?;
unmatched_pings += 1;
},
// Read message for subscribe
Some((operation, params, response_sender)) = subscribe_receiver.recv() => {
Expand Down Expand Up @@ -242,6 +275,9 @@ impl EnhancedWebsocket {
None => break,
};

// Reset unmatched_pings on any received frame
unmatched_pings = 0;

// Get text from the message
let text = match msg {
Message::Text(text) => text,
Expand All @@ -250,7 +286,9 @@ impl EnhancedWebsocket {
ws.send(Message::Pong(data)).await?;
continue
},
Message::Pong(_data) => continue,
Message::Pong(_data) => {
continue;
},
Message::Close(_frame) => break,
Message::Frame(_frame) => continue,
};
Expand Down

0 comments on commit 6a2999a

Please sign in to comment.