Skip to content

Commit

Permalink
Reduce the default number of IP echo server threads (#354)
Browse files Browse the repository at this point in the history
The IP echo server currently spins up a worker thread for every thread
on the machine. Observing some data for nodes,
- MNB validators and RPC nodes look to get several hundred of these
  requests per day
- MNB entrypoint nodes look to get 2-3 requests per second on average

In both instances, the current threadpool is severely overprovisioned
which is a waste of resources. This PR plumnbs a flag to control the
number of worker threads for this pool as well as setting a default of
two threads for this server. Two threads allow for one thread to always
listen on the TCP port while the other thread processes requests
  • Loading branch information
steviez authored Apr 1, 2024
1 parent 92c9b45 commit 79e316e
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ pub struct ValidatorConfig {
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
pub unified_scheduler_handler_threads: Option<usize>,
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}
Expand Down Expand Up @@ -338,6 +339,7 @@ impl Default for ValidatorConfig {
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
unified_scheduler_handler_threads: None,
ip_echo_server_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
}
Expand Down Expand Up @@ -1079,6 +1081,7 @@ impl Validator {
None => None,
Some(tcp_listener) => Some(solana_net_utils::ip_echo_server(
tcp_listener,
config.ip_echo_server_threads,
Some(node.info.shred_version()),
)),
};
Expand Down
11 changes: 9 additions & 2 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
solana_client::{
connection_cache::ConnectionCache, rpc_client::RpcClient, tpu_client::TpuClientWrapper,
},
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS,
solana_perf::recycler::Recycler,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
Expand Down Expand Up @@ -159,8 +160,14 @@ pub fn discover(
if let Some(my_gossip_addr) = my_gossip_addr {
info!("Gossip Address: {:?}", my_gossip_addr);
}
let _ip_echo_server = ip_echo
.map(|tcp_listener| solana_net_utils::ip_echo_server(tcp_listener, Some(my_shred_version)));

let _ip_echo_server = ip_echo.map(|tcp_listener| {
solana_net_utils::ip_echo_server(
tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
Some(my_shred_version),
)
});
let (met_criteria, elapsed, all_peers, tvu_peers) = spy(
spy_ref.clone(),
num_nodes,
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
ip_echo_server_threads: config.ip_echo_server_threads,
replay_forks_threads: config.replay_forks_threads,
replay_transactions_threads: config.replay_transactions_threads,
}
Expand Down
1 change: 1 addition & 0 deletions net-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ socket2 = { workspace = true }
solana-logger = { workspace = true }
solana-sdk = { workspace = true }
solana-version = { workspace = true }
static_assertions = { workspace = true }
tokio = { workspace = true, features = ["full"] }
url = { workspace = true }

Expand Down
7 changes: 6 additions & 1 deletion net-utils/src/bin/ip_address_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
clap::{Arg, Command},
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS,
std::net::{Ipv4Addr, SocketAddr, TcpListener},
};

Expand All @@ -21,7 +22,11 @@ fn main() {
.unwrap_or_else(|_| panic!("Unable to parse {port}"));
let bind_addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, port));
let tcp_listener = TcpListener::bind(bind_addr).expect("unable to start tcp listener");
let _runtime = solana_net_utils::ip_echo_server(tcp_listener, /*shred_version=*/ None);
let _runtime = solana_net_utils::ip_echo_server(
tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
/*shred_version=*/ None,
);
loop {
std::thread::park();
}
Expand Down
11 changes: 11 additions & 0 deletions net-utils/src/ip_echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
std::{
io,
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
time::Duration,
},
tokio::{
Expand All @@ -18,6 +19,14 @@ use {

pub type IpEchoServer = Runtime;

// Enforce a minimum of two threads:
// - One thread to monitor the TcpListener and spawn async tasks
// - One thread to service the spawned tasks
// The unsafe is safe because we're using a fixed, known non-zero value
pub const MINIMUM_IP_ECHO_SERVER_THREADS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(2) };
// IP echo requests require little computation and come in fairly infrequently,
// so keep the number of server workers small to avoid overhead
pub const DEFAULT_IP_ECHO_SERVER_THREADS: NonZeroUsize = MINIMUM_IP_ECHO_SERVER_THREADS;
pub const MAX_PORT_COUNT_PER_MESSAGE: usize = 4;

const IO_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -168,13 +177,15 @@ async fn run_echo_server(tcp_listener: std::net::TcpListener, shred_version: Opt
/// connects. Used by |get_public_ip_addr|
pub fn ip_echo_server(
tcp_listener: std::net::TcpListener,
num_server_threads: NonZeroUsize,
// Cluster shred-version of the node running the server.
shred_version: Option<u16>,
) -> IpEchoServer {
tcp_listener.set_nonblocking(true).unwrap();

let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("solIpEchoSrvrRt")
.worker_threads(num_server_threads.get())
.enable_all()
.build()
.expect("new tokio runtime");
Expand Down
17 changes: 14 additions & 3 deletions net-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use {
};

mod ip_echo_server;
pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE};
pub use ip_echo_server::{
ip_echo_server, IpEchoServer, DEFAULT_IP_ECHO_SERVER_THREADS, MAX_PORT_COUNT_PER_MESSAGE,
MINIMUM_IP_ECHO_SERVER_THREADS,
};
use ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse};

/// A data type representing a public Udp socket
Expand Down Expand Up @@ -744,7 +747,11 @@ mod tests {
let (_server_port, (server_udp_socket, server_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();

let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(42));
let _runtime = ip_echo_server(
server_tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
/*shred_version=*/ Some(42),
);

let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
assert_eq!(
Expand All @@ -764,7 +771,11 @@ mod tests {
let (client_port, (client_udp_socket, client_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();

let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(65535));
let _runtime = ip_echo_server(
server_tcp_listener,
DEFAULT_IP_ECHO_SERVER_THREADS,
/*shred_version=*/ Some(65535),
);

let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ use {

// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
pub struct DefaultThreadArgs {
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
}

impl Default for DefaultThreadArgs {
fn default() -> Self {
Self {
ip_echo_server_threads: IpEchoServerThreadsArg::default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(),
}
Expand All @@ -24,6 +26,7 @@ impl Default for DefaultThreadArgs {

pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
vec![
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
]
Expand All @@ -41,12 +44,18 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
}

pub struct NumThreadConfig {
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
}

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NumThreadConfig {
ip_echo_server_threads: value_t_or_exit!(
matches,
IpEchoServerThreadsArg::NAME,
NonZeroUsize
),
replay_forks_threads: if matches.is_present("replay_slots_concurrently") {
NonZeroUsize::new(4).expect("4 is non-zero")
} else {
Expand Down Expand Up @@ -86,6 +95,20 @@ trait ThreadArg {
}
}

struct IpEchoServerThreadsArg;
impl ThreadArg for IpEchoServerThreadsArg {
const NAME: &'static str = "ip_echo_server_threads";
const LONG_NAME: &'static str = "ip-echo-server-threads";
const HELP: &'static str = "Number of threads to use for the IP echo server";

fn default() -> usize {
solana_net_utils::DEFAULT_IP_ECHO_SERVER_THREADS.get()
}
fn min() -> usize {
solana_net_utils::MINIMUM_IP_ECHO_SERVER_THREADS.get()
}
}

struct ReplayForksThreadsArg;
impl ThreadArg for ReplayForksThreadsArg {
const NAME: &'static str = "replay_forks_threads";
Expand Down
2 changes: 2 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,7 @@ pub fn main() {
let full_api = matches.is_present("full_rpc_api");

let cli::thread_args::NumThreadConfig {
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
} = cli::thread_args::parse_num_threads_args(&matches);
Expand Down Expand Up @@ -1474,6 +1475,7 @@ pub fn main() {
use_snapshot_archives_at_startup::cli::NAME,
UseSnapshotArchivesAtStartup
),
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
..ValidatorConfig::default()
Expand Down

0 comments on commit 79e316e

Please sign in to comment.