From 4bcc9ddb6c09773c6c5663ba5995d9fc7a9f03b4 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 19 Mar 2024 14:37:08 +0100 Subject: [PATCH] Adding quic timers metrics, more quic bugfixes --- .../src/structures/prioritization_fee_heap.rs | 2 +- lite-rpc/src/cli.rs | 6 ++-- .../tests/quic_proxy_tpu_integrationtest.rs | 2 +- services/src/quic_connection.rs | 21 +++++++----- services/src/quic_connection_utils.rs | 33 +++++++++++++++++-- services/src/transaction_service.rs | 12 ++++--- 6 files changed, 55 insertions(+), 21 deletions(-) diff --git a/core/src/structures/prioritization_fee_heap.rs b/core/src/structures/prioritization_fee_heap.rs index 0b5cb863..c17aa4d6 100644 --- a/core/src/structures/prioritization_fee_heap.rs +++ b/core/src/structures/prioritization_fee_heap.rs @@ -194,7 +194,7 @@ mod tests { let mut height = 0; while instant.elapsed() < Duration::from_secs(45) { - let burst_count = rand::random::() % 1024 + 1; + let burst_count = rand::random::() % 128 + 1; for c in 0..burst_count { let prioritization_fee = rand::random::() % 100000; let info = SentTransactionInfo { diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index 1a850d30..c4f08fbb 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -296,10 +296,12 @@ fn quic_params_from_environment() -> Option { .map(|millis| millis.parse().unwrap()) .unwrap_or(quic_connection_parameters.number_of_transactions_per_unistream); - quic_connection_parameters.percentage_of_connection_limit_to_create_new = + quic_connection_parameters.unistreams_to_create_new_connection_in_percentage = env::var("QUIC_PERCENTAGE_TO_CREATE_NEW_CONNECTION") .map(|millis| millis.parse().unwrap()) - .unwrap_or(quic_connection_parameters.percentage_of_connection_limit_to_create_new); + .unwrap_or( + quic_connection_parameters.unistreams_to_create_new_connection_in_percentage, + ); Some(quic_connection_parameters) } diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index 8952b741..06ec9307 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -60,7 +60,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter unistream_timeout: Duration::from_secs(2), write_timeout: Duration::from_secs(2), number_of_transactions_per_unistream: 10, - percentage_of_connection_limit_to_create_new: 10, + unistreams_to_create_new_connection_in_percentage: 10, }; #[test] diff --git a/services/src/quic_connection.rs b/services/src/quic_connection.rs index 502908dd..82e68202 100644 --- a/services/src/quic_connection.rs +++ b/services/src/quic_connection.rs @@ -110,8 +110,14 @@ impl QuicConnection { } None => { NB_QUIC_CONNECTION_REQUESTED.inc(); + // so that only one instance is connecting + let mut lk = self.connection.write().await; + if lk.is_some() { + // connection has recently been established/ just use it + return (*lk).clone(); + } let connection = self.connect(false).await; - *self.connection.write().await = connection.clone(); + *lk = connection.clone(); self.has_connected_once.store(true, Ordering::Relaxed); connection } @@ -211,7 +217,7 @@ pub struct QuicConnectionPool { // counting semaphore is ideal way to manage backpressure on the connection // because a connection can create only N unistream connections transactions_in_sending_semaphore: Vec>, - permit_threshold: usize, + threshold_to_create_new_connection: usize, } pub struct PooledConnection { @@ -250,9 +256,9 @@ impl QuicConnectionPool { }); v }, - permit_threshold: max_number_of_unistream_connection - .saturating_mul(std::cmp::max( - connection_parameters.percentage_of_connection_limit_to_create_new, + threshold_to_create_new_connection: max_number_of_unistream_connection + .saturating_mul(std::cmp::min( + connection_parameters.unistreams_to_create_new_connection_in_percentage, 100, ) as usize) .saturating_div(100), @@ -266,7 +272,7 @@ impl QuicConnectionPool { if !connection.has_connected_atleast_once() || (connection.is_connected().await - && sem.available_permits() > self.permit_threshold) + && sem.available_permits() > self.threshold_to_create_new_connection) { // if it is connection is not yet connected even once or connection is still open if let Ok(permit) = sem.clone().try_acquire_owned() { @@ -289,9 +295,6 @@ impl QuicConnectionPool { let (permit, index) = self.get_permit_and_index().await?; // establish a connection if the connection has not yet been used let connection = self.connections[index].clone(); - if !connection.has_connected_atleast_once() { - connection.get_connection().await; - } Ok(PooledConnection { connection, permit }) } diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index d87c6a38..5a82b38a 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -1,5 +1,7 @@ use log::trace; -use prometheus::{core::GenericGauge, opts, register_int_gauge}; +use prometheus::{ + core::GenericGauge, histogram_opts, opts, register_histogram, register_int_gauge, Histogram, +}; use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, VarInt, @@ -48,6 +50,23 @@ lazy_static::lazy_static! { static ref NB_QUIC_CONNECTIONS: GenericGauge = register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); + + static ref TIME_OF_CONNECT: Histogram = register_histogram!(histogram_opts!( + "literpc_quic_connection_timer_histogram", + "Time to connect to the TPU port", + )) + .unwrap(); + static ref TIME_TO_WRITE: Histogram = register_histogram!(histogram_opts!( + "literpc_quic_write_timer_histogram", + "Time to write on the TPU port", + )) + .unwrap(); + + static ref TIME_TO_FINISH: Histogram = register_histogram!(histogram_opts!( + "literpc_quic_finish_timer_histogram", + "Time to finish on the TPU port", +)) +.unwrap(); } const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; @@ -66,7 +85,7 @@ pub struct QuicConnectionParameters { pub connection_retry_count: usize, pub max_number_of_connections: usize, pub number_of_transactions_per_unistream: usize, - pub percentage_of_connection_limit_to_create_new: u8, + pub unistreams_to_create_new_connection_in_percentage: u8, } impl Default for QuicConnectionParameters { @@ -79,7 +98,7 @@ impl Default for QuicConnectionParameters { connection_retry_count: 20, max_number_of_connections: 8, number_of_transactions_per_unistream: 1, - percentage_of_connection_limit_to_create_new: 75, + unistreams_to_create_new_connection_in_percentage: 50, } } } @@ -140,10 +159,12 @@ impl QuicConnectionUtils { addr: SocketAddr, connection_timeout: Duration, ) -> anyhow::Result { + let timer = TIME_OF_CONNECT.start_timer(); let connecting = endpoint.connect(addr, "connect")?; match timeout(connection_timeout, connecting).await { Ok(res) => match res { Ok(connection) => { + timer.observe_duration(); NB_QUIC_CONN_SUCCESSFUL.inc(); Ok(connection) } @@ -233,6 +254,7 @@ impl QuicConnectionUtils { identity: Pubkey, connection_params: QuicConnectionParameters, ) -> Result<(), QuicConnectionError> { + let timer = TIME_TO_WRITE.start_timer(); let write_timeout_res = timeout( connection_params.write_timeout, send_stream.write_all(tx.as_slice()), @@ -248,6 +270,8 @@ impl QuicConnectionUtils { ); NB_QUIC_WRITEALL_ERRORED.inc(); return Err(QuicConnectionError::ConnectionError { retry: true }); + } else { + timer.observe_duration(); } } Err(_) => { @@ -257,6 +281,7 @@ impl QuicConnectionUtils { } } + let timer: prometheus::HistogramTimer = TIME_TO_FINISH.start_timer(); let finish_timeout_res = timeout(connection_params.finalize_timeout, send_stream.finish()).await; match finish_timeout_res { @@ -269,6 +294,8 @@ impl QuicConnectionUtils { ); NB_QUIC_FINISH_ERRORED.inc(); return Err(QuicConnectionError::ConnectionError { retry: false }); + } else { + timer.observe_duration(); } } Err(_) => { diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index a5882228..2e0f0577 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -30,11 +30,11 @@ use tokio::{ }; lazy_static::lazy_static! { -static ref PRIORITY_FEES_HISTOGRAM: Histogram = register_histogram!(histogram_opts!( - "literpc_txs_priority_fee", - "Priority fees of transactions sent by lite-rpc", -)) -.unwrap(); + static ref PRIORITY_FEES_HISTOGRAM: Histogram = register_histogram!(histogram_opts!( + "literpc_txs_priority_fee", + "Priority fees of transactions sent by lite-rpc", + )) + .unwrap(); } #[derive(Clone)] @@ -203,3 +203,5 @@ impl TransactionService { Ok(signature.to_string()) } } + +mod test {}