Adding quic timers metrics, more quic bugfixes
This commit is contained in:
parent
73d0b069be
commit
4bcc9ddb6c
|
@ -194,7 +194,7 @@ mod tests {
|
|||
|
||||
let mut height = 0;
|
||||
while instant.elapsed() < Duration::from_secs(45) {
|
||||
let burst_count = rand::random::<u64>() % 1024 + 1;
|
||||
let burst_count = rand::random::<u64>() % 128 + 1;
|
||||
for c in 0..burst_count {
|
||||
let prioritization_fee = rand::random::<u64>() % 100000;
|
||||
let info = SentTransactionInfo {
|
||||
|
|
|
@ -296,10 +296,12 @@ fn quic_params_from_environment() -> Option<QuicConnectionParameters> {
|
|||
.map(|millis| millis.parse().unwrap())
|
||||
.unwrap_or(quic_connection_parameters.number_of_transactions_per_unistream);
|
||||
|
||||
quic_connection_parameters.percentage_of_connection_limit_to_create_new =
|
||||
quic_connection_parameters.unistreams_to_create_new_connection_in_percentage =
|
||||
env::var("QUIC_PERCENTAGE_TO_CREATE_NEW_CONNECTION")
|
||||
.map(|millis| millis.parse().unwrap())
|
||||
.unwrap_or(quic_connection_parameters.percentage_of_connection_limit_to_create_new);
|
||||
.unwrap_or(
|
||||
quic_connection_parameters.unistreams_to_create_new_connection_in_percentage,
|
||||
);
|
||||
|
||||
Some(quic_connection_parameters)
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
|
|||
unistream_timeout: Duration::from_secs(2),
|
||||
write_timeout: Duration::from_secs(2),
|
||||
number_of_transactions_per_unistream: 10,
|
||||
percentage_of_connection_limit_to_create_new: 10,
|
||||
unistreams_to_create_new_connection_in_percentage: 10,
|
||||
};
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -110,8 +110,14 @@ impl QuicConnection {
|
|||
}
|
||||
None => {
|
||||
NB_QUIC_CONNECTION_REQUESTED.inc();
|
||||
// so that only one instance is connecting
|
||||
let mut lk = self.connection.write().await;
|
||||
if lk.is_some() {
|
||||
// connection has recently been established/ just use it
|
||||
return (*lk).clone();
|
||||
}
|
||||
let connection = self.connect(false).await;
|
||||
*self.connection.write().await = connection.clone();
|
||||
*lk = connection.clone();
|
||||
self.has_connected_once.store(true, Ordering::Relaxed);
|
||||
connection
|
||||
}
|
||||
|
@ -211,7 +217,7 @@ pub struct QuicConnectionPool {
|
|||
// counting semaphore is ideal way to manage backpressure on the connection
|
||||
// because a connection can create only N unistream connections
|
||||
transactions_in_sending_semaphore: Vec<Arc<Semaphore>>,
|
||||
permit_threshold: usize,
|
||||
threshold_to_create_new_connection: usize,
|
||||
}
|
||||
|
||||
pub struct PooledConnection {
|
||||
|
@ -250,9 +256,9 @@ impl QuicConnectionPool {
|
|||
});
|
||||
v
|
||||
},
|
||||
permit_threshold: max_number_of_unistream_connection
|
||||
.saturating_mul(std::cmp::max(
|
||||
connection_parameters.percentage_of_connection_limit_to_create_new,
|
||||
threshold_to_create_new_connection: max_number_of_unistream_connection
|
||||
.saturating_mul(std::cmp::min(
|
||||
connection_parameters.unistreams_to_create_new_connection_in_percentage,
|
||||
100,
|
||||
) as usize)
|
||||
.saturating_div(100),
|
||||
|
@ -266,7 +272,7 @@ impl QuicConnectionPool {
|
|||
|
||||
if !connection.has_connected_atleast_once()
|
||||
|| (connection.is_connected().await
|
||||
&& sem.available_permits() > self.permit_threshold)
|
||||
&& sem.available_permits() > self.threshold_to_create_new_connection)
|
||||
{
|
||||
// if it is connection is not yet connected even once or connection is still open
|
||||
if let Ok(permit) = sem.clone().try_acquire_owned() {
|
||||
|
@ -289,9 +295,6 @@ impl QuicConnectionPool {
|
|||
let (permit, index) = self.get_permit_and_index().await?;
|
||||
// establish a connection if the connection has not yet been used
|
||||
let connection = self.connections[index].clone();
|
||||
if !connection.has_connected_atleast_once() {
|
||||
connection.get_connection().await;
|
||||
}
|
||||
Ok(PooledConnection { connection, permit })
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use log::trace;
|
||||
use prometheus::{core::GenericGauge, opts, register_int_gauge};
|
||||
use prometheus::{
|
||||
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_gauge, Histogram,
|
||||
};
|
||||
use quinn::{
|
||||
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
|
||||
TokioRuntime, TransportConfig, VarInt,
|
||||
|
@ -48,6 +50,23 @@ lazy_static::lazy_static! {
|
|||
|
||||
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
|
||||
|
||||
static ref TIME_OF_CONNECT: Histogram = register_histogram!(histogram_opts!(
|
||||
"literpc_quic_connection_timer_histogram",
|
||||
"Time to connect to the TPU port",
|
||||
))
|
||||
.unwrap();
|
||||
static ref TIME_TO_WRITE: Histogram = register_histogram!(histogram_opts!(
|
||||
"literpc_quic_write_timer_histogram",
|
||||
"Time to write on the TPU port",
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
static ref TIME_TO_FINISH: Histogram = register_histogram!(histogram_opts!(
|
||||
"literpc_quic_finish_timer_histogram",
|
||||
"Time to finish on the TPU port",
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
|
||||
|
@ -66,7 +85,7 @@ pub struct QuicConnectionParameters {
|
|||
pub connection_retry_count: usize,
|
||||
pub max_number_of_connections: usize,
|
||||
pub number_of_transactions_per_unistream: usize,
|
||||
pub percentage_of_connection_limit_to_create_new: u8,
|
||||
pub unistreams_to_create_new_connection_in_percentage: u8,
|
||||
}
|
||||
|
||||
impl Default for QuicConnectionParameters {
|
||||
|
@ -79,7 +98,7 @@ impl Default for QuicConnectionParameters {
|
|||
connection_retry_count: 20,
|
||||
max_number_of_connections: 8,
|
||||
number_of_transactions_per_unistream: 1,
|
||||
percentage_of_connection_limit_to_create_new: 75,
|
||||
unistreams_to_create_new_connection_in_percentage: 50,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -140,10 +159,12 @@ impl QuicConnectionUtils {
|
|||
addr: SocketAddr,
|
||||
connection_timeout: Duration,
|
||||
) -> anyhow::Result<Connection> {
|
||||
let timer = TIME_OF_CONNECT.start_timer();
|
||||
let connecting = endpoint.connect(addr, "connect")?;
|
||||
match timeout(connection_timeout, connecting).await {
|
||||
Ok(res) => match res {
|
||||
Ok(connection) => {
|
||||
timer.observe_duration();
|
||||
NB_QUIC_CONN_SUCCESSFUL.inc();
|
||||
Ok(connection)
|
||||
}
|
||||
|
@ -233,6 +254,7 @@ impl QuicConnectionUtils {
|
|||
identity: Pubkey,
|
||||
connection_params: QuicConnectionParameters,
|
||||
) -> Result<(), QuicConnectionError> {
|
||||
let timer = TIME_TO_WRITE.start_timer();
|
||||
let write_timeout_res = timeout(
|
||||
connection_params.write_timeout,
|
||||
send_stream.write_all(tx.as_slice()),
|
||||
|
@ -248,6 +270,8 @@ impl QuicConnectionUtils {
|
|||
);
|
||||
NB_QUIC_WRITEALL_ERRORED.inc();
|
||||
return Err(QuicConnectionError::ConnectionError { retry: true });
|
||||
} else {
|
||||
timer.observe_duration();
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
|
@ -257,6 +281,7 @@ impl QuicConnectionUtils {
|
|||
}
|
||||
}
|
||||
|
||||
let timer: prometheus::HistogramTimer = TIME_TO_FINISH.start_timer();
|
||||
let finish_timeout_res =
|
||||
timeout(connection_params.finalize_timeout, send_stream.finish()).await;
|
||||
match finish_timeout_res {
|
||||
|
@ -269,6 +294,8 @@ impl QuicConnectionUtils {
|
|||
);
|
||||
NB_QUIC_FINISH_ERRORED.inc();
|
||||
return Err(QuicConnectionError::ConnectionError { retry: false });
|
||||
} else {
|
||||
timer.observe_duration();
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
|
|
|
@ -30,11 +30,11 @@ use tokio::{
|
|||
};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref PRIORITY_FEES_HISTOGRAM: Histogram = register_histogram!(histogram_opts!(
|
||||
"literpc_txs_priority_fee",
|
||||
"Priority fees of transactions sent by lite-rpc",
|
||||
))
|
||||
.unwrap();
|
||||
static ref PRIORITY_FEES_HISTOGRAM: Histogram = register_histogram!(histogram_opts!(
|
||||
"literpc_txs_priority_fee",
|
||||
"Priority fees of transactions sent by lite-rpc",
|
||||
))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -203,3 +203,5 @@ impl TransactionService {
|
|||
Ok(signature.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
mod test {}
|
||||
|
|
Loading…
Reference in New Issue