Remove RotatingQueue, connections share an Endpoint

This commit is contained in:
Christian Kamm 2024-04-04 08:24:30 +02:00
parent 45171acd3b
commit 9212b3bccc
6 changed files with 13 additions and 68 deletions

View File

@ -11,6 +11,5 @@ pub mod notifications;
pub mod prioritization_fee_heap;
pub mod produced_block;
pub mod proxy_request_format;
pub mod rotating_queue;
pub mod slot_notification;
pub mod transaction_sent_info;

View File

@ -1,40 +0,0 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[derive(Clone)]
pub struct RotatingQueue<T: Clone> {
elements: Vec<T>,
current: Arc<AtomicUsize>,
}
impl<T: Clone> RotatingQueue<T> {
pub fn new<F>(size: usize, creator_functor: F) -> Self
where
F: Fn() -> T,
{
Self {
elements: std::iter::repeat_with(creator_functor).take(size).collect(),
current: Arc::new(AtomicUsize::new(0)),
}
}
pub fn get(&self) -> Option<T> {
if !self.elements.is_empty() {
let current = self.current.fetch_add(1, Ordering::Relaxed);
let index = current % (self.elements.len());
self.elements.get(index).cloned()
} else {
None
}
}
pub fn len(&self) -> usize {
self.elements.len()
}
pub fn is_empty(&self) -> bool {
self.elements.is_empty()
}
}

View File

@ -461,8 +461,6 @@ async fn start_literpc_client_direct_mode(
) -> anyhow::Result<()> {
info!("Start lite-rpc test client in direct-mode...");
let fanout_slots = 4;
// (String, Vec<u8>) (signature, transaction)
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let broadcast_sender = Arc::new(sender);
@ -472,8 +470,7 @@ async fn start_literpc_client_direct_mode(
)
.expect("Failed to initialize QUIC connection certificates");
let tpu_connection_manager =
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;
let tpu_connection_manager = TpuConnectionManager::new(certificate, key).await;
// this effectively controls how many connections we will have
let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new();

View File

@ -5,7 +5,6 @@ use futures::FutureExt;
use log::warn;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{Connection, Endpoint, VarInt};
use solana_lite_rpc_core::structures::rotating_queue::RotatingQueue;
use solana_sdk::pubkey::Pubkey;
use std::{
net::SocketAddr,
@ -16,8 +15,6 @@ use std::{
};
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock, Semaphore};
pub type EndpointPool = RotatingQueue<Endpoint>;
lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTION_RESET: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_connection_reset", "Number of times connection was reset")).unwrap();
@ -251,7 +248,7 @@ pub struct PooledConnection {
impl QuicConnectionPool {
pub fn new(
identity: Pubkey,
endpoints: EndpointPool,
endpoint: Endpoint,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
nb_connection: usize,
@ -262,7 +259,7 @@ impl QuicConnectionPool {
for _ in 0..nb_connection {
connections.push(QuicConnection::new(
identity,
endpoints.get().expect("Should get and endpoint"),
endpoint.clone(),
socket_address,
connection_parameters,
));

View File

@ -8,7 +8,7 @@ use solana_lite_rpc_core::{
stores::data_cache::DataCache,
structures::{
identity_stakes::IdentityStakesData, prioritization_fee_heap::PrioritizationFeesHeap,
rotating_queue::RotatingQueue, transaction_sent_info::SentTransactionInfo,
transaction_sent_info::SentTransactionInfo,
},
};
use solana_sdk::pubkey::Pubkey;
@ -43,7 +43,7 @@ lazy_static::lazy_static! {
#[derive(Clone)]
struct ActiveConnection {
endpoints: RotatingQueue<Endpoint>,
endpoint: Endpoint,
identity: Pubkey,
tpu_address: SocketAddr,
data_cache: DataCache,
@ -53,7 +53,7 @@ struct ActiveConnection {
impl ActiveConnection {
pub fn new(
endpoints: RotatingQueue<Endpoint>,
endpoint: Endpoint,
tpu_address: SocketAddr,
identity: Pubkey,
data_cache: DataCache,
@ -61,7 +61,7 @@ impl ActiveConnection {
) -> Self {
let (exit_notifier, _) = broadcast::channel(1);
Self {
endpoints,
endpoint,
tpu_address,
identity,
data_cache,
@ -92,7 +92,7 @@ impl ActiveConnection {
);
let connection_pool = QuicConnectionPool::new(
identity,
self.endpoints.clone(),
self.endpoint.clone(),
addr,
self.connection_parameters,
max_number_of_connections,
@ -241,21 +241,14 @@ impl ActiveConnection {
}
pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
endpoint: Endpoint,
active_connections: Arc<DashMap<(Pubkey, SocketAddr), ActiveConnection>>,
}
impl TpuConnectionManager {
pub async fn new(
certificate: rustls::Certificate,
key: rustls::PrivateKey,
_fanout: usize,
) -> Self {
let number_of_clients = 1; // fanout * 4;
pub async fn new(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Self {
Self {
endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
}),
endpoint: QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()),
active_connections: Arc::new(DashMap::new()),
}
}
@ -274,7 +267,7 @@ impl TpuConnectionManager {
if self.active_connections.get(&connection_key).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr);
let active_connection = ActiveConnection::new(
self.endpoints.clone(),
self.endpoint.clone(),
*socket_addr,
*identity,
data_cache.clone(),

View File

@ -81,8 +81,7 @@ impl TpuService {
let connection_manager = match config.tpu_connection_path {
TpuConnectionPath::QuicDirectPath => {
let tpu_connection_manager =
TpuConnectionManager::new(certificate, key, config.fanout_slots as usize).await;
let tpu_connection_manager = TpuConnectionManager::new(certificate, key).await;
DirectTpu {
tpu_connection_manager: Arc::new(tpu_connection_manager),
}