Skip to content

Commit

Permalink
Remove unneeded timeout in turbine retransmit stage (#4335)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexpyattaev authored Jan 8, 2025
1 parent d1b78cf commit 7256608
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use {
crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
bytes::Bytes,
crossbeam_channel::{Receiver, RecvTimeoutError},
crossbeam_channel::{Receiver, RecvError},
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
Expand Down Expand Up @@ -193,10 +193,12 @@ fn retransmit(
max_slots: &MaxSlots,
rpc_subscriptions: Option<&RpcSubscriptions>,
slot_status_notifier: Option<&SlotStatusNotifier>,
) -> Result<(), RecvTimeoutError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?;
) -> Result<(), RecvError> {
// wait for something on the channel
let mut shreds = shreds_receiver.recv()?;
// now the batch has started
let mut timer_start = Measure::start("retransmit");
// drain the channel until it is empty to form a batch
shreds.extend(shreds_receiver.try_iter().flatten());
stats.num_shreds += shreds.len();
stats.total_batches += 1;
Expand Down Expand Up @@ -392,7 +394,9 @@ pub fn retransmitter(
);
let mut rng = rand::thread_rng();
let mut shred_deduper = ShredDeduper::new(&mut rng, DEDUPER_NUM_BITS);

let mut stats = RetransmitStats::new(Instant::now());

#[allow(clippy::manual_clamp)]
let num_threads = get_thread_count().min(8).max(sockets.len());
let thread_pool = ThreadPoolBuilder::new()
Expand All @@ -402,8 +406,8 @@ pub fn retransmitter(
.unwrap();
Builder::new()
.name("solRetransmittr".to_string())
.spawn(move || loop {
match retransmit(
.spawn(move || {
while retransmit(
&thread_pool,
&bank_forks,
&leader_schedule_cache,
Expand All @@ -417,11 +421,9 @@ pub fn retransmitter(
&max_slots,
rpc_subscriptions.as_deref(),
slot_status_notifier.as_ref(),
) {
Ok(()) => (),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
}
)
.is_ok()
{}
})
.unwrap()
}
Expand Down

0 comments on commit 7256608

Please sign in to comment.