From dc35ce8a7be0ee4e9a53addccba0104adb7594cd Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 3 Oct 2024 20:06:59 -0700 Subject: [PATCH] Support max conncurrent connections (#3031) * implementing max concurrent connection limits * add metrics refused_connections_too_many_open_connections * reference counting on concurrent connections --- streamer/src/nonblocking/quic.rs | 117 ++++++++++++++++++++++++++++++- streamer/src/quic.rs | 14 ++++ 2 files changed, 128 insertions(+), 3 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 760902a6aab06b..0bc99e04652da9 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -39,6 +39,7 @@ use { }, solana_transaction_metrics_tracker::signature_if_should_track_packet, std::{ + fmt, iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, pin::Pin, @@ -195,8 +196,7 @@ pub fn spawn_server_multi( coalesce: Duration, ) -> Result { info!("Start {name} quic server on {sockets:?}"); - let concurrent_connections = - (max_staked_connections + max_unstaked_connections) / sockets.len(); + let concurrent_connections = max_staked_connections + max_unstaked_connections; let max_concurrent_connections = concurrent_connections + concurrent_connections / 4; let (config, _) = configure_server(keypair)?; @@ -227,6 +227,7 @@ pub fn spawn_server_multi( stats.clone(), wait_for_chunk_timeout, coalesce, + max_concurrent_connections, )); Ok(SpawnNonBlockingServerResult { endpoints, @@ -236,6 +237,52 @@ pub fn spawn_server_multi( }) } +/// struct ease tracking connections of all stages, so that we do not have to +/// litter the code with open connection tracking. This is added into the +/// connection table as part of the ConnectionEntry. The reference is auto +/// reduced when it is dropped. + +struct ClientConnectionTracker { + stats: Arc, +} + +/// This is required by ConnectionEntry for supporting debug format. +impl fmt::Debug for ClientConnectionTracker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StreamerClientConnection") + .field( + "open_connections:", + &self.stats.open_connections.load(Ordering::Relaxed), + ) + .finish() + } +} + +impl Drop for ClientConnectionTracker { + /// When this is dropped, reduce the open connection count. + fn drop(&mut self) { + self.stats.open_connections.fetch_sub(1, Ordering::Relaxed); + } +} + +impl ClientConnectionTracker { + /// Check the max_concurrent_connections limit and if it is within the limit + /// create ClientConnectionTracker and increment open connection count. Otherwise returns Err + fn new(stats: Arc, max_concurrent_connections: usize) -> Result { + let open_connections = stats.open_connections.fetch_add(1, Ordering::Relaxed); + if open_connections >= max_concurrent_connections { + stats.open_connections.fetch_sub(1, Ordering::Relaxed); + debug!( + "There are too many concurrent connections opened already: open: {}, max: {}", + open_connections, max_concurrent_connections + ); + return Err(()); + } + + Ok(Self { stats }) + } +} + #[allow(clippy::too_many_arguments)] async fn run_server( name: &'static str, @@ -251,6 +298,7 @@ async fn run_server( stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, + max_concurrent_connections: usize, ) { let rate_limiter = ConnectionRateLimiter::new(max_connections_per_ipaddr_per_min); let overall_connection_rate_limiter = @@ -290,6 +338,7 @@ async fn run_server( }) }) .collect::>(); + while !exit.load(Ordering::Relaxed) { let timeout_connection = select! { ready = accepts.next() => { @@ -320,6 +369,7 @@ async fn run_server( stats .total_incoming_connection_attempts .fetch_add(1, Ordering::Relaxed); + let remote_address = incoming.remote_address(); // first check overall connection rate limit: @@ -354,6 +404,16 @@ async fn run_server( continue; } + let Ok(client_connection_tracker) = + ClientConnectionTracker::new(stats.clone(), max_concurrent_connections) + else { + stats + .refused_connections_too_many_open_connections + .fetch_add(1, Ordering::Relaxed); + incoming.refuse(); + continue; + }; + stats .outstanding_incoming_connection_attempts .fetch_add(1, Ordering::Relaxed); @@ -362,6 +422,7 @@ async fn run_server( Ok(connecting) => { tokio::spawn(setup_connection( connecting, + client_connection_tracker, unstaked_connection_table.clone(), staked_connection_table.clone(), sender.clone(), @@ -496,6 +557,7 @@ impl NewConnectionHandlerParams { } fn handle_and_cache_new_connection( + client_connection_tracker: ClientConnectionTracker, connection: Connection, mut connection_table_l: MutexGuard, connection_table: Arc>, @@ -525,6 +587,7 @@ fn handle_and_cache_new_connection( .try_add_connection( ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), + client_connection_tracker, Some(connection.clone()), params.peer_type, timing::timestamp(), @@ -571,6 +634,7 @@ fn handle_and_cache_new_connection( } async fn prune_unstaked_connections_and_add_new_connection( + client_connection_tracker: ClientConnectionTracker, connection: Connection, connection_table: Arc>, max_connections: usize, @@ -584,6 +648,7 @@ async fn prune_unstaked_connections_and_add_new_connection( let mut connection_table = connection_table.lock().await; prune_unstaked_connection_table(&mut connection_table, max_connections, stats); handle_and_cache_new_connection( + client_connection_tracker, connection, connection_table, connection_table_clone, @@ -646,6 +711,7 @@ fn compute_recieve_window( #[allow(clippy::too_many_arguments)] async fn setup_connection( connecting: Connecting, + client_connection_tracker: ClientConnectionTracker, unstaked_connection_table: Arc>, staked_connection_table: Arc>, packet_sender: AsyncSender, @@ -712,6 +778,7 @@ async fn setup_connection( if connection_table_l.total_size < max_staked_connections { if let Ok(()) = handle_and_cache_new_connection( + client_connection_tracker, new_connection, connection_table_l, staked_connection_table.clone(), @@ -728,6 +795,7 @@ async fn setup_connection( // put this connection in the unstaked connection table. If needed, prune a // connection from the unstaked connection table. if let Ok(()) = prune_unstaked_connections_and_add_new_connection( + client_connection_tracker, new_connection, unstaked_connection_table.clone(), max_unstaked_connections, @@ -752,6 +820,7 @@ async fn setup_connection( } ConnectionPeerType::Unstaked => { if let Ok(()) = prune_unstaked_connections_and_add_new_connection( + client_connection_tracker, new_connection, unstaked_connection_table.clone(), max_unstaked_connections, @@ -1226,6 +1295,8 @@ struct ConnectionEntry { peer_type: ConnectionPeerType, last_update: Arc, port: u16, + // We do not explicitly use it, but its drop is triggered when ConnectionEntry is dropped. + _client_connection_tracker: ClientConnectionTracker, connection: Option, stream_counter: Arc, } @@ -1236,6 +1307,7 @@ impl ConnectionEntry { peer_type: ConnectionPeerType, last_update: Arc, port: u16, + client_connection_tracker: ClientConnectionTracker, connection: Option, stream_counter: Arc, ) -> Self { @@ -1244,6 +1316,7 @@ impl ConnectionEntry { peer_type, last_update, port, + _client_connection_tracker: client_connection_tracker, connection, stream_counter, } @@ -1334,7 +1407,7 @@ impl ConnectionTable { }) .map(|index| { let connection = self.table[index].first(); - let stake = connection.map(|connection| connection.stake()); + let stake = connection.map(|connection: &ConnectionEntry| connection.stake()); (index, stake) }) .take(sample_size) @@ -1351,6 +1424,7 @@ impl ConnectionTable { &mut self, key: ConnectionTableKey, port: u16, + client_connection_tracker: ClientConnectionTracker, connection: Option, peer_type: ConnectionPeerType, last_update: u64, @@ -1378,6 +1452,7 @@ impl ConnectionTable { peer_type, last_update.clone(), port, + client_connection_tracker, connection, stream_counter.clone(), )); @@ -2002,11 +2077,13 @@ pub mod test { let sockets: Vec<_> = (0..num_entries) .map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(i, 0, 0, 0)), 0)) .collect(); + let stats = Arc::new(StreamerStats::default()); for (i, socket) in sockets.iter().enumerate() { table .try_add_connection( ConnectionTableKey::IP(socket.ip()), socket.port(), + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Unstaked, i as u64, @@ -2019,6 +2096,7 @@ pub mod test { .try_add_connection( ConnectionTableKey::IP(sockets[0].ip()), sockets[0].port(), + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Unstaked, 5, @@ -2040,6 +2118,7 @@ pub mod test { table.remove_connection(ConnectionTableKey::IP(socket.ip()), socket.port(), 0); } assert_eq!(table.total_size, 0); + assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0); } #[test] @@ -2051,6 +2130,7 @@ pub mod test { // from a different peer pubkey. let num_entries = 15; let max_connections_per_peer = 10; + let stats = Arc::new(StreamerStats::default()); let pubkeys: Vec<_> = (0..num_entries).map(|_| Pubkey::new_unique()).collect(); for (i, pubkey) in pubkeys.iter().enumerate() { @@ -2058,6 +2138,7 @@ pub mod test { .try_add_connection( ConnectionTableKey::Pubkey(*pubkey), 0, + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Unstaked, i as u64, @@ -2075,6 +2156,7 @@ pub mod test { table.remove_connection(ConnectionTableKey::Pubkey(*pubkey), 0, 0); } assert_eq!(table.total_size, 0); + assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0); } #[test] @@ -2084,11 +2166,14 @@ pub mod test { let max_connections_per_peer = 10; let pubkey = Pubkey::new_unique(); + let stats: Arc = Arc::new(StreamerStats::default()); + (0..max_connections_per_peer).for_each(|i| { table .try_add_connection( ConnectionTableKey::Pubkey(pubkey), 0, + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Unstaked, i as u64, @@ -2103,6 +2188,7 @@ pub mod test { .try_add_connection( ConnectionTableKey::Pubkey(pubkey), 0, + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Unstaked, 10, @@ -2117,6 +2203,7 @@ pub mod test { .try_add_connection( ConnectionTableKey::Pubkey(pubkey2), 0, + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Unstaked, 10, @@ -2134,6 +2221,7 @@ pub mod test { table.remove_connection(ConnectionTableKey::Pubkey(pubkey2), 0, 0); assert_eq!(table.total_size, 0); + assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0); } #[test] @@ -2146,11 +2234,14 @@ pub mod test { let sockets: Vec<_> = (0..num_entries) .map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(i, 0, 0, 0)), 0)) .collect(); + let stats: Arc = Arc::new(StreamerStats::default()); + for (i, socket) in sockets.iter().enumerate() { table .try_add_connection( ConnectionTableKey::IP(socket.ip()), socket.port(), + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Staked((i + 1) as u64), i as u64, @@ -2171,6 +2262,8 @@ pub mod test { num_entries as u64 + 1, // threshold_stake ); assert_eq!(pruned, 1); + // We had 5 connections and pruned 1, we should have 4 left + assert_eq!(stats.open_connections.load(Ordering::Relaxed), 4); } #[test] @@ -2183,11 +2276,14 @@ pub mod test { let mut sockets: Vec<_> = (0..num_ips) .map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(i, 0, 0, 0)), 0)) .collect(); + let stats: Arc = Arc::new(StreamerStats::default()); + for (i, socket) in sockets.iter().enumerate() { table .try_add_connection( ConnectionTableKey::IP(socket.ip()), socket.port(), + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Unstaked, (i * 2) as u64, @@ -2199,6 +2295,7 @@ pub mod test { .try_add_connection( ConnectionTableKey::IP(socket.ip()), socket.port(), + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Unstaked, (i * 2 + 1) as u64, @@ -2213,6 +2310,7 @@ pub mod test { .try_add_connection( ConnectionTableKey::IP(single_connection_addr.ip()), single_connection_addr.port(), + ClientConnectionTracker::new(stats.clone(), 1000).unwrap(), None, ConnectionPeerType::Unstaked, (num_ips * 2) as u64, @@ -2230,6 +2328,7 @@ pub mod test { table.remove_connection(ConnectionTableKey::IP(socket.ip()), socket.port(), 0); } assert_eq!(table.total_size, 0); + assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0); } #[test] @@ -2342,4 +2441,16 @@ pub mod test { ); assert!(stats.throttled_unstaked_streams.load(Ordering::Relaxed) > 0); } + + #[test] + fn test_client_connection_tracker() { + let stats = Arc::new(StreamerStats::default()); + let tracker_1 = ClientConnectionTracker::new(stats.clone(), 1); + assert!(tracker_1.is_ok()); + assert!(ClientConnectionTracker::new(stats.clone(), 1).is_err()); + assert_eq!(stats.open_connections.load(Ordering::Relaxed), 1); + // dropping the connection, concurrent connections should become 0 + drop(tracker_1); + assert_eq!(stats.open_connections.load(Ordering::Relaxed), 0); + } } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index e9ca06a10bb133..b5f78c753da92c 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -245,6 +245,9 @@ pub struct StreamerStats { pub(crate) throttled_staked_streams: AtomicUsize, pub(crate) throttled_unstaked_streams: AtomicUsize, pub(crate) connection_rate_limiter_length: AtomicUsize, + // All connections in various states such as Incoming, Connecting, Connection + pub(crate) open_connections: AtomicUsize, + pub(crate) refused_connections_too_many_open_connections: AtomicUsize, pub(crate) outstanding_incoming_connection_attempts: AtomicUsize, pub(crate) total_incoming_connection_attempts: AtomicUsize, pub(crate) quic_endpoints_count: AtomicUsize, @@ -593,6 +596,17 @@ impl StreamerStats { self.quic_endpoints_count.load(Ordering::Relaxed), i64 ), + ( + "open_connections", + self.open_connections.load(Ordering::Relaxed), + i64 + ), + ( + "refused_connections_too_many_open_connections", + self.refused_connections_too_many_open_connections + .swap(0, Ordering::Relaxed), + i64 + ), ); } }