diff --git a/src/infra/blockchain_client/blockchain_client.rs b/src/infra/blockchain_client/blockchain_client.rs index eb5494f47..3da94ffb5 100644 --- a/src/infra/blockchain_client/blockchain_client.rs +++ b/src/infra/blockchain_client/blockchain_client.rs @@ -6,11 +6,14 @@ use ethers_core::types::Transaction; use jsonrpsee::core::client::ClientT; use jsonrpsee::core::client::Subscription; use jsonrpsee::core::client::SubscriptionClientT; +use jsonrpsee::core::ClientError; use jsonrpsee::http_client::HttpClient; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::ws_client::WsClient; use jsonrpsee::ws_client::WsClientBuilder; use serde_json::Value as JsonValue; +use tokio::sync::RwLock; +use tokio::sync::RwLockReadGuard; use super::pending_transaction::PendingTransaction; use crate::eth::primitives::Address; @@ -22,14 +25,15 @@ use crate::eth::primitives::SlotIndex; use crate::eth::primitives::SlotValue; use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::Wei; +use crate::ext::DisplayExt; use crate::log_and_err; #[derive(Debug)] pub struct BlockchainClient { http: HttpClient, - pub http_url: String, - ws: Option, - pub ws_url: Option, + ws: Option>, + ws_url: Option, + timeout: Duration, } impl BlockchainClient { @@ -43,7 +47,7 @@ impl BlockchainClient { tracing::info!(%http_url, "starting blockchain client"); // build http provider - let http = match HttpClientBuilder::default().request_timeout(timeout).build(http_url) { + let http = match Self::build_http_client(http_url, timeout) { Ok(http) => http, Err(e) => { tracing::error!(reason = ?e, url = %http_url, "failed to create blockchain http client"); @@ -53,8 +57,8 @@ impl BlockchainClient { // build ws provider let (ws, ws_url) = if let Some(ws_url) = ws_url { - match WsClientBuilder::new().connection_timeout(timeout).build(ws_url).await { - Ok(ws) => (Some(ws), Some(ws_url.to_string())), + match Self::build_ws_client(ws_url, timeout).await { + Ok(ws) => (Some(RwLock::new(ws)), Some(ws_url.to_string())), Err(e) => { tracing::error!(reason = ?e, url = %ws_url, "failed to create blockchain websocket client"); return Err(e).context("failed to create blockchain websocket client"); @@ -64,12 +68,7 @@ impl BlockchainClient { (None, None) }; - let client = Self { - http, - http_url: http_url.to_string(), - ws, - ws_url, - }; + let client = Self { http, ws, ws_url, timeout }; // check health before assuming it is ok client.fetch_listening().await?; @@ -77,6 +76,28 @@ impl BlockchainClient { Ok(client) } + fn build_http_client(url: &str, timeout: Duration) -> anyhow::Result { + tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain http client"); + match HttpClientBuilder::default().request_timeout(timeout).build(url) { + Ok(http) => Ok(http), + Err(e) => { + tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain http client"); + Err(e).context("failed to create blockchain http client") + } + } + } + + async fn build_ws_client(url: &str, timeout: Duration) -> anyhow::Result { + tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain websocket client"); + match WsClientBuilder::new().connection_timeout(timeout).build(url).await { + Ok(ws) => Ok(ws), + Err(e) => { + tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain websocket client"); + Err(e).context("failed to create blockchain websocket client") + } + } + } + // ------------------------------------------------------------------------- // Websocket // ------------------------------------------------------------------------- @@ -87,9 +108,9 @@ impl BlockchainClient { } /// Validates it is connected to websocket and returns a reference to the websocket client. - fn require_ws(&self) -> anyhow::Result<&WsClient> { + async fn require_ws(&self) -> anyhow::Result> { match &self.ws { - Some(ws) => Ok(ws), + Some(ws) => Ok(ws.read().await), None => log_and_err!("blockchain client not connected to websocket"), } } @@ -228,15 +249,38 @@ impl BlockchainClient { pub async fn subscribe_new_heads(&self) -> anyhow::Result> { tracing::debug!("subscribing to newHeads event"); + let ws = self.require_ws().await?; + + let mut first_attempt = true; + loop { + let result = ws + .subscribe::>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe") + .await; + + match result { + // subscribed + Ok(sub) => return Ok(sub), + + // failed and need to reconnect + e @ Err(ClientError::RestartNeeded(_)) => { + // will try to reconnect websocket client only in first attempt + if first_attempt { + tracing::error!(%first_attempt, reason = ?e, "failed to subscribe to newHeads event. trying to reconnect websocket client now."); + } else { + tracing::error!(%first_attempt, reason = ?e, "failed to subscribe to newHeads event. will not try to reconnect websocket client."); + return e.context("failed to subscribe to newHeads event"); + } + first_attempt = false; + + // reconnect websocket client + let new_ws_client = Self::build_ws_client(self.ws_url.as_ref().unwrap(), self.timeout).await?; + let mut current_ws_client = self.ws.as_ref().unwrap().write().await; + let _ = std::mem::replace(&mut *current_ws_client, new_ws_client); + } - let ws = self.require_ws()?; - let result = ws - .subscribe::>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe") - .await; - - match result { - Ok(sub) => Ok(sub), - Err(e) => log_and_err!(reason = e, "failed to subscribe to newHeads event"), + // failed and cannot do anything + Err(e) => return log_and_err!(reason = e, "failed to subscribe to newHeads event"), + } } } }