Skip to content

Commit

Permalink
Plumb CLI arg to control number of TVU receive threads/sockets (#550)
Browse files Browse the repository at this point in the history
The parameter directly controls the number of sockets that are created;
the sockets later have one thread created per socket to listen.
  • Loading branch information
steviez authored Apr 15, 2024
1 parent e0b0bcc commit 7138ea7
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 2 deletions.
16 changes: 14 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use {
io::BufReader,
iter::repeat,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
num::NonZeroUsize,
ops::{Deref, Div},
path::{Path, PathBuf},
result::Result,
Expand Down Expand Up @@ -144,6 +145,11 @@ const MIN_STAKE_FOR_GOSSIP: u64 = solana_sdk::native_token::LAMPORTS_PER_SOL;
/// Minimum number of staked nodes for enforcing stakes in gossip.
const MIN_NUM_STAKED_NODES: usize = 500;

// Must have at least one socket to monitor the TVU port
// The unsafes are safe because we're using fixed, known non-zero values
pub const MINIMUM_NUM_TVU_SOCKETS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1) };
pub const DEFAULT_NUM_TVU_SOCKETS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(8) };

#[derive(Debug, PartialEq, Eq, Error)]
pub enum ClusterInfoError {
#[error("NoPeers")]
Expand Down Expand Up @@ -2792,6 +2798,8 @@ pub struct NodeConfig {
pub bind_ip_addr: IpAddr,
pub public_tpu_addr: Option<SocketAddr>,
pub public_tpu_forwards_addr: Option<SocketAddr>,
/// The number of TVU sockets to create
pub num_tvu_sockets: NonZeroUsize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -2993,13 +3001,15 @@ impl Node {
bind_ip_addr,
public_tpu_addr,
public_tpu_forwards_addr,
num_tvu_sockets,
} = config;

let (gossip_port, (gossip, ip_echo)) =
Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr);

let (tvu_port, tvu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tvu multi_bind");
multi_bind_in_range(bind_ip_addr, port_range, num_tvu_sockets.get())
.expect("tvu multi_bind");
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
let (tpu_port, tpu_sockets) =
multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");
Expand Down Expand Up @@ -3576,7 +3586,7 @@ mod tests {
}

fn check_sockets(sockets: &[UdpSocket], ip: IpAddr, range: (u16, u16)) {
assert!(sockets.len() > 1);
assert!(!sockets.is_empty());
let port = sockets[0].local_addr().unwrap().port();
for socket in sockets.iter() {
check_socket(socket, ip, range);
Expand Down Expand Up @@ -3608,6 +3618,7 @@ mod tests {
bind_ip_addr: IpAddr::V4(ip),
public_tpu_addr: None,
public_tpu_forwards_addr: None,
num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS,
};

let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config);
Expand All @@ -3631,6 +3642,7 @@ mod tests {
bind_ip_addr: ip,
public_tpu_addr: None,
public_tpu_forwards_addr: None,
num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS,
};

let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config);
Expand Down
20 changes: 20 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct DefaultThreadArgs {
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub tvu_receive_threads: String,
}

impl Default for DefaultThreadArgs {
Expand All @@ -20,6 +21,7 @@ impl Default for DefaultThreadArgs {
ip_echo_server_threads: IpEchoServerThreadsArg::default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(),
tvu_receive_threads: TvuReceiveThreadsArg::default().to_string(),
}
}
}
Expand All @@ -29,6 +31,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
]
}

Expand All @@ -47,6 +50,7 @@ pub struct NumThreadConfig {
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
}

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
Expand All @@ -66,6 +70,7 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
ReplayTransactionsThreadsArg::NAME,
NonZeroUsize
),
tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
}
}

Expand Down Expand Up @@ -136,3 +141,18 @@ impl ThreadArg for ReplayTransactionsThreadsArg {
get_max_thread_count()
}
}

struct TvuReceiveThreadsArg;
impl ThreadArg for TvuReceiveThreadsArg {
const NAME: &'static str = "tvu_receive_threads";
const LONG_NAME: &'static str = "tvu-receive-threads";
const HELP: &'static str =
"Number of threads (and sockets) to use for receiving shreds on the TVU port";

fn default() -> usize {
solana_gossip::cluster_info::DEFAULT_NUM_TVU_SOCKETS.get()
}
fn min() -> usize {
solana_gossip::cluster_info::MINIMUM_NUM_TVU_SOCKETS.get()
}
}
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,7 @@ pub fn main() {
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_receive_threads,
} = cli::thread_args::parse_num_threads_args(&matches);

let mut validator_config = ValidatorConfig {
Expand Down Expand Up @@ -1853,6 +1854,7 @@ pub fn main() {
bind_ip_addr: bind_address,
public_tpu_addr,
public_tpu_forwards_addr,
num_tvu_sockets: tvu_receive_threads,
};

let cluster_entrypoints = entrypoint_addrs
Expand Down

0 comments on commit 7138ea7

Please sign in to comment.