Skip to content

Commit

Permalink
[JIT-1708] Fix TOC TOU condition for relayer and block engine config (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
esemeniuc authored and buffalu committed Dec 28, 2023
1 parent 8067836 commit 667ac8d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 29 deletions.
45 changes: 27 additions & 18 deletions core/src/proxy/block_engine_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,11 @@ impl BlockEngineStage {
while !exit.load(Ordering::Relaxed) {
// Wait until a valid config is supplied (either initially or by admin rpc)
// Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination
if !Self::is_valid_block_engine_config(&block_engine_config.lock().unwrap()) {
let local_block_engine_config = block_engine_config.lock().unwrap().clone();
if !Self::is_valid_block_engine_config(&local_block_engine_config) {
sleep(CONNECTION_BACKOFF).await;
} else if let Err(e) = Self::connect_auth_and_stream(
&local_block_engine_config,
&block_engine_config,
&cluster_info,
&bundle_tx,
Expand Down Expand Up @@ -183,7 +185,8 @@ impl BlockEngineStage {
}

async fn connect_auth_and_stream(
block_engine_config: &Arc<Mutex<BlockEngineConfig>>,
local_block_engine_config: &BlockEngineConfig,
global_block_engine_config: &Arc<Mutex<BlockEngineConfig>>,
cluster_info: &Arc<ClusterInfo>,
bundle_tx: &Sender<Vec<PacketBundle>>,
packet_tx: &Sender<PacketBatch>,
Expand All @@ -194,17 +197,20 @@ impl BlockEngineStage {
) -> crate::proxy::Result<()> {
// Get a copy of configs here in case they have changed at runtime
let keypair = cluster_info.keypair().clone();
let local_config = block_engine_config.lock().unwrap().clone();

let mut backend_endpoint = Endpoint::from_shared(local_config.block_engine_url.clone())
.map_err(|_| {
ProxyError::BlockEngineConnectionError(format!(
"invalid block engine url value: {}",
local_config.block_engine_url
))
})?
.tcp_keepalive(Some(Duration::from_secs(60)));
if local_config.block_engine_url.starts_with("https") {

let mut backend_endpoint =
Endpoint::from_shared(local_block_engine_config.block_engine_url.clone())
.map_err(|_| {
ProxyError::BlockEngineConnectionError(format!(
"invalid block engine url value: {}",
local_block_engine_config.block_engine_url
))
})?
.tcp_keepalive(Some(Duration::from_secs(60)));
if local_block_engine_config
.block_engine_url
.starts_with("https")
{
backend_endpoint = backend_endpoint
.tls_config(tonic::transport::ClientTlsConfig::new())
.map_err(|_| {
Expand All @@ -214,7 +220,10 @@ impl BlockEngineStage {
})?;
}

debug!("connecting to auth: {}", local_config.block_engine_url);
debug!(
"connecting to auth: {}",
local_block_engine_config.block_engine_url
);
let auth_channel = timeout(*connection_timeout, backend_endpoint.connect())
.await
.map_err(|_| ProxyError::AuthenticationConnectionTimeout)?
Expand All @@ -232,13 +241,13 @@ impl BlockEngineStage {

datapoint_info!(
"block_engine_stage-tokens_generated",
("url", local_config.block_engine_url, String),
("url", local_block_engine_config.block_engine_url, String),
("count", 1, i64),
);

debug!(
"connecting to block engine: {}",
local_config.block_engine_url
local_block_engine_config.block_engine_url
);
let block_engine_channel = timeout(*connection_timeout, backend_endpoint.connect())
.await
Expand All @@ -255,8 +264,8 @@ impl BlockEngineStage {
bundle_tx,
block_engine_client,
packet_tx,
&local_config,
block_engine_config,
local_block_engine_config,
global_block_engine_config,
banking_packet_sender,
exit,
block_builder_fee_info,
Expand Down
27 changes: 16 additions & 11 deletions core/src/proxy/relayer_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ impl RelayerStage {
while !exit.load(Ordering::Relaxed) {
// Wait until a valid config is supplied (either initially or by admin rpc)
// Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination
if !Self::is_valid_relayer_config(&relayer_config.lock().unwrap()) {
let local_relayer_config = relayer_config.lock().unwrap().clone();
if !Self::is_valid_relayer_config(&local_relayer_config) {
sleep(CONNECTION_BACKOFF).await;
} else if let Err(e) = Self::connect_auth_and_stream(
&local_relayer_config,
&relayer_config,
&cluster_info,
&heartbeat_tx,
Expand Down Expand Up @@ -181,7 +183,8 @@ impl RelayerStage {
}

async fn connect_auth_and_stream(
relayer_config: &Arc<Mutex<RelayerConfig>>,
local_relayer_config: &RelayerConfig,
global_relayer_config: &Arc<Mutex<RelayerConfig>>,
cluster_info: &Arc<ClusterInfo>,
heartbeat_tx: &Sender<HeartbeatEvent>,
packet_tx: &Sender<PacketBatch>,
Expand All @@ -191,17 +194,16 @@ impl RelayerStage {
) -> crate::proxy::Result<()> {
// Get a copy of configs here in case they have changed at runtime
let keypair = cluster_info.keypair().clone();
let local_config = relayer_config.lock().unwrap().clone();

let mut backend_endpoint = Endpoint::from_shared(local_config.relayer_url.clone())
let mut backend_endpoint = Endpoint::from_shared(local_relayer_config.relayer_url.clone())
.map_err(|_| {
ProxyError::RelayerConnectionError(format!(
"invalid relayer url value: {}",
local_config.relayer_url
local_relayer_config.relayer_url
))
})?
.tcp_keepalive(Some(Duration::from_secs(60)));
if local_config.relayer_url.starts_with("https") {
if local_relayer_config.relayer_url.starts_with("https") {
backend_endpoint = backend_endpoint
.tls_config(tonic::transport::ClientTlsConfig::new())
.map_err(|_| {
Expand All @@ -211,7 +213,7 @@ impl RelayerStage {
})?;
}

debug!("connecting to auth: {}", local_config.relayer_url);
debug!("connecting to auth: {}", local_relayer_config.relayer_url);
let auth_channel = timeout(*connection_timeout, backend_endpoint.connect())
.await
.map_err(|_| ProxyError::AuthenticationConnectionTimeout)?
Expand All @@ -229,11 +231,14 @@ impl RelayerStage {

datapoint_info!(
"relayer_stage-tokens_generated",
("url", local_config.relayer_url, String),
("url", local_relayer_config.relayer_url, String),
("count", 1, i64),
);

debug!("connecting to relayer: {}", local_config.relayer_url);
debug!(
"connecting to relayer: {}",
local_relayer_config.relayer_url
);
let relayer_channel = timeout(*connection_timeout, backend_endpoint.connect())
.await
.map_err(|_| ProxyError::RelayerConnectionTimeout)?
Expand All @@ -250,8 +255,8 @@ impl RelayerStage {
heartbeat_tx,
packet_tx,
banking_packet_sender,
&local_config,
relayer_config,
local_relayer_config,
global_relayer_config,
exit,
auth_client,
access_token,
Expand Down

0 comments on commit 667ac8d

Please sign in to comment.