Skip to content

Commit

Permalink
[JIT-1812] Fix blocking mutexs (#495)
Browse files Browse the repository at this point in the history
  • Loading branch information
esemeniuc authored and jito-infra committed Feb 27, 2024
1 parent 0e47efe commit ee1246f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 15 deletions.
51 changes: 40 additions & 11 deletions core/src/proxy/block_engine_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use {
thread::{self, Builder, JoinHandle},
time::Duration,
},
tokio::time::{interval, sleep, timeout},
tokio::{
task,
time::{interval, sleep, timeout},
},
tonic::{
codegen::InterceptedService,
transport::{Channel, Endpoint},
Expand Down Expand Up @@ -148,7 +151,12 @@ impl BlockEngineStage {
while !exit.load(Ordering::Relaxed) {
// Wait until a valid config is supplied (either initially or by admin rpc)
// Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination
let local_block_engine_config = block_engine_config.lock().unwrap().clone();
let local_block_engine_config = {
let block_engine_config = block_engine_config.clone();
task::spawn_blocking(move || block_engine_config.lock().unwrap().clone())
.await
.unwrap()
};
if !Self::is_valid_block_engine_config(&local_block_engine_config) {
sleep(CONNECTION_BACKOFF).await;
} else if let Err(e) = Self::connect_auth_and_stream(
Expand Down Expand Up @@ -324,10 +332,16 @@ impl BlockEngineStage {
.into_inner();

{
let mut bb_fee = block_builder_fee_info.lock().unwrap();
bb_fee.block_builder_commission = block_builder_info.commission;
bb_fee.block_builder =
Pubkey::from_str(&block_builder_info.pubkey).unwrap_or(bb_fee.block_builder);
let block_builder_fee_info = block_builder_fee_info.clone();
task::spawn_blocking(move || {
let mut bb_fee = block_builder_fee_info.lock().unwrap();
bb_fee.block_builder_commission = block_builder_info.commission;
if let Ok(pk) = Pubkey::from_str(&block_builder_info.pubkey) {
bb_fee.block_builder = pk
}
})
.await
.unwrap();
}

Self::consume_bundle_and_packet_stream(
Expand Down Expand Up @@ -400,7 +414,10 @@ impl BlockEngineStage {
return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string()));
}

if *global_config.lock().unwrap() != *local_config {
let global_config = global_config.clone();
if *local_config != task::spawn_blocking(move || global_config.lock().unwrap().clone())
.await
.unwrap() {
return Err(ProxyError::AuthenticationConnectionError("block engine config changed".to_string()));
}

Expand All @@ -419,7 +436,11 @@ impl BlockEngineStage {
("url", &local_config.block_engine_url, String),
("count", num_refresh_access_token, i64),
);
*access_token.lock().unwrap() = new_token;

let access_token = access_token.clone();
task::spawn_blocking(move || *access_token.lock().unwrap() = new_token)
.await
.unwrap();
}
if let Some(new_token) = maybe_new_refresh {
num_full_refreshes += 1;
Expand All @@ -441,12 +462,20 @@ impl BlockEngineStage {
.map_err(|e| ProxyError::MethodError(e.to_string()))?
.into_inner();

let mut bb_fee = block_builder_fee_info.lock().unwrap();
bb_fee.block_builder_commission = block_builder_info.commission;
bb_fee.block_builder = Pubkey::from_str(&block_builder_info.pubkey).unwrap_or(bb_fee.block_builder);
let block_builder_fee_info = block_builder_fee_info.clone();
task::spawn_blocking(move || {
let mut bb_fee = block_builder_fee_info.lock().unwrap();
bb_fee.block_builder_commission = block_builder_info.commission;
if let Ok(pk) = Pubkey::from_str(&block_builder_info.pubkey) {
bb_fee.block_builder = pk
}
})
.await
.unwrap();
}
}
}

Ok(())
}

Expand Down
23 changes: 19 additions & 4 deletions core/src/proxy/relayer_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use {
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::time::{interval, sleep, timeout},
tokio::{
task,
time::{interval, sleep, timeout},
},
tonic::{
codegen::InterceptedService,
transport::{Channel, Endpoint},
Expand Down Expand Up @@ -147,7 +150,12 @@ impl RelayerStage {
while !exit.load(Ordering::Relaxed) {
// Wait until a valid config is supplied (either initially or by admin rpc)
// Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination
let local_relayer_config = relayer_config.lock().unwrap().clone();
let local_relayer_config = {
let relayer_config = relayer_config.clone();
task::spawn_blocking(move || relayer_config.lock().unwrap().clone())
.await
.expect("Failed to get execute tokio task.")
};
if !Self::is_valid_relayer_config(&local_relayer_config) {
sleep(CONNECTION_BACKOFF).await;
} else if let Err(e) = Self::connect_auth_and_stream(
Expand Down Expand Up @@ -387,7 +395,10 @@ impl RelayerStage {
return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string()));
}

if *global_config.lock().unwrap() != *local_config {
let global_config = global_config.clone();
if *local_config != task::spawn_blocking(move || global_config.lock().unwrap().clone())
.await
.unwrap() {
return Err(ProxyError::AuthenticationConnectionError("relayer config changed".to_string()));
}

Expand All @@ -406,7 +417,11 @@ impl RelayerStage {
("url", &local_config.relayer_url, String),
("count", num_refresh_access_token, i64),
);
*access_token.lock().unwrap() = new_token;

let access_token = access_token.clone();
task::spawn_blocking(move || *access_token.lock().unwrap() = new_token)
.await
.unwrap();
}
if let Some(new_token) = maybe_new_refresh {
num_full_refreshes += 1;
Expand Down

0 comments on commit ee1246f

Please sign in to comment.