Skip to content

Commit

Permalink
chore: add timeout config for BlockchainClient
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw committed May 23, 2024
1 parent 7c54397 commit 50ca7cc
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 14 deletions.
9 changes: 8 additions & 1 deletion src/bin/importer_online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> {
let relayer = config.relayer.init(Arc::clone(&storage)).await?;
let miner = config.miner.init_external_mode(Arc::clone(&storage), None).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; //XXX TODO implement the consensus here, in case of it being a follower, it should not even enter here
let chain = Arc::new(BlockchainClient::new_http_ws(&config.base.external_rpc, config.base.external_rpc_ws.as_deref()).await?);
let chain = Arc::new(
BlockchainClient::new_http_ws(
&config.base.external_rpc,
config.base.external_rpc_ws.as_deref(),
config.base.external_rpc_timeout,
)
.await?,
);

let result = run_importer_online(executor, miner, storage, chain, config.base.sync_interval).await;
if let Err(ref e) = result {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/rpc_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn main() -> anyhow::Result<()> {

async fn run(config: RpcDownloaderConfig) -> anyhow::Result<()> {
let rpc_storage = config.rpc_storage.init().await?;
let chain = Arc::new(BlockchainClient::new_http(&config.external_rpc).await?);
let chain = Arc::new(BlockchainClient::new_http(&config.external_rpc, config.external_rpc_timeout).await?);

// download balances and blocks
download_balances(Arc::clone(&rpc_storage), &chain, config.initial_accounts).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {
let consensus = Arc::new(Consensus::new(config.clone().leader_node)); // in development, with no leader configured, the current node ends up being the leader
let (http_url, ws_url) = consensus.get_chain_url(config.clone());
consensus.sender.send("Consensus initialized.".to_string()).await.unwrap();
let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref()).await?);
let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref(), config.online.external_rpc_timeout).await?);

let relayer = config.relayer.init(Arc::clone(&storage)).await?;
let miner = config.miner.init_external_mode(Arc::clone(&storage), Some(Arc::clone(&consensus))).await?;
Expand Down
3 changes: 2 additions & 1 deletion src/bin/state_validator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;

use rand::Rng;
use stratus::config::StateValidatorConfig;
Expand Down Expand Up @@ -53,7 +54,7 @@ async fn validate_state(
) -> anyhow::Result<()> {
match method {
ValidatorMethodConfig::Rpc { url } => {
let chain = BlockchainClient::new_http(&url).await?;
let chain = BlockchainClient::new_http(&url, Duration::from_secs(2)).await?;
validate_state_rpc(&chain, storage, start, end, max_sample_size, seed).await
}
_ => todo!(),
Expand Down
14 changes: 13 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ pub struct RelayerConfig {
/// RPC address to forward transactions to.
#[arg(long = "forward-to", env = "FORWARD_TO")]
pub forward_to: Option<String>,

/// Timeout for blockchain requests (relayer)
#[arg(long = "relayer-timeout", value_parser=parse_duration, env = "RELAYER_TIMEOUT", default_value = "2s")]
pub relayer_timeout: Duration,
}

impl RelayerConfig {
Expand All @@ -309,7 +313,7 @@ impl RelayerConfig {

match self.forward_to {
Some(ref forward_to) => {
let chain = BlockchainClient::new_http(forward_to).await?;
let chain = BlockchainClient::new_http(forward_to, self.relayer_timeout).await?;
let relayer = TransactionRelayer::new(storage, chain);
Ok(Some(Arc::new(relayer)))
}
Expand Down Expand Up @@ -366,6 +370,10 @@ pub struct RpcDownloaderConfig {
#[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")]
pub external_rpc: String,

/// Timeout for blockchain requests
#[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")]
pub external_rpc_timeout: Duration,

/// Number of parallel downloads.
#[arg(short = 'p', long = "paralellism", env = "PARALELLISM", default_value = "1")]
pub paralellism: usize,
Expand Down Expand Up @@ -476,6 +484,10 @@ pub struct ImporterOnlineBaseConfig {
#[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS")]
pub external_rpc_ws: Option<String>,

/// Timeout for blockchain requests (importer online)
#[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")]
pub external_rpc_timeout: Duration,

#[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms")]
pub sync_interval: Duration,
}
Expand Down
2 changes: 1 addition & 1 deletion src/eth/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl Consensus {
}

async fn append_entries(follower: &str, entries: Vec<Entry>) -> Result<(), anyhow::Error> {
let client = BlockchainClient::new_http_ws(follower, None).await?;
let client = BlockchainClient::new_http_ws(follower, None, Duration::from_secs(2)).await?;

for attempt in 1..=RETRY_ATTEMPTS {
let response = client.append_entries(entries.clone()).await;
Expand Down
13 changes: 5 additions & 8 deletions src/infra/blockchain_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use crate::eth::primitives::StoragePointInTime;
use crate::eth::primitives::Wei;
use crate::log_and_err;

/// Default timeout for blockchain operations.
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);

#[derive(Debug)]
pub struct BlockchainClient {
http: HttpClient,
Expand All @@ -37,16 +34,16 @@ pub struct BlockchainClient {

impl BlockchainClient {
/// Creates a new RPC client connected only to HTTP.
pub async fn new_http(http_url: &str) -> anyhow::Result<Self> {
Self::new_http_ws(http_url, None).await
pub async fn new_http(http_url: &str, timeout: Duration) -> anyhow::Result<Self> {
Self::new_http_ws(http_url, None, timeout).await
}

/// Creates a new RPC client connected to HTTP and optionally to WS.
pub async fn new_http_ws(http_url: &str, ws_url: Option<&str>) -> anyhow::Result<Self> {
pub async fn new_http_ws(http_url: &str, ws_url: Option<&str>, timeout: Duration) -> anyhow::Result<Self> {
tracing::info!(%http_url, "starting blockchain client");

// build http provider
let http = match HttpClientBuilder::default().request_timeout(DEFAULT_TIMEOUT).build(http_url) {
let http = match HttpClientBuilder::default().request_timeout(timeout).build(http_url) {
Ok(http) => http,
Err(e) => {
tracing::error!(reason = ?e, url = %http_url, "failed to create blockchain http client");
Expand All @@ -56,7 +53,7 @@ impl BlockchainClient {

// build ws provider
let (ws, ws_url) = if let Some(ws_url) = ws_url {
match WsClientBuilder::new().connection_timeout(DEFAULT_TIMEOUT).build(ws_url).await {
match WsClientBuilder::new().connection_timeout(timeout).build(ws_url).await {
Ok(ws) => (Some(ws), Some(ws_url.to_string())),
Err(e) => {
tracing::error!(reason = ?e, url = %ws_url, "failed to create blockchain websocket client");
Expand Down

0 comments on commit 50ca7cc

Please sign in to comment.