diff --git a/examples/enhanced_websocket_transactions.rs b/examples/enhanced_websocket_transactions.rs index 5115c62..da287ab 100644 --- a/examples/enhanced_websocket_transactions.rs +++ b/examples/enhanced_websocket_transactions.rs @@ -9,7 +9,8 @@ async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; - let helius: Helius = Helius::new_with_ws(api_key, cluster).await.unwrap(); + // Uses custom ping-pong timeouts to ping every 15s and timeout after 45s of no pong + let helius: Helius = Helius::new_with_ws_with_timeouts(api_key, cluster, Some(15), Some(45)).await?; let key: pubkey::Pubkey = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); diff --git a/src/client.rs b/src/client.rs index 8d4ce9b..9aa567e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -95,14 +95,23 @@ impl Helius { /// # Arguments /// * `api_key` - The API key required for authenticating requests made /// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment + /// * `ping_interval_secs` - Optional duration in seconds between ping messages (defaults to 10 seconds if None) + /// * `pong_timeout_secs` - Optional duration in seconds to wait for a pong response before considering the connection dead + /// /// # Returns /// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client - pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result { + pub async fn new_with_ws_with_timeouts( + api_key: &str, + cluster: Cluster, + ping_interval_secs: Option, + pong_timeout_secs: Option, + ) -> Result { let config: Arc = Arc::new(Config::new(api_key, cluster)?); let client: Client = Client::builder().build().map_err(HeliusError::ReqwestError)?; let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), config.clone())?); let wss: String = format!("{}{}", ENHANCED_WEBSOCKET_URL, api_key); - let ws_client: Arc = Arc::new(EnhancedWebsocket::new(&wss).await?); + let ws_client: Arc = + Arc::new(EnhancedWebsocket::new(&wss, ping_interval_secs, pong_timeout_secs).await?); Ok(Helius { config, @@ -113,6 +122,20 @@ impl Helius { }) } + /// Creates a new instance of `Helius` with an enhanced websocket client using default timeout settings. + /// This is a convenience method that uses default values of 10 seconds for ping interval and 3 failed pings + /// before considering the connection dead. + /// + /// # Arguments + /// * `api_key` - The API key required for authenticating requests made + /// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment + /// + /// # Returns + /// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client + pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result { + Self::new_with_ws_with_timeouts(api_key, cluster, None, None).await + } + /// Provides a thread-safe way to access RPC functionalities /// /// # Returns diff --git a/src/websocket.rs b/src/websocket.rs index fc6c43c..13d6284 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -30,7 +30,8 @@ use tokio_tungstenite::{ }; pub const ENHANCED_WEBSOCKET_URL: &str = "wss://atlas-mainnet.helius-rpc.com/?api-key="; -const DEFAULT_PING_DURATION_SECONDS: u64 = 10; +pub const DEFAULT_PING_DURATION_SECONDS: u64 = 10; +pub const DEFAULT_MAX_FAILED_PINGS: usize = 3; // pub type Result = Result; @@ -40,7 +41,7 @@ type SubscribeRequestMsg = (String, Value, oneshot::Sender type SubscribeResult<'a, T> = Result<(BoxStream<'a, T>, UnsubscribeFn)>; type RequestMsg = (String, Value, oneshot::Sender>); -/// A client for subscribing to transaction or account updates from a Helius (geyser) enhanced websocket server. +/// A client for subscribing to transaction or account updates from a Helius (Geyser) enhanced websocket server. /// /// Forked from Solana's [`PubsubClient`]. pub struct EnhancedWebsocket { @@ -52,13 +53,26 @@ pub struct EnhancedWebsocket { impl EnhancedWebsocket { /// Expects enhanced websocket endpoint: wss://atlas-mainnet.helius-rpc.com?api-key= - pub async fn new(url: &str) -> Result { + pub async fn new(url: &str, ping_interval_secs: Option, pong_timeout_secs: Option) -> Result { let (ws, _response) = connect_async(url).await.map_err(HeliusError::Tungstenite)?; let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel(); let (_request_sender, request_receiver) = mpsc::unbounded_channel(); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let ping_interval = ping_interval_secs + .filter(|interval: &u64| *interval != 0) + .unwrap_or(DEFAULT_PING_DURATION_SECONDS); + let max_failed_pings = pong_timeout_secs + .map(|timeout| (timeout as f64 / ping_interval as f64).ceil() as usize) + .map_or(DEFAULT_MAX_FAILED_PINGS, |max_failed_pings| { + if max_failed_pings != 0 { + max_failed_pings + } else { + usize::MAX + } + }); + Ok(Self { subscribe_sender, shutdown_sender, @@ -68,7 +82,8 @@ impl EnhancedWebsocket { subscribe_receiver, request_receiver, shutdown_receiver, - DEFAULT_PING_DURATION_SECONDS, + ping_interval, + max_failed_pings, )), }) } @@ -189,8 +204,10 @@ impl EnhancedWebsocket { mut request_receiver: mpsc::UnboundedReceiver, mut shutdown_receiver: oneshot::Receiver<()>, ping_duration_seconds: u64, + max_failed_pings: usize, ) -> Result<()> { let mut request_id: u64 = 0; + let mut unmatched_pings: usize = 0; let mut requests_subscribe = BTreeMap::new(); let mut requests_unsubscribe = BTreeMap::>::new(); @@ -209,7 +226,23 @@ impl EnhancedWebsocket { }, // Send `Message::Ping` each 10s if no any other communication () = sleep(Duration::from_secs(ping_duration_seconds)) => { + // Check if we've exceeded our failed ping threshold + if unmatched_pings >= max_failed_pings { + let frame = CloseFrame { + code: CloseCode::Abnormal, + reason: format!("No pong received after {} pings", max_failed_pings).into() + }; + + ws.send(Message::Close(Some(frame))).await?; + ws.flush().await?; + + return Err(HeliusError::WebsocketClosed( + format!("Connection timeout: no pong received after {} pings", max_failed_pings) + )); + } + ws.send(Message::Ping(Vec::new())).await?; + unmatched_pings += 1; }, // Read message for subscribe Some((operation, params, response_sender)) = subscribe_receiver.recv() => { @@ -242,6 +275,9 @@ impl EnhancedWebsocket { None => break, }; + // Reset unmatched_pings on any received frame + unmatched_pings = 0; + // Get text from the message let text = match msg { Message::Text(text) => text, @@ -250,7 +286,9 @@ impl EnhancedWebsocket { ws.send(Message::Pong(data)).await?; continue }, - Message::Pong(_data) => continue, + Message::Pong(_data) => { + continue; + }, Message::Close(_frame) => break, Message::Frame(_frame) => continue, };