diff --git a/src/bin/importer_online.rs b/src/bin/importer_online.rs index dbae7066a..cecd72149 100644 --- a/src/bin/importer_online.rs +++ b/src/bin/importer_online.rs @@ -65,7 +65,14 @@ async fn run(config: ImporterOnlineConfig) -> anyhow::Result<()> { let relayer = config.relayer.init(Arc::clone(&storage)).await?; let miner = config.miner.init_external_mode(Arc::clone(&storage), None).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner), relayer, None).await; //XXX TODO implement the consensus here, in case of it being a follower, it should not even enter here - let chain = Arc::new(BlockchainClient::new_http_ws(&config.base.external_rpc, config.base.external_rpc_ws.as_deref()).await?); + let chain = Arc::new( + BlockchainClient::new_http_ws( + &config.base.external_rpc, + config.base.external_rpc_ws.as_deref(), + config.base.external_rpc_timeout, + ) + .await?, + ); let result = run_importer_online(executor, miner, storage, chain, config.base.sync_interval).await; if let Err(ref e) = result { diff --git a/src/bin/rpc_downloader.rs b/src/bin/rpc_downloader.rs index 94ec2f6c5..2ce15d229 100644 --- a/src/bin/rpc_downloader.rs +++ b/src/bin/rpc_downloader.rs @@ -27,7 +27,7 @@ fn main() -> anyhow::Result<()> { async fn run(config: RpcDownloaderConfig) -> anyhow::Result<()> { let rpc_storage = config.rpc_storage.init().await?; - let chain = Arc::new(BlockchainClient::new_http(&config.external_rpc).await?); + let chain = Arc::new(BlockchainClient::new_http(&config.external_rpc, config.external_rpc_timeout).await?); // download balances and blocks download_balances(Arc::clone(&rpc_storage), &chain, config.initial_accounts).await?; diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index e11e017fa..0871dc682 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -24,7 +24,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { let consensus = Arc::new(Consensus::new(config.clone().leader_node)); // in development, with no leader configured, the current node ends up being the leader let (http_url, ws_url) = consensus.get_chain_url(config.clone()); consensus.sender.send("Consensus initialized.".to_string()).await.unwrap(); - let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref()).await?); + let chain = Arc::new(BlockchainClient::new_http_ws(&http_url, ws_url.as_deref(), config.online.external_rpc_timeout).await?); let relayer = config.relayer.init(Arc::clone(&storage)).await?; let miner = config.miner.init_external_mode(Arc::clone(&storage), Some(Arc::clone(&consensus))).await?; diff --git a/src/bin/state_validator.rs b/src/bin/state_validator.rs index 43a2bcfc3..7788a4739 100644 --- a/src/bin/state_validator.rs +++ b/src/bin/state_validator.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use rand::Rng; use stratus::config::StateValidatorConfig; @@ -53,7 +54,7 @@ async fn validate_state( ) -> anyhow::Result<()> { match method { ValidatorMethodConfig::Rpc { url } => { - let chain = BlockchainClient::new_http(&url).await?; + let chain = BlockchainClient::new_http(&url, Duration::from_secs(2)).await?; validate_state_rpc(&chain, storage, start, end, max_sample_size, seed).await } _ => todo!(), diff --git a/src/config.rs b/src/config.rs index 3be3e8a76..259cf0699 100644 --- a/src/config.rs +++ b/src/config.rs @@ -301,6 +301,10 @@ pub struct RelayerConfig { /// RPC address to forward transactions to. #[arg(long = "forward-to", env = "FORWARD_TO")] pub forward_to: Option, + + /// Timeout for blockchain requests (relayer) + #[arg(long = "relayer-timeout", value_parser=parse_duration, env = "RELAYER_TIMEOUT", default_value = "2s")] + pub relayer_timeout: Duration, } impl RelayerConfig { @@ -309,7 +313,7 @@ impl RelayerConfig { match self.forward_to { Some(ref forward_to) => { - let chain = BlockchainClient::new_http(forward_to).await?; + let chain = BlockchainClient::new_http(forward_to, self.relayer_timeout).await?; let relayer = TransactionRelayer::new(storage, chain); Ok(Some(Arc::new(relayer))) } @@ -366,6 +370,10 @@ pub struct RpcDownloaderConfig { #[arg(short = 'r', long = "external-rpc", env = "EXTERNAL_RPC")] pub external_rpc: String, + /// Timeout for blockchain requests + #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")] + pub external_rpc_timeout: Duration, + /// Number of parallel downloads. #[arg(short = 'p', long = "paralellism", env = "PARALELLISM", default_value = "1")] pub paralellism: usize, @@ -476,6 +484,10 @@ pub struct ImporterOnlineBaseConfig { #[arg(short = 'w', long = "external-rpc-ws", env = "EXTERNAL_RPC_WS")] pub external_rpc_ws: Option, + /// Timeout for blockchain requests (importer online) + #[arg(long = "external-rpc-timeout", value_parser=parse_duration, env = "EXTERNAL_RPC_TIMEOUT", default_value = "2s")] + pub external_rpc_timeout: Duration, + #[arg(long = "sync-interval", value_parser=parse_duration, env = "SYNC_INTERVAL", default_value = "100ms")] pub sync_interval: Duration, } diff --git a/src/eth/consensus.rs b/src/eth/consensus.rs index 27a33b19f..702b1d0dd 100644 --- a/src/eth/consensus.rs +++ b/src/eth/consensus.rs @@ -149,7 +149,7 @@ impl Consensus { } async fn append_entries(follower: &str, entries: Vec) -> Result<(), anyhow::Error> { - let client = BlockchainClient::new_http_ws(follower, None).await?; + let client = BlockchainClient::new_http_ws(follower, None, Duration::from_secs(2)).await?; for attempt in 1..=RETRY_ATTEMPTS { let response = client.append_entries(entries.clone()).await; diff --git a/src/infra/blockchain_client/client.rs b/src/infra/blockchain_client/client.rs index ebdc78ac9..2e2131c8a 100644 --- a/src/infra/blockchain_client/client.rs +++ b/src/infra/blockchain_client/client.rs @@ -24,9 +24,6 @@ use crate::eth::primitives::StoragePointInTime; use crate::eth::primitives::Wei; use crate::log_and_err; -/// Default timeout for blockchain operations. -pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2); - #[derive(Debug)] pub struct BlockchainClient { http: HttpClient, @@ -37,16 +34,16 @@ pub struct BlockchainClient { impl BlockchainClient { /// Creates a new RPC client connected only to HTTP. - pub async fn new_http(http_url: &str) -> anyhow::Result { - Self::new_http_ws(http_url, None).await + pub async fn new_http(http_url: &str, timeout: Duration) -> anyhow::Result { + Self::new_http_ws(http_url, None, timeout).await } /// Creates a new RPC client connected to HTTP and optionally to WS. - pub async fn new_http_ws(http_url: &str, ws_url: Option<&str>) -> anyhow::Result { + pub async fn new_http_ws(http_url: &str, ws_url: Option<&str>, timeout: Duration) -> anyhow::Result { tracing::info!(%http_url, "starting blockchain client"); // build http provider - let http = match HttpClientBuilder::default().request_timeout(DEFAULT_TIMEOUT).build(http_url) { + let http = match HttpClientBuilder::default().request_timeout(timeout).build(http_url) { Ok(http) => http, Err(e) => { tracing::error!(reason = ?e, url = %http_url, "failed to create blockchain http client"); @@ -56,7 +53,7 @@ impl BlockchainClient { // build ws provider let (ws, ws_url) = if let Some(ws_url) = ws_url { - match WsClientBuilder::new().connection_timeout(DEFAULT_TIMEOUT).build(ws_url).await { + match WsClientBuilder::new().connection_timeout(timeout).build(ws_url).await { Ok(ws) => (Some(ws), Some(ws_url.to_string())), Err(e) => { tracing::error!(reason = ?e, url = %ws_url, "failed to create blockchain websocket client");