Merge branch 'production'

This commit is contained in:
godmodegalactus 2024-03-19 18:09:59 +01:00
commit 35d13a25de
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
7 changed files with 123 additions and 45 deletions

View File

@ -112,6 +112,14 @@ impl PrioritizationFeesHeap {
pub async fn size(&self) -> usize { pub async fn size(&self) -> usize {
self.map.lock().await.signatures.len() self.map.lock().await.signatures.len()
} }
pub async fn clear(&self) -> usize {
let mut lk = self.map.lock().await;
lk.map.clear();
let size = lk.signatures.len();
lk.signatures.clear();
size
}
} }
#[cfg(test)] #[cfg(test)]
@ -189,8 +197,8 @@ mod tests {
let mut height = 0; let mut height = 0;
while instant.elapsed() < Duration::from_secs(45) { while instant.elapsed() < Duration::from_secs(45) {
let burst_count = rand::random::<u64>() % 1024 + 1; let burst_count = rand::random::<u64>() % 128 + 1;
for _ in 0..burst_count { for _c in 0..burst_count {
let prioritization_fee = rand::random::<u64>() % 100000; let prioritization_fee = rand::random::<u64>() % 100000;
let info = SentTransactionInfo { let info = SentTransactionInfo {
signature: Signature::new_unique(), signature: Signature::new_unique(),

View File

@ -389,10 +389,12 @@ fn quic_params_from_environment() -> Option<QuicConnectionParameters> {
.map(|millis| millis.parse().unwrap()) .map(|millis| millis.parse().unwrap())
.unwrap_or(quic_connection_parameters.number_of_transactions_per_unistream); .unwrap_or(quic_connection_parameters.number_of_transactions_per_unistream);
quic_connection_parameters.percentage_of_connection_limit_to_create_new = quic_connection_parameters.unistreams_to_create_new_connection_in_percentage =
env::var("QUIC_PERCENTAGE_TO_CREATE_NEW_CONNECTION") env::var("QUIC_PERCENTAGE_TO_CREATE_NEW_CONNECTION")
.map(|millis| millis.parse().unwrap()) .map(|millis| millis.parse().unwrap())
.unwrap_or(quic_connection_parameters.percentage_of_connection_limit_to_create_new); .unwrap_or(
quic_connection_parameters.unistreams_to_create_new_connection_in_percentage,
);
Some(quic_connection_parameters) Some(quic_connection_parameters)
} }

View File

@ -60,7 +60,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
unistream_timeout: Duration::from_secs(2), unistream_timeout: Duration::from_secs(2),
write_timeout: Duration::from_secs(2), write_timeout: Duration::from_secs(2),
number_of_transactions_per_unistream: 10, number_of_transactions_per_unistream: 10,
percentage_of_connection_limit_to_create_new: 10, unistreams_to_create_new_connection_in_percentage: 10,
}; };
#[test] #[test]

View File

@ -110,8 +110,14 @@ impl QuicConnection {
} }
None => { None => {
NB_QUIC_CONNECTION_REQUESTED.inc(); NB_QUIC_CONNECTION_REQUESTED.inc();
// so that only one instance is connecting
let mut lk = self.connection.write().await;
if lk.is_some() {
// connection has recently been established/ just use it
return (*lk).clone();
}
let connection = self.connect(false).await; let connection = self.connect(false).await;
*self.connection.write().await = connection.clone(); *lk = connection.clone();
self.has_connected_once.store(true, Ordering::Relaxed); self.has_connected_once.store(true, Ordering::Relaxed);
connection connection
} }
@ -211,7 +217,7 @@ pub struct QuicConnectionPool {
// counting semaphore is ideal way to manage backpressure on the connection // counting semaphore is ideal way to manage backpressure on the connection
// because a connection can create only N unistream connections // because a connection can create only N unistream connections
transactions_in_sending_semaphore: Vec<Arc<Semaphore>>, transactions_in_sending_semaphore: Vec<Arc<Semaphore>>,
permit_threshold: usize, threshold_to_create_new_connection: usize,
} }
pub struct PooledConnection { pub struct PooledConnection {
@ -250,9 +256,9 @@ impl QuicConnectionPool {
}); });
v v
}, },
permit_threshold: max_number_of_unistream_connection threshold_to_create_new_connection: max_number_of_unistream_connection
.saturating_mul(std::cmp::max( .saturating_mul(std::cmp::min(
connection_parameters.percentage_of_connection_limit_to_create_new, connection_parameters.unistreams_to_create_new_connection_in_percentage,
100, 100,
) as usize) ) as usize)
.saturating_div(100), .saturating_div(100),
@ -266,7 +272,7 @@ impl QuicConnectionPool {
if !connection.has_connected_atleast_once() if !connection.has_connected_atleast_once()
|| (connection.is_connected().await || (connection.is_connected().await
&& sem.available_permits() > self.permit_threshold) && sem.available_permits() > self.threshold_to_create_new_connection)
{ {
// if it is connection is not yet connected even once or connection is still open // if it is connection is not yet connected even once or connection is still open
if let Ok(permit) = sem.clone().try_acquire_owned() { if let Ok(permit) = sem.clone().try_acquire_owned() {
@ -289,9 +295,6 @@ impl QuicConnectionPool {
let (permit, index) = self.get_permit_and_index().await?; let (permit, index) = self.get_permit_and_index().await?;
// establish a connection if the connection has not yet been used // establish a connection if the connection has not yet been used
let connection = self.connections[index].clone(); let connection = self.connections[index].clone();
if !connection.has_connected_atleast_once() {
connection.get_connection().await;
}
Ok(PooledConnection { connection, permit }) Ok(PooledConnection { connection, permit })
} }

View File

@ -1,5 +1,7 @@
use log::trace; use log::trace;
use prometheus::{core::GenericGauge, opts, register_int_gauge}; use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_gauge, Histogram,
};
use quinn::{ use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig, VarInt, TokioRuntime, TransportConfig, VarInt,
@ -45,6 +47,26 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("literpc_quic_finish_timedout", "Number of times finish timedout")).unwrap(); register_int_gauge!(opts!("literpc_quic_finish_timedout", "Number of times finish timedout")).unwrap();
static ref NB_QUIC_FINISH_ERRORED: GenericGauge<prometheus::core::AtomicI64> = static ref NB_QUIC_FINISH_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap(); register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap();
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
static ref TIME_OF_CONNECT: Histogram = register_histogram!(histogram_opts!(
"literpc_quic_connection_timer_histogram",
"Time to connect to the TPU port",
))
.unwrap();
static ref TIME_TO_WRITE: Histogram = register_histogram!(histogram_opts!(
"literpc_quic_write_timer_histogram",
"Time to write on the TPU port",
))
.unwrap();
static ref TIME_TO_FINISH: Histogram = register_histogram!(histogram_opts!(
"literpc_quic_finish_timer_histogram",
"Time to finish on the TPU port",
))
.unwrap();
} }
const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
@ -63,20 +85,20 @@ pub struct QuicConnectionParameters {
pub connection_retry_count: usize, pub connection_retry_count: usize,
pub max_number_of_connections: usize, pub max_number_of_connections: usize,
pub number_of_transactions_per_unistream: usize, pub number_of_transactions_per_unistream: usize,
pub percentage_of_connection_limit_to_create_new: u8, pub unistreams_to_create_new_connection_in_percentage: u8,
} }
impl Default for QuicConnectionParameters { impl Default for QuicConnectionParameters {
fn default() -> Self { fn default() -> Self {
Self { Self {
connection_timeout: Duration::from_millis(5000), connection_timeout: Duration::from_millis(10000),
unistream_timeout: Duration::from_millis(5000), unistream_timeout: Duration::from_millis(10000),
write_timeout: Duration::from_millis(5000), write_timeout: Duration::from_millis(10000),
finalize_timeout: Duration::from_millis(5000), finalize_timeout: Duration::from_millis(10000),
connection_retry_count: 20, connection_retry_count: 20,
max_number_of_connections: 8, max_number_of_connections: 8,
number_of_transactions_per_unistream: 1, number_of_transactions_per_unistream: 1,
percentage_of_connection_limit_to_create_new: 50, unistreams_to_create_new_connection_in_percentage: 10,
} }
} }
} }
@ -137,10 +159,12 @@ impl QuicConnectionUtils {
addr: SocketAddr, addr: SocketAddr,
connection_timeout: Duration, connection_timeout: Duration,
) -> anyhow::Result<Connection> { ) -> anyhow::Result<Connection> {
let timer = TIME_OF_CONNECT.start_timer();
let connecting = endpoint.connect(addr, "connect")?; let connecting = endpoint.connect(addr, "connect")?;
match timeout(connection_timeout, connecting).await { match timeout(connection_timeout, connecting).await {
Ok(res) => match res { Ok(res) => match res {
Ok(connection) => { Ok(connection) => {
timer.observe_duration();
NB_QUIC_CONN_SUCCESSFUL.inc(); NB_QUIC_CONN_SUCCESSFUL.inc();
Ok(connection) Ok(connection)
} }
@ -210,6 +234,7 @@ impl QuicConnectionUtils {
}; };
match conn { match conn {
Ok(conn) => { Ok(conn) => {
NB_QUIC_CONNECTIONS.inc();
return Some(conn); return Some(conn);
} }
Err(e) => { Err(e) => {
@ -229,6 +254,7 @@ impl QuicConnectionUtils {
identity: Pubkey, identity: Pubkey,
connection_params: QuicConnectionParameters, connection_params: QuicConnectionParameters,
) -> Result<(), QuicConnectionError> { ) -> Result<(), QuicConnectionError> {
let timer = TIME_TO_WRITE.start_timer();
let write_timeout_res = timeout( let write_timeout_res = timeout(
connection_params.write_timeout, connection_params.write_timeout,
send_stream.write_all(tx.as_slice()), send_stream.write_all(tx.as_slice()),
@ -244,6 +270,8 @@ impl QuicConnectionUtils {
); );
NB_QUIC_WRITEALL_ERRORED.inc(); NB_QUIC_WRITEALL_ERRORED.inc();
return Err(QuicConnectionError::ConnectionError { retry: true }); return Err(QuicConnectionError::ConnectionError { retry: true });
} else {
timer.observe_duration();
} }
} }
Err(_) => { Err(_) => {
@ -253,6 +281,7 @@ impl QuicConnectionUtils {
} }
} }
let timer: prometheus::HistogramTimer = TIME_TO_FINISH.start_timer();
let finish_timeout_res = let finish_timeout_res =
timeout(connection_params.finalize_timeout, send_stream.finish()).await; timeout(connection_params.finalize_timeout, send_stream.finish()).await;
match finish_timeout_res { match finish_timeout_res {
@ -265,6 +294,8 @@ impl QuicConnectionUtils {
); );
NB_QUIC_FINISH_ERRORED.inc(); NB_QUIC_FINISH_ERRORED.inc();
return Err(QuicConnectionError::ConnectionError { retry: false }); return Err(QuicConnectionError::ConnectionError { retry: false });
} else {
timer.observe_duration();
} }
} }
Err(_) => { Err(_) => {

View File

@ -33,8 +33,6 @@ use crate::{
}; };
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> = static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap(); register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap();
static ref NB_CONNECTIONS_TO_KEEP: GenericGauge<prometheus::core::AtomicI64> = static ref NB_CONNECTIONS_TO_KEEP: GenericGauge<prometheus::core::AtomicI64> =
@ -46,6 +44,9 @@ lazy_static::lazy_static! {
"Time to send transaction batch", "Time to send transaction batch",
)) ))
.unwrap(); .unwrap();
static ref TRANSACTIONS_IN_HEAP: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_transactions_in_priority_heap", "Number of transactions in priority heap")).unwrap();
} }
#[derive(Clone)] #[derive(Clone)]
@ -84,19 +85,41 @@ impl ActiveConnection {
addr: SocketAddr, addr: SocketAddr,
identity_stakes: IdentityStakesData, identity_stakes: IdentityStakesData,
) { ) {
let priorization_heap = PrioritizationFeesHeap::new(2048);
let fill_notify = Arc::new(Notify::new()); let fill_notify = Arc::new(Notify::new());
let identity = self.identity; let identity = self.identity;
NB_QUIC_ACTIVE_CONNECTIONS.inc();
let max_number_of_connections = self.connection_parameters.max_number_of_connections;
let max_uni_stream_connections = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
);
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(
identity,
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_signal.clone(),
max_number_of_connections,
max_uni_stream_connections,
);
let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections);
let heap_filler_task = { let heap_filler_task = {
let priorization_heap = priorization_heap.clone(); let priorization_heap = priorization_heap.clone();
let data_cache = self.data_cache.clone(); let data_cache = self.data_cache.clone();
let fill_notify = fill_notify.clone(); let fill_notify = fill_notify.clone();
let exit_signal = exit_signal.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut current_blockheight = let mut current_blockheight =
data_cache.block_information_store.get_last_blockheight(); data_cache.block_information_store.get_last_blockheight();
loop { while !exit_signal.load(Ordering::Relaxed) {
let tx = transaction_reciever.recv().await; let tx = transaction_reciever.recv().await;
match tx { match tx {
Ok(transaction_sent_info) => { Ok(transaction_sent_info) => {
@ -108,6 +131,8 @@ impl ActiveConnection {
} }
priorization_heap.insert(transaction_sent_info).await; priorization_heap.insert(transaction_sent_info).await;
TRANSACTIONS_IN_HEAP.inc();
fill_notify.notify_one(); fill_notify.notify_one();
// give little more priority to read the transaction sender with this wait // give little more priority to read the transaction sender with this wait
let last_blockheight = let last_blockheight =
@ -134,25 +159,15 @@ impl ActiveConnection {
}) })
}; };
NB_QUIC_ACTIVE_CONNECTIONS.inc(); // create atleast one connection before waiting from transactions
if let Ok(PooledConnection { connection, permit }) =
let max_number_of_connections = self.connection_parameters.max_number_of_connections; connection_pool.get_pooled_connection().await
{
let max_uni_stream_connections = compute_max_allowed_uni_streams( tokio::task::spawn(async move {
identity_stakes.peer_type, let _permit = permit;
identity_stakes.stakes, connection.get_connection().await;
identity_stakes.total_stakes, });
); }
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(
identity,
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_signal.clone(),
max_number_of_connections,
max_uni_stream_connections,
);
'main_loop: loop { 'main_loop: loop {
// exit signal set // exit signal set
@ -173,6 +188,7 @@ impl ActiveConnection {
// wait to get notification from fill event // wait to get notification from fill event
break; break;
}; };
TRANSACTIONS_IN_HEAP.dec();
// check if transaction is already confirmed // check if transaction is already confirmed
if self.data_cache.txs.is_transaction_confirmed(&tx.signature) { if self.data_cache.txs.is_transaction_confirmed(&tx.signature) {
@ -193,8 +209,12 @@ impl ActiveConnection {
tokio::spawn(async move { tokio::spawn(async move {
// permit will be used to send all the transaction and then destroyed // permit will be used to send all the transaction and then destroyed
let _permit = permit; let _permit = permit;
let timer = TT_SENT_TIMER.start_timer();
NB_QUIC_TASKS.inc(); NB_QUIC_TASKS.inc();
connection.send_transaction(tx.transaction).await; connection.send_transaction(tx.transaction).await;
timer.observe_duration();
NB_QUIC_TASKS.dec(); NB_QUIC_TASKS.dec();
}); });
} }
@ -207,7 +227,8 @@ impl ActiveConnection {
} }
heap_filler_task.abort(); heap_filler_task.abort();
NB_QUIC_CONNECTIONS.dec(); let elements_removed = priorization_heap.clear().await;
TRANSACTIONS_IN_HEAP.sub(elements_removed as i64);
NB_QUIC_ACTIVE_CONNECTIONS.dec(); NB_QUIC_ACTIVE_CONNECTIONS.dec();
} }

