From 74217fc7935e8b7111ad72b768c6d0a9d7663adf Mon Sep 17 00:00:00 2001 From: Eric Semeniuc <3838856+esemeniuc@users.noreply.github.com> Date: Mon, 4 Dec 2023 11:59:33 -0800 Subject: [PATCH] fix mutex --- core/src/proxy/block_engine_stage.rs | 52 ++++++++++++++++++++++------ core/src/proxy/mod.rs | 4 +++ core/src/proxy/relayer_stage.rs | 24 ++++++++++--- 3 files changed, 65 insertions(+), 15 deletions(-) diff --git a/core/src/proxy/block_engine_stage.rs b/core/src/proxy/block_engine_stage.rs index 5dd8510bad..2e34e3aa9f 100644 --- a/core/src/proxy/block_engine_stage.rs +++ b/core/src/proxy/block_engine_stage.rs @@ -35,7 +35,10 @@ use { thread::{self, Builder, JoinHandle}, time::Duration, }, - tokio::time::{interval, sleep, timeout}, + tokio::{ + task, + time::{interval, sleep, timeout}, + }, tonic::{ codegen::InterceptedService, transport::{Channel, Endpoint}, @@ -148,7 +151,13 @@ impl BlockEngineStage { while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination - let local_block_engine_config = block_engine_config.lock().unwrap().clone(); + let local_block_engine_config = { + let block_engine_config = block_engine_config.clone(); + task::spawn_blocking(move || block_engine_config.lock().unwrap().clone()) + .await + .map_err(ProxyError::TokioJoinError) + .expect("Failed to get execute tokio task.") + }; if !Self::is_valid_block_engine_config(&local_block_engine_config) { sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( @@ -324,10 +333,16 @@ impl BlockEngineStage { .into_inner(); { - let mut bb_fee = block_builder_fee_info.lock().unwrap(); - bb_fee.block_builder_commission = block_builder_info.commission; - bb_fee.block_builder = - Pubkey::from_str(&block_builder_info.pubkey).unwrap_or(bb_fee.block_builder); + let block_builder_fee_info = block_builder_fee_info.clone(); + task::spawn_blocking(move || { + let mut bb_fee = block_builder_fee_info.lock().unwrap(); + bb_fee.block_builder_commission = block_builder_info.commission; + if let Ok(pk) = Pubkey::from_str(&block_builder_info.pubkey) { + bb_fee.block_builder = pk + } + }) + .await + .map_err(ProxyError::TokioJoinError)?; } Self::consume_bundle_and_packet_stream( @@ -400,7 +415,10 @@ impl BlockEngineStage { return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string())); } - if *global_config.lock().unwrap() != *local_config { + let global_config = global_config.clone(); + if *local_config != task::spawn_blocking(move || global_config.lock().unwrap().clone()) + .await + .map_err(ProxyError::TokioJoinError)? { return Err(ProxyError::AuthenticationConnectionError("block engine config changed".to_string())); } @@ -419,7 +437,11 @@ impl BlockEngineStage { ("url", &local_config.block_engine_url, String), ("count", num_refresh_access_token, i64), ); - *access_token.lock().unwrap() = new_token; + + let access_token = access_token.clone(); + task::spawn_blocking(move || *access_token.lock().unwrap() = new_token) + .await + .map_err(ProxyError::TokioJoinError)?; } if let Some(new_token) = maybe_new_refresh { num_full_refreshes += 1; @@ -441,12 +463,20 @@ impl BlockEngineStage { .map_err(|e| ProxyError::MethodError(e.to_string()))? .into_inner(); - let mut bb_fee = block_builder_fee_info.lock().unwrap(); - bb_fee.block_builder_commission = block_builder_info.commission; - bb_fee.block_builder = Pubkey::from_str(&block_builder_info.pubkey).unwrap_or(bb_fee.block_builder); + let block_builder_fee_info = block_builder_fee_info.clone(); + task::spawn_blocking(move || { + let mut bb_fee = block_builder_fee_info.lock().unwrap(); + bb_fee.block_builder_commission = block_builder_info.commission; + if let Ok(pk) = Pubkey::from_str(&block_builder_info.pubkey) { + bb_fee.block_builder = pk + } + }) + .await + .map_err(ProxyError::TokioJoinError)?; } } } + Ok(()) } diff --git a/core/src/proxy/mod.rs b/core/src/proxy/mod.rs index 86d48482aa..9e6a298d4b 100644 --- a/core/src/proxy/mod.rs +++ b/core/src/proxy/mod.rs @@ -21,6 +21,7 @@ use { result, }, thiserror::Error, + tokio::task::JoinError, tonic::Status, }; @@ -97,4 +98,7 @@ pub enum ProxyError { #[error("MethodError: {0:?}")] MethodError(String), + + #[error("TokioJoinError: {0:?}")] + TokioJoinError(JoinError), } diff --git a/core/src/proxy/relayer_stage.rs b/core/src/proxy/relayer_stage.rs index e640bd7554..20e0cf2199 100644 --- a/core/src/proxy/relayer_stage.rs +++ b/core/src/proxy/relayer_stage.rs @@ -38,7 +38,10 @@ use { thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }, - tokio::time::{interval, sleep, timeout}, + tokio::{ + task, + time::{interval, sleep, timeout}, + }, tonic::{ codegen::InterceptedService, transport::{Channel, Endpoint}, @@ -147,7 +150,13 @@ impl RelayerStage { while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination - let local_relayer_config = relayer_config.lock().unwrap().clone(); + let local_relayer_config = { + let relayer_config = relayer_config.clone(); + task::spawn_blocking(move || relayer_config.lock().unwrap().clone()) + .await + .map_err(ProxyError::TokioJoinError) + .expect("Failed to get execute tokio task.") + }; if !Self::is_valid_relayer_config(&local_relayer_config) { sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( @@ -387,7 +396,10 @@ impl RelayerStage { return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string())); } - if *global_config.lock().unwrap() != *local_config { + let global_config = global_config.clone(); + if *local_config != task::spawn_blocking(move || global_config.lock().unwrap().clone()) + .await + .map_err(ProxyError::TokioJoinError)? { return Err(ProxyError::AuthenticationConnectionError("relayer config changed".to_string())); } @@ -406,7 +418,11 @@ impl RelayerStage { ("url", &local_config.relayer_url, String), ("count", num_refresh_access_token, i64), ); - *access_token.lock().unwrap() = new_token; + + let access_token = access_token.clone(); + task::spawn_blocking(move || *access_token.lock().unwrap() = new_token) + .await + .map_err(ProxyError::TokioJoinError)?; } if let Some(new_token) = maybe_new_refresh { num_full_refreshes += 1;