Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JIT-1812] Fix blocking mutexs #495

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions core/src/proxy/block_engine_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use {
thread::{self, Builder, JoinHandle},
time::Duration,
},
tokio::time::{interval, sleep, timeout},
tokio::{
task,
time::{interval, sleep, timeout},
},
tonic::{
codegen::InterceptedService,
transport::{Channel, Endpoint},
Expand Down Expand Up @@ -148,7 +151,12 @@ impl BlockEngineStage {
while !exit.load(Ordering::Relaxed) {
// Wait until a valid config is supplied (either initially or by admin rpc)
// Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination
let local_block_engine_config = block_engine_config.lock().unwrap().clone();
let local_block_engine_config = {
let block_engine_config = block_engine_config.clone();
task::spawn_blocking(move || block_engine_config.lock().unwrap().clone())
.await
.unwrap()
};
if !Self::is_valid_block_engine_config(&local_block_engine_config) {
sleep(CONNECTION_BACKOFF).await;
} else if let Err(e) = Self::connect_auth_and_stream(
Expand Down Expand Up @@ -324,10 +332,16 @@ impl BlockEngineStage {
.into_inner();

{
let mut bb_fee = block_builder_fee_info.lock().unwrap();
bb_fee.block_builder_commission = block_builder_info.commission;
bb_fee.block_builder =
Pubkey::from_str(&block_builder_info.pubkey).unwrap_or(bb_fee.block_builder);
let block_builder_fee_info = block_builder_fee_info.clone();
task::spawn_blocking(move || {
let mut bb_fee = block_builder_fee_info.lock().unwrap();
bb_fee.block_builder_commission = block_builder_info.commission;
if let Ok(pk) = Pubkey::from_str(&block_builder_info.pubkey) {
bb_fee.block_builder = pk
}
})
.await
.unwrap();
}

Self::consume_bundle_and_packet_stream(
Expand Down Expand Up @@ -400,7 +414,10 @@ impl BlockEngineStage {
return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string()));
}

if *global_config.lock().unwrap() != *local_config {
let global_config = global_config.clone();
if *local_config != task::spawn_blocking(move || global_config.lock().unwrap().clone())
.await
.unwrap() {
return Err(ProxyError::AuthenticationConnectionError("block engine config changed".to_string()));
}

Expand All @@ -419,7 +436,11 @@ impl BlockEngineStage {
("url", &local_config.block_engine_url, String),
("count", num_refresh_access_token, i64),
);
*access_token.lock().unwrap() = new_token;

let access_token = access_token.clone();
task::spawn_blocking(move || *access_token.lock().unwrap() = new_token)
.await
.unwrap();
}
if let Some(new_token) = maybe_new_refresh {
num_full_refreshes += 1;
Expand All @@ -441,12 +462,20 @@ impl BlockEngineStage {
.map_err(|e| ProxyError::MethodError(e.to_string()))?
.into_inner();

let mut bb_fee = block_builder_fee_info.lock().unwrap();
bb_fee.block_builder_commission = block_builder_info.commission;
bb_fee.block_builder = Pubkey::from_str(&block_builder_info.pubkey).unwrap_or(bb_fee.block_builder);
let block_builder_fee_info = block_builder_fee_info.clone();
task::spawn_blocking(move || {
let mut bb_fee = block_builder_fee_info.lock().unwrap();
bb_fee.block_builder_commission = block_builder_info.commission;
if let Ok(pk) = Pubkey::from_str(&block_builder_info.pubkey) {
bb_fee.block_builder = pk
}
})
.await
.unwrap();
}
}
}

Ok(())
}

Expand Down
23 changes: 19 additions & 4 deletions core/src/proxy/relayer_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use {
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::time::{interval, sleep, timeout},
tokio::{
task,
time::{interval, sleep, timeout},
},
tonic::{
codegen::InterceptedService,
transport::{Channel, Endpoint},
Expand Down Expand Up @@ -147,7 +150,12 @@ impl RelayerStage {
while !exit.load(Ordering::Relaxed) {
// Wait until a valid config is supplied (either initially or by admin rpc)
// Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination
let local_relayer_config = relayer_config.lock().unwrap().clone();
let local_relayer_config = {
let relayer_config = relayer_config.clone();
task::spawn_blocking(move || relayer_config.lock().unwrap().clone())
.await
.expect("Failed to get execute tokio task.")
};
if !Self::is_valid_relayer_config(&local_relayer_config) {
sleep(CONNECTION_BACKOFF).await;
} else if let Err(e) = Self::connect_auth_and_stream(
Expand Down Expand Up @@ -387,7 +395,10 @@ impl RelayerStage {
return Err(ProxyError::AuthenticationConnectionError("validator identity changed".to_string()));
}

if *global_config.lock().unwrap() != *local_config {
let global_config = global_config.clone();
if *local_config != task::spawn_blocking(move || global_config.lock().unwrap().clone())
.await
.unwrap() {
return Err(ProxyError::AuthenticationConnectionError("relayer config changed".to_string()));
}

Expand All @@ -406,7 +417,11 @@ impl RelayerStage {
("url", &local_config.relayer_url, String),
("count", num_refresh_access_token, i64),
);
*access_token.lock().unwrap() = new_token;

let access_token = access_token.clone();
task::spawn_blocking(move || *access_token.lock().unwrap() = new_token)
.await
.unwrap();
}
if let Some(new_token) = maybe_new_refresh {
num_full_refreshes += 1;
Expand Down
Loading