diff --git a/core/src/structures/prioritization_fee_heap.rs b/core/src/structures/prioritization_fee_heap.rs index a3755765..0b5cb863 100644 --- a/core/src/structures/prioritization_fee_heap.rs +++ b/core/src/structures/prioritization_fee_heap.rs @@ -110,6 +110,14 @@ impl PrioritizationFeesHeap { pub async fn size(&self) -> usize { self.map.lock().await.signatures.len() } + + pub async fn clear(&self) -> usize { + let mut lk = self.map.lock().await; + lk.map.clear(); + let size = lk.signatures.len(); + lk.signatures.clear(); + size + } } #[cfg(test)] diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index 90fd24c8..d87c6a38 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -45,6 +45,9 @@ lazy_static::lazy_static! { register_int_gauge!(opts!("literpc_quic_finish_timedout", "Number of times finish timedout")).unwrap(); static ref NB_QUIC_FINISH_ERRORED: GenericGauge = register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap(); + + static ref NB_QUIC_CONNECTIONS: GenericGauge = + register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); } const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -210,6 +213,7 @@ impl QuicConnectionUtils { }; match conn { Ok(conn) => { + NB_QUIC_CONNECTIONS.inc(); return Some(conn); } Err(e) => { diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 8d05343d..fba8a28c 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -33,8 +33,6 @@ use crate::{ }; lazy_static::lazy_static! { - static ref NB_QUIC_CONNECTIONS: GenericGauge = - register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge = register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap(); static ref NB_CONNECTIONS_TO_KEEP: GenericGauge = @@ -48,7 +46,7 @@ lazy_static::lazy_static! { )) .unwrap(); - static ref TRANSACTIONS_IN_HEAP: GenericGauge = + static ref TRANSACTIONS_IN_HEAP: GenericGauge = register_int_gauge!(opts!("literpc_transactions_in_priority_heap", "Number of transactions in priority heap")).unwrap(); } @@ -88,19 +86,41 @@ impl ActiveConnection { addr: SocketAddr, identity_stakes: IdentityStakesData, ) { - let priorization_heap = PrioritizationFeesHeap::new(2048); let fill_notify = Arc::new(Notify::new()); let identity = self.identity; + NB_QUIC_ACTIVE_CONNECTIONS.inc(); + + let max_number_of_connections = self.connection_parameters.max_number_of_connections; + + let max_uni_stream_connections = compute_max_allowed_uni_streams( + identity_stakes.peer_type, + identity_stakes.stakes, + identity_stakes.total_stakes, + ); + let exit_signal = self.exit_signal.clone(); + let connection_pool = QuicConnectionPool::new( + identity, + self.endpoints.clone(), + addr, + self.connection_parameters, + exit_signal.clone(), + max_number_of_connections, + max_uni_stream_connections, + ); + + let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections); + let heap_filler_task = { let priorization_heap = priorization_heap.clone(); let data_cache = self.data_cache.clone(); let fill_notify = fill_notify.clone(); + let exit_signal = exit_signal.clone(); tokio::spawn(async move { let mut current_blockheight = data_cache.block_information_store.get_last_blockheight(); - loop { + while !exit_signal.load(Ordering::Relaxed) { let tx = transaction_reciever.recv().await; match tx { Ok(transaction_sent_info) => { @@ -140,26 +160,6 @@ impl ActiveConnection { }) }; - NB_QUIC_ACTIVE_CONNECTIONS.inc(); - - let max_number_of_connections = self.connection_parameters.max_number_of_connections; - - let max_uni_stream_connections = compute_max_allowed_uni_streams( - identity_stakes.peer_type, - identity_stakes.stakes, - identity_stakes.total_stakes, - ); - let exit_signal = self.exit_signal.clone(); - let connection_pool = QuicConnectionPool::new( - identity, - self.endpoints.clone(), - addr, - self.connection_parameters, - exit_signal.clone(), - max_number_of_connections, - max_uni_stream_connections, - ); - 'main_loop: loop { // exit signal set if exit_signal.load(Ordering::Relaxed) { @@ -217,7 +217,8 @@ impl ActiveConnection { } heap_filler_task.abort(); - NB_QUIC_CONNECTIONS.dec(); + let elements_removed = priorization_heap.clear().await; + TRANSACTIONS_IN_HEAP.sub(elements_removed as i64); NB_QUIC_ACTIVE_CONNECTIONS.dec(); }