From 98596eedf63949889e7695869f00a5249b497448 Mon Sep 17 00:00:00 2001 From: Drew Date: Wed, 15 May 2024 15:02:43 -0400 Subject: [PATCH 1/5] feat: enhanced websocket --- .gitignore | 1 + Cargo.toml | 9 + examples/enhanced_websocket.rs | 29 ++ src/client.rs | 21 ++ src/error.rs | 12 + src/factory.rs | 1 + src/lib.rs | 1 + src/types/enhanced_websocket.rs | 99 ++++++ src/types/mod.rs | 2 + src/websocket.rs | 372 ++++++++++++++++++++ tests/rpc/test_get_asset.rs | 2 + tests/rpc/test_get_asset_batch.rs | 2 + tests/rpc/test_get_asset_proof.rs | 2 + tests/rpc/test_get_asset_proof_batch.rs | 2 + tests/rpc/test_get_assets_by_authority.rs | 2 + tests/rpc/test_get_assets_by_creator.rs | 2 + tests/rpc/test_get_assets_by_group.rs | 2 + tests/rpc/test_get_assets_by_owner.rs | 2 + tests/rpc/test_get_nft_editions.rs | 2 + tests/rpc/test_get_priority_fee_estimate.rs | 2 + tests/rpc/test_get_rwa_asset.rs | 2 + tests/rpc/test_get_signatures_for_asset.rs | 2 + tests/rpc/test_get_token_accounts.rs | 2 + tests/rpc/test_search_assets.rs | 2 + tests/test_mint_api.rs | 2 + 25 files changed, 577 insertions(+) create mode 100644 examples/enhanced_websocket.rs create mode 100644 src/types/enhanced_websocket.rs create mode 100644 src/websocket.rs diff --git a/.gitignore b/.gitignore index 169a160..5261b4f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /src/main.rs /Cargo.lock +/.idea diff --git a/Cargo.toml b/Cargo.toml index 69f7f50..55ba95a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,15 @@ chrono = { version = "0.4.11", features = ["serde"] } solana-client = "1.18.12" solana-program = "1.18.12" serde-enum-str = "0.4.0" +tokio-tungstenite = { version = "0.21.0", features = ["native-tls", "handshake"] } +tokio-stream = "0.1.15" +solana-rpc-client-api = "1.18.12" +futures-util = "0.3.30" +solana-account-decoder = "1.18.12" +solana-transaction-status = "1.18.12" +futures = "0.3.30" +semver = "1.0.23" +url = "2.5.0" [dev-dependencies] mockito = "1.4.0" diff --git a/examples/enhanced_websocket.rs b/examples/enhanced_websocket.rs new file mode 100644 index 0000000..61dae85 --- /dev/null +++ b/examples/enhanced_websocket.rs @@ -0,0 +1,29 @@ +use helius::error::HeliusError; +use helius::types::{Cluster, RpcTransactionsConfig, TransactionSubscribeFilter, TransactionSubscribeOptions}; +use helius::Helius; +use solana_sdk::pubkey; +use tokio_stream::StreamExt; + +#[tokio::main] +async fn main() -> Result<(), HeliusError> { + let api_key: &str = "your_api_key"; + let cluster: Cluster = Cluster::MainnetBeta; + + let helius: Helius = Helius::new_with_ws(api_key, cluster).await.unwrap(); + + let key = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); + + let config = RpcTransactionsConfig { + filter: TransactionSubscribeFilter::standard(&key), + options: TransactionSubscribeOptions::default(), + }; + + if let Some(ws) = helius.ws() { + let (mut stream, _unsub) = ws.transaction_subscribe(config).await?; + while let Some(event) = stream.next().await { + println!("{:#?}", event); + } + } + + Ok(()) +} diff --git a/src/client.rs b/src/client.rs index d7dc0e7..380bd0d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,6 +4,7 @@ use crate::config::Config; use crate::error::Result; use crate::rpc_client::RpcClient; use crate::types::Cluster; +use crate::websocket::{EnhancedWebsocket, ENHANCED_WEBSOCKET_URL}; use reqwest::Client; use solana_client::rpc_client::RpcClient as SolanaRpcClient; @@ -19,6 +20,8 @@ pub struct Helius { pub client: Client, /// A reference-counted RPC client tailored for making requests in a thread-safe manner pub rpc_client: Arc, + /// A reference-counted enhanced (geyser) websocket client + pub ws_client: Option>, } impl Helius { @@ -42,11 +45,25 @@ impl Helius { let config: Arc = Arc::new(Config::new(api_key, cluster)?); let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), config.clone())?); + Ok(Helius { + config, + client, + rpc_client, + ws_client: None, + }) + } + pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result { + let config: Arc = Arc::new(Config::new(api_key, cluster)?); + let client: Client = Client::new(); + let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), config.clone())?); + let wss = format!("{}{}", ENHANCED_WEBSOCKET_URL, api_key); + let ws_client = Arc::new(EnhancedWebsocket::new(&wss).await?); Ok(Helius { config, client, rpc_client, + ws_client: Some(ws_client), }) } @@ -61,4 +78,8 @@ impl Helius { pub fn connection(&self) -> Arc { self.rpc_client.solana_client.clone() } + + pub fn ws(&self) -> Option> { + self.ws_client.clone() + } } diff --git a/src/error.rs b/src/error.rs index 3dcbaaa..307f439 100644 --- a/src/error.rs +++ b/src/error.rs @@ -71,6 +71,18 @@ pub enum HeliusError { /// This error includes the HTTP status code and message to help with debugging, acting as a generic catch-all for all other errors #[error("Unknown error has occurred: HTTP {code} - {text}")] Unknown { code: StatusCode, text: String }, + + #[error("unable to connect to server: {0}")] + Tungstenite(#[from] tokio_tungstenite::tungstenite::Error), + + #[error("websocket connection closed (({0})")] + WebsocketClosed(String), + + #[error("enhanced websocket: {message}: {reason}")] + EnhancedWebsocket { reason: String, message: String }, + + #[error("url parse error")] + UrlParseError(#[from] url::ParseError), } impl HeliusError { diff --git a/src/factory.rs b/src/factory.rs index 60ae723..f0bfbca 100644 --- a/src/factory.rs +++ b/src/factory.rs @@ -89,6 +89,7 @@ impl HeliusFactory { config, client, rpc_client, + ws_client: None, }) } } diff --git a/src/lib.rs b/src/lib.rs index 4e34a78..ae5ef37 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub mod rpc_client; pub mod types; pub mod utils; pub mod webhook; +pub mod websocket; pub use client::Helius; pub use factory::HeliusFactory; diff --git a/src/types/enhanced_websocket.rs b/src/types/enhanced_websocket.rs new file mode 100644 index 0000000..c60e5cd --- /dev/null +++ b/src/types/enhanced_websocket.rs @@ -0,0 +1,99 @@ +use serde::{Deserialize, Serialize}; +use solana_sdk::pubkey::Pubkey; +use solana_transaction_status::EncodedTransactionWithStatusMeta; + +#[derive(Debug, Clone, PartialEq, Default, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionSubscribeFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub vote: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub failed: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub signature: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub account_include: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub account_exclude: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub account_required: Option>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum UiEnhancedTransactionEncoding { + Base58, + Base64, + #[serde(rename = "base64+zstd")] + Base64Zstd, + JsonParsed, +} + +impl TransactionSubscribeFilter { + pub fn standard(key: &Pubkey) -> Self { + Self { + account_include: Some(vec![key.to_string()]), + ..Default::default() + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum TransactionCommitment { + Processed, + Confirmed, + Finalized, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum TransactionDetails { + Full, + Signatures, + Accounts, + None, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionSubscribeOptions { + #[serde(skip_serializing_if = "Option::is_none")] + pub commitment: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub encoding: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub transaction_details: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub show_rewards: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub max_supported_transaction_version: Option, +} + +impl Default for TransactionSubscribeOptions { + fn default() -> Self { + Self { + commitment: Some(TransactionCommitment::Confirmed), + encoding: Some(UiEnhancedTransactionEncoding::JsonParsed), + transaction_details: Some(TransactionDetails::Full), + show_rewards: Some(true), + max_supported_transaction_version: Some(0), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcTransactionsConfig { + pub filter: TransactionSubscribeFilter, + pub options: TransactionSubscribeOptions, +} + +// Websocket transaction response + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionNotification { + pub transaction: EncodedTransactionWithStatusMeta, + pub signature: String, +} diff --git a/src/types/mod.rs b/src/types/mod.rs index 26d0673..de84b8f 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,10 +1,12 @@ pub mod enhanced_transaction_types; +pub mod enhanced_websocket; pub mod enums; pub mod options; #[allow(clippy::module_inception)] pub mod types; pub use self::enhanced_transaction_types::*; +pub use self::enhanced_websocket::*; pub use self::enums::*; pub use self::options::*; pub use self::types::*; diff --git a/src/websocket.rs b/src/websocket.rs new file mode 100644 index 0000000..7548786 --- /dev/null +++ b/src/websocket.rs @@ -0,0 +1,372 @@ +#![allow(dead_code)] + +use crate::error::HeliusError; +use crate::types::{RpcTransactionsConfig, TransactionNotification}; +use futures_util::{ + future::{ready, BoxFuture, FutureExt}, + sink::SinkExt, + stream::{BoxStream, StreamExt}, +}; +use serde::de::DeserializeOwned; +use serde_json::{json, Map, Value}; +use solana_account_decoder::UiAccount; +use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; +use solana_rpc_client_api::filter::maybe_map_filters; +use solana_rpc_client_api::response::RpcKeyedAccount; +use solana_rpc_client_api::{ + error_object::RpcErrorObject, + response::{Response as RpcResponse, RpcVersionInfo}, +}; +use solana_sdk::pubkey::Pubkey; +use std::collections::BTreeMap; +use std::fmt::Debug; +use tokio::{ + net::TcpStream, + sync::{mpsc, oneshot, RwLock}, + task::JoinHandle, + time::{sleep, Duration}, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_tungstenite::{ + connect_async, + tungstenite::{ + protocol::frame::{coding::CloseCode, CloseFrame}, + Message, + }, + MaybeTlsStream, WebSocketStream, +}; +use url::Url; + +pub const ENHANCED_WEBSOCKET_URL: &str = "wss://atlas-mainnet.helius-rpc.com?api-key="; + +pub type EnhancedWebsocketResult = Result; + +type UnsubscribeFn = Box BoxFuture<'static, ()> + Send>; +type SubscribeResponseMsg = Result<(mpsc::UnboundedReceiver, UnsubscribeFn), HeliusError>; +type SubscribeRequestMsg = (String, Value, oneshot::Sender); +type SubscribeResult<'a, T> = EnhancedWebsocketResult<(BoxStream<'a, T>, UnsubscribeFn)>; +type RequestMsg = (String, Value, oneshot::Sender>); + +/// A client for subscribing to messages from the RPC server. +/// +/// Forked from Solana's EnhancedWebsocket. +#[derive(Debug)] +pub struct EnhancedWebsocket { + subscribe_sender: mpsc::UnboundedSender, + request_sender: mpsc::UnboundedSender, + shutdown_sender: oneshot::Sender<()>, + node_version: RwLock>, + ws: JoinHandle, +} + +impl EnhancedWebsocket { + /// Expects enhanced websocket endpoint: wss://atlas-mainnet.helius-rpc.com?api-key= + pub async fn new(url: &str) -> EnhancedWebsocketResult { + let url = Url::parse(url)?; + let (ws, _response) = connect_async(url).await.map_err(HeliusError::Tungstenite)?; + + let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel(); + let (request_sender, request_receiver) = mpsc::unbounded_channel(); + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + + Ok(Self { + subscribe_sender, + request_sender, + shutdown_sender, + node_version: RwLock::new(None), + ws: tokio::spawn(EnhancedWebsocket::run_ws( + ws, + subscribe_receiver, + request_receiver, + shutdown_receiver, + )), + }) + } + + pub async fn shutdown(self) -> EnhancedWebsocketResult { + let _ = self.shutdown_sender.send(()); + self.ws.await.unwrap() // WS future should not be cancelled or panicked + } + + pub async fn set_node_version(&self, version: semver::Version) -> Result<(), ()> { + let mut w_node_version = self.node_version.write().await; + *w_node_version = Some(version); + Ok(()) + } + + async fn get_node_version(&self) -> EnhancedWebsocketResult { + let r_node_version = self.node_version.read().await; + if let Some(version) = &*r_node_version { + Ok(version.clone()) + } else { + drop(r_node_version); + let mut w_node_version = self.node_version.write().await; + let node_version = self.get_version().await?; + *w_node_version = Some(node_version.clone()); + Ok(node_version) + } + } + + async fn get_version(&self) -> EnhancedWebsocketResult { + let (response_sender, response_receiver) = oneshot::channel(); + self.request_sender + .send(("getVersion".to_string(), Value::Null, response_sender)) + .map_err(|err| HeliusError::WebsocketClosed(err.to_string()))?; + let result = response_receiver + .await + .map_err(|err| HeliusError::WebsocketClosed(err.to_string()))??; + let node_version: RpcVersionInfo = serde_json::from_value(result)?; + let node_version = + semver::Version::parse(&node_version.solana_core).map_err(|e| HeliusError::EnhancedWebsocket { + reason: format!("failed to parse cluster version: {e}"), + message: "getVersion".to_string(), + })?; + Ok(node_version) + } + + async fn subscribe<'a, T: DeserializeOwned + Send + Debug + 'a>( + &self, + operation: &str, + params: Value, + ) -> SubscribeResult<'a, T> { + let (response_sender, response_receiver) = oneshot::channel(); + self.subscribe_sender + .send((operation.to_string(), params, response_sender)) + .map_err(|err| HeliusError::WebsocketClosed(err.to_string()))?; + + let (notifications, unsubscribe) = response_receiver + .await + .map_err(|err| HeliusError::WebsocketClosed(err.to_string()))??; + Ok(( + UnboundedReceiverStream::new(notifications) + .filter_map(|value| match serde_json::from_value::(value.clone()) { + Err(e) => { + eprintln!( + "Failed to parse websocket notification: {:#?} for value: {:#?}", + e, value + ); + ready(None) + } + Ok(res) => ready(Some(res)), + }) + .boxed(), + unsubscribe, + )) + } + + pub async fn transaction_subscribe( + &self, + config: RpcTransactionsConfig, + ) -> SubscribeResult<'_, TransactionNotification> { + let params = json!([config.filter, config.options]); + self.subscribe("transaction", params).await + } + + pub async fn account_subscribe( + &self, + pubkey: &Pubkey, + config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + let params = json!([pubkey.to_string(), config]); + self.subscribe("account", params).await + } + + pub async fn program_subscribe( + &self, + pubkey: &Pubkey, + mut config: Option, + ) -> SubscribeResult<'_, RpcResponse> { + if let Some(ref mut config) = config { + if let Some(ref mut filters) = config.filters { + let node_version = self.get_node_version().await.ok(); + // If node does not support the pubsub `getVersion` method, assume version is old + // and filters should be mapped (node_version.is_none()). + maybe_map_filters(node_version, filters).map_err(|e| HeliusError::EnhancedWebsocket { + reason: e, + message: "maybe_map_filters".to_string(), + })?; + } + } + + let params = json!([pubkey.to_string(), config]); + self.subscribe("program", params).await + } + + async fn run_ws( + mut ws: WebSocketStream>, + mut subscribe_receiver: mpsc::UnboundedReceiver, + mut request_receiver: mpsc::UnboundedReceiver, + mut shutdown_receiver: oneshot::Receiver<()>, + ) -> EnhancedWebsocketResult { + let mut request_id: u64 = 0; + + let mut requests_subscribe = BTreeMap::new(); + let mut requests_unsubscribe = BTreeMap::>::new(); + let mut other_requests = BTreeMap::new(); + let mut subscriptions = BTreeMap::new(); + let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel(); + + loop { + tokio::select! { + // Send close on shutdown signal + _ = &mut shutdown_receiver => { + let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() }; + ws.send(Message::Close(Some(frame))).await?; + ws.flush().await?; + break; + }, + // Send `Message::Ping` each 10s if no any other communication + () = sleep(Duration::from_secs(10)) => { + ws.send(Message::Ping(Vec::new())).await?; + }, + // Read message for subscribe + Some((operation, params, response_sender)) = subscribe_receiver.recv() => { + request_id += 1; + let method = format!("{operation}Subscribe"); + let body = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}); + println!("subscription: {:#}", body); + ws.send(Message::Text(body.to_string())).await?; + requests_subscribe.insert(request_id, (operation, response_sender)); + }, + // Read message for unsubscribe + Some((operation, sid, response_sender)) = unsubscribe_receiver.recv() => { + subscriptions.remove(&sid); + request_id += 1; + let method = format!("{operation}Unsubscribe"); + let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string(); + ws.send(Message::Text(text)).await?; + requests_unsubscribe.insert(request_id, response_sender); + }, + // Read message for other requests + Some((method, params, response_sender)) = request_receiver.recv() => { + request_id += 1; + let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string(); + ws.send(Message::Text(text)).await?; + other_requests.insert(request_id, response_sender); + } + // Read incoming WebSocket message + next_msg = ws.next() => { + let msg = match next_msg { + Some(msg) => msg?, + None => break, + }; + + // Get text from the message + let text = match msg { + Message::Text(text) => text, + Message::Binary(_data) => continue, // Ignore + Message::Ping(data) => { + ws.send(Message::Pong(data)).await?; + continue + }, + Message::Pong(_data) => continue, + Message::Close(_frame) => break, + Message::Frame(_frame) => continue, + }; + + let mut json: Map = serde_json::from_str(&text)?; + + // Subscribe/Unsubscribe response, example: + // `{"jsonrpc":"2.0","result":5308752,"id":1}` + if let Some(id) = json.get("id") { + let id = id.as_u64().ok_or_else(|| { + HeliusError::EnhancedWebsocket { reason: "invalid `id` field".into(), message: text.clone() } + })?; + + let err = json.get("error").map(|error_object| { + match serde_json::from_value::(error_object.clone()) { + Ok(rpc_error_object) => { + format!("{} ({})", rpc_error_object.message, rpc_error_object.code) + } + Err(err) => format!( + "Failed to deserialize RPC error response: {} [{}]", + serde_json::to_string(error_object).unwrap(), + err + ) + } + }); + + if let Some(response_sender) = other_requests.remove(&id) { + match err { + Some(reason) => { + let _ = response_sender.send(Err(HeliusError::EnhancedWebsocket { reason, message: text.clone()})); + }, + None => { + let json_result = json.get("result").ok_or_else(|| { + HeliusError::EnhancedWebsocket { reason: "missing `result` field".into(), message: text.clone() } + })?; + if response_sender.send(Ok(json_result.clone())).is_err() { + break; + } + } + } + } else if let Some(response_sender) = requests_unsubscribe.remove(&id) { + let _ = response_sender.send(()); // do not care if receiver is closed + } else if let Some((operation, response_sender)) = requests_subscribe.remove(&id) { + match err { + Some(reason) => { + let _ = response_sender.send(Err(HeliusError::EnhancedWebsocket { reason, message: text.clone()})); + }, + None => { + // Subscribe Id + let sid = json.get("result").and_then(Value::as_u64).ok_or_else(|| { + HeliusError::EnhancedWebsocket { reason: "invalid `result` field".into(), message: text.clone() } + })?; + + // Create notifications channel and unsubscribe function + let (notifications_sender, notifications_receiver) = mpsc::unbounded_channel(); + let unsubscribe_sender = unsubscribe_sender.clone(); + let unsubscribe = Box::new(move || async move { + let (response_sender, response_receiver) = oneshot::channel(); + // do nothing if ws already closed + if unsubscribe_sender.send((operation, sid, response_sender)).is_ok() { + let _ = response_receiver.await; // channel can be closed only if ws is closed + } + }.boxed()); + + if response_sender.send(Ok((notifications_receiver, unsubscribe))).is_err() { + break; + } + subscriptions.insert(sid, notifications_sender); + } + } + } else { + eprintln!("Unknown request id: {}", id); + break; + } + continue; + } + + // Notification, example: + // `{"jsonrpc":"2.0","method":"logsNotification","params":{"result":{...},"subscription":3114862}}` + if let Some(Value::Object(params)) = json.get_mut("params") { + if let Some(sid) = params.get("subscription").and_then(Value::as_u64) { + let mut unsubscribe_required = false; + + if let Some(notifications_sender) = subscriptions.get(&sid) { + if let Some(result) = params.remove("result") { + if notifications_sender.send(result).is_err() { + unsubscribe_required = true; + } + } + } else { + unsubscribe_required = true; + } + + if unsubscribe_required { + if let Some(Value::String(method)) = json.remove("method") { + if let Some(operation) = method.strip_suffix("Notification") { + let (response_sender, _response_receiver) = oneshot::channel(); + let _ = unsubscribe_sender.send((operation.to_string(), sid, response_sender)); + } + } + } + } + } + } + } + } + + Ok(()) + } +} diff --git a/tests/rpc/test_get_asset.rs b/tests/rpc/test_get_asset.rs index a5655f6..64848ad 100644 --- a/tests/rpc/test_get_asset.rs +++ b/tests/rpc/test_get_asset.rs @@ -173,6 +173,7 @@ async fn test_get_asset_success() { config, client, rpc_client, + ws_client: None, }; let request: GetAsset = GetAsset { @@ -228,6 +229,7 @@ async fn test_get_asset_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetAsset = GetAsset { diff --git a/tests/rpc/test_get_asset_batch.rs b/tests/rpc/test_get_asset_batch.rs index 902f50b..24462aa 100644 --- a/tests/rpc/test_get_asset_batch.rs +++ b/tests/rpc/test_get_asset_batch.rs @@ -266,6 +266,7 @@ async fn test_get_asset_batch_success() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetBatch = GetAssetBatch { @@ -313,6 +314,7 @@ async fn test_get_asset_batch_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetBatch = GetAssetBatch { diff --git a/tests/rpc/test_get_asset_proof.rs b/tests/rpc/test_get_asset_proof.rs index 14e11c3..f05f8a0 100644 --- a/tests/rpc/test_get_asset_proof.rs +++ b/tests/rpc/test_get_asset_proof.rs @@ -63,6 +63,7 @@ async fn test_get_asset_proof_success() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetProof = GetAssetProof { @@ -110,6 +111,7 @@ async fn test_get_asset_proof_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetProof = GetAssetProof { diff --git a/tests/rpc/test_get_asset_proof_batch.rs b/tests/rpc/test_get_asset_proof_batch.rs index 8e0e1c4..1f67728 100644 --- a/tests/rpc/test_get_asset_proof_batch.rs +++ b/tests/rpc/test_get_asset_proof_batch.rs @@ -66,6 +66,7 @@ async fn test_get_asset_proof_batch_success() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetProofBatch = GetAssetProofBatch { @@ -121,6 +122,7 @@ async fn test_get_asset_proof_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetProofBatch = GetAssetProofBatch { diff --git a/tests/rpc/test_get_assets_by_authority.rs b/tests/rpc/test_get_assets_by_authority.rs index b699795..0b5870c 100644 --- a/tests/rpc/test_get_assets_by_authority.rs +++ b/tests/rpc/test_get_assets_by_authority.rs @@ -230,6 +230,7 @@ async fn test_get_assets_by_authority_success() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetsByAuthority = GetAssetsByAuthority { @@ -275,6 +276,7 @@ async fn test_get_assets_by_authority_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetsByAuthority = GetAssetsByAuthority { diff --git a/tests/rpc/test_get_assets_by_creator.rs b/tests/rpc/test_get_assets_by_creator.rs index 4fed6f7..e7dfd35 100644 --- a/tests/rpc/test_get_assets_by_creator.rs +++ b/tests/rpc/test_get_assets_by_creator.rs @@ -230,6 +230,7 @@ async fn test_get_assets_by_creator_success() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetsByCreator = GetAssetsByCreator { @@ -275,6 +276,7 @@ async fn test_get_assets_by_creator_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetsByCreator = GetAssetsByCreator { diff --git a/tests/rpc/test_get_assets_by_group.rs b/tests/rpc/test_get_assets_by_group.rs index d00522e..c74dbdc 100644 --- a/tests/rpc/test_get_assets_by_group.rs +++ b/tests/rpc/test_get_assets_by_group.rs @@ -145,6 +145,7 @@ async fn test_get_assets_by_group_success() { config, client, rpc_client, + ws_client: None, }; let sorting: AssetSorting = AssetSorting { @@ -200,6 +201,7 @@ async fn test_get_assets_by_group_failure() { config, client, rpc_client, + ws_client: None, }; let sorting: AssetSorting = AssetSorting { diff --git a/tests/rpc/test_get_assets_by_owner.rs b/tests/rpc/test_get_assets_by_owner.rs index 94d0fc6..d983678 100644 --- a/tests/rpc/test_get_assets_by_owner.rs +++ b/tests/rpc/test_get_assets_by_owner.rs @@ -160,6 +160,7 @@ async fn test_get_assets_by_owner_success() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetsByOwner = GetAssetsByOwner { @@ -209,6 +210,7 @@ async fn test_get_assets_by_owner_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetsByOwner = GetAssetsByOwner { diff --git a/tests/rpc/test_get_nft_editions.rs b/tests/rpc/test_get_nft_editions.rs index b31fa17..9909ae6 100644 --- a/tests/rpc/test_get_nft_editions.rs +++ b/tests/rpc/test_get_nft_editions.rs @@ -54,6 +54,7 @@ async fn test_get_nft_editions_success() { config, client, rpc_client, + ws_client: None, }; let request = GetNftEditions { @@ -105,6 +106,7 @@ async fn test_get_nft_editions_failure() { config, client, rpc_client, + ws_client: None, }; let request = GetNftEditions { diff --git a/tests/rpc/test_get_priority_fee_estimate.rs b/tests/rpc/test_get_priority_fee_estimate.rs index 118cb2e..5612225 100644 --- a/tests/rpc/test_get_priority_fee_estimate.rs +++ b/tests/rpc/test_get_priority_fee_estimate.rs @@ -52,6 +52,7 @@ async fn test_get_nft_editions_success() { config, client, rpc_client, + ws_client: None, }; let request: GetPriorityFeeEstimateRequest = GetPriorityFeeEstimateRequest { @@ -105,6 +106,7 @@ async fn test_get_nft_editions_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetPriorityFeeEstimateRequest = GetPriorityFeeEstimateRequest { diff --git a/tests/rpc/test_get_rwa_asset.rs b/tests/rpc/test_get_rwa_asset.rs index ef4994e..660b66c 100644 --- a/tests/rpc/test_get_rwa_asset.rs +++ b/tests/rpc/test_get_rwa_asset.rs @@ -61,6 +61,7 @@ async fn test_get_rwa_asset_success() { config, client, rpc_client, + ws_client: None, }; let request: GetRwaAssetRequest = GetRwaAssetRequest { @@ -102,6 +103,7 @@ async fn test_get_rwa_asset_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetRwaAssetRequest = GetRwaAssetRequest { diff --git a/tests/rpc/test_get_signatures_for_asset.rs b/tests/rpc/test_get_signatures_for_asset.rs index ce48927..9be4ef5 100644 --- a/tests/rpc/test_get_signatures_for_asset.rs +++ b/tests/rpc/test_get_signatures_for_asset.rs @@ -52,6 +52,7 @@ async fn test_get_asset_signatures_success() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetSignatures = GetAssetSignatures { @@ -100,6 +101,7 @@ async fn test_get_asset_signatures_failure() { config, client, rpc_client, + ws_client: None, }; let request: GetAssetSignatures = GetAssetSignatures { diff --git a/tests/rpc/test_get_token_accounts.rs b/tests/rpc/test_get_token_accounts.rs index a5f446c..6184563 100644 --- a/tests/rpc/test_get_token_accounts.rs +++ b/tests/rpc/test_get_token_accounts.rs @@ -59,6 +59,7 @@ async fn test_get_token_accounts_success() { config, client, rpc_client, + ws_client: None, }; let request: GetTokenAccounts = GetTokenAccounts { @@ -110,6 +111,7 @@ async fn test_get_token_accounts_failure() { config, client, rpc_client, + ws_client: None, }; let request = GetTokenAccounts { diff --git a/tests/rpc/test_search_assets.rs b/tests/rpc/test_search_assets.rs index 2863dfc..d5b03c7 100644 --- a/tests/rpc/test_search_assets.rs +++ b/tests/rpc/test_search_assets.rs @@ -160,6 +160,7 @@ async fn test_search_assets_success() { config, client, rpc_client, + ws_client: None, }; let request: SearchAssets = SearchAssets { @@ -206,6 +207,7 @@ async fn test_search_assets_failure() { config, client, rpc_client, + ws_client: None, }; let request: SearchAssets = SearchAssets { diff --git a/tests/test_mint_api.rs b/tests/test_mint_api.rs index 5d49499..d78a7aa 100644 --- a/tests/test_mint_api.rs +++ b/tests/test_mint_api.rs @@ -48,6 +48,7 @@ async fn test_mint_compressed_nft() { config, client, rpc_client, + ws_client: None, }; let request: MintCompressedNftRequest = MintCompressedNftRequest { @@ -121,6 +122,7 @@ async fn test_get_asset_proof_failure() { config, client, rpc_client, + ws_client: None, }; let request: MintCompressedNftRequest = MintCompressedNftRequest { From a312494d273e51c7f8e05a46c5455e4b5921c05a Mon Sep 17 00:00:00 2001 From: Drew Date: Wed, 15 May 2024 15:09:34 -0400 Subject: [PATCH 2/5] account subscription example --- examples/enhanced_websocket_accounts.rs | 24 +++++++++++++++++++ ....rs => enhanced_websocket_transactions.rs} | 0 2 files changed, 24 insertions(+) create mode 100644 examples/enhanced_websocket_accounts.rs rename examples/{enhanced_websocket.rs => enhanced_websocket_transactions.rs} (100%) diff --git a/examples/enhanced_websocket_accounts.rs b/examples/enhanced_websocket_accounts.rs new file mode 100644 index 0000000..1d0d29a --- /dev/null +++ b/examples/enhanced_websocket_accounts.rs @@ -0,0 +1,24 @@ +use helius::error::HeliusError; +use helius::types::Cluster; +use helius::Helius; +use solana_sdk::pubkey; +use tokio_stream::StreamExt; + +#[tokio::main] +async fn main() -> Result<(), HeliusError> { + let api_key: &str = "your_api_key"; + let cluster: Cluster = Cluster::MainnetBeta; + + let helius: Helius = Helius::new_with_ws(api_key, cluster).await.unwrap(); + + let key = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); + + if let Some(ws) = helius.ws() { + let (mut stream, _unsub) = ws.account_subscribe(&key, None).await?; + while let Some(event) = stream.next().await { + println!("{:#?}", event); + } + } + + Ok(()) +} diff --git a/examples/enhanced_websocket.rs b/examples/enhanced_websocket_transactions.rs similarity index 100% rename from examples/enhanced_websocket.rs rename to examples/enhanced_websocket_transactions.rs From 626acd03850f8439ecac80571e1f4f9e08a3dce9 Mon Sep 17 00:00:00 2001 From: Drew Date: Mon, 27 May 2024 13:55:16 -0400 Subject: [PATCH 3/5] resolve PR review --- README.md | 6 +- examples/enhanced_websocket_accounts.rs | 4 +- examples/enhanced_websocket_transactions.rs | 4 +- examples/get_asset_batch.rs | 6 +- examples/get_asset_proof_batch.rs | 7 +- examples/get_latest_blockhash.rs | 6 +- examples/get_parse_transactions.rs | 6 +- examples/get_parsed_transaction_history.rs | 6 +- examples/get_priority_fee_estimate.rs | 7 +- examples/mint_cnft.rs | 6 +- src/client.rs | 29 ++++ src/error.rs | 8 +- src/types/enhanced_websocket.rs | 18 +-- src/websocket.rs | 148 ++++++++++---------- tests/rpc/test_get_asset.rs | 6 +- tests/rpc/test_get_asset_batch.rs | 6 +- tests/rpc/test_get_asset_proof.rs | 6 +- tests/rpc/test_get_asset_proof_batch.rs | 8 +- tests/rpc/test_get_assets_by_group.rs | 6 +- tests/rpc/test_get_assets_by_owner.rs | 6 +- tests/rpc/test_get_nft_editions.rs | 6 +- tests/rpc/test_get_priority_fee_estimate.rs | 8 +- tests/rpc/test_get_rwa_asset.rs | 6 +- tests/rpc/test_get_signatures_for_asset.rs | 6 +- tests/rpc/test_get_token_accounts.rs | 6 +- tests/rpc/test_search_assets.rs | 6 +- tests/test_config.rs | 6 +- tests/test_mint_api.rs | 16 ++- 28 files changed, 189 insertions(+), 170 deletions(-) diff --git a/README.md b/README.md index 10b47e9..93fb465 100644 --- a/README.md +++ b/README.md @@ -19,11 +19,11 @@ Remember to run `cargo update` regularly to fetch the latest version of the SDK. ## Usage The SDK provides a [`Helius`](https://github.com/helius-labs/helius-rust-sdk/blob/dev/src/client.rs) instance that can be configured with an API key and a given Solana cluster. Developers can generate a new API key on the [Helius Developer Dashboard](https://dev.helius.xyz/dashboard/app). This instance acts as the main entry point for interacting with the SDK by providing methods to access different Solana and RPC client functionalities. The following code is an example of how to use the SDK to fetch info on [Mad Lad #8420](https://xray.helius.xyz/token/F9Lw3ki3hJ7PF9HQXsBzoY8GyE6sPoEZZdXJBsTTD2rk?network=mainnet): ```rust -use helius::error::HeliusError; +use helius::error::Result; use helius::types::{Cluster, DisplayOptions, GetAssetRequest, GetAssetResponseForAsset}; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "YOUR_API_KEY"; let cluster: Cluster = Cluster::MainnetBeta; @@ -39,7 +39,7 @@ async fn main() -> Result<(), HeliusError> { }), }; - let response: Result, HeliusError> = helius.rpc().get_asset(request).await; + let response: Result> = helius.rpc().get_asset(request).await; match response { Ok(Some(asset)) => { diff --git a/examples/enhanced_websocket_accounts.rs b/examples/enhanced_websocket_accounts.rs index 1d0d29a..aac7dd1 100644 --- a/examples/enhanced_websocket_accounts.rs +++ b/examples/enhanced_websocket_accounts.rs @@ -1,11 +1,11 @@ -use helius::error::HeliusError; +use helius::error::Result; use helius::types::Cluster; use helius::Helius; use solana_sdk::pubkey; use tokio_stream::StreamExt; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; diff --git a/examples/enhanced_websocket_transactions.rs b/examples/enhanced_websocket_transactions.rs index 61dae85..409ade8 100644 --- a/examples/enhanced_websocket_transactions.rs +++ b/examples/enhanced_websocket_transactions.rs @@ -1,11 +1,11 @@ -use helius::error::HeliusError; +use helius::error::Result; use helius::types::{Cluster, RpcTransactionsConfig, TransactionSubscribeFilter, TransactionSubscribeOptions}; use helius::Helius; use solana_sdk::pubkey; use tokio_stream::StreamExt; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; diff --git a/examples/get_asset_batch.rs b/examples/get_asset_batch.rs index b109705..dee29be 100644 --- a/examples/get_asset_batch.rs +++ b/examples/get_asset_batch.rs @@ -1,9 +1,9 @@ -use helius::error::HeliusError; +use helius::error::Result; use helius::types::{Asset, Cluster, GetAssetBatch, GetAssetOptions}; use helius::Helius; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; @@ -20,7 +20,7 @@ async fn main() -> Result<(), HeliusError> { }), }; - let response: Result>, HeliusError> = helius.rpc().get_asset_batch(request).await; + let response: Result>> = helius.rpc().get_asset_batch(request).await; println!("Assets: {:?}", response); Ok(()) diff --git a/examples/get_asset_proof_batch.rs b/examples/get_asset_proof_batch.rs index 63453cf..848370e 100644 --- a/examples/get_asset_proof_batch.rs +++ b/examples/get_asset_proof_batch.rs @@ -1,11 +1,11 @@ -use helius::error::HeliusError; +use helius::error::Result; use helius::types::{AssetProof, Cluster, GetAssetProofBatch}; use helius::Helius; use std::collections::HashMap; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; @@ -18,8 +18,7 @@ async fn main() -> Result<(), HeliusError> { ], }; - let response: Result>, HeliusError> = - helius.rpc().get_asset_proof_batch(request).await; + let response: Result>> = helius.rpc().get_asset_proof_batch(request).await; println!("Assets: {:?}", response); Ok(()) diff --git a/examples/get_latest_blockhash.rs b/examples/get_latest_blockhash.rs index a4827ff..25ef796 100644 --- a/examples/get_latest_blockhash.rs +++ b/examples/get_latest_blockhash.rs @@ -1,4 +1,4 @@ -use helius::error::HeliusError; +use helius::error::Result; use helius::types::*; use helius::Helius; @@ -6,13 +6,13 @@ use solana_client::client_error::ClientError; use solana_sdk::hash::Hash; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; let helius: Helius = Helius::new(api_key, cluster).unwrap(); - let result: Result = helius.connection().get_latest_blockhash(); + let result: std::result::Result = helius.connection().get_latest_blockhash(); println!("{:?}", result); Ok(()) diff --git a/examples/get_parse_transactions.rs b/examples/get_parse_transactions.rs index fb5bdf3..1b0d6e9 100644 --- a/examples/get_parse_transactions.rs +++ b/examples/get_parse_transactions.rs @@ -1,9 +1,9 @@ -use helius::error::HeliusError; +use helius::error::Result; use helius::types::*; use helius::Helius; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; @@ -15,7 +15,7 @@ async fn main() -> Result<(), HeliusError> { ], }; - let response: Result, HeliusError> = helius.parse_transactions(request).await; + let response: Result> = helius.parse_transactions(request).await; println!("Assets: {:?}", response); Ok(()) diff --git a/examples/get_parsed_transaction_history.rs b/examples/get_parsed_transaction_history.rs index 2e6d26a..57d979b 100644 --- a/examples/get_parsed_transaction_history.rs +++ b/examples/get_parsed_transaction_history.rs @@ -1,9 +1,9 @@ -use helius::error::HeliusError; +use helius::error::Result; use helius::types::*; use helius::Helius; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; @@ -14,7 +14,7 @@ async fn main() -> Result<(), HeliusError> { before: None, }; - let response: Result, HeliusError> = helius.parsed_transaction_history(request).await; + let response: Result> = helius.parsed_transaction_history(request).await; println!("Assets: {:?}", response); diff --git a/examples/get_priority_fee_estimate.rs b/examples/get_priority_fee_estimate.rs index d5a322d..d832d9f 100644 --- a/examples/get_priority_fee_estimate.rs +++ b/examples/get_priority_fee_estimate.rs @@ -1,9 +1,9 @@ -use helius::error::HeliusError; +use helius::error::Result; use helius::types::*; use helius::Helius; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; @@ -20,8 +20,7 @@ async fn main() -> Result<(), HeliusError> { }), }; - let response: Result = - helius.rpc().get_priority_fee_estimate(request).await; + let response: Result = helius.rpc().get_priority_fee_estimate(request).await; println!("Assets: {:?}", response); Ok(()) diff --git a/examples/mint_cnft.rs b/examples/mint_cnft.rs index accd59e..147c611 100644 --- a/examples/mint_cnft.rs +++ b/examples/mint_cnft.rs @@ -1,10 +1,10 @@ -use helius::error::HeliusError; +use helius::error::Result; use helius::types::*; use helius::Helius; use serde_json::Value; #[tokio::main] -async fn main() -> Result<(), HeliusError> { +async fn main() -> Result<()> { let api_key: &str = "your_api_key"; let cluster: Cluster = Cluster::MainnetBeta; @@ -39,7 +39,7 @@ async fn main() -> Result<(), HeliusError> { confirm_transaction: Some(true), }; - let response: Result = helius.mint_compressed_nft(request).await; + let response: Result = helius.mint_compressed_nft(request).await; println!("Assets: {:?}", response); Ok(()) diff --git a/src/client.rs b/src/client.rs index 380bd0d..be80550 100644 --- a/src/client.rs +++ b/src/client.rs @@ -53,6 +53,35 @@ impl Helius { }) } + /// The enhanced websocket is optional, and this method is used to create a new instance of `Helius` with an enhanced websocket client. + /// Upon calling this method, the websocket will connect hence the asynchronous function definition omission from the default `new` method. + /// + /// # Example + /// ```rust + /// use helius::Helius; + /// use helius::error::Result; + /// use helius::types::{Cluster, RpcTransactionsConfig, TransactionSubscribeFilter, TransactionSubscribeOptions}; + /// use solana_sdk::pubkey; + /// use tokio_stream::StreamExt; + /// + /// #[tokio::main] + /// async fn main() -> Result<()> { + /// let helius = Helius::new("your_api_key", Cluster::MainnetBeta).expect("Failed to create a Helius client"); + /// // you may monitor transactions for any pubkey, this is just an example. + /// let key = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); + /// let config = RpcTransactionsConfig { + /// filter: TransactionSubscribeFilter::standard(&key), + /// options: TransactionSubscribeOptions::default(), + /// }; + /// if let Some(ws) = helius.ws() { + /// let (mut stream, _unsub) = ws.transaction_subscribe(config).await?; + /// while let Some(event) = stream.next().await { + /// println!("{:#?}", event); + /// } + /// } + /// Ok(()) + /// } + /// ``` pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result { let config: Arc = Arc::new(Config::new(api_key, cluster)?); let client: Client = Client::new(); diff --git a/src/error.rs b/src/error.rs index 307f439..af6908b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -72,16 +72,16 @@ pub enum HeliusError { #[error("Unknown error has occurred: HTTP {code} - {text}")] Unknown { code: StatusCode, text: String }, - #[error("unable to connect to server: {0}")] + #[error("Unable to connect to server: {0}")] Tungstenite(#[from] tokio_tungstenite::tungstenite::Error), - #[error("websocket connection closed (({0})")] + #[error("Websocket connection closed (({0})")] WebsocketClosed(String), - #[error("enhanced websocket: {message}: {reason}")] + #[error("Enhanced websocket: {message}: {reason}")] EnhancedWebsocket { reason: String, message: String }, - #[error("url parse error")] + #[error("Url parse error")] UrlParseError(#[from] url::ParseError), } diff --git a/src/types/enhanced_websocket.rs b/src/types/enhanced_websocket.rs index c60e5cd..2935a6d 100644 --- a/src/types/enhanced_websocket.rs +++ b/src/types/enhanced_websocket.rs @@ -19,6 +19,15 @@ pub struct TransactionSubscribeFilter { pub account_required: Option>, } +impl TransactionSubscribeFilter { + pub fn standard(key: &Pubkey) -> Self { + Self { + account_include: Some(vec![key.to_string()]), + ..Default::default() + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum UiEnhancedTransactionEncoding { @@ -29,15 +38,6 @@ pub enum UiEnhancedTransactionEncoding { JsonParsed, } -impl TransactionSubscribeFilter { - pub fn standard(key: &Pubkey) -> Self { - Self { - account_include: Some(vec![key.to_string()]), - ..Default::default() - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum TransactionCommitment { diff --git a/src/websocket.rs b/src/websocket.rs index 7548786..cd6212d 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,6 +1,4 @@ -#![allow(dead_code)] - -use crate::error::HeliusError; +use crate::error::{HeliusError, Result}; use crate::types::{RpcTransactionsConfig, TransactionNotification}; use futures_util::{ future::{ready, BoxFuture, FutureExt}, @@ -10,13 +8,8 @@ use futures_util::{ use serde::de::DeserializeOwned; use serde_json::{json, Map, Value}; use solana_account_decoder::UiAccount; -use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; -use solana_rpc_client_api::filter::maybe_map_filters; -use solana_rpc_client_api::response::RpcKeyedAccount; -use solana_rpc_client_api::{ - error_object::RpcErrorObject, - response::{Response as RpcResponse, RpcVersionInfo}, -}; +use solana_rpc_client_api::config::RpcAccountInfoConfig; +use solana_rpc_client_api::{error_object::RpcErrorObject, response::Response as RpcResponse}; use solana_sdk::pubkey::Pubkey; use std::collections::BTreeMap; use std::fmt::Debug; @@ -38,40 +31,38 @@ use tokio_tungstenite::{ use url::Url; pub const ENHANCED_WEBSOCKET_URL: &str = "wss://atlas-mainnet.helius-rpc.com?api-key="; +const DEFAULT_PING_DURATION_SECONDS: u64 = 10; -pub type EnhancedWebsocketResult = Result; +// pub type Result = Result; type UnsubscribeFn = Box BoxFuture<'static, ()> + Send>; -type SubscribeResponseMsg = Result<(mpsc::UnboundedReceiver, UnsubscribeFn), HeliusError>; +type SubscribeResponseMsg = Result<(mpsc::UnboundedReceiver, UnsubscribeFn)>; type SubscribeRequestMsg = (String, Value, oneshot::Sender); -type SubscribeResult<'a, T> = EnhancedWebsocketResult<(BoxStream<'a, T>, UnsubscribeFn)>; -type RequestMsg = (String, Value, oneshot::Sender>); +type SubscribeResult<'a, T> = Result<(BoxStream<'a, T>, UnsubscribeFn)>; +type RequestMsg = (String, Value, oneshot::Sender>); -/// A client for subscribing to messages from the RPC server. +/// A client for subscribing to transaction or account updates from a Helius (geyser) enhanced websocket server. /// -/// Forked from Solana's EnhancedWebsocket. -#[derive(Debug)] +/// Forked from Solana's [`PubsubClient`]. pub struct EnhancedWebsocket { subscribe_sender: mpsc::UnboundedSender, - request_sender: mpsc::UnboundedSender, shutdown_sender: oneshot::Sender<()>, node_version: RwLock>, - ws: JoinHandle, + ws: JoinHandle>, } impl EnhancedWebsocket { /// Expects enhanced websocket endpoint: wss://atlas-mainnet.helius-rpc.com?api-key= - pub async fn new(url: &str) -> EnhancedWebsocketResult { + pub async fn new(url: &str) -> Result { let url = Url::parse(url)?; let (ws, _response) = connect_async(url).await.map_err(HeliusError::Tungstenite)?; let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel(); - let (request_sender, request_receiver) = mpsc::unbounded_channel(); + let (_request_sender, request_receiver) = mpsc::unbounded_channel(); let (shutdown_sender, shutdown_receiver) = oneshot::channel(); Ok(Self { subscribe_sender, - request_sender, shutdown_sender, node_version: RwLock::new(None), ws: tokio::spawn(EnhancedWebsocket::run_ws( @@ -79,51 +70,22 @@ impl EnhancedWebsocket { subscribe_receiver, request_receiver, shutdown_receiver, + DEFAULT_PING_DURATION_SECONDS, )), }) } - pub async fn shutdown(self) -> EnhancedWebsocketResult { + pub async fn shutdown(self) -> Result<()> { let _ = self.shutdown_sender.send(()); self.ws.await.unwrap() // WS future should not be cancelled or panicked } - pub async fn set_node_version(&self, version: semver::Version) -> Result<(), ()> { + pub async fn set_node_version(&self, version: semver::Version) -> Result<()> { let mut w_node_version = self.node_version.write().await; *w_node_version = Some(version); Ok(()) } - async fn get_node_version(&self) -> EnhancedWebsocketResult { - let r_node_version = self.node_version.read().await; - if let Some(version) = &*r_node_version { - Ok(version.clone()) - } else { - drop(r_node_version); - let mut w_node_version = self.node_version.write().await; - let node_version = self.get_version().await?; - *w_node_version = Some(node_version.clone()); - Ok(node_version) - } - } - - async fn get_version(&self) -> EnhancedWebsocketResult { - let (response_sender, response_receiver) = oneshot::channel(); - self.request_sender - .send(("getVersion".to_string(), Value::Null, response_sender)) - .map_err(|err| HeliusError::WebsocketClosed(err.to_string()))?; - let result = response_receiver - .await - .map_err(|err| HeliusError::WebsocketClosed(err.to_string()))??; - let node_version: RpcVersionInfo = serde_json::from_value(result)?; - let node_version = - semver::Version::parse(&node_version.solana_core).map_err(|e| HeliusError::EnhancedWebsocket { - reason: format!("failed to parse cluster version: {e}"), - message: "getVersion".to_string(), - })?; - Ok(node_version) - } - async fn subscribe<'a, T: DeserializeOwned + Send + Debug + 'a>( &self, operation: &str, @@ -154,6 +116,34 @@ impl EnhancedWebsocket { )) } + /// Stream transactions with numerous configurations and filters to choose from. + /// + /// # Example + /// ```rust + /// use helius::Helius; + /// use helius::error::Result; + /// use helius::types::{Cluster, RpcTransactionsConfig, TransactionSubscribeFilter, TransactionSubscribeOptions}; + /// use solana_sdk::pubkey; + /// use tokio_stream::StreamExt; + /// + /// #[tokio::main] + /// async fn main() -> Result<()> { + /// let helius = Helius::new("your_api_key", Cluster::MainnetBeta).expect("Failed to create a Helius client"); + /// // you may monitor transactions for any pubkey, this is just an example. + /// let key = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); + /// let config = RpcTransactionsConfig { + /// filter: TransactionSubscribeFilter::standard(&key), + /// options: TransactionSubscribeOptions::default(), + /// }; + /// if let Some(ws) = helius.ws() { + /// let (mut stream, _unsub) = ws.transaction_subscribe(config).await?; + /// while let Some(event) = stream.next().await { + /// println!("{:#?}", event); + /// } + /// } + /// Ok(()) + /// } + /// ``` pub async fn transaction_subscribe( &self, config: RpcTransactionsConfig, @@ -162,6 +152,30 @@ impl EnhancedWebsocket { self.subscribe("transaction", params).await } + /// Stream accounts with numerous configurations and filters to choose from. + /// + /// # Example + /// ```rust + /// use helius::Helius; + /// use helius::error::Result; + /// use helius::types::{Cluster, RpcTransactionsConfig, TransactionSubscribeFilter, TransactionSubscribeOptions}; + /// use solana_sdk::pubkey; + /// use tokio_stream::StreamExt; + /// + /// #[tokio::main] + /// async fn main() -> Result<()> { + /// let helius = Helius::new("your_api_key", Cluster::MainnetBeta).expect("Failed to create a Helius client"); + /// // you may monitor updates for any account pubkey, this is just an example. + /// let key = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); + /// if let Some(ws) = helius.ws() { + /// let (mut stream, _unsub) = ws.account_subscribe(&key, None).await?; + /// while let Some(event) = stream.next().await { + /// println!("{:#?}", event); + /// } + /// } + /// Ok(()) + /// } + /// ``` pub async fn account_subscribe( &self, pubkey: &Pubkey, @@ -171,33 +185,13 @@ impl EnhancedWebsocket { self.subscribe("account", params).await } - pub async fn program_subscribe( - &self, - pubkey: &Pubkey, - mut config: Option, - ) -> SubscribeResult<'_, RpcResponse> { - if let Some(ref mut config) = config { - if let Some(ref mut filters) = config.filters { - let node_version = self.get_node_version().await.ok(); - // If node does not support the pubsub `getVersion` method, assume version is old - // and filters should be mapped (node_version.is_none()). - maybe_map_filters(node_version, filters).map_err(|e| HeliusError::EnhancedWebsocket { - reason: e, - message: "maybe_map_filters".to_string(), - })?; - } - } - - let params = json!([pubkey.to_string(), config]); - self.subscribe("program", params).await - } - async fn run_ws( mut ws: WebSocketStream>, mut subscribe_receiver: mpsc::UnboundedReceiver, mut request_receiver: mpsc::UnboundedReceiver, mut shutdown_receiver: oneshot::Receiver<()>, - ) -> EnhancedWebsocketResult { + ping_duration_seconds: u64, + ) -> Result<()> { let mut request_id: u64 = 0; let mut requests_subscribe = BTreeMap::new(); @@ -216,7 +210,7 @@ impl EnhancedWebsocket { break; }, // Send `Message::Ping` each 10s if no any other communication - () = sleep(Duration::from_secs(10)) => { + () = sleep(Duration::from_secs(ping_duration_seconds)) => { ws.send(Message::Ping(Vec::new())).await?; }, // Read message for subscribe diff --git a/tests/rpc/test_get_asset.rs b/tests/rpc/test_get_asset.rs index 64848ad..33cc907 100644 --- a/tests/rpc/test_get_asset.rs +++ b/tests/rpc/test_get_asset.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -184,7 +184,7 @@ async fn test_get_asset_success() { }), }; - let response: Result, HeliusError> = helius.rpc().get_asset(request).await; + let response: Result> = helius.rpc().get_asset(request).await; assert!(response.is_ok(), "API call failed with error: {:?}", response.err()); let asset_response: Option = response.unwrap(); @@ -240,6 +240,6 @@ async fn test_get_asset_failure() { }), }; - let response: Result, HeliusError> = helius.rpc().get_asset(request).await; + let response: Result> = helius.rpc().get_asset(request).await; assert!(response.is_err(), "Expected an error but got success"); } diff --git a/tests/rpc/test_get_asset_batch.rs b/tests/rpc/test_get_asset_batch.rs index 24462aa..a90d6d2 100644 --- a/tests/rpc/test_get_asset_batch.rs +++ b/tests/rpc/test_get_asset_batch.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{ ApiResponse, Asset, Attribute, Authorities, Cluster, CollectionMetadata, Compression, Content, Creator, File, @@ -280,7 +280,7 @@ async fn test_get_asset_batch_success() { }), }; - let response: Result>, HeliusError> = helius.rpc().get_asset_batch(request).await; + let response: Result>> = helius.rpc().get_asset_batch(request).await; assert!(response.is_ok(), "API call failed with the error: {:?}", response.err()); let assets: Vec> = response.unwrap(); @@ -325,6 +325,6 @@ async fn test_get_asset_batch_failure() { }), }; - let response: Result>, HeliusError> = helius.rpc().get_asset_batch(request).await; + let response: Result>> = helius.rpc().get_asset_batch(request).await; assert!(response.is_err(), "Expected an error but got a successful response"); } diff --git a/tests/rpc/test_get_asset_proof.rs b/tests/rpc/test_get_asset_proof.rs index f05f8a0..c54b03c 100644 --- a/tests/rpc/test_get_asset_proof.rs +++ b/tests/rpc/test_get_asset_proof.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -70,7 +70,7 @@ async fn test_get_asset_proof_success() { id: "Bu1DEKeawy7txbnCEJE4BU3BKLXaNAKCYcHR4XhndGss".to_string(), }; - let response: Result, HeliusError> = helius.rpc().get_asset_proof(request).await; + let response: Result> = helius.rpc().get_asset_proof(request).await; assert!(response.is_ok(), "API call failed with error: {:?}", response.err()); let asset_response: Option = response.unwrap(); @@ -118,6 +118,6 @@ async fn test_get_asset_proof_failure() { id: "invalid-id-helius-is-the-best".to_string(), }; - let response: Result, HeliusError> = helius.rpc().get_asset_proof(request).await; + let response: Result> = helius.rpc().get_asset_proof(request).await; assert!(response.is_err(), "Expected an error but got success"); } diff --git a/tests/rpc/test_get_asset_proof_batch.rs b/tests/rpc/test_get_asset_proof_batch.rs index 1f67728..1d2701e 100644 --- a/tests/rpc/test_get_asset_proof_batch.rs +++ b/tests/rpc/test_get_asset_proof_batch.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -76,8 +76,7 @@ async fn test_get_asset_proof_batch_success() { ], }; - let response: Result>, HeliusError> = - helius.rpc().get_asset_proof_batch(request).await; + let response: Result>> = helius.rpc().get_asset_proof_batch(request).await; assert!(response.is_ok(), "API call failed with error: {:?}", response.err()); let proofs: HashMap> = response.unwrap(); @@ -129,7 +128,6 @@ async fn test_get_asset_proof_failure() { ids: vec!["Hello there".to_string(), "General Kenobi".to_string()], }; - let response: Result>, HeliusError> = - helius.rpc().get_asset_proof_batch(request).await; + let response: Result>> = helius.rpc().get_asset_proof_batch(request).await; assert!(response.is_err(), "Expected an error but got success"); } diff --git a/tests/rpc/test_get_assets_by_group.rs b/tests/rpc/test_get_assets_by_group.rs index c74dbdc..0998116 100644 --- a/tests/rpc/test_get_assets_by_group.rs +++ b/tests/rpc/test_get_assets_by_group.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -165,7 +165,7 @@ async fn test_get_assets_by_group_success() { cursor: None, }; - let response: Result = helius.rpc().get_assets_by_group(request).await; + let response: Result = helius.rpc().get_assets_by_group(request).await; assert!(response.is_ok(), "Expected an error due to server failure"); let asset: AssetList = response.unwrap(); @@ -218,6 +218,6 @@ async fn test_get_assets_by_group_failure() { ..Default::default() }; - let response: Result = helius.rpc().get_assets_by_group(request).await; + let response: Result = helius.rpc().get_assets_by_group(request).await; assert!(response.is_err(), "Expected an error due to server failure"); } diff --git a/tests/rpc/test_get_assets_by_owner.rs b/tests/rpc/test_get_assets_by_owner.rs index d983678..4c1fb1d 100644 --- a/tests/rpc/test_get_assets_by_owner.rs +++ b/tests/rpc/test_get_assets_by_owner.rs @@ -1,5 +1,5 @@ use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{ ApiResponse, Asset, AssetList, Attribute, Authorities, Cluster, Compression, Content, Creator, File, @@ -174,7 +174,7 @@ async fn test_get_assets_by_owner_success() { ..Default::default() }; - let response: Result = helius.rpc().get_assets_by_owner(request).await; + let response: Result = helius.rpc().get_assets_by_owner(request).await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); let api_response: AssetList = response.unwrap(); @@ -224,6 +224,6 @@ async fn test_get_assets_by_owner_failure() { ..Default::default() }; - let response: Result = helius.rpc().get_assets_by_owner(request).await; + let response: Result = helius.rpc().get_assets_by_owner(request).await; assert!(response.is_err(), "Expected an error due to server failure"); } diff --git a/tests/rpc/test_get_nft_editions.rs b/tests/rpc/test_get_nft_editions.rs index 9909ae6..aebb9ff 100644 --- a/tests/rpc/test_get_nft_editions.rs +++ b/tests/rpc/test_get_nft_editions.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -63,7 +63,7 @@ async fn test_get_nft_editions_success() { limit: Some(1), }; - let response: Result = helius.rpc().get_nft_editions(request).await; + let response: Result = helius.rpc().get_nft_editions(request).await; assert!(response.is_ok(), "API call failed with error: {:?}", response.err()); let editions_list: EditionsList = response.unwrap(); @@ -115,6 +115,6 @@ async fn test_get_nft_editions_failure() { limit: Some(1), }; - let response: Result = helius.rpc().get_nft_editions(request).await; + let response: Result = helius.rpc().get_nft_editions(request).await; assert!(response.is_err(), "Expected an error but got success"); } diff --git a/tests/rpc/test_get_priority_fee_estimate.rs b/tests/rpc/test_get_priority_fee_estimate.rs index 5612225..8c0cdd9 100644 --- a/tests/rpc/test_get_priority_fee_estimate.rs +++ b/tests/rpc/test_get_priority_fee_estimate.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -66,8 +66,7 @@ async fn test_get_nft_editions_success() { }), }; - let response: Result = - helius.rpc().get_priority_fee_estimate(request).await; + let response: Result = helius.rpc().get_priority_fee_estimate(request).await; assert!(response.is_ok(), "API call failed with error: {:?}", response.err()); let fee_estimate = response.unwrap(); @@ -120,7 +119,6 @@ async fn test_get_nft_editions_failure() { }), }; - let response: Result = - helius.rpc().get_priority_fee_estimate(request).await; + let response: Result = helius.rpc().get_priority_fee_estimate(request).await; assert!(response.is_err(), "Expected an error but got success"); } diff --git a/tests/rpc/test_get_rwa_asset.rs b/tests/rpc/test_get_rwa_asset.rs index 660b66c..661b752 100644 --- a/tests/rpc/test_get_rwa_asset.rs +++ b/tests/rpc/test_get_rwa_asset.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -68,7 +68,7 @@ async fn test_get_rwa_asset_success() { id: "RadioactiveMan#1".to_string(), }; - let response: Result = helius.rpc().get_rwa_asset(request).await; + let response: Result = helius.rpc().get_rwa_asset(request).await; assert!(response.is_ok()); assert_eq!( response.unwrap().items.asset_controller.unwrap().address, @@ -110,6 +110,6 @@ async fn test_get_rwa_asset_failure() { id: "Flanders'BookofFaith".to_string(), }; - let response: Result = helius.rpc().get_rwa_asset(request).await; + let response: Result = helius.rpc().get_rwa_asset(request).await; assert!(response.is_err(), "Expected an error but got success"); } diff --git a/tests/rpc/test_get_signatures_for_asset.rs b/tests/rpc/test_get_signatures_for_asset.rs index 9be4ef5..f4d9642 100644 --- a/tests/rpc/test_get_signatures_for_asset.rs +++ b/tests/rpc/test_get_signatures_for_asset.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -61,7 +61,7 @@ async fn test_get_asset_signatures_success() { ..Default::default() }; - let response: Result = helius.rpc().get_signatures_for_asset(request).await; + let response: Result = helius.rpc().get_signatures_for_asset(request).await; assert!(response.is_ok(), "API call failed with error: {:?}", response.err()); let signatures: TransactionSignatureList = response.unwrap(); @@ -110,6 +110,6 @@ async fn test_get_asset_signatures_failure() { ..Default::default() }; - let response: Result = helius.rpc().get_signatures_for_asset(request).await; + let response: Result = helius.rpc().get_signatures_for_asset(request).await; assert!(response.is_err(), "Expected an error but got success"); } diff --git a/tests/rpc/test_get_token_accounts.rs b/tests/rpc/test_get_token_accounts.rs index 6184563..021bc13 100644 --- a/tests/rpc/test_get_token_accounts.rs +++ b/tests/rpc/test_get_token_accounts.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -69,7 +69,7 @@ async fn test_get_token_accounts_success() { ..Default::default() }; - let response: Result = helius.rpc().get_token_accounts(request).await; + let response: Result = helius.rpc().get_token_accounts(request).await; assert!(response.is_ok(), "API call failed with error: {:?}", response.err()); let token_accounts: TokenAccountsList = response.unwrap(); @@ -121,6 +121,6 @@ async fn test_get_token_accounts_failure() { ..Default::default() }; - let response: Result = helius.rpc().get_token_accounts(request).await; + let response: Result = helius.rpc().get_token_accounts(request).await; assert!(response.is_err(), "Expected an error but got success"); } diff --git a/tests/rpc/test_search_assets.rs b/tests/rpc/test_search_assets.rs index d5b03c7..72501f6 100644 --- a/tests/rpc/test_search_assets.rs +++ b/tests/rpc/test_search_assets.rs @@ -1,5 +1,5 @@ use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{ ApiResponse, Asset, AssetList, Attribute, Authorities, Cluster, Compression, Content, Creator, File, Group, @@ -171,7 +171,7 @@ async fn test_search_assets_success() { ..Default::default() }; - let response: Result = helius.rpc().search_assets(request).await; + let response: Result = helius.rpc().search_assets(request).await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); let api_response: AssetList = response.unwrap(); @@ -218,6 +218,6 @@ async fn test_search_assets_failure() { ..Default::default() }; - let response: Result = helius.rpc().search_assets(request).await; + let response: Result = helius.rpc().search_assets(request).await; assert!(response.is_err(), "Expected an error due to server failure"); } diff --git a/tests/test_config.rs b/tests/test_config.rs index 7429c57..75568aa 100644 --- a/tests/test_config.rs +++ b/tests/test_config.rs @@ -1,16 +1,16 @@ use helius::config::Config; -use helius::error::HeliusError; +use helius::error::{HeliusError, Result}; use helius::types::Cluster; #[test] fn test_config_new_with_empty_api_key() { - let result: Result = Config::new("", Cluster::Devnet); + let result: Result = Config::new("", Cluster::Devnet); assert!(matches!(result, Err(HeliusError::InvalidInput(_)))); } #[test] fn test_config_new_with_valid_api_key() { - let result: Result = Config::new("valid-api-key", Cluster::Devnet); + let result: Result = Config::new("valid-api-key", Cluster::Devnet); assert!(result.is_ok()); let config: Config = result.unwrap(); diff --git a/tests/test_mint_api.rs b/tests/test_mint_api.rs index d78a7aa..44bd117 100644 --- a/tests/test_mint_api.rs +++ b/tests/test_mint_api.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use helius::client::Helius; use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::*; @@ -80,7 +80,7 @@ async fn test_mint_compressed_nft() { confirm_transaction: Some(true), }; - let result: Result = helius.mint_compressed_nft(request).await; + let result: Result = helius.mint_compressed_nft(request).await; assert!(result.is_ok(), "API call failed with error: {:?}", result.err()); let mint_response: MintResponse = result.unwrap(); @@ -154,7 +154,7 @@ async fn test_get_asset_proof_failure() { confirm_transaction: Some(true), }; - let result: Result = helius.mint_compressed_nft(request).await; + let result: Result = helius.mint_compressed_nft(request).await; assert!(result.is_err(), "Expected an error but got success"); } @@ -163,8 +163,9 @@ async fn test_mint_api_authority_from_cluster_success() { let devnet_cluster: Cluster = Cluster::Devnet; let mainnet_cluster: Cluster = Cluster::MainnetBeta; - let devnet_authority: Result = MintApiAuthority::from_cluster(devnet_cluster); - let mainnet_authority: Result = MintApiAuthority::from_cluster(mainnet_cluster); + let devnet_authority: std::result::Result = MintApiAuthority::from_cluster(devnet_cluster); + let mainnet_authority: std::result::Result = + MintApiAuthority::from_cluster(mainnet_cluster); assert_eq!( devnet_authority.unwrap(), @@ -183,8 +184,9 @@ async fn test_mint_api_authority_from_cluster_failure() { let devnet_cluster: Cluster = Cluster::Devnet; let mainnet_cluster: Cluster = Cluster::MainnetBeta; - let devnet_authority: Result = MintApiAuthority::from_cluster(devnet_cluster); - let mainnet_authority: Result = MintApiAuthority::from_cluster(mainnet_cluster); + let devnet_authority: std::result::Result = MintApiAuthority::from_cluster(devnet_cluster); + let mainnet_authority: std::result::Result = + MintApiAuthority::from_cluster(mainnet_cluster); assert_ne!( devnet_authority.unwrap(), From d9357dff509caf2ebbc1a7aef81fa24b1d1f54e0 Mon Sep 17 00:00:00 2001 From: Drew Date: Mon, 27 May 2024 13:59:33 -0400 Subject: [PATCH 4/5] resolve merge conflicts and cargo sort dependencies for readability --- .gitignore | 2 +- Cargo.toml | 24 +++++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 5261b4f..16c763d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ /target /src/main.rs /Cargo.lock -/.idea +.idea/ diff --git a/Cargo.toml b/Cargo.toml index 55ba95a..f5f2884 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,24 +12,26 @@ readme = "README.md" homepage = "https://www.helius.dev/" [dependencies] +base64 = "0.22.1" +bincode = "1.3.3" +chrono = { version = "0.4.11", features = ["serde"] } +futures = "0.3.30" +futures-util = "0.3.30" reqwest = { version = "0.12.3", features = ["json"] } +semver = "1.0.23" serde = "1.0.198" +serde-enum-str = "0.4.0" serde_json = "1.0.116" -solana-sdk = "1.18.11" -thiserror = "1.0.58" -tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread", "net"] } -chrono = { version = "0.4.11", features = ["serde"] } +solana-account-decoder = "1.18.12" solana-client = "1.18.12" solana-program = "1.18.12" -serde-enum-str = "0.4.0" -tokio-tungstenite = { version = "0.21.0", features = ["native-tls", "handshake"] } -tokio-stream = "0.1.15" solana-rpc-client-api = "1.18.12" -futures-util = "0.3.30" -solana-account-decoder = "1.18.12" +solana-sdk = "1.18.11" solana-transaction-status = "1.18.12" -futures = "0.3.30" -semver = "1.0.23" +thiserror = "1.0.58" +tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread", "net"] } +tokio-stream = "0.1.15" +tokio-tungstenite = { version = "0.21.0", features = ["native-tls", "handshake"] } url = "2.5.0" [dev-dependencies] From c193afbfa5cac2acc32b69dbfd75bcd95454ac1a Mon Sep 17 00:00:00 2001 From: Evan <0xIchigo@protonmail.com> Date: Tue, 28 May 2024 02:21:17 -0400 Subject: [PATCH 5/5] Clean Up PR --- src/client.rs | 35 ++++------------ tests/test_enhanced_transactions.rs | 40 +++++++++++-------- .../test_append_addresses_to_webhook.rs | 15 ++++--- tests/webhook/test_create_webhook.rs | 19 +++++---- tests/webhook/test_delete_webhook.rs | 11 +++-- tests/webhook/test_edit_webhook.rs | 33 ++++++++------- tests/webhook/test_get_all_webhooks.rs | 16 +++++--- tests/webhook/test_get_webhook_by_id.rs | 16 +++++--- .../test_remove_addresses_from_webhook.rs | 16 +++++--- 9 files changed, 106 insertions(+), 95 deletions(-) diff --git a/src/client.rs b/src/client.rs index be80550..ca2404b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -56,38 +56,17 @@ impl Helius { /// The enhanced websocket is optional, and this method is used to create a new instance of `Helius` with an enhanced websocket client. /// Upon calling this method, the websocket will connect hence the asynchronous function definition omission from the default `new` method. /// - /// # Example - /// ```rust - /// use helius::Helius; - /// use helius::error::Result; - /// use helius::types::{Cluster, RpcTransactionsConfig, TransactionSubscribeFilter, TransactionSubscribeOptions}; - /// use solana_sdk::pubkey; - /// use tokio_stream::StreamExt; - /// - /// #[tokio::main] - /// async fn main() -> Result<()> { - /// let helius = Helius::new("your_api_key", Cluster::MainnetBeta).expect("Failed to create a Helius client"); - /// // you may monitor transactions for any pubkey, this is just an example. - /// let key = pubkey!("BtsmiEEvnSuUnKxqXj2PZRYpPJAc7C34mGz8gtJ1DAaH"); - /// let config = RpcTransactionsConfig { - /// filter: TransactionSubscribeFilter::standard(&key), - /// options: TransactionSubscribeOptions::default(), - /// }; - /// if let Some(ws) = helius.ws() { - /// let (mut stream, _unsub) = ws.transaction_subscribe(config).await?; - /// while let Some(event) = stream.next().await { - /// println!("{:#?}", event); - /// } - /// } - /// Ok(()) - /// } - /// ``` + /// # Arguments + /// * `api_key` - The API key required for authenticating requests made + /// * `cluster` - The Solana cluster (Devnet or MainnetBeta) that defines the given network environment + /// # Returns + /// An instance of `Helius` if successful. A `HeliusError` is returned if an error occurs during configuration or initialization of the HTTP, RPC, or WS client pub async fn new_with_ws(api_key: &str, cluster: Cluster) -> Result { let config: Arc = Arc::new(Config::new(api_key, cluster)?); let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), config.clone())?); - let wss = format!("{}{}", ENHANCED_WEBSOCKET_URL, api_key); - let ws_client = Arc::new(EnhancedWebsocket::new(&wss).await?); + let wss: String = format!("{}{}", ENHANCED_WEBSOCKET_URL, api_key); + let ws_client: Arc = Arc::new(EnhancedWebsocket::new(&wss).await?); Ok(Helius { config, client, diff --git a/tests/test_enhanced_transactions.rs b/tests/test_enhanced_transactions.rs index dc95b72..b218779 100644 --- a/tests/test_enhanced_transactions.rs +++ b/tests/test_enhanced_transactions.rs @@ -1,4 +1,5 @@ use helius::config::Config; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{ AccountData, Cluster, EnhancedTransaction, HeliusEndpoints, InnerInstruction, Instruction, @@ -16,7 +17,7 @@ async fn test_parse_transactions_success() { let mut server: Server = Server::new_with_opts_async(mockito::ServerOpts::default()).await; let url: String = format!("{}/", server.url()); - let mock_response = vec![EnhancedTransaction { + let mock_response: Vec = vec![EnhancedTransaction { account_data: vec![AccountData { account: "".to_string(), native_token_balance: Some(Number::from(10)), @@ -75,22 +76,24 @@ async fn test_parse_transactions_success() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let request = ParseTransactionsRequest { + let request: ParseTransactionsRequest = ParseTransactionsRequest { transactions: vec![ "DiG7v24AXRQqGagDx8pcVxRgVrgFoXUpJgp7xb62ycG9".to_string(), "46tC8n6GyWvUjFxpTE9juG5WZ72RXADpPhY4S1d6wvTi".to_string(), ], }; - let response = helius.parse_transactions(request).await; + let response: Result> = helius.parse_transactions(request).await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); - let tx_response = response.unwrap(); + + let tx_response: Vec = response.unwrap(); assert_eq!( tx_response[0].signature, "yy5BT9benHhx8fGCvhcAfTtLEHAtRJ3hRTzVL16bdrTCWm63t2vapfrZQZLJC3RcuagekaXjSs2zUGQvbcto8DK".to_string() @@ -113,12 +116,13 @@ async fn test_parse_transactions_failure() { }); let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let request = ParseTransactionsRequest { + let request: ParseTransactionsRequest = ParseTransactionsRequest { transactions: vec![ "DiG7v24AXRQqGagDx8pcVxRgVrgFoXUpJgp7xb62ycG9".to_string(), "46tC8n6GyWvUjFxpTE9juG5WZ72RXADpPhY4S1d6wvTi".to_string(), @@ -131,7 +135,7 @@ async fn test_parse_transactions_failure() { .with_body(r#"{"error":"Internal Server Error"}"#) .create(); - let response = helius.parse_transactions(request).await; + let response: Result> = helius.parse_transactions(request).await; assert!(response.is_err(), "Expected an error due to server failure"); } #[tokio::test] @@ -139,7 +143,7 @@ async fn test_parse_transaction_history_success() { let mut server: Server = Server::new_with_opts_async(mockito::ServerOpts::default()).await; let url: String = format!("{}/", server.url()); - let mock_response = vec![EnhancedTransaction { + let mock_response: Vec = vec![EnhancedTransaction { account_data: vec![AccountData { account: "".to_string(), native_token_balance: Some(Number::from(10)), @@ -201,21 +205,22 @@ async fn test_parse_transaction_history_success() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let request = ParsedTransactionHistoryRequest { + let request: ParsedTransactionHistoryRequest = ParsedTransactionHistoryRequest { address: "46tC8n6GyWvUjFxpTE9juG5WZ72RXADpPhY4S1d6wvTi".to_string(), before: None, }; - let response = helius.parsed_transaction_history(request).await; - + let response: Result> = helius.parsed_transaction_history(request).await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); - let tx_response = response.unwrap(); + + let tx_response: Vec = response.unwrap(); assert_eq!( tx_response[0].signature, "yy5BT9benHhx8fGCvhcAfTtLEHAtRJ3hRTzVL16bdrTCWm63t2vapfrZQZLJC3RcuagekaXjSs2zUGQvbcto8DK".to_string() @@ -238,12 +243,13 @@ async fn test_parse_transaction_history_failure() { }); let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let request = ParsedTransactionHistoryRequest { + let request: ParsedTransactionHistoryRequest = ParsedTransactionHistoryRequest { address: "46tC8n6GyWvUjFxpTE9juG5WZ72RXADpPhY4S1d6wvTi".to_string(), before: None, }; @@ -257,6 +263,6 @@ async fn test_parse_transaction_history_failure() { .with_body(r#"{"error":"Internal Server Error"}"#) .create(); - let response = helius.parsed_transaction_history(request).await; + let response: Result> = helius.parsed_transaction_history(request).await; assert!(response.is_err(), "Expected an error due to server failure"); } diff --git a/tests/webhook/test_append_addresses_to_webhook.rs b/tests/webhook/test_append_addresses_to_webhook.rs index 737f611..f072329 100644 --- a/tests/webhook/test_append_addresses_to_webhook.rs +++ b/tests/webhook/test_append_addresses_to_webhook.rs @@ -1,5 +1,5 @@ use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{Cluster, HeliusEndpoints, TransactionType, Webhook, WebhookType}; use helius::Helius; @@ -59,13 +59,14 @@ async fn test_append_addresses_to_webhook_success() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response = helius + let response: Result = helius .append_addresses_to_webhook( "0e8250a1-ceec-4757-ad69", &["71WDyyCsZwyEYDV91Qrb212rdg6woCHYQhFnmZUBxiJ6".to_string()], @@ -73,7 +74,8 @@ async fn test_append_addresses_to_webhook_success() { .await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); - let webhook_response = response.unwrap(); + let webhook_response: Webhook = response.unwrap(); + assert_eq!(webhook_response.webhook_id, "0e8250a1-ceec-4757-ad69"); assert_eq!( webhook_response.webhook_url, @@ -125,12 +127,13 @@ async fn test_append_addresses_to_webhook_failure() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response: Result = helius + let response: Result = helius .append_addresses_to_webhook( "0e8250a1-ceec-4757-ad69", &["71WDyyCsZwyEYDV91Qrb212rdg6woCHYQhFnmZUBxiJ6".to_string()], diff --git a/tests/webhook/test_create_webhook.rs b/tests/webhook/test_create_webhook.rs index d259a6e..c4098fb 100644 --- a/tests/webhook/test_create_webhook.rs +++ b/tests/webhook/test_create_webhook.rs @@ -1,5 +1,5 @@ use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{Cluster, CreateWebhookRequest, HeliusEndpoints, TransactionType, Webhook, WebhookType}; use helius::Helius; @@ -42,13 +42,14 @@ async fn test_create_webhook_success() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let request = CreateWebhookRequest { + let request: CreateWebhookRequest = CreateWebhookRequest { webhook_url: "https://webhook.site/0e8250a1-ceec-4757-ad69-cc6473085bfc".to_string(), transaction_types: vec![TransactionType::Any], account_addresses: vec![], @@ -56,10 +57,11 @@ async fn test_create_webhook_success() { auth_header: None, ..Default::default() }; - let response = helius.create_webhook(request).await; + let response: Result = helius.create_webhook(request).await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); - let webhook_response = response.unwrap(); + let webhook_response: Webhook = response.unwrap(); + assert_eq!(webhook_response.webhook_id, "0e8250a1-ceec-4757-ad69"); assert_eq!( webhook_response.webhook_url, @@ -86,7 +88,7 @@ async fn test_create_webhook_failure() { rpc: url.to_string(), }, }); - let request = CreateWebhookRequest { + let request: CreateWebhookRequest = CreateWebhookRequest { webhook_url: "https://webhook.site/0e8250a1-ceec-4757-ad69-cc6473085bfc".to_string(), transaction_types: vec![TransactionType::Any], account_addresses: vec![], @@ -97,11 +99,12 @@ async fn test_create_webhook_failure() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response: Result = helius.create_webhook(request).await; + let response: Result = helius.create_webhook(request).await; assert!(response.is_err(), "Expected an error due to server failure"); } diff --git a/tests/webhook/test_delete_webhook.rs b/tests/webhook/test_delete_webhook.rs index 588f3a0..01d5009 100644 --- a/tests/webhook/test_delete_webhook.rs +++ b/tests/webhook/test_delete_webhook.rs @@ -1,4 +1,5 @@ use helius::config::Config; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{Cluster, HeliusEndpoints}; use helius::Helius; @@ -28,13 +29,14 @@ async fn test_delete_webhook_success() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response = helius.delete_webhook("0e8250a1-ceec-4757-ad69").await; + let response: Result<()> = helius.delete_webhook("0e8250a1-ceec-4757-ad69").await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); } @@ -61,11 +63,12 @@ async fn test_delete_webhook_failure() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response = helius.delete_webhook("0e8250a1-ceec-4757-ad69").await; + let response: Result<()> = helius.delete_webhook("0e8250a1-ceec-4757-ad69").await; assert!(response.is_err(), "Expected an error due to server failure"); } diff --git a/tests/webhook/test_edit_webhook.rs b/tests/webhook/test_edit_webhook.rs index 857a72e..9f27c89 100644 --- a/tests/webhook/test_edit_webhook.rs +++ b/tests/webhook/test_edit_webhook.rs @@ -1,5 +1,5 @@ use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{Cluster, EditWebhookRequest, HeliusEndpoints, TransactionType, Webhook, WebhookType}; use helius::Helius; @@ -42,13 +42,14 @@ async fn test_edit_webhook_success() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let request = EditWebhookRequest { + let request: EditWebhookRequest = EditWebhookRequest { webhook_id: "0e8250a1-ceec-4757-ad69".to_string(), webhook_url: "https://webhook.site/0e8250a1-ceec-4757-ad69-cc6473085bfc".to_string(), transaction_types: vec![TransactionType::Any], @@ -58,10 +59,11 @@ async fn test_edit_webhook_success() { txn_status: Default::default(), encoding: Default::default(), }; - let response = helius.edit_webhook(request).await; + let response: Result = helius.edit_webhook(request).await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); - let webhook_response = response.unwrap(); + let webhook_response: Webhook = response.unwrap(); + assert_eq!(webhook_response.webhook_id, "0e8250a1-ceec-4757-ad69"); assert_eq!( webhook_response.webhook_url, @@ -84,6 +86,7 @@ async fn test_edit_webhook_failure() { .with_header("Content-Type", "application/json") .with_body(r#"{"error":"Internal Server Error"}"#) .create(); + let config: Arc = Arc::new(Config { api_key: "fake_api_key".to_string(), cluster: Cluster::Devnet, @@ -92,7 +95,16 @@ async fn test_edit_webhook_failure() { rpc: url.to_string(), }, }); - let request = EditWebhookRequest { + let client: Client = Client::new(); + let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); + let helius: Helius = Helius { + config, + client, + rpc_client, + ws_client: None, + }; + + let request: EditWebhookRequest = EditWebhookRequest { webhook_id: "0e8250a1-ceec-4757-ad69".to_string(), webhook_url: "https://webhook.site/0e8250a1-ceec-4757-ad69-cc6473085bfc".to_string(), transaction_types: vec![TransactionType::Any], @@ -103,13 +115,6 @@ async fn test_edit_webhook_failure() { encoding: Default::default(), }; - let client: Client = Client::new(); - let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { - config, - client, - rpc_client, - }; - let response: Result = helius.edit_webhook(request).await; + let response: Result = helius.edit_webhook(request).await; assert!(response.is_err(), "Expected an error due to server failure"); } diff --git a/tests/webhook/test_get_all_webhooks.rs b/tests/webhook/test_get_all_webhooks.rs index c488f32..01814b9 100644 --- a/tests/webhook/test_get_all_webhooks.rs +++ b/tests/webhook/test_get_all_webhooks.rs @@ -1,5 +1,5 @@ use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{Cluster, HeliusEndpoints, TransactionType, Webhook, WebhookType}; use helius::Helius; @@ -40,16 +40,18 @@ async fn test_get_all_webhooks_success() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response = helius.get_all_webhooks().await; + let response: Result> = helius.get_all_webhooks().await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); - let webhook_response = response.unwrap(); + let webhook_response: Vec = response.unwrap(); + assert_eq!(webhook_response.len(), 1); assert_eq!(webhook_response[0].webhook_id, "0e8250a1-ceec-4757-ad69"); assert_eq!( @@ -81,11 +83,13 @@ async fn test_get_all_webhooks_failure() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response: Result, HeliusError> = helius.get_all_webhooks().await; + + let response: Result> = helius.get_all_webhooks().await; assert!(response.is_err(), "Expected an error due to server failure"); } diff --git a/tests/webhook/test_get_webhook_by_id.rs b/tests/webhook/test_get_webhook_by_id.rs index e988b9b..0dbbc13 100644 --- a/tests/webhook/test_get_webhook_by_id.rs +++ b/tests/webhook/test_get_webhook_by_id.rs @@ -1,5 +1,5 @@ use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{Cluster, HeliusEndpoints, TransactionType, Webhook, WebhookType}; use helius::Helius; @@ -41,16 +41,18 @@ async fn test_get_webhook_by_id_success() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response = helius.get_webhook_by_id("0e8250a1-ceec-4757-ad69").await; + let response: Result = helius.get_webhook_by_id("0e8250a1-ceec-4757-ad69").await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); - let webhook_response = response.unwrap(); + let webhook_response: Webhook = response.unwrap(); + assert_eq!(webhook_response.webhook_id, "0e8250a1-ceec-4757-ad69"); assert_eq!( webhook_response.webhook_url, @@ -81,11 +83,13 @@ async fn test_get_webhook_by_id_failure() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response: Result = helius.get_webhook_by_id("0e8250a1-ceec-4757-ad69").await; + + let response: Result = helius.get_webhook_by_id("0e8250a1-ceec-4757-ad69").await; assert!(response.is_err(), "Expected an error due to server failure"); } diff --git a/tests/webhook/test_remove_addresses_from_webhook.rs b/tests/webhook/test_remove_addresses_from_webhook.rs index 3022e81..2b55174 100644 --- a/tests/webhook/test_remove_addresses_from_webhook.rs +++ b/tests/webhook/test_remove_addresses_from_webhook.rs @@ -1,5 +1,5 @@ use helius::config::Config; -use helius::error::HeliusError; +use helius::error::Result; use helius::rpc_client::RpcClient; use helius::types::{Cluster, HeliusEndpoints, TransactionType, Webhook, WebhookType}; use helius::Helius; @@ -63,13 +63,14 @@ async fn test_remove_addresses_from_webhook_success() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response = helius + let response: Result = helius .remove_addresses_from_webhook( "0e8250a1-ceec-4757-ad69", &[ @@ -80,7 +81,8 @@ async fn test_remove_addresses_from_webhook_success() { .await; assert!(response.is_ok(), "The API call failed: {:?}", response.err()); - let webhook_response = response.unwrap(); + let webhook_response: Webhook = response.unwrap(); + assert_eq!(webhook_response.webhook_id, "0e8250a1-ceec-4757-ad69"); assert_eq!( webhook_response.webhook_url, @@ -136,12 +138,14 @@ async fn test_remove_addresses_from_webhook_failure() { let client: Client = Client::new(); let rpc_client: Arc = Arc::new(RpcClient::new(Arc::new(client.clone()), Arc::clone(&config)).unwrap()); - let helius = Helius { + let helius: Helius = Helius { config, client, rpc_client, + ws_client: None, }; - let response: Result = helius + + let response: Result = helius .remove_addresses_from_webhook( "0e8250a1-ceec-4757-ad69", &[