From 596957f65e877d3c1712eaa72db9b7df6cb2a2a2 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Mon, 18 Mar 2024 21:57:32 +0100 Subject: [PATCH 1/5] Priority fee related statistics --- services/src/quic_connection_utils.rs | 10 +++++----- .../src/tpu_utils/tpu_connection_manager.rs | 20 ++++++++++++++++++- services/src/transaction_service.rs | 11 ++++++++++ 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index bc3749d4..90fd24c8 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -69,14 +69,14 @@ pub struct QuicConnectionParameters { impl Default for QuicConnectionParameters { fn default() -> Self { Self { - connection_timeout: Duration::from_millis(5000), - unistream_timeout: Duration::from_millis(5000), - write_timeout: Duration::from_millis(5000), - finalize_timeout: Duration::from_millis(5000), + connection_timeout: Duration::from_millis(10000), + unistream_timeout: Duration::from_millis(10000), + write_timeout: Duration::from_millis(10000), + finalize_timeout: Duration::from_millis(10000), connection_retry_count: 20, max_number_of_connections: 8, number_of_transactions_per_unistream: 1, - percentage_of_connection_limit_to_create_new: 50, + percentage_of_connection_limit_to_create_new: 75, } } } diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 3fd2cb10..8d05343d 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -1,6 +1,8 @@ use dashmap::DashMap; use log::{error, trace}; -use prometheus::{core::GenericGauge, opts, register_int_gauge}; +use prometheus::{ + core::GenericGauge, histogram_opts, opts, register_histogram, register_int_gauge, Histogram, +}; use quinn::Endpoint; use solana_lite_rpc_core::{ stores::data_cache::DataCache, @@ -39,6 +41,15 @@ lazy_static::lazy_static! { register_int_gauge!(opts!("literpc_connections_to_keep", "Number of connections to keep asked by tpu service")).unwrap(); static ref NB_QUIC_TASKS: GenericGauge = register_int_gauge!(opts!("literpc_quic_tasks", "Number of connections to keep asked by tpu service")).unwrap(); + + static ref TT_SENT_TIMER: Histogram = register_histogram!(histogram_opts!( + "literpc_txs_send_timer", + "Time to send transaction batch", + )) + .unwrap(); + + static ref TRANSACTIONS_IN_HEAP: GenericGauge = + register_int_gauge!(opts!("literpc_transactions_in_priority_heap", "Number of transactions in priority heap")).unwrap(); } #[derive(Clone)] @@ -101,6 +112,8 @@ impl ActiveConnection { } priorization_heap.insert(transaction_sent_info).await; + TRANSACTIONS_IN_HEAP.inc(); + fill_notify.notify_one(); // give little more priority to read the transaction sender with this wait let last_blockheight = @@ -166,6 +179,7 @@ impl ActiveConnection { // wait to get notification from fill event break; }; + TRANSACTIONS_IN_HEAP.dec(); // check if transaction is already confirmed if self.data_cache.txs.is_transaction_confirmed(&tx.signature) { @@ -186,8 +200,12 @@ impl ActiveConnection { tokio::spawn(async move { // permit will be used to send all the transaction and then destroyed let _permit = permit; + let timer = TT_SENT_TIMER.start_timer(); + NB_QUIC_TASKS.inc(); + connection.send_transaction(tx.transaction).await; + timer.observe_duration(); NB_QUIC_TASKS.dec(); }); } diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index 0906b39b..a5882228 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -9,6 +9,7 @@ use crate::{ tx_sender::TxSender, }; use anyhow::bail; +use prometheus::{histogram_opts, register_histogram, Histogram}; use solana_lite_rpc_core::{ solana_utils::SerializableTransaction, structures::transaction_sent_info::SentTransactionInfo, types::SlotStream, @@ -28,6 +29,14 @@ use tokio::{ time::Instant, }; +lazy_static::lazy_static! { +static ref PRIORITY_FEES_HISTOGRAM: Histogram = register_histogram!(histogram_opts!( + "literpc_txs_priority_fee", + "Priority fees of transactions sent by lite-rpc", +)) +.unwrap(); +} + #[derive(Clone)] pub struct TransactionServiceBuilder { tx_sender: TxSender, @@ -157,6 +166,8 @@ impl TransactionService { prioritization_fee }; + PRIORITY_FEES_HISTOGRAM.observe(prioritization_fee as f64); + let max_replay = max_retries.map_or(self.max_retries, |x| x as usize); let transaction_info = SentTransactionInfo { signature: signature.to_string(), From 73d0b069beb8070d5ad5c9448724cd508840c607 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 19 Mar 2024 10:35:29 +0100 Subject: [PATCH 2/5] Fixing memory leak, and metrics --- .../src/structures/prioritization_fee_heap.rs | 8 +++ services/src/quic_connection_utils.rs | 4 ++ .../src/tpu_utils/tpu_connection_manager.rs | 53 ++++++++++--------- 3 files changed, 39 insertions(+), 26 deletions(-) diff --git a/core/src/structures/prioritization_fee_heap.rs b/core/src/structures/prioritization_fee_heap.rs index a3755765..0b5cb863 100644 --- a/core/src/structures/prioritization_fee_heap.rs +++ b/core/src/structures/prioritization_fee_heap.rs @@ -110,6 +110,14 @@ impl PrioritizationFeesHeap { pub async fn size(&self) -> usize { self.map.lock().await.signatures.len() } + + pub async fn clear(&self) -> usize { + let mut lk = self.map.lock().await; + lk.map.clear(); + let size = lk.signatures.len(); + lk.signatures.clear(); + size + } } #[cfg(test)] diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index 90fd24c8..d87c6a38 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -45,6 +45,9 @@ lazy_static::lazy_static! { register_int_gauge!(opts!("literpc_quic_finish_timedout", "Number of times finish timedout")).unwrap(); static ref NB_QUIC_FINISH_ERRORED: GenericGauge = register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap(); + + static ref NB_QUIC_CONNECTIONS: GenericGauge = + register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); } const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -210,6 +213,7 @@ impl QuicConnectionUtils { }; match conn { Ok(conn) => { + NB_QUIC_CONNECTIONS.inc(); return Some(conn); } Err(e) => { diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 8d05343d..fba8a28c 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -33,8 +33,6 @@ use crate::{ }; lazy_static::lazy_static! { - static ref NB_QUIC_CONNECTIONS: GenericGauge = - register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge = register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap(); static ref NB_CONNECTIONS_TO_KEEP: GenericGauge = @@ -48,7 +46,7 @@ lazy_static::lazy_static! { )) .unwrap(); - static ref TRANSACTIONS_IN_HEAP: GenericGauge = + static ref TRANSACTIONS_IN_HEAP: GenericGauge = register_int_gauge!(opts!("literpc_transactions_in_priority_heap", "Number of transactions in priority heap")).unwrap(); } @@ -88,19 +86,41 @@ impl ActiveConnection { addr: SocketAddr, identity_stakes: IdentityStakesData, ) { - let priorization_heap = PrioritizationFeesHeap::new(2048); let fill_notify = Arc::new(Notify::new()); let identity = self.identity; + NB_QUIC_ACTIVE_CONNECTIONS.inc(); + + let max_number_of_connections = self.connection_parameters.max_number_of_connections; + + let max_uni_stream_connections = compute_max_allowed_uni_streams( + identity_stakes.peer_type, + identity_stakes.stakes, + identity_stakes.total_stakes, + ); + let exit_signal = self.exit_signal.clone(); + let connection_pool = QuicConnectionPool::new( + identity, + self.endpoints.clone(), + addr, + self.connection_parameters, + exit_signal.clone(), + max_number_of_connections, + max_uni_stream_connections, + ); + + let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections); + let heap_filler_task = { let priorization_heap = priorization_heap.clone(); let data_cache = self.data_cache.clone(); let fill_notify = fill_notify.clone(); + let exit_signal = exit_signal.clone(); tokio::spawn(async move { let mut current_blockheight = data_cache.block_information_store.get_last_blockheight(); - loop { + while !exit_signal.load(Ordering::Relaxed) { let tx = transaction_reciever.recv().await; match tx { Ok(transaction_sent_info) => { @@ -140,26 +160,6 @@ impl ActiveConnection { }) }; - NB_QUIC_ACTIVE_CONNECTIONS.inc(); - - let max_number_of_connections = self.connection_parameters.max_number_of_connections; - - let max_uni_stream_connections = compute_max_allowed_uni_streams( - identity_stakes.peer_type, - identity_stakes.stakes, - identity_stakes.total_stakes, - ); - let exit_signal = self.exit_signal.clone(); - let connection_pool = QuicConnectionPool::new( - identity, - self.endpoints.clone(), - addr, - self.connection_parameters, - exit_signal.clone(), - max_number_of_connections, - max_uni_stream_connections, - ); - 'main_loop: loop { // exit signal set if exit_signal.load(Ordering::Relaxed) { @@ -217,7 +217,8 @@ impl ActiveConnection { } heap_filler_task.abort(); - NB_QUIC_CONNECTIONS.dec(); + let elements_removed = priorization_heap.clear().await; + TRANSACTIONS_IN_HEAP.sub(elements_removed as i64); NB_QUIC_ACTIVE_CONNECTIONS.dec(); } From 4bcc9ddb6c09773c6c5663ba5995d9fc7a9f03b4 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 19 Mar 2024 14:37:08 +0100 Subject: [PATCH 3/5] Adding quic timers metrics, more quic bugfixes --- .../src/structures/prioritization_fee_heap.rs | 2 +- lite-rpc/src/cli.rs | 6 ++-- .../tests/quic_proxy_tpu_integrationtest.rs | 2 +- services/src/quic_connection.rs | 21 +++++++----- services/src/quic_connection_utils.rs | 33 +++++++++++++++++-- services/src/transaction_service.rs | 12 ++++--- 6 files changed, 55 insertions(+), 21 deletions(-) diff --git a/core/src/structures/prioritization_fee_heap.rs b/core/src/structures/prioritization_fee_heap.rs index 0b5cb863..c17aa4d6 100644 --- a/core/src/structures/prioritization_fee_heap.rs +++ b/core/src/structures/prioritization_fee_heap.rs @@ -194,7 +194,7 @@ mod tests { let mut height = 0; while instant.elapsed() < Duration::from_secs(45) { - let burst_count = rand::random::() % 1024 + 1; + let burst_count = rand::random::() % 128 + 1; for c in 0..burst_count { let prioritization_fee = rand::random::() % 100000; let info = SentTransactionInfo { diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index 1a850d30..c4f08fbb 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -296,10 +296,12 @@ fn quic_params_from_environment() -> Option { .map(|millis| millis.parse().unwrap()) .unwrap_or(quic_connection_parameters.number_of_transactions_per_unistream); - quic_connection_parameters.percentage_of_connection_limit_to_create_new = + quic_connection_parameters.unistreams_to_create_new_connection_in_percentage = env::var("QUIC_PERCENTAGE_TO_CREATE_NEW_CONNECTION") .map(|millis| millis.parse().unwrap()) - .unwrap_or(quic_connection_parameters.percentage_of_connection_limit_to_create_new); + .unwrap_or( + quic_connection_parameters.unistreams_to_create_new_connection_in_percentage, + ); Some(quic_connection_parameters) } diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index 8952b741..06ec9307 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -60,7 +60,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter unistream_timeout: Duration::from_secs(2), write_timeout: Duration::from_secs(2), number_of_transactions_per_unistream: 10, - percentage_of_connection_limit_to_create_new: 10, + unistreams_to_create_new_connection_in_percentage: 10, }; #[test] diff --git a/services/src/quic_connection.rs b/services/src/quic_connection.rs index 502908dd..82e68202 100644 --- a/services/src/quic_connection.rs +++ b/services/src/quic_connection.rs @@ -110,8 +110,14 @@ impl QuicConnection { } None => { NB_QUIC_CONNECTION_REQUESTED.inc(); + // so that only one instance is connecting + let mut lk = self.connection.write().await; + if lk.is_some() { + // connection has recently been established/ just use it + return (*lk).clone(); + } let connection = self.connect(false).await; - *self.connection.write().await = connection.clone(); + *lk = connection.clone(); self.has_connected_once.store(true, Ordering::Relaxed); connection } @@ -211,7 +217,7 @@ pub struct QuicConnectionPool { // counting semaphore is ideal way to manage backpressure on the connection // because a connection can create only N unistream connections transactions_in_sending_semaphore: Vec>, - permit_threshold: usize, + threshold_to_create_new_connection: usize, } pub struct PooledConnection { @@ -250,9 +256,9 @@ impl QuicConnectionPool { }); v }, - permit_threshold: max_number_of_unistream_connection - .saturating_mul(std::cmp::max( - connection_parameters.percentage_of_connection_limit_to_create_new, + threshold_to_create_new_connection: max_number_of_unistream_connection + .saturating_mul(std::cmp::min( + connection_parameters.unistreams_to_create_new_connection_in_percentage, 100, ) as usize) .saturating_div(100), @@ -266,7 +272,7 @@ impl QuicConnectionPool { if !connection.has_connected_atleast_once() || (connection.is_connected().await - && sem.available_permits() > self.permit_threshold) + && sem.available_permits() > self.threshold_to_create_new_connection) { // if it is connection is not yet connected even once or connection is still open if let Ok(permit) = sem.clone().try_acquire_owned() { @@ -289,9 +295,6 @@ impl QuicConnectionPool { let (permit, index) = self.get_permit_and_index().await?; // establish a connection if the connection has not yet been used let connection = self.connections[index].clone(); - if !connection.has_connected_atleast_once() { - connection.get_connection().await; - } Ok(PooledConnection { connection, permit }) } diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index d87c6a38..5a82b38a 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -1,5 +1,7 @@ use log::trace; -use prometheus::{core::GenericGauge, opts, register_int_gauge}; +use prometheus::{ + core::GenericGauge, histogram_opts, opts, register_histogram, register_int_gauge, Histogram, +}; use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, VarInt, @@ -48,6 +50,23 @@ lazy_static::lazy_static! { static ref NB_QUIC_CONNECTIONS: GenericGauge = register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); + + static ref TIME_OF_CONNECT: Histogram = register_histogram!(histogram_opts!( + "literpc_quic_connection_timer_histogram", + "Time to connect to the TPU port", + )) + .unwrap(); + static ref TIME_TO_WRITE: Histogram = register_histogram!(histogram_opts!( + "literpc_quic_write_timer_histogram", + "Time to write on the TPU port", + )) + .unwrap(); + + static ref TIME_TO_FINISH: Histogram = register_histogram!(histogram_opts!( + "literpc_quic_finish_timer_histogram", + "Time to finish on the TPU port", +)) +.unwrap(); } const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -66,7 +85,7 @@ pub struct QuicConnectionParameters { pub connection_retry_count: usize, pub max_number_of_connections: usize, pub number_of_transactions_per_unistream: usize, - pub percentage_of_connection_limit_to_create_new: u8, + pub unistreams_to_create_new_connection_in_percentage: u8, } impl Default for QuicConnectionParameters { @@ -79,7 +98,7 @@ impl Default for QuicConnectionParameters { connection_retry_count: 20, max_number_of_connections: 8, number_of_transactions_per_unistream: 1, - percentage_of_connection_limit_to_create_new: 75, + unistreams_to_create_new_connection_in_percentage: 50, } } } @@ -140,10 +159,12 @@ impl QuicConnectionUtils { addr: SocketAddr, connection_timeout: Duration, ) -> anyhow::Result { + let timer = TIME_OF_CONNECT.start_timer(); let connecting = endpoint.connect(addr, "connect")?; match timeout(connection_timeout, connecting).await { Ok(res) => match res { Ok(connection) => { + timer.observe_duration(); NB_QUIC_CONN_SUCCESSFUL.inc(); Ok(connection) } @@ -233,6 +254,7 @@ impl QuicConnectionUtils { identity: Pubkey, connection_params: QuicConnectionParameters, ) -> Result<(), QuicConnectionError> { + let timer = TIME_TO_WRITE.start_timer(); let write_timeout_res = timeout( connection_params.write_timeout, send_stream.write_all(tx.as_slice()), @@ -248,6 +270,8 @@ impl QuicConnectionUtils { ); NB_QUIC_WRITEALL_ERRORED.inc(); return Err(QuicConnectionError::ConnectionError { retry: true }); + } else { + timer.observe_duration(); } } Err(_) => { @@ -257,6 +281,7 @@ impl QuicConnectionUtils { } } + let timer: prometheus::HistogramTimer = TIME_TO_FINISH.start_timer(); let finish_timeout_res = timeout(connection_params.finalize_timeout, send_stream.finish()).await; match finish_timeout_res { @@ -269,6 +294,8 @@ impl QuicConnectionUtils { ); NB_QUIC_FINISH_ERRORED.inc(); return Err(QuicConnectionError::ConnectionError { retry: false }); + } else { + timer.observe_duration(); } } Err(_) => { diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index a5882228..2e0f0577 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -30,11 +30,11 @@ use tokio::{ }; lazy_static::lazy_static! { -static ref PRIORITY_FEES_HISTOGRAM: Histogram = register_histogram!(histogram_opts!( - "literpc_txs_priority_fee", - "Priority fees of transactions sent by lite-rpc", -)) -.unwrap(); + static ref PRIORITY_FEES_HISTOGRAM: Histogram = register_histogram!(histogram_opts!( + "literpc_txs_priority_fee", + "Priority fees of transactions sent by lite-rpc", + )) + .unwrap(); } #[derive(Clone)] @@ -203,3 +203,5 @@ impl TransactionService { Ok(signature.to_string()) } } + +mod test {} From 6802852bbef6c0c6e05d4cecafa32dc6f2c2a392 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 19 Mar 2024 16:08:27 +0100 Subject: [PATCH 4/5] Minor optimization while creating quic connection --- services/src/tpu_utils/tpu_connection_manager.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index fba8a28c..be3a56de 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -160,6 +160,16 @@ impl ActiveConnection { }) }; + // create atleast one connection before waiting from transactions + if let Ok(PooledConnection { connection, permit }) = + connection_pool.get_pooled_connection().await + { + tokio::task::spawn(async move { + let _permit = permit; + connection.get_connection().await; + }); + } + 'main_loop: loop { // exit signal set if exit_signal.load(Ordering::Relaxed) { From 1754e92ff3d2b402bc58738811a75d11254cae79 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 19 Mar 2024 16:59:34 +0100 Subject: [PATCH 5/5] Changing default value for create connection limit --- services/src/quic_connection_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index 5a82b38a..dc864610 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -98,7 +98,7 @@ impl Default for QuicConnectionParameters { connection_retry_count: 20, max_number_of_connections: 8, number_of_transactions_per_unistream: 1, - unistreams_to_create_new_connection_in_percentage: 50, + unistreams_to_create_new_connection_in_percentage: 10, } } }