Skip to content

Commit

Permalink
feat: reconnect websocket client on failure (#932)
Browse files Browse the repository at this point in the history
  • Loading branch information
dinhani-cw authored May 27, 2024
1 parent 9f0a8c1 commit 010e11d
Showing 1 changed file with 66 additions and 22 deletions.
88 changes: 66 additions & 22 deletions src/infra/blockchain_client/blockchain_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use ethers_core::types::Transaction;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::core::client::Subscription;
use jsonrpsee::core::client::SubscriptionClientT;
use jsonrpsee::core::ClientError;
use jsonrpsee::http_client::HttpClient;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::ws_client::WsClient;
use jsonrpsee::ws_client::WsClientBuilder;
use serde_json::Value as JsonValue;
use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;

use super::pending_transaction::PendingTransaction;
use crate::eth::primitives::Address;
Expand All @@ -22,14 +25,15 @@ use crate::eth::primitives::SlotIndex;
use crate::eth::primitives::SlotValue;
use crate::eth::primitives::StoragePointInTime;
use crate::eth::primitives::Wei;
use crate::ext::DisplayExt;
use crate::log_and_err;

#[derive(Debug)]
pub struct BlockchainClient {
http: HttpClient,
pub http_url: String,
ws: Option<WsClient>,
pub ws_url: Option<String>,
ws: Option<RwLock<WsClient>>,
ws_url: Option<String>,
timeout: Duration,
}

impl BlockchainClient {
Expand All @@ -43,7 +47,7 @@ impl BlockchainClient {
tracing::info!(%http_url, "starting blockchain client");

// build http provider
let http = match HttpClientBuilder::default().request_timeout(timeout).build(http_url) {
let http = match Self::build_http_client(http_url, timeout) {
Ok(http) => http,
Err(e) => {
tracing::error!(reason = ?e, url = %http_url, "failed to create blockchain http client");
Expand All @@ -53,8 +57,8 @@ impl BlockchainClient {

// build ws provider
let (ws, ws_url) = if let Some(ws_url) = ws_url {
match WsClientBuilder::new().connection_timeout(timeout).build(ws_url).await {
Ok(ws) => (Some(ws), Some(ws_url.to_string())),
match Self::build_ws_client(ws_url, timeout).await {
Ok(ws) => (Some(RwLock::new(ws)), Some(ws_url.to_string())),
Err(e) => {
tracing::error!(reason = ?e, url = %ws_url, "failed to create blockchain websocket client");
return Err(e).context("failed to create blockchain websocket client");
Expand All @@ -64,19 +68,36 @@ impl BlockchainClient {
(None, None)
};

let client = Self {
http,
http_url: http_url.to_string(),
ws,
ws_url,
};
let client = Self { http, ws, ws_url, timeout };

// check health before assuming it is ok
client.fetch_listening().await?;

Ok(client)
}

fn build_http_client(url: &str, timeout: Duration) -> anyhow::Result<HttpClient> {
tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain http client");
match HttpClientBuilder::default().request_timeout(timeout).build(url) {
Ok(http) => Ok(http),
Err(e) => {
tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain http client");
Err(e).context("failed to create blockchain http client")
}
}
}

async fn build_ws_client(url: &str, timeout: Duration) -> anyhow::Result<WsClient> {
tracing::info!(%url, timeout = %timeout.to_string_ext(), "creating blockchain websocket client");
match WsClientBuilder::new().connection_timeout(timeout).build(url).await {
Ok(ws) => Ok(ws),
Err(e) => {
tracing::error!(reason = ?e, %url, timeout = %timeout.to_string_ext(), "failed to create blockchain websocket client");
Err(e).context("failed to create blockchain websocket client")
}
}
}

// -------------------------------------------------------------------------
// Websocket
// -------------------------------------------------------------------------
Expand All @@ -87,9 +108,9 @@ impl BlockchainClient {
}

/// Validates it is connected to websocket and returns a reference to the websocket client.
fn require_ws(&self) -> anyhow::Result<&WsClient> {
async fn require_ws(&self) -> anyhow::Result<RwLockReadGuard<'_, WsClient>> {
match &self.ws {
Some(ws) => Ok(ws),
Some(ws) => Ok(ws.read().await),
None => log_and_err!("blockchain client not connected to websocket"),
}
}
Expand Down Expand Up @@ -228,15 +249,38 @@ impl BlockchainClient {

pub async fn subscribe_new_heads(&self) -> anyhow::Result<Subscription<ExternalBlock>> {
tracing::debug!("subscribing to newHeads event");
let ws = self.require_ws().await?;

let mut first_attempt = true;
loop {
let result = ws
.subscribe::<ExternalBlock, Vec<JsonValue>>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe")
.await;

match result {
// subscribed
Ok(sub) => return Ok(sub),

// failed and need to reconnect
e @ Err(ClientError::RestartNeeded(_)) => {
// will try to reconnect websocket client only in first attempt
if first_attempt {
tracing::error!(%first_attempt, reason = ?e, "failed to subscribe to newHeads event. trying to reconnect websocket client now.");
} else {
tracing::error!(%first_attempt, reason = ?e, "failed to subscribe to newHeads event. will not try to reconnect websocket client.");
return e.context("failed to subscribe to newHeads event");
}
first_attempt = false;

// reconnect websocket client
let new_ws_client = Self::build_ws_client(self.ws_url.as_ref().unwrap(), self.timeout).await?;
let mut current_ws_client = self.ws.as_ref().unwrap().write().await;
let _ = std::mem::replace(&mut *current_ws_client, new_ws_client);
}

let ws = self.require_ws()?;
let result = ws
.subscribe::<ExternalBlock, Vec<JsonValue>>("eth_subscribe", vec![JsonValue::String("newHeads".to_owned())], "eth_unsubscribe")
.await;

match result {
Ok(sub) => Ok(sub),
Err(e) => log_and_err!(reason = e, "failed to subscribe to newHeads event"),
// failed and cannot do anything
Err(e) => return log_and_err!(reason = e, "failed to subscribe to newHeads event"),
}
}
}
}

0 comments on commit 010e11d

Please sign in to comment.