Merge pull request #1 from blockworks-foundation/ckamm/remove-rotating-queue
Remove RotatingQueue, connections share an Endpoint
This commit is contained in:
commit
45a8ac79aa
|
@ -11,6 +11,5 @@ pub mod notifications;
|
|||
pub mod prioritization_fee_heap;
|
||||
pub mod produced_block;
|
||||
pub mod proxy_request_format;
|
||||
pub mod rotating_queue;
|
||||
pub mod slot_notification;
|
||||
pub mod transaction_sent_info;
|
||||
|
|
|
@ -1,40 +0,0 @@
|
|||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RotatingQueue<T: Clone> {
|
||||
elements: Vec<T>,
|
||||
current: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl<T: Clone> RotatingQueue<T> {
|
||||
pub fn new<F>(size: usize, creator_functor: F) -> Self
|
||||
where
|
||||
F: Fn() -> T,
|
||||
{
|
||||
Self {
|
||||
elements: std::iter::repeat_with(creator_functor).take(size).collect(),
|
||||
current: Arc::new(AtomicUsize::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self) -> Option<T> {
|
||||
if !self.elements.is_empty() {
|
||||
let current = self.current.fetch_add(1, Ordering::Relaxed);
|
||||
let index = current % (self.elements.len());
|
||||
self.elements.get(index).cloned()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.elements.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.elements.is_empty()
|
||||
}
|
||||
}
|
|
@ -461,8 +461,6 @@ async fn start_literpc_client_direct_mode(
|
|||
) -> anyhow::Result<()> {
|
||||
info!("Start lite-rpc test client in direct-mode...");
|
||||
|
||||
let fanout_slots = 4;
|
||||
|
||||
// (String, Vec<u8>) (signature, transaction)
|
||||
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
|
||||
let broadcast_sender = Arc::new(sender);
|
||||
|
@ -472,8 +470,7 @@ async fn start_literpc_client_direct_mode(
|
|||
)
|
||||
.expect("Failed to initialize QUIC connection certificates");
|
||||
|
||||
let tpu_connection_manager =
|
||||
TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;
|
||||
let tpu_connection_manager = TpuConnectionManager::new(certificate, key).await;
|
||||
|
||||
// this effectively controls how many connections we will have
|
||||
let mut connections_to_keep = HashSet::<(Pubkey, SocketAddr)>::new();
|
||||
|
|
|
@ -5,7 +5,6 @@ use futures::FutureExt;
|
|||
use log::warn;
|
||||
use prometheus::{core::GenericGauge, opts, register_int_gauge};
|
||||
use quinn::{Connection, Endpoint, VarInt};
|
||||
use solana_lite_rpc_core::structures::rotating_queue::RotatingQueue;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::{
|
||||
net::SocketAddr,
|
||||
|
@ -16,8 +15,6 @@ use std::{
|
|||
};
|
||||
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock, Semaphore};
|
||||
|
||||
pub type EndpointPool = RotatingQueue<Endpoint>;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref NB_QUIC_CONNECTION_RESET: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_nb_connection_reset", "Number of times connection was reset")).unwrap();
|
||||
|
@ -251,7 +248,7 @@ pub struct PooledConnection {
|
|||
impl QuicConnectionPool {
|
||||
pub fn new(
|
||||
identity: Pubkey,
|
||||
endpoints: EndpointPool,
|
||||
endpoint: Endpoint,
|
||||
socket_address: SocketAddr,
|
||||
connection_parameters: QuicConnectionParameters,
|
||||
nb_connection: usize,
|
||||
|
@ -262,7 +259,7 @@ impl QuicConnectionPool {
|
|||
for _ in 0..nb_connection {
|
||||
connections.push(QuicConnection::new(
|
||||
identity,
|
||||
endpoints.get().expect("Should get and endpoint"),
|
||||
endpoint.clone(),
|
||||
socket_address,
|
||||
connection_parameters,
|
||||
));
|
||||
|
|
|
@ -8,7 +8,7 @@ use solana_lite_rpc_core::{
|
|||
stores::data_cache::DataCache,
|
||||
structures::{
|
||||
identity_stakes::IdentityStakesData, prioritization_fee_heap::PrioritizationFeesHeap,
|
||||
rotating_queue::RotatingQueue, transaction_sent_info::SentTransactionInfo,
|
||||
transaction_sent_info::SentTransactionInfo,
|
||||
},
|
||||
};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
@ -43,7 +43,7 @@ lazy_static::lazy_static! {
|
|||
|
||||
#[derive(Clone)]
|
||||
struct ActiveConnection {
|
||||
endpoints: RotatingQueue<Endpoint>,
|
||||
endpoint: Endpoint,
|
||||
identity: Pubkey,
|
||||
tpu_address: SocketAddr,
|
||||
data_cache: DataCache,
|
||||
|
@ -53,7 +53,7 @@ struct ActiveConnection {
|
|||
|
||||
impl ActiveConnection {
|
||||
pub fn new(
|
||||
endpoints: RotatingQueue<Endpoint>,
|
||||
endpoint: Endpoint,
|
||||
tpu_address: SocketAddr,
|
||||
identity: Pubkey,
|
||||
data_cache: DataCache,
|
||||
|
@ -61,7 +61,7 @@ impl ActiveConnection {
|
|||
) -> Self {
|
||||
let (exit_notifier, _) = broadcast::channel(1);
|
||||
Self {
|
||||
endpoints,
|
||||
endpoint,
|
||||
tpu_address,
|
||||
identity,
|
||||
data_cache,
|
||||
|
@ -92,7 +92,7 @@ impl ActiveConnection {
|
|||
);
|
||||
let connection_pool = QuicConnectionPool::new(
|
||||
identity,
|
||||
self.endpoints.clone(),
|
||||
self.endpoint.clone(),
|
||||
addr,
|
||||
self.connection_parameters,
|
||||
max_number_of_connections,
|
||||
|
@ -241,21 +241,14 @@ impl ActiveConnection {
|
|||
}
|
||||
|
||||
pub struct TpuConnectionManager {
|
||||
endpoints: RotatingQueue<Endpoint>,
|
||||
endpoint: Endpoint,
|
||||
active_connections: Arc<DashMap<(Pubkey, SocketAddr), ActiveConnection>>,
|
||||
}
|
||||
|
||||
impl TpuConnectionManager {
|
||||
pub async fn new(
|
||||
certificate: rustls::Certificate,
|
||||
key: rustls::PrivateKey,
|
||||
_fanout: usize,
|
||||
) -> Self {
|
||||
let number_of_clients = 1; // fanout * 4;
|
||||
pub async fn new(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Self {
|
||||
Self {
|
||||
endpoints: RotatingQueue::new(number_of_clients, || {
|
||||
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
|
||||
}),
|
||||
endpoint: QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()),
|
||||
active_connections: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
@ -274,7 +267,7 @@ impl TpuConnectionManager {
|
|||
if self.active_connections.get(&connection_key).is_none() {
|
||||
trace!("added a connection for {}, {}", identity, socket_addr);
|
||||
let active_connection = ActiveConnection::new(
|
||||
self.endpoints.clone(),
|
||||
self.endpoint.clone(),
|
||||
*socket_addr,
|
||||
*identity,
|
||||
data_cache.clone(),
|
||||
|
|
|
@ -81,8 +81,7 @@ impl TpuService {
|
|||
|
||||
let connection_manager = match config.tpu_connection_path {
|
||||
TpuConnectionPath::QuicDirectPath => {
|
||||
let tpu_connection_manager =
|
||||
TpuConnectionManager::new(certificate, key, config.fanout_slots as usize).await;
|
||||
let tpu_connection_manager = TpuConnectionManager::new(certificate, key).await;
|
||||
DirectTpu {
|
||||
tpu_connection_manager: Arc::new(tpu_connection_manager),
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue