Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ws): Better Health Checks #95

Merged
merged 10 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/enhanced_websocket_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ async fn main() -> Result<()> {
let api_key: &str = "your_api_key";
let cluster: Cluster = Cluster::MainnetBeta;

let helius: Helius = Helius::new_with_ws(api_key, cluster).await.unwrap();
// Uses custom ping-pong timeouts to ping every 15s and timeout after 45s of no pong
let helius: Helius = Helius::new_with_ws_with_timeouts(api_key, cluster, Some(15), Some(45)).await?;

let key: pubkey::Pubkey = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH");

Expand Down
27 changes: 25 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,23 @@ impl Helius {
/// # Arguments
/// * `api_key` - The API key required for authenticating requests made
/// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment
/// * `ping_interval_secs` - Optional duration in seconds between ping messages (defaults to 10 seconds if None)
/// * `pong_timeout_secs` - Optional duration in seconds to wait for a pong response before considering the connection dead
///
/// # Returns
/// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client
pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result<Self> {
pub async fn new_with_ws_with_timeouts(
api_key: &str,
cluster: Cluster,
ping_interval_secs: Option<u64>,
pong_timeout_secs: Option<u64>,
) -> Result<Self> {
let config: Arc<Config> = Arc::new(Config::new(api_key, cluster)?);
let client: Client = Client::builder().build().map_err(HeliusError::ReqwestError)?;
let rpc_client: Arc<RpcClient> = Arc::new(RpcClient::new(Arc::new(client.clone()), config.clone())?);
let wss: String = format!("{}{}", ENHANCED_WEBSOCKET_URL, api_key);
let ws_client: Arc<EnhancedWebsocket> = Arc::new(EnhancedWebsocket::new(&wss).await?);
let ws_client: Arc<EnhancedWebsocket> =
Arc::new(EnhancedWebsocket::new(&wss, ping_interval_secs, pong_timeout_secs).await?);

Ok(Helius {
config,
Expand All @@ -113,6 +122,20 @@ impl Helius {
})
}

/// Creates a new instance of `Helius` with an enhanced websocket client using default timeout settings.
/// This is a convenience method that uses default values of 10 seconds for ping interval and 3 failed pings
/// before considering the connection dead.
///
/// # Arguments
/// * `api_key` - The API key required for authenticating requests made
/// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment
///
/// # Returns
/// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client
pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result<Self> {
Self::new_with_ws_with_timeouts(api_key, cluster, None, None).await
}

/// Provides a thread-safe way to access RPC functionalities
///
/// # Returns
Expand Down
48 changes: 43 additions & 5 deletions src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use tokio_tungstenite::{
};

pub const ENHANCED_WEBSOCKET_URL: &str = "wss://atlas-mainnet.helius-rpc.com/?api-key=";
const DEFAULT_PING_DURATION_SECONDS: u64 = 10;
pub const DEFAULT_PING_DURATION_SECONDS: u64 = 10;
pub const DEFAULT_MAX_FAILED_PINGS: usize = 3;

// pub type Result<T = ()> = Result<T, HeliusError>;

Expand All @@ -40,7 +41,7 @@ type SubscribeRequestMsg = (String, Value, oneshot::Sender<SubscribeResponseMsg>
type SubscribeResult<'a, T> = Result<(BoxStream<'a, T>, UnsubscribeFn)>;
type RequestMsg = (String, Value, oneshot::Sender<Result<Value>>);

/// A client for subscribing to transaction or account updates from a Helius (geyser) enhanced websocket server.
/// A client for subscribing to transaction or account updates from a Helius (Geyser) enhanced websocket server.
///
/// Forked from Solana's [`PubsubClient`].
pub struct EnhancedWebsocket {
Expand All @@ -52,13 +53,26 @@ pub struct EnhancedWebsocket {

impl EnhancedWebsocket {
/// Expects enhanced websocket endpoint: wss://atlas-mainnet.helius-rpc.com?api-key=<API_KEY>
pub async fn new(url: &str) -> Result<Self> {
pub async fn new(url: &str, ping_interval_secs: Option<u64>, pong_timeout_secs: Option<u64>) -> Result<Self> {
let (ws, _response) = connect_async(url).await.map_err(HeliusError::Tungstenite)?;

let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
let (_request_sender, request_receiver) = mpsc::unbounded_channel();
let (shutdown_sender, shutdown_receiver) = oneshot::channel();

let ping_interval = ping_interval_secs
.filter(|interval: &u64| *interval != 0)
.unwrap_or(DEFAULT_PING_DURATION_SECONDS);
let max_failed_pings = pong_timeout_secs
.map(|timeout| (timeout as f64 / ping_interval as f64).ceil() as usize)
.map_or(DEFAULT_MAX_FAILED_PINGS, |max_failed_pings| {
if max_failed_pings != 0 {
max_failed_pings
} else {
usize::MAX
}
});

Ok(Self {
subscribe_sender,
shutdown_sender,
Expand All @@ -68,7 +82,8 @@ impl EnhancedWebsocket {
subscribe_receiver,
request_receiver,
shutdown_receiver,
DEFAULT_PING_DURATION_SECONDS,
ping_interval,
max_failed_pings,
)),
})
}
Expand Down Expand Up @@ -189,8 +204,10 @@ impl EnhancedWebsocket {
mut request_receiver: mpsc::UnboundedReceiver<RequestMsg>,
mut shutdown_receiver: oneshot::Receiver<()>,
ping_duration_seconds: u64,
max_failed_pings: usize,
) -> Result<()> {
let mut request_id: u64 = 0;
let mut unmatched_pings: usize = 0;

let mut requests_subscribe = BTreeMap::new();
let mut requests_unsubscribe = BTreeMap::<u64, oneshot::Sender<()>>::new();
Expand All @@ -209,7 +226,23 @@ impl EnhancedWebsocket {
},
// Send `Message::Ping` each 10s if no any other communication
() = sleep(Duration::from_secs(ping_duration_seconds)) => {
// Check if we've exceeded our failed ping threshold
if unmatched_pings >= max_failed_pings {
let frame = CloseFrame {
code: CloseCode::Abnormal,
reason: format!("No pong received after {} pings", max_failed_pings).into()
};

ws.send(Message::Close(Some(frame))).await?;
ws.flush().await?;

return Err(HeliusError::WebsocketClosed(
format!("Connection timeout: no pong received after {} pings", max_failed_pings)
));
}

ws.send(Message::Ping(Vec::new())).await?;
unmatched_pings += 1;
},
// Read message for subscribe
Some((operation, params, response_sender)) = subscribe_receiver.recv() => {
Expand Down Expand Up @@ -242,6 +275,9 @@ impl EnhancedWebsocket {
None => break,
};

// Reset unmatched_pings on any received frame
unmatched_pings = 0;

// Get text from the message
let text = match msg {
Message::Text(text) => text,
Expand All @@ -250,7 +286,9 @@ impl EnhancedWebsocket {
ws.send(Message::Pong(data)).await?;
continue
},
Message::Pong(_data) => continue,
Message::Pong(_data) => {
continue;
},
Message::Close(_frame) => break,
Message::Frame(_frame) => continue,
};
Expand Down
Loading