Skip to content

Commit

Permalink
v1.18: Parameterize max streams per ms (backport of jito-foundation#707
Browse files Browse the repository at this point in the history
…) (#828)

Make PPS a parameter instead of the hard coded
  • Loading branch information
mergify[bot] authored and yihau committed Apr 26, 2024
1 parent eb2c43d commit 14f5fa7
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 16 deletions.
4 changes: 3 additions & 1 deletion client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::SpawnServerResult,
streamer::StakedNodes,
},
std::{
Expand Down Expand Up @@ -273,6 +274,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
4 changes: 3 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
Expand Down Expand Up @@ -167,6 +167,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand All @@ -191,6 +192,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand Down
10 changes: 8 additions & 2 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::SpawnServerResult,
streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -83,6 +85,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -163,6 +166,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -225,6 +229,7 @@ mod tests {
staked_nodes.clone(),
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -253,6 +258,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
18 changes: 15 additions & 3 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use {
crate::{
nonblocking::stream_throttle::{
ConnectionStreamCounter, StakedStreamLoadEMA, MAX_STREAMS_PER_MS,
STREAM_STOP_CODE_THROTTLING, STREAM_THROTTLING_INTERVAL_MS,
ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING,
STREAM_THROTTLING_INTERVAL_MS,
},
quic::{configure_server, QuicServerError, StreamStats},
streamer::StakedNodes,
Expand Down Expand Up @@ -74,6 +74,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";

/// Limit to 250K PPS
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;

// A sequence of bytes that is part of a packet
// along with where in the packet it is
struct PacketChunk {
Expand Down Expand Up @@ -122,6 +125,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<(Endpoint, Arc<StreamStats>, JoinHandle<()>), QuicServerError> {
Expand All @@ -145,6 +149,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
coalesce,
Expand All @@ -162,6 +167,7 @@ async fn run_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
Expand All @@ -174,6 +180,7 @@ async fn run_server(
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
stats.clone(),
max_unstaked_connections,
max_streams_per_ms,
));
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
Expand Down Expand Up @@ -204,6 +211,7 @@ async fn run_server(
staked_nodes.clone(),
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
stream_load_ema.clone(),
Expand Down Expand Up @@ -482,6 +490,7 @@ async fn setup_connection(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
stream_load_ema: Arc<StakedStreamLoadEMA>,
Expand All @@ -503,7 +512,7 @@ async fn setup_connection(
// The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle
// interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams.
let min_stake_ratio =
1_f64 / (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) as f64;
1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64;
let stake_ratio = stake as f64 / total_stake as f64;
let peer_type = if stake_ratio < min_stake_ratio {
// If it is a staked connection with ultra low stake ratio, treat it as unstaked.
Expand Down Expand Up @@ -1288,6 +1297,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(2),
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -1724,6 +1734,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -1755,6 +1766,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
26 changes: 18 additions & 8 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use {
},
};

/// Limit to 250K PPS
pub const MAX_STREAMS_PER_MS: u64 = 250;
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
pub const STREAM_STOP_CODE_THROTTLING: u32 = 15;
Expand All @@ -35,14 +33,18 @@ pub(crate) struct StakedStreamLoadEMA {
}

impl StakedStreamLoadEMA {
pub(crate) fn new(stats: Arc<StreamStats>, max_unstaked_connections: usize) -> Self {
pub(crate) fn new(
stats: Arc<StreamStats>,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
) -> Self {
let allow_unstaked_streams = max_unstaked_connections > 0;
let max_staked_load_in_ema_window = if allow_unstaked_streams {
(MAX_STREAMS_PER_MS
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS))
(max_streams_per_ms
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(max_streams_per_ms))
* EMA_WINDOW_MS
} else {
MAX_STREAMS_PER_MS * EMA_WINDOW_MS
max_streams_per_ms * EMA_WINDOW_MS
};

let max_num_unstaked_connections =
Expand All @@ -56,7 +58,7 @@ impl StakedStreamLoadEMA {

let max_unstaked_load_in_throttling_window = if allow_unstaked_streams {
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS)
.apply_to(max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS)
.saturating_div(max_num_unstaked_connections)
} else {
0
Expand Down Expand Up @@ -228,7 +230,9 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
nonblocking::{
quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
},
quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS},
},
std::{
Expand All @@ -242,6 +246,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
// 25K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
Expand All @@ -258,6 +263,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));

// EMA load is used for staked connections to calculate max number of allowed streams.
Expand Down Expand Up @@ -349,6 +355,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
0,
DEFAULT_MAX_STREAMS_PER_MS,
));

// EMA load is used for staked connections to calculate max number of allowed streams.
Expand Down Expand Up @@ -436,6 +443,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
Expand Down Expand Up @@ -464,6 +472,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
Expand All @@ -483,6 +492,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
Expand Down
9 changes: 8 additions & 1 deletion streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<SpawnServerResult, QuicServerError> {
Expand All @@ -488,6 +489,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
wait_for_chunk_timeout,
coalesce,
)
Expand Down Expand Up @@ -515,7 +517,9 @@ pub fn spawn_server(
mod test {
use {
super::*,
crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
crate::nonblocking::quic::{
test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crossbeam_channel::unbounded,
solana_sdk::net::DEFAULT_TPU_COALESCE,
std::net::SocketAddr,
Expand Down Expand Up @@ -549,6 +553,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -609,6 +614,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -656,6 +662,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down

0 comments on commit 14f5fa7

Please sign in to comment.