Fixing memory leak, and metrics
This commit is contained in:
parent
596957f65e
commit
73d0b069be
|
@ -110,6 +110,14 @@ impl PrioritizationFeesHeap {
|
|||
pub async fn size(&self) -> usize {
|
||||
self.map.lock().await.signatures.len()
|
||||
}
|
||||
|
||||
pub async fn clear(&self) -> usize {
|
||||
let mut lk = self.map.lock().await;
|
||||
lk.map.clear();
|
||||
let size = lk.signatures.len();
|
||||
lk.signatures.clear();
|
||||
size
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -45,6 +45,9 @@ lazy_static::lazy_static! {
|
|||
register_int_gauge!(opts!("literpc_quic_finish_timedout", "Number of times finish timedout")).unwrap();
|
||||
static ref NB_QUIC_FINISH_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap();
|
||||
|
||||
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
|
||||
}
|
||||
|
||||
const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
|
||||
|
@ -210,6 +213,7 @@ impl QuicConnectionUtils {
|
|||
};
|
||||
match conn {
|
||||
Ok(conn) => {
|
||||
NB_QUIC_CONNECTIONS.inc();
|
||||
return Some(conn);
|
||||
}
|
||||
Err(e) => {
|
||||
|
|
|
@ -33,8 +33,6 @@ use crate::{
|
|||
};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
|
||||
static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap();
|
||||
static ref NB_CONNECTIONS_TO_KEEP: GenericGauge<prometheus::core::AtomicI64> =
|
||||
|
@ -48,7 +46,7 @@ lazy_static::lazy_static! {
|
|||
))
|
||||
.unwrap();
|
||||
|
||||
static ref TRANSACTIONS_IN_HEAP: GenericGauge<prometheus::core::AtomicI64> =
|
||||
static ref TRANSACTIONS_IN_HEAP: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_transactions_in_priority_heap", "Number of transactions in priority heap")).unwrap();
|
||||
}
|
||||
|
||||
|
@ -88,19 +86,41 @@ impl ActiveConnection {
|
|||
addr: SocketAddr,
|
||||
identity_stakes: IdentityStakesData,
|
||||
) {
|
||||
let priorization_heap = PrioritizationFeesHeap::new(2048);
|
||||
let fill_notify = Arc::new(Notify::new());
|
||||
|
||||
let identity = self.identity;
|
||||
|
||||
NB_QUIC_ACTIVE_CONNECTIONS.inc();
|
||||
|
||||
let max_number_of_connections = self.connection_parameters.max_number_of_connections;
|
||||
|
||||
let max_uni_stream_connections = compute_max_allowed_uni_streams(
|
||||
identity_stakes.peer_type,
|
||||
identity_stakes.stakes,
|
||||
identity_stakes.total_stakes,
|
||||
);
|
||||
let exit_signal = self.exit_signal.clone();
|
||||
let connection_pool = QuicConnectionPool::new(
|
||||
identity,
|
||||
self.endpoints.clone(),
|
||||
addr,
|
||||
self.connection_parameters,
|
||||
exit_signal.clone(),
|
||||
max_number_of_connections,
|
||||
max_uni_stream_connections,
|
||||
);
|
||||
|
||||
let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections);
|
||||
|
||||
let heap_filler_task = {
|
||||
let priorization_heap = priorization_heap.clone();
|
||||
let data_cache = self.data_cache.clone();
|
||||
let fill_notify = fill_notify.clone();
|
||||
let exit_signal = exit_signal.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut current_blockheight =
|
||||
data_cache.block_information_store.get_last_blockheight();
|
||||
loop {
|
||||
while !exit_signal.load(Ordering::Relaxed) {
|
||||
let tx = transaction_reciever.recv().await;
|
||||
match tx {
|
||||
Ok(transaction_sent_info) => {
|
||||
|
@ -140,26 +160,6 @@ impl ActiveConnection {
|
|||
})
|
||||
};
|
||||
|
||||
NB_QUIC_ACTIVE_CONNECTIONS.inc();
|
||||
|
||||
let max_number_of_connections = self.connection_parameters.max_number_of_connections;
|
||||
|
||||
let max_uni_stream_connections = compute_max_allowed_uni_streams(
|
||||
identity_stakes.peer_type,
|
||||
identity_stakes.stakes,
|
||||
identity_stakes.total_stakes,
|
||||
);
|
||||
let exit_signal = self.exit_signal.clone();
|
||||
let connection_pool = QuicConnectionPool::new(
|
||||
identity,
|
||||
self.endpoints.clone(),
|
||||
addr,
|
||||
self.connection_parameters,
|
||||
exit_signal.clone(),
|
||||
max_number_of_connections,
|
||||
max_uni_stream_connections,
|
||||
);
|
||||
|
||||
'main_loop: loop {
|
||||
// exit signal set
|
||||
if exit_signal.load(Ordering::Relaxed) {
|
||||
|
@ -217,7 +217,8 @@ impl ActiveConnection {
|
|||
}
|
||||
|
||||
heap_filler_task.abort();
|
||||
NB_QUIC_CONNECTIONS.dec();
|
||||
let elements_removed = priorization_heap.clear().await;
|
||||
TRANSACTIONS_IN_HEAP.sub(elements_removed as i64);
|
||||
NB_QUIC_ACTIVE_CONNECTIONS.dec();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue