diff --git a/core/src/proxy/block_engine_stage.rs b/core/src/proxy/block_engine_stage.rs index 5dd8510bad..caf45ac0d9 100644 --- a/core/src/proxy/block_engine_stage.rs +++ b/core/src/proxy/block_engine_stage.rs @@ -35,7 +35,10 @@ use { thread::{self, Builder, JoinHandle}, time::Duration, }, - tokio::time::{interval, sleep, timeout}, + tokio::{ + task, + time::{interval, sleep, timeout}, + }, tonic::{ codegen::InterceptedService, transport::{Channel, Endpoint}, @@ -148,7 +151,12 @@ impl BlockEngineStage { while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination - let local_block_engine_config = block_engine_config.lock().unwrap().clone(); + let local_block_engine_config = { + let block_engine_config = block_engine_config.clone(); + task::spawn_blocking(move || block_engine_config.lock().unwrap().clone()) + .await + .unwrap() + }; if !Self::is_valid_block_engine_config(&local_block_engine_config) { sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( @@ -324,10 +332,16 @@ impl BlockEngineStage { .into_inner(); { - let mut bb_fee = block_builder_fee_info.lock().unwrap(); - bb_fee.block_builder_commission = block_builder_info.commission; - bb_fee.block_builder = - Pubkey::from_str(&block_builder_info.pubkey).unwrap_or(bb_fee.block_builder); + let block_builder_fee_info = block_builder_fee_info.clone(); + task::spawn_blocking(move || { + let mut bb_fee = block_builder_fee_info.lock().unwrap(); + bb_fee.block_builder_commission = block_builder_info.commission; + if let Ok(pk) = Pubkey::from_str(&block_builder_info.pubkey) { + bb_fee.block_builder = pk + } + }) + .await + .unwrap(); } Self::consume_bundle_and_packet_stream( @@ -400,7 +414,10 @@ impl BlockEngineStage { return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string())); } - if *global_config.lock().unwrap() != *local_config { + let global_config = global_config.clone(); + if *local_config != task::spawn_blocking(move || global_config.lock().unwrap().clone()) + .await + .unwrap() { return Err(ProxyError::AuthenticationConnectionError("block engine config changed".to_string())); } @@ -419,7 +436,11 @@ impl BlockEngineStage { ("url", &local_config.block_engine_url, String), ("count", num_refresh_access_token, i64), ); - *access_token.lock().unwrap() = new_token; + + let access_token = access_token.clone(); + task::spawn_blocking(move || *access_token.lock().unwrap() = new_token) + .await + .unwrap(); } if let Some(new_token) = maybe_new_refresh { num_full_refreshes += 1; @@ -441,12 +462,20 @@ impl BlockEngineStage { .map_err(|e| ProxyError::MethodError(e.to_string()))? .into_inner(); - let mut bb_fee = block_builder_fee_info.lock().unwrap(); - bb_fee.block_builder_commission = block_builder_info.commission; - bb_fee.block_builder = Pubkey::from_str(&block_builder_info.pubkey).unwrap_or(bb_fee.block_builder); + let block_builder_fee_info = block_builder_fee_info.clone(); + task::spawn_blocking(move || { + let mut bb_fee = block_builder_fee_info.lock().unwrap(); + bb_fee.block_builder_commission = block_builder_info.commission; + if let Ok(pk) = Pubkey::from_str(&block_builder_info.pubkey) { + bb_fee.block_builder = pk + } + }) + .await + .unwrap(); } } } + Ok(()) } diff --git a/core/src/proxy/relayer_stage.rs b/core/src/proxy/relayer_stage.rs index e640bd7554..0c8ce22877 100644 --- a/core/src/proxy/relayer_stage.rs +++ b/core/src/proxy/relayer_stage.rs @@ -38,7 +38,10 @@ use { thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }, - tokio::time::{interval, sleep, timeout}, + tokio::{ + task, + time::{interval, sleep, timeout}, + }, tonic::{ codegen::InterceptedService, transport::{Channel, Endpoint}, @@ -147,7 +150,12 @@ impl RelayerStage { while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination - let local_relayer_config = relayer_config.lock().unwrap().clone(); + let local_relayer_config = { + let relayer_config = relayer_config.clone(); + task::spawn_blocking(move || relayer_config.lock().unwrap().clone()) + .await + .expect("Failed to get execute tokio task.") + }; if !Self::is_valid_relayer_config(&local_relayer_config) { sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( @@ -387,7 +395,10 @@ impl RelayerStage { return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string())); } - if *global_config.lock().unwrap() != *local_config { + let global_config = global_config.clone(); + if *local_config != task::spawn_blocking(move || global_config.lock().unwrap().clone()) + .await + .unwrap() { return Err(ProxyError::AuthenticationConnectionError("relayer config changed".to_string())); } @@ -406,7 +417,11 @@ impl RelayerStage { ("url", &local_config.relayer_url, String), ("count", num_refresh_access_token, i64), ); - *access_token.lock().unwrap() = new_token; + + let access_token = access_token.clone(); + task::spawn_blocking(move || *access_token.lock().unwrap() = new_token) + .await + .unwrap(); } if let Some(new_token) = maybe_new_refresh { num_full_refreshes += 1;