diff --git a/core/src/structures/prioritization_fee_heap.rs b/core/src/structures/prioritization_fee_heap.rs index 22a510c8..6bd26a8a 100644 --- a/core/src/structures/prioritization_fee_heap.rs +++ b/core/src/structures/prioritization_fee_heap.rs @@ -112,6 +112,14 @@ impl PrioritizationFeesHeap { pub async fn size(&self) -> usize { self.map.lock().await.signatures.len() } + + pub async fn clear(&self) -> usize { + let mut lk = self.map.lock().await; + lk.map.clear(); + let size = lk.signatures.len(); + lk.signatures.clear(); + size + } } #[cfg(test)] @@ -189,8 +197,8 @@ mod tests { let mut height = 0; while instant.elapsed() < Duration::from_secs(45) { - let burst_count = rand::random::() % 1024 + 1; - for _ in 0..burst_count { + let burst_count = rand::random::() % 128 + 1; + for _c in 0..burst_count { let prioritization_fee = rand::random::() % 100000; let info = SentTransactionInfo { signature: Signature::new_unique(), diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index 82aba777..b8c334ca 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -389,10 +389,12 @@ fn quic_params_from_environment() -> Option { .map(|millis| millis.parse().unwrap()) .unwrap_or(quic_connection_parameters.number_of_transactions_per_unistream); - quic_connection_parameters.percentage_of_connection_limit_to_create_new = + quic_connection_parameters.unistreams_to_create_new_connection_in_percentage = env::var("QUIC_PERCENTAGE_TO_CREATE_NEW_CONNECTION") .map(|millis| millis.parse().unwrap()) - .unwrap_or(quic_connection_parameters.percentage_of_connection_limit_to_create_new); + .unwrap_or( + quic_connection_parameters.unistreams_to_create_new_connection_in_percentage, + ); Some(quic_connection_parameters) } diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index eaceadb1..2033dd95 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -60,7 +60,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter unistream_timeout: Duration::from_secs(2), write_timeout: Duration::from_secs(2), number_of_transactions_per_unistream: 10, - percentage_of_connection_limit_to_create_new: 10, + unistreams_to_create_new_connection_in_percentage: 10, }; #[test] diff --git a/services/src/quic_connection.rs b/services/src/quic_connection.rs index 502908dd..82e68202 100644 --- a/services/src/quic_connection.rs +++ b/services/src/quic_connection.rs @@ -110,8 +110,14 @@ impl QuicConnection { } None => { NB_QUIC_CONNECTION_REQUESTED.inc(); + // so that only one instance is connecting + let mut lk = self.connection.write().await; + if lk.is_some() { + // connection has recently been established/ just use it + return (*lk).clone(); + } let connection = self.connect(false).await; - *self.connection.write().await = connection.clone(); + *lk = connection.clone(); self.has_connected_once.store(true, Ordering::Relaxed); connection } @@ -211,7 +217,7 @@ pub struct QuicConnectionPool { // counting semaphore is ideal way to manage backpressure on the connection // because a connection can create only N unistream connections transactions_in_sending_semaphore: Vec>, - permit_threshold: usize, + threshold_to_create_new_connection: usize, } pub struct PooledConnection { @@ -250,9 +256,9 @@ impl QuicConnectionPool { }); v }, - permit_threshold: max_number_of_unistream_connection - .saturating_mul(std::cmp::max( - connection_parameters.percentage_of_connection_limit_to_create_new, + threshold_to_create_new_connection: max_number_of_unistream_connection + .saturating_mul(std::cmp::min( + connection_parameters.unistreams_to_create_new_connection_in_percentage, 100, ) as usize) .saturating_div(100), @@ -266,7 +272,7 @@ impl QuicConnectionPool { if !connection.has_connected_atleast_once() || (connection.is_connected().await - && sem.available_permits() > self.permit_threshold) + && sem.available_permits() > self.threshold_to_create_new_connection) { // if it is connection is not yet connected even once or connection is still open if let Ok(permit) = sem.clone().try_acquire_owned() { @@ -289,9 +295,6 @@ impl QuicConnectionPool { let (permit, index) = self.get_permit_and_index().await?; // establish a connection if the connection has not yet been used let connection = self.connections[index].clone(); - if !connection.has_connected_atleast_once() { - connection.get_connection().await; - } Ok(PooledConnection { connection, permit }) } diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index bc3749d4..dc864610 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -1,5 +1,7 @@ use log::trace; -use prometheus::{core::GenericGauge, opts, register_int_gauge}; +use prometheus::{ + core::GenericGauge, histogram_opts, opts, register_histogram, register_int_gauge, Histogram, +}; use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, VarInt, @@ -45,6 +47,26 @@ lazy_static::lazy_static! { register_int_gauge!(opts!("literpc_quic_finish_timedout", "Number of times finish timedout")).unwrap(); static ref NB_QUIC_FINISH_ERRORED: GenericGauge = register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap(); + + static ref NB_QUIC_CONNECTIONS: GenericGauge = + register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); + + static ref TIME_OF_CONNECT: Histogram = register_histogram!(histogram_opts!( + "literpc_quic_connection_timer_histogram", + "Time to connect to the TPU port", + )) + .unwrap(); + static ref TIME_TO_WRITE: Histogram = register_histogram!(histogram_opts!( + "literpc_quic_write_timer_histogram", + "Time to write on the TPU port", + )) + .unwrap(); + + static ref TIME_TO_FINISH: Histogram = register_histogram!(histogram_opts!( + "literpc_quic_finish_timer_histogram", + "Time to finish on the TPU port", +)) +.unwrap(); } const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -63,20 +85,20 @@ pub struct QuicConnectionParameters { pub connection_retry_count: usize, pub max_number_of_connections: usize, pub number_of_transactions_per_unistream: usize, - pub percentage_of_connection_limit_to_create_new: u8, + pub unistreams_to_create_new_connection_in_percentage: u8, } impl Default for QuicConnectionParameters { fn default() -> Self { Self { - connection_timeout: Duration::from_millis(5000), - unistream_timeout: Duration::from_millis(5000), - write_timeout: Duration::from_millis(5000), - finalize_timeout: Duration::from_millis(5000), + connection_timeout: Duration::from_millis(10000), + unistream_timeout: Duration::from_millis(10000), + write_timeout: Duration::from_millis(10000), + finalize_timeout: Duration::from_millis(10000), connection_retry_count: 20, max_number_of_connections: 8, number_of_transactions_per_unistream: 1, - percentage_of_connection_limit_to_create_new: 50, + unistreams_to_create_new_connection_in_percentage: 10, } } } @@ -137,10 +159,12 @@ impl QuicConnectionUtils { addr: SocketAddr, connection_timeout: Duration, ) -> anyhow::Result { + let timer = TIME_OF_CONNECT.start_timer(); let connecting = endpoint.connect(addr, "connect")?; match timeout(connection_timeout, connecting).await { Ok(res) => match res { Ok(connection) => { + timer.observe_duration(); NB_QUIC_CONN_SUCCESSFUL.inc(); Ok(connection) } @@ -210,6 +234,7 @@ impl QuicConnectionUtils { }; match conn { Ok(conn) => { + NB_QUIC_CONNECTIONS.inc(); return Some(conn); } Err(e) => { @@ -229,6 +254,7 @@ impl QuicConnectionUtils { identity: Pubkey, connection_params: QuicConnectionParameters, ) -> Result<(), QuicConnectionError> { + let timer = TIME_TO_WRITE.start_timer(); let write_timeout_res = timeout( connection_params.write_timeout, send_stream.write_all(tx.as_slice()), @@ -244,6 +270,8 @@ impl QuicConnectionUtils { ); NB_QUIC_WRITEALL_ERRORED.inc(); return Err(QuicConnectionError::ConnectionError { retry: true }); + } else { + timer.observe_duration(); } } Err(_) => { @@ -253,6 +281,7 @@ impl QuicConnectionUtils { } } + let timer: prometheus::HistogramTimer = TIME_TO_FINISH.start_timer(); let finish_timeout_res = timeout(connection_params.finalize_timeout, send_stream.finish()).await; match finish_timeout_res { @@ -265,6 +294,8 @@ impl QuicConnectionUtils { ); NB_QUIC_FINISH_ERRORED.inc(); return Err(QuicConnectionError::ConnectionError { retry: false }); + } else { + timer.observe_duration(); } } Err(_) => { diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index e87ad46d..70913667 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -33,8 +33,6 @@ use crate::{ }; lazy_static::lazy_static! { - static ref NB_QUIC_CONNECTIONS: GenericGauge = - register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge = register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap(); static ref NB_CONNECTIONS_TO_KEEP: GenericGauge = @@ -46,6 +44,9 @@ lazy_static::lazy_static! { "Time to send transaction batch", )) .unwrap(); + + static ref TRANSACTIONS_IN_HEAP: GenericGauge = + register_int_gauge!(opts!("literpc_transactions_in_priority_heap", "Number of transactions in priority heap")).unwrap(); } #[derive(Clone)] @@ -84,19 +85,41 @@ impl ActiveConnection { addr: SocketAddr, identity_stakes: IdentityStakesData, ) { - let priorization_heap = PrioritizationFeesHeap::new(2048); let fill_notify = Arc::new(Notify::new()); let identity = self.identity; + NB_QUIC_ACTIVE_CONNECTIONS.inc(); + + let max_number_of_connections = self.connection_parameters.max_number_of_connections; + + let max_uni_stream_connections = compute_max_allowed_uni_streams( + identity_stakes.peer_type, + identity_stakes.stakes, + identity_stakes.total_stakes, + ); + let exit_signal = self.exit_signal.clone(); + let connection_pool = QuicConnectionPool::new( + identity, + self.endpoints.clone(), + addr, + self.connection_parameters, + exit_signal.clone(), + max_number_of_connections, + max_uni_stream_connections, + ); + + let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections); + let heap_filler_task = { let priorization_heap = priorization_heap.clone(); let data_cache = self.data_cache.clone(); let fill_notify = fill_notify.clone(); + let exit_signal = exit_signal.clone(); tokio::spawn(async move { let mut current_blockheight = data_cache.block_information_store.get_last_blockheight(); - loop { + while !exit_signal.load(Ordering::Relaxed) { let tx = transaction_reciever.recv().await; match tx { Ok(transaction_sent_info) => { @@ -108,6 +131,8 @@ impl ActiveConnection { } priorization_heap.insert(transaction_sent_info).await; + TRANSACTIONS_IN_HEAP.inc(); + fill_notify.notify_one(); // give little more priority to read the transaction sender with this wait let last_blockheight = @@ -134,25 +159,15 @@ impl ActiveConnection { }) }; - NB_QUIC_ACTIVE_CONNECTIONS.inc(); - - let max_number_of_connections = self.connection_parameters.max_number_of_connections; - - let max_uni_stream_connections = compute_max_allowed_uni_streams( - identity_stakes.peer_type, - identity_stakes.stakes, - identity_stakes.total_stakes, - ); - let exit_signal = self.exit_signal.clone(); - let connection_pool = QuicConnectionPool::new( - identity, - self.endpoints.clone(), - addr, - self.connection_parameters, - exit_signal.clone(), - max_number_of_connections, - max_uni_stream_connections, - ); + // create atleast one connection before waiting from transactions + if let Ok(PooledConnection { connection, permit }) = + connection_pool.get_pooled_connection().await + { + tokio::task::spawn(async move { + let _permit = permit; + connection.get_connection().await; + }); + } 'main_loop: loop { // exit signal set @@ -173,6 +188,7 @@ impl ActiveConnection { // wait to get notification from fill event break; }; + TRANSACTIONS_IN_HEAP.dec(); // check if transaction is already confirmed if self.data_cache.txs.is_transaction_confirmed(&tx.signature) { @@ -193,8 +209,12 @@ impl ActiveConnection { tokio::spawn(async move { // permit will be used to send all the transaction and then destroyed let _permit = permit; + let timer = TT_SENT_TIMER.start_timer(); + NB_QUIC_TASKS.inc(); + connection.send_transaction(tx.transaction).await; + timer.observe_duration(); NB_QUIC_TASKS.dec(); }); } @@ -207,7 +227,8 @@ impl ActiveConnection { } heap_filler_task.abort(); - NB_QUIC_CONNECTIONS.dec(); + let elements_removed = priorization_heap.clear().await; + TRANSACTIONS_IN_HEAP.sub(elements_removed as i64); NB_QUIC_ACTIVE_CONNECTIONS.dec(); } diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index dfc1f87d..2c00efd6 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -9,6 +9,7 @@ use crate::{ tx_sender::TxSender, }; use anyhow::bail; +use prometheus::{histogram_opts, register_histogram, Histogram}; use solana_lite_rpc_core::{ solana_utils::SerializableTransaction, structures::transaction_sent_info::SentTransactionInfo, types::SlotStream, @@ -28,6 +29,14 @@ use tokio::{ time::Instant, }; +lazy_static::lazy_static! { + static ref PRIORITY_FEES_HISTOGRAM: Histogram = register_histogram!(histogram_opts!( + "literpc_txs_priority_fee", + "Priority fees of transactions sent by lite-rpc", + )) + .unwrap(); +} + #[derive(Clone)] pub struct TransactionServiceBuilder { tx_sender: TxSender, @@ -157,6 +166,8 @@ impl TransactionService { prioritization_fee }; + PRIORITY_FEES_HISTOGRAM.observe(prioritization_fee as f64); + let max_replay = max_retries.map_or(self.max_retries, |x| x as usize); let transaction_info = SentTransactionInfo { signature, @@ -192,3 +203,5 @@ impl TransactionService { Ok(signature.to_string()) } } + +mod test {}