From c17bf570b4c9b01c9d37f2154fa9a431689c4ece Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Wed, 27 Nov 2024 16:01:59 -0500 Subject: [PATCH 01/10] Add Better Ping Pong Health Check --- src/websocket.rs | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/websocket.rs b/src/websocket.rs index fc6c43c..960d2b4 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -31,6 +31,7 @@ use tokio_tungstenite::{ pub const ENHANCED_WEBSOCKET_URL: &str = "wss://atlas-mainnet.helius-rpc.com/?api-key="; const DEFAULT_PING_DURATION_SECONDS: u64 = 10; +const DEFAULT_MAX_FAILED_PINGS: usize = 3; // pub type Result = Result; @@ -52,13 +53,20 @@ pub struct EnhancedWebsocket { impl EnhancedWebsocket { /// Expects enhanced websocket endpoint: wss://atlas-mainnet.helius-rpc.com?api-key= - pub async fn new(url: &str) -> Result { + pub async fn new(url: &str, ping_interval_secs: Option, pong_timeout_secs: Option) -> Result { let (ws, _response) = connect_async(url).await.map_err(HeliusError::Tungstenite)?; let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel(); let (_request_sender, request_receiver) = mpsc::unbounded_channel(); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let ping_interval = ping_interval_secs.unwrap_or(DEFAULT_PING_DURATION_SECONDS); + let max_failed_pings = if let Some(timeout) = pong_timeout_secs { + (timeout as f64 / ping_interval as f64).ceil() as usize + } else { + DEFAULT_MAX_FAILED_PINGS + }; + Ok(Self { subscribe_sender, shutdown_sender, @@ -68,7 +76,8 @@ impl EnhancedWebsocket { subscribe_receiver, request_receiver, shutdown_receiver, - DEFAULT_PING_DURATION_SECONDS, + ping_interval, + max_failed_pings, )), }) } @@ -189,8 +198,10 @@ impl EnhancedWebsocket { mut request_receiver: mpsc::UnboundedReceiver, mut shutdown_receiver: oneshot::Receiver<()>, ping_duration_seconds: u64, + max_failed_pings: usize, ) -> Result<()> { let mut request_id: u64 = 0; + let mut unmatched_pings: usize = 0; let mut requests_subscribe = BTreeMap::new(); let mut requests_unsubscribe = BTreeMap::>::new(); @@ -209,7 +220,23 @@ impl EnhancedWebsocket { }, // Send `Message::Ping` each 10s if no any other communication () = sleep(Duration::from_secs(ping_duration_seconds)) => { + // Check if we've exceeded our failed ping threshold + if unmatched_pings >= max_failed_pings { + let frame = CloseFrame { + code: CloseCode::Abnormal, + reason: format!("No pong received after {} pings", max_failed_pings).into() + }; + + ws.send(Message::Close(Some(frame))).await?; + ws.flush().await?; + + return Err(HeliusError::WebsocketClosed( + format!("Connection timeout: no pong received after {} pings", max_failed_pings) + )); + } + ws.send(Message::Ping(Vec::new())).await?; + unmatched_pings += 1; }, // Read message for subscribe Some((operation, params, response_sender)) = subscribe_receiver.recv() => { @@ -250,7 +277,10 @@ impl EnhancedWebsocket { ws.send(Message::Pong(data)).await?; continue }, - Message::Pong(_data) => continue, + Message::Pong(_data) => { + unmatched_pings = 0; + continue; + }, Message::Close(_frame) => break, Message::Frame(_frame) => continue, }; From f02987b172bc6d521fd4538b692f9874a1e6c122 Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Wed, 27 Nov 2024 16:02:30 -0500 Subject: [PATCH 02/10] Handle New Params and Create new_with_ws_default --- src/client.rs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 8d4ce9b..16b670b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -95,14 +95,19 @@ impl Helius { /// # Arguments /// * `api_key` - The API key required for authenticating requests made /// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment + /// * `ping_interval_secs` - Optional duration in seconds between ping messages (defaults to 10 seconds if None) + /// * `pong_timeout_secs` - Optional duration in seconds to wait for a pong response before considering the connection dead + /// /// # Returns /// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client - pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result { + pub async fn new_with_ws(api_key: &str, cluster: Cluster, ping_interval_secs: Option, pong_timeout_secs: Option) -> Result { let config: Arc = Arc::new(Config::new(api_key, cluster)?); let client: Client = Client::builder().build().map_err(HeliusError::ReqwestError)?; let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), config.clone())?); let wss: String = format!("{}{}", ENHANCED_WEBSOCKET_URL, api_key); - let ws_client: Arc = Arc::new(EnhancedWebsocket::new(&wss).await?); + let ws_client: Arc = Arc::new( + EnhancedWebsocket::new(&wss, ping_interval_secs, pong_timeout_secs).await? + ); Ok(Helius { config, @@ -113,6 +118,20 @@ impl Helius { }) } + /// Creates a new instance of `Helius` with an enhanced websocket client using default timeout settings. + /// This is a convenience method that uses default values of 10 seconds for ping interval and 3 failed pings + /// before considering the connection dead. + /// + /// # Arguments + /// * `api_key` - The API key required for authenticating requests made + /// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment + /// + /// # Returns + /// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client + pub async fn new_with_ws_default(api_key: &str, cluster: Cluster) -> Result { + Self::new_with_ws(api_key, cluster, None, None).await + } + /// Provides a thread-safe way to access RPC functionalities /// /// # Returns From 05d8e3b4e5450e2db19da1a7b451abe2907f58ca Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Wed, 27 Nov 2024 16:02:41 -0500 Subject: [PATCH 03/10] Update Examples --- examples/enhanced_websocket_accounts.rs | 2 +- examples/enhanced_websocket_transactions.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/enhanced_websocket_accounts.rs b/examples/enhanced_websocket_accounts.rs index 9a4af96..746df05 100644 --- a/examples/enhanced_websocket_accounts.rs +++ b/examples/enhanced_websocket_accounts.rs @@ -9,7 +9,7 @@ async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; - let helius: Helius = Helius::new_with_ws(api_key, cluster).await?; + let helius: Helius = Helius::new_with_ws_default(api_key, cluster).await?; let key: pubkey::Pubkey = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); diff --git a/examples/enhanced_websocket_transactions.rs b/examples/enhanced_websocket_transactions.rs index 5115c62..ebbcfcc 100644 --- a/examples/enhanced_websocket_transactions.rs +++ b/examples/enhanced_websocket_transactions.rs @@ -9,7 +9,8 @@ async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; - let helius: Helius = Helius::new_with_ws(api_key, cluster).await.unwrap(); + // Uses custom ping-pong timeouts to ping every 15s and timeout after 45s of no pong + let helius: Helius = Helius::new_with_ws(api_key, cluster, Some(15), Some(45)).await.unwrap(); let key: pubkey::Pubkey = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); From c3ad8299f82bb6c8e934e5dbed5578583a78249d Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Wed, 27 Nov 2024 16:02:57 -0500 Subject: [PATCH 04/10] Formattooorrr --- src/client.rs | 14 +++++++++----- src/websocket.rs | 4 ++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/client.rs b/src/client.rs index 16b670b..257c402 100644 --- a/src/client.rs +++ b/src/client.rs @@ -97,17 +97,21 @@ impl Helius { /// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment /// * `ping_interval_secs` - Optional duration in seconds between ping messages (defaults to 10 seconds if None) /// * `pong_timeout_secs` - Optional duration in seconds to wait for a pong response before considering the connection dead - /// + /// /// # Returns /// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client - pub async fn new_with_ws(api_key: &str, cluster: Cluster, ping_interval_secs: Option, pong_timeout_secs: Option) -> Result { + pub async fn new_with_ws( + api_key: &str, + cluster: Cluster, + ping_interval_secs: Option, + pong_timeout_secs: Option, + ) -> Result { let config: Arc = Arc::new(Config::new(api_key, cluster)?); let client: Client = Client::builder().build().map_err(HeliusError::ReqwestError)?; let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), config.clone())?); let wss: String = format!("{}{}", ENHANCED_WEBSOCKET_URL, api_key); - let ws_client: Arc = Arc::new( - EnhancedWebsocket::new(&wss, ping_interval_secs, pong_timeout_secs).await? - ); + let ws_client: Arc = + Arc::new(EnhancedWebsocket::new(&wss, ping_interval_secs, pong_timeout_secs).await?); Ok(Helius { config, diff --git a/src/websocket.rs b/src/websocket.rs index 960d2b4..1327843 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -62,9 +62,9 @@ impl EnhancedWebsocket { let ping_interval = ping_interval_secs.unwrap_or(DEFAULT_PING_DURATION_SECONDS); let max_failed_pings = if let Some(timeout) = pong_timeout_secs { - (timeout as f64 / ping_interval as f64).ceil() as usize + (timeout as f64 / ping_interval as f64).ceil() as usize } else { - DEFAULT_MAX_FAILED_PINGS + DEFAULT_MAX_FAILED_PINGS }; Ok(Self { From d358d16ab27b641169d5388e682d70609320cb0d Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Wed, 27 Nov 2024 16:13:07 -0500 Subject: [PATCH 05/10] Refactor to Make Backwards Compatible --- examples/enhanced_websocket_accounts.rs | 2 +- examples/enhanced_websocket_transactions.rs | 2 +- src/client.rs | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/enhanced_websocket_accounts.rs b/examples/enhanced_websocket_accounts.rs index 746df05..9a4af96 100644 --- a/examples/enhanced_websocket_accounts.rs +++ b/examples/enhanced_websocket_accounts.rs @@ -9,7 +9,7 @@ async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; - let helius: Helius = Helius::new_with_ws_default(api_key, cluster).await?; + let helius: Helius = Helius::new_with_ws(api_key, cluster).await?; let key: pubkey::Pubkey = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); diff --git a/examples/enhanced_websocket_transactions.rs b/examples/enhanced_websocket_transactions.rs index ebbcfcc..da287ab 100644 --- a/examples/enhanced_websocket_transactions.rs +++ b/examples/enhanced_websocket_transactions.rs @@ -10,7 +10,7 @@ async fn main() -> Result<()> { let cluster: Cluster = Cluster::MainnetBeta; // Uses custom ping-pong timeouts to ping every 15s and timeout after 45s of no pong - let helius: Helius = Helius::new_with_ws(api_key, cluster, Some(15), Some(45)).await.unwrap(); + let helius: Helius = Helius::new_with_ws_with_timeouts(api_key, cluster, Some(15), Some(45)).await?; let key: pubkey::Pubkey = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); diff --git a/src/client.rs b/src/client.rs index 257c402..9aa567e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -100,7 +100,7 @@ impl Helius { /// /// # Returns /// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client - pub async fn new_with_ws( + pub async fn new_with_ws_with_timeouts( api_key: &str, cluster: Cluster, ping_interval_secs: Option, @@ -132,8 +132,8 @@ impl Helius { /// /// # Returns /// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client - pub async fn new_with_ws_default(api_key: &str, cluster: Cluster) -> Result { - Self::new_with_ws(api_key, cluster, None, None).await + pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result { + Self::new_with_ws_with_timeouts(api_key, cluster, None, None).await } /// Provides a thread-safe way to access RPC functionalities From afe0787857ba21c56ac51c99a551fa7ea9b25fce Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Thu, 28 Nov 2024 13:43:52 -0500 Subject: [PATCH 06/10] Update unmatched_pings Reset --- src/websocket.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/websocket.rs b/src/websocket.rs index 1327843..fd6b56c 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -269,6 +269,9 @@ impl EnhancedWebsocket { None => break, }; + // Reset unmatched_pings on any received frame + unmatched_pings = 0; + // Get text from the message let text = match msg { Message::Text(text) => text, @@ -278,7 +281,6 @@ impl EnhancedWebsocket { continue }, Message::Pong(_data) => { - unmatched_pings = 0; continue; }, Message::Close(_frame) => break, From 6c91455ad3c37580bc0a278bbfcb354d7bf43b23 Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Thu, 28 Nov 2024 13:47:47 -0500 Subject: [PATCH 07/10] Handle Ping Pong Params Better --- src/websocket.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/websocket.rs b/src/websocket.rs index fd6b56c..ff40a11 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -60,12 +60,16 @@ impl EnhancedWebsocket { let (_request_sender, request_receiver) = mpsc::unbounded_channel(); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - let ping_interval = ping_interval_secs.unwrap_or(DEFAULT_PING_DURATION_SECONDS); - let max_failed_pings = if let Some(timeout) = pong_timeout_secs { - (timeout as f64 / ping_interval as f64).ceil() as usize - } else { - DEFAULT_MAX_FAILED_PINGS - }; + let ping_interval = ping_interval_secs.filter(|interval: &u64| *interval !=0).unwrap_or(DEFAULT_PING_DURATION_SECONDS); + let max_failed_pings = pong_timeout_secs + .map(|timeout| (timeout as f64 / ping_interval as f64).ceil() as usize) + .map_or(DEFAULT_MAX_FAILED_PINGS, |max_failed_pings| { + if max_failed_pings != 0 { + max_failed_pings + } else { + usize::MAX + } + }); Ok(Self { subscribe_sender, From bdcb553838051e2816a70654badb56f06f92b436 Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Thu, 28 Nov 2024 13:48:07 -0500 Subject: [PATCH 08/10] Make Constants Public --- src/websocket.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/websocket.rs b/src/websocket.rs index ff40a11..e0cba03 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -30,8 +30,8 @@ use tokio_tungstenite::{ }; pub const ENHANCED_WEBSOCKET_URL: &str = "wss://atlas-mainnet.helius-rpc.com/?api-key="; -const DEFAULT_PING_DURATION_SECONDS: u64 = 10; -const DEFAULT_MAX_FAILED_PINGS: usize = 3; +pub const DEFAULT_PING_DURATION_SECONDS: u64 = 10; +pub const DEFAULT_MAX_FAILED_PINGS: usize = 3; // pub type Result = Result; From b844a88a2219c05aa9ece6d8f1594dd16838d4a7 Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Thu, 28 Nov 2024 13:48:29 -0500 Subject: [PATCH 09/10] Formattooorrr --- src/websocket.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/websocket.rs b/src/websocket.rs index e0cba03..9370de1 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -60,16 +60,18 @@ impl EnhancedWebsocket { let (_request_sender, request_receiver) = mpsc::unbounded_channel(); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - let ping_interval = ping_interval_secs.filter(|interval: &u64| *interval !=0).unwrap_or(DEFAULT_PING_DURATION_SECONDS); + let ping_interval = ping_interval_secs + .filter(|interval: &u64| *interval != 0) + .unwrap_or(DEFAULT_PING_DURATION_SECONDS); let max_failed_pings = pong_timeout_secs - .map(|timeout| (timeout as f64 / ping_interval as f64).ceil() as usize) - .map_or(DEFAULT_MAX_FAILED_PINGS, |max_failed_pings| { - if max_failed_pings != 0 { - max_failed_pings - } else { - usize::MAX - } - }); + .map(|timeout| (timeout as f64 / ping_interval as f64).ceil() as usize) + .map_or(DEFAULT_MAX_FAILED_PINGS, |max_failed_pings| { + if max_failed_pings != 0 { + max_failed_pings + } else { + usize::MAX + } + }); Ok(Self { subscribe_sender, From 66737634d943c1c26744f39eeef2967bdba829fd Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Thu, 28 Nov 2024 14:06:55 -0500 Subject: [PATCH 10/10] Update Comment --- src/websocket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/websocket.rs b/src/websocket.rs index 9370de1..13d6284 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -41,7 +41,7 @@ type SubscribeRequestMsg = (String, Value, oneshot::Sender type SubscribeResult<'a, T> = Result<(BoxStream<'a, T>, UnsubscribeFn)>; type RequestMsg = (String, Value, oneshot::Sender>); -/// A client for subscribing to transaction or account updates from a Helius (geyser) enhanced websocket server. +/// A client for subscribing to transaction or account updates from a Helius (Geyser) enhanced websocket server. /// /// Forked from Solana's [`PubsubClient`]. pub struct EnhancedWebsocket {