Skip to content

Commit

Permalink
Fixing memory leak, and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Mar 19, 2024
1 parent 596957f commit 73d0b06
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
8 changes: 8 additions & 0 deletions core/src/structures/prioritization_fee_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ impl PrioritizationFeesHeap {
pub async fn size(&self) -> usize {
self.map.lock().await.signatures.len()
}

pub async fn clear(&self) -> usize {
let mut lk = self.map.lock().await;
lk.map.clear();
let size = lk.signatures.len();
lk.signatures.clear();
size
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("literpc_quic_finish_timedout", "Number of times finish timedout")).unwrap();
static ref NB_QUIC_FINISH_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap();

static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
}

const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
Expand Down Expand Up @@ -210,6 +213,7 @@ impl QuicConnectionUtils {
};
match conn {
Ok(conn) => {
NB_QUIC_CONNECTIONS.inc();
return Some(conn);
}
Err(e) => {
Expand Down
53 changes: 27 additions & 26 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ use crate::{
};

lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap();
static ref NB_CONNECTIONS_TO_KEEP: GenericGauge<prometheus::core::AtomicI64> =
Expand All @@ -48,7 +46,7 @@ lazy_static::lazy_static! {
))
.unwrap();

static ref TRANSACTIONS_IN_HEAP: GenericGauge<prometheus::core::AtomicI64> =
static ref TRANSACTIONS_IN_HEAP: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_transactions_in_priority_heap", "Number of transactions in priority heap")).unwrap();
}

Expand Down Expand Up @@ -88,19 +86,41 @@ impl ActiveConnection {
addr: SocketAddr,
identity_stakes: IdentityStakesData,
) {
let priorization_heap = PrioritizationFeesHeap::new(2048);
let fill_notify = Arc::new(Notify::new());

let identity = self.identity;

NB_QUIC_ACTIVE_CONNECTIONS.inc();

let max_number_of_connections = self.connection_parameters.max_number_of_connections;

let max_uni_stream_connections = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
);
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(
identity,
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_signal.clone(),
max_number_of_connections,
max_uni_stream_connections,
);

let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections);

let heap_filler_task = {
let priorization_heap = priorization_heap.clone();
let data_cache = self.data_cache.clone();
let fill_notify = fill_notify.clone();
let exit_signal = exit_signal.clone();
tokio::spawn(async move {
let mut current_blockheight =
data_cache.block_information_store.get_last_blockheight();
loop {
while !exit_signal.load(Ordering::Relaxed) {
let tx = transaction_reciever.recv().await;
match tx {
Ok(transaction_sent_info) => {
Expand Down Expand Up @@ -140,26 +160,6 @@ impl ActiveConnection {
})
};

NB_QUIC_ACTIVE_CONNECTIONS.inc();

let max_number_of_connections = self.connection_parameters.max_number_of_connections;

let max_uni_stream_connections = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
);
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(
identity,
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_signal.clone(),
max_number_of_connections,
max_uni_stream_connections,
);

'main_loop: loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -217,7 +217,8 @@ impl ActiveConnection {
}

heap_filler_task.abort();
NB_QUIC_CONNECTIONS.dec();
let elements_removed = priorization_heap.clear().await;
TRANSACTIONS_IN_HEAP.sub(elements_removed as i64);
NB_QUIC_ACTIVE_CONNECTIONS.dec();
}

Expand Down

0 comments on commit 73d0b06

Please sign in to comment.