From 9212b3bccc395306d32097397f71b58581f340db Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Thu, 4 Apr 2024 08:24:30 +0200 Subject: [PATCH] Remove RotatingQueue, connections share an Endpoint --- core/src/structures/mod.rs | 1 - core/src/structures/rotating_queue.rs | 40 ------------------- .../tests/quic_proxy_tpu_integrationtest.rs | 5 +-- services/src/quic_connection.rs | 7 +--- .../src/tpu_utils/tpu_connection_manager.rs | 25 +++++------- services/src/tpu_utils/tpu_service.rs | 3 +- 6 files changed, 13 insertions(+), 68 deletions(-) delete mode 100644 core/src/structures/rotating_queue.rs diff --git a/core/src/structures/mod.rs b/core/src/structures/mod.rs index 889d7c62..47700800 100644 --- a/core/src/structures/mod.rs +++ b/core/src/structures/mod.rs @@ -11,6 +11,5 @@ pub mod notifications; pub mod prioritization_fee_heap; pub mod produced_block; pub mod proxy_request_format; -pub mod rotating_queue; pub mod slot_notification; pub mod transaction_sent_info; diff --git a/core/src/structures/rotating_queue.rs b/core/src/structures/rotating_queue.rs deleted file mode 100644 index 5d425872..00000000 --- a/core/src/structures/rotating_queue.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; - -#[derive(Clone)] -pub struct RotatingQueue { - elements: Vec, - current: Arc, -} - -impl RotatingQueue { - pub fn new(size: usize, creator_functor: F) -> Self - where - F: Fn() -> T, - { - Self { - elements: std::iter::repeat_with(creator_functor).take(size).collect(), - current: Arc::new(AtomicUsize::new(0)), - } - } - - pub fn get(&self) -> Option { - if !self.elements.is_empty() { - let current = self.current.fetch_add(1, Ordering::Relaxed); - let index = current % (self.elements.len()); - self.elements.get(index).cloned() - } else { - None - } - } - - pub fn len(&self) -> usize { - self.elements.len() - } - - pub fn is_empty(&self) -> bool { - self.elements.is_empty() - } -} diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index 19c024e8..70db2988 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -461,8 +461,6 @@ async fn start_literpc_client_direct_mode( ) -> anyhow::Result<()> { info!("Start lite-rpc test client in direct-mode..."); - let fanout_slots = 4; - // (String, Vec) (signature, transaction) let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); let broadcast_sender = Arc::new(sender); @@ -472,8 +470,7 @@ async fn start_literpc_client_direct_mode( ) .expect("Failed to initialize QUIC connection certificates"); - let tpu_connection_manager = - TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; + let tpu_connection_manager = TpuConnectionManager::new(certificate, key).await; // this effectively controls how many connections we will have let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new(); diff --git a/services/src/quic_connection.rs b/services/src/quic_connection.rs index 9111c52e..8d9a1643 100644 --- a/services/src/quic_connection.rs +++ b/services/src/quic_connection.rs @@ -5,7 +5,6 @@ use futures::FutureExt; use log::warn; use prometheus::{core::GenericGauge, opts, register_int_gauge}; use quinn::{Connection, Endpoint, VarInt}; -use solana_lite_rpc_core::structures::rotating_queue::RotatingQueue; use solana_sdk::pubkey::Pubkey; use std::{ net::SocketAddr, @@ -16,8 +15,6 @@ use std::{ }; use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock, Semaphore}; -pub type EndpointPool = RotatingQueue; - lazy_static::lazy_static! { static ref NB_QUIC_CONNECTION_RESET: GenericGauge = register_int_gauge!(opts!("literpc_quic_nb_connection_reset", "Number of times connection was reset")).unwrap(); @@ -251,7 +248,7 @@ pub struct PooledConnection { impl QuicConnectionPool { pub fn new( identity: Pubkey, - endpoints: EndpointPool, + endpoint: Endpoint, socket_address: SocketAddr, connection_parameters: QuicConnectionParameters, nb_connection: usize, @@ -262,7 +259,7 @@ impl QuicConnectionPool { for _ in 0..nb_connection { connections.push(QuicConnection::new( identity, - endpoints.get().expect("Should get and endpoint"), + endpoint.clone(), socket_address, connection_parameters, )); diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index cb50803a..ca1b0cff 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -8,7 +8,7 @@ use solana_lite_rpc_core::{ stores::data_cache::DataCache, structures::{ identity_stakes::IdentityStakesData, prioritization_fee_heap::PrioritizationFeesHeap, - rotating_queue::RotatingQueue, transaction_sent_info::SentTransactionInfo, + transaction_sent_info::SentTransactionInfo, }, }; use solana_sdk::pubkey::Pubkey; @@ -43,7 +43,7 @@ lazy_static::lazy_static! { #[derive(Clone)] struct ActiveConnection { - endpoints: RotatingQueue, + endpoint: Endpoint, identity: Pubkey, tpu_address: SocketAddr, data_cache: DataCache, @@ -53,7 +53,7 @@ struct ActiveConnection { impl ActiveConnection { pub fn new( - endpoints: RotatingQueue, + endpoint: Endpoint, tpu_address: SocketAddr, identity: Pubkey, data_cache: DataCache, @@ -61,7 +61,7 @@ impl ActiveConnection { ) -> Self { let (exit_notifier, _) = broadcast::channel(1); Self { - endpoints, + endpoint, tpu_address, identity, data_cache, @@ -92,7 +92,7 @@ impl ActiveConnection { ); let connection_pool = QuicConnectionPool::new( identity, - self.endpoints.clone(), + self.endpoint.clone(), addr, self.connection_parameters, max_number_of_connections, @@ -241,21 +241,14 @@ impl ActiveConnection { } pub struct TpuConnectionManager { - endpoints: RotatingQueue, + endpoint: Endpoint, active_connections: Arc>, } impl TpuConnectionManager { - pub async fn new( - certificate: rustls::Certificate, - key: rustls::PrivateKey, - _fanout: usize, - ) -> Self { - let number_of_clients = 1; // fanout * 4; + pub async fn new(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Self { Self { - endpoints: RotatingQueue::new(number_of_clients, || { - QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()) - }), + endpoint: QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()), active_connections: Arc::new(DashMap::new()), } } @@ -274,7 +267,7 @@ impl TpuConnectionManager { if self.active_connections.get(&connection_key).is_none() { trace!("added a connection for {}, {}", identity, socket_addr); let active_connection = ActiveConnection::new( - self.endpoints.clone(), + self.endpoint.clone(), *socket_addr, *identity, data_cache.clone(), diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index b5727cb7..0d770db5 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -81,8 +81,7 @@ impl TpuService { let connection_manager = match config.tpu_connection_path { TpuConnectionPath::QuicDirectPath => { - let tpu_connection_manager = - TpuConnectionManager::new(certificate, key, config.fanout_slots as usize).await; + let tpu_connection_manager = TpuConnectionManager::new(certificate, key).await; DirectTpu { tpu_connection_manager: Arc::new(tpu_connection_manager), }