creating task counters per quic connection
This commit is contained in:
parent
dcc640f80a
commit
3e4c6ed933
|
@ -2,25 +2,26 @@ use crate::{
|
|||
quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils},
|
||||
structures::rotating_queue::RotatingQueue,
|
||||
};
|
||||
use anyhow::bail;
|
||||
use anyhow::Context;
|
||||
use futures::FutureExt;
|
||||
use log::warn;
|
||||
use quinn::{Connection, Endpoint};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
net::SocketAddr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
|
||||
|
||||
pub type EndpointPool = RotatingQueue<Endpoint>;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[warn(clippy::rc_clone_in_vec_init)]
|
||||
pub struct QuicConnection {
|
||||
connection: Arc<RwLock<Connection>>,
|
||||
connection: Arc<RwLock<Option<Connection>>>,
|
||||
last_stable_id: Arc<AtomicU64>,
|
||||
endpoint: Endpoint,
|
||||
identity: Pubkey,
|
||||
|
@ -31,150 +32,140 @@ pub struct QuicConnection {
|
|||
}
|
||||
|
||||
impl QuicConnection {
|
||||
pub async fn new(
|
||||
pub fn new(
|
||||
identity: Pubkey,
|
||||
endpoints: EndpointPool,
|
||||
endpoint: Endpoint,
|
||||
socket_address: SocketAddr,
|
||||
connection_params: QuicConnectionParameters,
|
||||
exit_signal: Arc<AtomicBool>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let endpoint = endpoints
|
||||
.get()
|
||||
.await
|
||||
.expect("endpoint pool is not suppose to be empty");
|
||||
let connection = QuicConnectionUtils::connect(
|
||||
) -> Self {
|
||||
Self {
|
||||
connection: Arc::new(RwLock::new(None)),
|
||||
last_stable_id: Arc::new(AtomicU64::new(0)),
|
||||
endpoint,
|
||||
identity,
|
||||
false,
|
||||
endpoint.clone(),
|
||||
socket_address,
|
||||
connection_params.connection_timeout,
|
||||
connection_params.connection_retry_count,
|
||||
exit_signal.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match connection {
|
||||
Some(connection) => Ok(Self {
|
||||
connection: Arc::new(RwLock::new(connection)),
|
||||
last_stable_id: Arc::new(AtomicU64::new(0)),
|
||||
endpoint,
|
||||
identity,
|
||||
socket_address,
|
||||
connection_params,
|
||||
exit_signal,
|
||||
timeout_counters: Arc::new(AtomicU64::new(0)),
|
||||
}),
|
||||
None => {
|
||||
bail!("Could not establish connection");
|
||||
}
|
||||
connection_params,
|
||||
exit_signal,
|
||||
timeout_counters: Arc::new(AtomicU64::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect(&self) -> Option<Connection> {
|
||||
QuicConnectionUtils::connect(
|
||||
self.identity,
|
||||
true,
|
||||
self.endpoint.clone(),
|
||||
self.socket_address,
|
||||
self.connection_params.connection_timeout,
|
||||
self.connection_params.connection_retry_count,
|
||||
self.exit_signal.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_connection(&self) -> Option<Connection> {
|
||||
// get new connection reset if necessary
|
||||
let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize;
|
||||
let conn = self.connection.read().await;
|
||||
if conn.stable_id() == last_stable_id {
|
||||
let current_stable_id = conn.stable_id();
|
||||
// problematic connection
|
||||
drop(conn);
|
||||
let mut conn = self.connection.write().await;
|
||||
// check may be already written by another thread
|
||||
if conn.stable_id() != current_stable_id {
|
||||
Some(conn.clone())
|
||||
} else {
|
||||
let new_conn = QuicConnectionUtils::connect(
|
||||
self.identity,
|
||||
true,
|
||||
self.endpoint.clone(),
|
||||
self.socket_address,
|
||||
self.connection_params.connection_timeout,
|
||||
self.connection_params.connection_retry_count,
|
||||
self.exit_signal.clone(),
|
||||
)
|
||||
.await;
|
||||
if let Some(new_conn) = new_conn {
|
||||
*conn = new_conn;
|
||||
Some(conn.clone())
|
||||
let conn = self.connection.read().await.clone();
|
||||
match conn {
|
||||
Some(connection) => {
|
||||
if connection.stable_id() == last_stable_id {
|
||||
let current_stable_id = connection.stable_id();
|
||||
// problematic connection
|
||||
let mut conn = self.connection.write().await;
|
||||
let connection = conn.clone().expect("Connection cannot be None here");
|
||||
// check may be already written by another thread
|
||||
if connection.stable_id() != current_stable_id {
|
||||
Some(connection)
|
||||
} else {
|
||||
let new_conn = QuicConnectionUtils::connect(
|
||||
self.identity,
|
||||
true,
|
||||
self.endpoint.clone(),
|
||||
self.socket_address,
|
||||
self.connection_params.connection_timeout,
|
||||
self.connection_params.connection_retry_count,
|
||||
self.exit_signal.clone(),
|
||||
)
|
||||
.await;
|
||||
if let Some(new_conn) = new_conn {
|
||||
*conn = Some(new_conn);
|
||||
conn.clone()
|
||||
} else {
|
||||
// could not connect
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// could not connect
|
||||
None
|
||||
Some(connection.clone())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Some(conn.clone())
|
||||
None => self.connect().await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_transaction_batch(&self, txs: Vec<Vec<u8>>) {
|
||||
let mut queue = VecDeque::new();
|
||||
for tx in txs {
|
||||
queue.push_back(tx);
|
||||
}
|
||||
pub async fn send_transaction(&self, tx: Vec<u8>) {
|
||||
let connection_retry_count = self.connection_params.connection_retry_count;
|
||||
for _ in 0..connection_retry_count {
|
||||
if queue.is_empty() || self.exit_signal.load(Ordering::Relaxed) {
|
||||
if self.exit_signal.load(Ordering::Relaxed) {
|
||||
// return
|
||||
return;
|
||||
}
|
||||
|
||||
let mut do_retry = false;
|
||||
while !queue.is_empty() {
|
||||
let tx = queue.pop_front().unwrap();
|
||||
let connection = self.get_connection().await;
|
||||
let connection = self.get_connection().await;
|
||||
|
||||
if self.exit_signal.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
if self.exit_signal.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(connection) = connection {
|
||||
let current_stable_id = connection.stable_id() as u64;
|
||||
match QuicConnectionUtils::open_unistream(
|
||||
connection,
|
||||
self.connection_params.unistream_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(send_stream) => {
|
||||
match QuicConnectionUtils::write_all(
|
||||
send_stream,
|
||||
&tx,
|
||||
self.identity,
|
||||
self.connection_params,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
// do nothing
|
||||
}
|
||||
Err(QuicConnectionError::ConnectionError { retry }) => {
|
||||
do_retry = retry;
|
||||
}
|
||||
Err(QuicConnectionError::TimeOut) => {
|
||||
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
if let Some(connection) = connection {
|
||||
let current_stable_id = connection.stable_id() as u64;
|
||||
match QuicConnectionUtils::open_unistream(
|
||||
connection,
|
||||
self.connection_params.unistream_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(send_stream) => {
|
||||
match QuicConnectionUtils::write_all(
|
||||
send_stream,
|
||||
&tx,
|
||||
self.identity,
|
||||
self.connection_params,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
// do nothing
|
||||
}
|
||||
Err(QuicConnectionError::ConnectionError { retry }) => {
|
||||
do_retry = retry;
|
||||
}
|
||||
Err(QuicConnectionError::TimeOut) => {
|
||||
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
Err(QuicConnectionError::ConnectionError { retry }) => {
|
||||
do_retry = retry;
|
||||
}
|
||||
Err(QuicConnectionError::TimeOut) => {
|
||||
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
if do_retry {
|
||||
self.last_stable_id
|
||||
.store(current_stable_id, Ordering::Relaxed);
|
||||
queue.push_back(tx);
|
||||
break;
|
||||
Err(QuicConnectionError::ConnectionError { retry }) => {
|
||||
do_retry = retry;
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"Could not establish connection with {}",
|
||||
self.identity.to_string()
|
||||
);
|
||||
Err(QuicConnectionError::TimeOut) => {
|
||||
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
if do_retry {
|
||||
self.last_stable_id
|
||||
.store(current_stable_id, Ordering::Relaxed);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
warn!(
|
||||
"Could not establish connection with {}",
|
||||
self.identity.to_string()
|
||||
);
|
||||
break;
|
||||
}
|
||||
if !do_retry {
|
||||
break;
|
||||
|
@ -193,12 +184,15 @@ impl QuicConnection {
|
|||
|
||||
#[derive(Clone)]
|
||||
pub struct QuicConnectionPool {
|
||||
connections: RotatingQueue<QuicConnection>,
|
||||
connection_parameters: QuicConnectionParameters,
|
||||
endpoints: EndpointPool,
|
||||
identity: Pubkey,
|
||||
socket_address: SocketAddr,
|
||||
exit_signal: Arc<AtomicBool>,
|
||||
connections: Vec<QuicConnection>,
|
||||
// counting semaphore is ideal way to manage backpressure on the connection
|
||||
// because a connection can create only N unistream connections
|
||||
transactions_in_sending_semaphore: Vec<Arc<Semaphore>>,
|
||||
}
|
||||
|
||||
pub struct PooledConnection {
|
||||
pub connection: QuicConnection,
|
||||
pub permit: OwnedSemaphorePermit,
|
||||
}
|
||||
|
||||
impl QuicConnectionPool {
|
||||
|
@ -208,60 +202,46 @@ impl QuicConnectionPool {
|
|||
socket_address: SocketAddr,
|
||||
connection_parameters: QuicConnectionParameters,
|
||||
exit_signal: Arc<AtomicBool>,
|
||||
nb_connection: usize,
|
||||
max_number_of_unistream_connection: usize,
|
||||
) -> Self {
|
||||
let connections = RotatingQueue::new_empty();
|
||||
let mut connections = vec![];
|
||||
// should not clone connection each time but create a new one
|
||||
for _ in 0..nb_connection {
|
||||
connections.push(QuicConnection::new(
|
||||
identity,
|
||||
endpoints.get().expect("Should get and endpoint"),
|
||||
socket_address,
|
||||
connection_parameters,
|
||||
exit_signal.clone(),
|
||||
));
|
||||
}
|
||||
Self {
|
||||
connections,
|
||||
identity,
|
||||
endpoints,
|
||||
socket_address,
|
||||
connection_parameters,
|
||||
exit_signal,
|
||||
transactions_in_sending_semaphore: {
|
||||
// should create a new semaphore each time so avoid vec[elem;count]
|
||||
let mut v = Vec::with_capacity(nb_connection);
|
||||
(0..nb_connection).for_each(|_| {
|
||||
v.push(Arc::new(Semaphore::new(max_number_of_unistream_connection)))
|
||||
});
|
||||
v
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_transaction_batch(&self, txs: Vec<Vec<u8>>) {
|
||||
let connection = match self.connections.get().await {
|
||||
Some(connection) => connection,
|
||||
None => {
|
||||
let new_connection = QuicConnection::new(
|
||||
self.identity,
|
||||
self.endpoints.clone(),
|
||||
self.socket_address,
|
||||
self.connection_parameters,
|
||||
self.exit_signal.clone(),
|
||||
)
|
||||
.await;
|
||||
if new_connection.is_err() {
|
||||
return;
|
||||
}
|
||||
let new_connection = new_connection.expect("Cannot establish a connection");
|
||||
self.connections.add(new_connection.clone()).await;
|
||||
new_connection
|
||||
}
|
||||
};
|
||||
|
||||
connection.send_transaction_batch(txs).await;
|
||||
}
|
||||
|
||||
pub async fn add_connection(&self) {
|
||||
let new_connection = QuicConnection::new(
|
||||
self.identity,
|
||||
self.endpoints.clone(),
|
||||
self.socket_address,
|
||||
self.connection_parameters,
|
||||
self.exit_signal.clone(),
|
||||
pub async fn get_pooled_connection(&self) -> anyhow::Result<PooledConnection> {
|
||||
let (permit, index, _others) = futures::future::select_all(
|
||||
self.transactions_in_sending_semaphore
|
||||
.iter()
|
||||
.map(|x| x.clone().acquire_owned().boxed()),
|
||||
)
|
||||
.await;
|
||||
if let Ok(new_connection) = new_connection {
|
||||
self.connections.add(new_connection).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_connection(&self) {
|
||||
if !self.connections.is_empty() {
|
||||
self.connections.remove().await;
|
||||
}
|
||||
drop(_others);
|
||||
let permit = permit.context("Cannot aquire permit, connection pool erased")?;
|
||||
Ok(PooledConnection {
|
||||
connection: self.connections[index].clone(),
|
||||
permit,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
|
|
|
@ -1,74 +1,46 @@
|
|||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
},
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RotatingQueue<T> {
|
||||
deque: Arc<Mutex<VecDeque<T>>>,
|
||||
count: Arc<AtomicU64>,
|
||||
pub struct RotatingQueue<T: Clone> {
|
||||
elements: Vec<T>,
|
||||
current: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl<T: Clone> RotatingQueue<T> {
|
||||
pub async fn new<F>(size: usize, creator_functor: F) -> Self
|
||||
pub fn new<F>(size: usize, creator_functor: F) -> Self
|
||||
where
|
||||
F: Fn() -> T,
|
||||
{
|
||||
let item = Self {
|
||||
deque: Arc::new(Mutex::new(VecDeque::<T>::new())),
|
||||
count: Arc::new(AtomicU64::new(0)),
|
||||
let mut item = Self {
|
||||
elements: Vec::<T>::new(),
|
||||
current: Arc::new(AtomicU64::new(0)),
|
||||
};
|
||||
{
|
||||
let mut deque = item.deque.lock().await;
|
||||
for _i in 0..size {
|
||||
deque.push_back(creator_functor());
|
||||
item.elements.push(creator_functor());
|
||||
}
|
||||
item.count.store(size as u64, Ordering::Relaxed);
|
||||
}
|
||||
item
|
||||
}
|
||||
|
||||
pub fn new_empty() -> Self {
|
||||
Self {
|
||||
deque: Arc::new(Mutex::new(VecDeque::<T>::new())),
|
||||
count: Arc::new(AtomicU64::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get(&self) -> Option<T> {
|
||||
let mut deque = self.deque.lock().await;
|
||||
if !deque.is_empty() {
|
||||
let current = deque.pop_front().unwrap();
|
||||
deque.push_back(current.clone());
|
||||
Some(current)
|
||||
pub fn get(&self) -> Option<T> {
|
||||
if !self.elements.is_empty() {
|
||||
let current = self.current.fetch_add(1, Ordering::Relaxed);
|
||||
let index = current % (self.elements.len() as u64);
|
||||
Some(self.elements[index as usize].clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add(&self, instance: T) {
|
||||
let mut queue = self.deque.lock().await;
|
||||
queue.push_front(instance);
|
||||
self.count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub async fn remove(&self) {
|
||||
if !self.is_empty() {
|
||||
let mut queue = self.deque.lock().await;
|
||||
queue.pop_front();
|
||||
self.count.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.count.load(Ordering::Relaxed) as usize
|
||||
self.elements.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
self.elements.is_empty()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use log::{error, trace};
|
|||
use prometheus::{core::GenericGauge, opts, register_int_gauge};
|
||||
use quinn::Endpoint;
|
||||
use solana_lite_rpc_core::{
|
||||
quic_connection::QuicConnectionPool,
|
||||
quic_connection::{PooledConnection, QuicConnectionPool},
|
||||
quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils},
|
||||
stores::tx_store::TxStore,
|
||||
structures::{identity_stakes::IdentityStakesData, rotating_queue::RotatingQueue},
|
||||
|
@ -89,9 +89,7 @@ impl ActiveConnection {
|
|||
identity_stakes.peer_type,
|
||||
identity_stakes.stakes,
|
||||
identity_stakes.total_stakes,
|
||||
) * max_number_of_connections;
|
||||
|
||||
let task_counter = Arc::new(tokio::sync::Semaphore::new(max_uni_stream_connections));
|
||||
);
|
||||
let exit_signal = self.exit_signal.clone();
|
||||
let connection_pool = QuicConnectionPool::new(
|
||||
identity,
|
||||
|
@ -99,6 +97,8 @@ impl ActiveConnection {
|
|||
addr,
|
||||
self.connection_parameters,
|
||||
exit_signal.clone(),
|
||||
max_number_of_connections,
|
||||
max_uni_stream_connections,
|
||||
);
|
||||
|
||||
loop {
|
||||
|
@ -141,20 +141,21 @@ impl ActiveConnection {
|
|||
}
|
||||
}
|
||||
|
||||
// queue getting full and a connection poll is getting slower
|
||||
// add more connections to the pool
|
||||
if connection_pool.len() < max_number_of_connections {
|
||||
connection_pool.add_connection().await;
|
||||
NB_QUIC_CONNECTIONS.inc();
|
||||
}
|
||||
|
||||
let task_counter = task_counter.clone();
|
||||
let connection_pool = connection_pool.clone();
|
||||
let permit = task_counter.acquire_owned().await.expect("Should get permit");
|
||||
let connection_pool = match connection_pool.get_pooled_connection().await {
|
||||
Ok(connection_pool) => connection_pool,
|
||||
Err(_) => break,
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
let _ = permit;
|
||||
let PooledConnection {
|
||||
connection,
|
||||
permit
|
||||
} = connection_pool;
|
||||
// permit will be used to send all the transaction and then destroyed
|
||||
let _permit = permit;
|
||||
NB_QUIC_TASKS.inc();
|
||||
connection_pool.send_transaction_batch(txs).await;
|
||||
for tx in txs {
|
||||
connection.send_transaction(tx).await;
|
||||
}
|
||||
NB_QUIC_TASKS.dec();
|
||||
});
|
||||
},
|
||||
|
@ -210,8 +211,7 @@ impl TpuConnectionManager {
|
|||
Self {
|
||||
endpoints: RotatingQueue::new(number_of_clients, || {
|
||||
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
|
||||
})
|
||||
.await,
|
||||
}),
|
||||
identity_to_active_connection: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue