Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
esemeniuc committed Dec 1, 2023
1 parent 74dded7 commit 33f4e5e
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions core/src/proxy/fetch_stage_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::proxy::relayer_stage::RelayerConfig;
use crate::proxy::ProxyResult;
use std::ops::Add;
use std::sync::Mutex;
use {
crate::proxy::{HeartbeatEvent, ProxyError},
Expand Down Expand Up @@ -76,24 +77,29 @@ impl FetchStageManager {
/// Sets fetch_connected to true, pending_disconnect to false
/// Advertises saved contact info
fn start(
relayer_config: &Arc<Mutex<RelayerConfig>>,
global_relayer_config: &Arc<Mutex<RelayerConfig>>,
cluster_info: &Arc<ClusterInfo>,
heartbeat_rx: &Receiver<HeartbeatEvent>,
packet_intercept_rx: &Receiver<PacketBatch>,
packet_tx: &Sender<PacketBatch>,
exit: &Arc<AtomicBool>,
) -> ProxyResult<()> {
let my_fallback_contact_info = cluster_info.my_contact_info();
let current_config = relayer_config.lock().unwrap().clone();
let local_relayer_config = global_relayer_config.lock().unwrap().clone();

let mut fetch_connected = true;
let mut heartbeat_received = false;
let mut pending_disconnect = false;

let mut pending_disconnect_ts = Instant::now();

let config_tick = tick(Duration::from_secs(1));
let heartbeat_tick = tick(HEARTBEAT_TIMEOUT);
let relayer_config_tick = tick(Duration::from_secs(1));
// Add buffer to `DEFAULT_RELAYER_EXPECTED_HEARTBEAT_INTERVAL_MS`
let heartbeat_tick = tick(
local_relayer_config
.expected_heartbeat_interval
.add(Duration::from_secs(1)),
);
let metrics_tick = tick(METRICS_CADENCE);
let mut packets_forwarded = 0;
let mut heartbeats_received = 0;
Expand Down Expand Up @@ -155,8 +161,8 @@ impl FetchStageManager {
}
}
}
recv(config_tick) -> _ => {
if current_config != relayer_config.lock().unwrap().clone() {
recv(relayer_config_tick) -> _ => {
if local_relayer_config != global_relayer_config.lock().unwrap().clone() {
return Err(ProxyError::RelayerConfigChanged);
}
}
Expand Down

0 comments on commit 33f4e5e

Please sign in to comment.