View File

@ -9,6 +9,7 @@ use crate::{
tx_sender::TxSender, tx_sender::TxSender,
}; };
use anyhow::bail; use anyhow::bail;
use prometheus::{histogram_opts, register_histogram, Histogram};
use solana_lite_rpc_core::{ use solana_lite_rpc_core::{
solana_utils::SerializableTransaction, structures::transaction_sent_info::SentTransactionInfo, solana_utils::SerializableTransaction, structures::transaction_sent_info::SentTransactionInfo,
types::SlotStream, types::SlotStream,
@ -28,6 +29,14 @@ use tokio::{
time::Instant, time::Instant,
}; };
lazy_static::lazy_static! {
static ref PRIORITY_FEES_HISTOGRAM: Histogram = register_histogram!(histogram_opts!(
"literpc_txs_priority_fee",
"Priority fees of transactions sent by lite-rpc",
))
.unwrap();
}
#[derive(Clone)] #[derive(Clone)]
pub struct TransactionServiceBuilder { pub struct TransactionServiceBuilder {
tx_sender: TxSender, tx_sender: TxSender,
@ -157,6 +166,8 @@ impl TransactionService {
prioritization_fee prioritization_fee
}; };
PRIORITY_FEES_HISTOGRAM.observe(prioritization_fee as f64);
let max_replay = max_retries.map_or(self.max_retries, |x| x as usize); let max_replay = max_retries.map_or(self.max_retries, |x| x as usize);
let transaction_info = SentTransactionInfo { let transaction_info = SentTransactionInfo {
signature, signature,
@ -192,3 +203,5 @@ impl TransactionService {
Ok(signature.to_string()) Ok(signature.to_string())
} }
} }
mod test {}