Merge pull request #194 from blockworks-foundation/removing_unwanted_code

using semaphore in tpu connection manager and removing unwanted code
This commit is contained in:
galactus 2023-09-15 12:07:05 +02:00 committed by GitHub
commit 7d36768b05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 181 additions and 249 deletions

View File

@ -2,25 +2,26 @@ use crate::{
quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils}, quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils},
structures::rotating_queue::RotatingQueue, structures::rotating_queue::RotatingQueue,
}; };
use anyhow::bail; use anyhow::Context;
use futures::FutureExt;
use log::warn; use log::warn;
use quinn::{Connection, Endpoint}; use quinn::{Connection, Endpoint};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::{ use std::{
collections::VecDeque,
net::SocketAddr, net::SocketAddr,
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Arc,
}, },
}; };
use tokio::sync::RwLock; use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
pub type EndpointPool = RotatingQueue<Endpoint>; pub type EndpointPool = RotatingQueue<Endpoint>;
#[derive(Clone)] #[derive(Clone)]
#[warn(clippy::rc_clone_in_vec_init)]
pub struct QuicConnection { pub struct QuicConnection {
connection: Arc<RwLock<Connection>>, connection: Arc<RwLock<Option<Connection>>>,
last_stable_id: Arc<AtomicU64>, last_stable_id: Arc<AtomicU64>,
endpoint: Endpoint, endpoint: Endpoint,
identity: Pubkey, identity: Pubkey,
@ -31,150 +32,135 @@ pub struct QuicConnection {
} }
impl QuicConnection { impl QuicConnection {
pub async fn new( pub fn new(
identity: Pubkey, identity: Pubkey,
endpoints: EndpointPool, endpoint: Endpoint,
socket_address: SocketAddr, socket_address: SocketAddr,
connection_params: QuicConnectionParameters, connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>, exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<Self> { ) -> Self {
let endpoint = endpoints Self {
.get() connection: Arc::new(RwLock::new(None)),
.await last_stable_id: Arc::new(AtomicU64::new(0)),
.expect("endpoint pool is not suppose to be empty"); endpoint,
let connection = QuicConnectionUtils::connect(
identity, identity,
false,
endpoint.clone(),
socket_address, socket_address,
connection_params.connection_timeout, connection_params,
connection_params.connection_retry_count, exit_signal,
exit_signal.clone(), timeout_counters: Arc::new(AtomicU64::new(0)),
)
.await;
match connection {
Some(connection) => Ok(Self {
connection: Arc::new(RwLock::new(connection)),
last_stable_id: Arc::new(AtomicU64::new(0)),
endpoint,
identity,
socket_address,
connection_params,
exit_signal,
timeout_counters: Arc::new(AtomicU64::new(0)),
}),
None => {
bail!("Could not establish connection");
}
} }
} }
async fn connect(&self) -> Option<Connection> {
QuicConnectionUtils::connect(
self.identity,
true,
self.endpoint.clone(),
self.socket_address,
self.connection_params.connection_timeout,
self.connection_params.connection_retry_count,
self.exit_signal.clone(),
)
.await
}
async fn get_connection(&self) -> Option<Connection> { async fn get_connection(&self) -> Option<Connection> {
// get new connection reset if necessary // get new connection reset if necessary
let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize; let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize;
let conn = self.connection.read().await; let conn = self.connection.read().await.clone();
if conn.stable_id() == last_stable_id { match conn {
let current_stable_id = conn.stable_id(); Some(connection) => {
// problematic connection if connection.stable_id() == last_stable_id {
drop(conn); let current_stable_id = connection.stable_id();
let mut conn = self.connection.write().await; // problematic connection
// check may be already written by another thread let mut conn = self.connection.write().await;
if conn.stable_id() != current_stable_id { let connection = conn.clone().expect("Connection cannot be None here");
Some(conn.clone()) // check may be already written by another thread
} else { if connection.stable_id() != current_stable_id {
let new_conn = QuicConnectionUtils::connect( Some(connection)
self.identity, } else {
true, let new_conn = self.connect().await;
self.endpoint.clone(), if let Some(new_conn) = new_conn {
self.socket_address, *conn = Some(new_conn);
self.connection_params.connection_timeout, conn.clone()
self.connection_params.connection_retry_count, } else {
self.exit_signal.clone(), // could not connect
) None
.await; }
if let Some(new_conn) = new_conn { }
*conn = new_conn;
Some(conn.clone())
} else { } else {
// could not connect Some(connection.clone())
None
} }
} }
} else { None => {
Some(conn.clone()) let connection = self.connect().await;
*self.connection.write().await = connection.clone();
connection
}
} }
} }
pub async fn send_transaction_batch(&self, txs: Vec<Vec<u8>>) { pub async fn send_transaction(&self, tx: Vec<u8>) {
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);
}
let connection_retry_count = self.connection_params.connection_retry_count; let connection_retry_count = self.connection_params.connection_retry_count;
for _ in 0..connection_retry_count { for _ in 0..connection_retry_count {
if queue.is_empty() || self.exit_signal.load(Ordering::Relaxed) { if self.exit_signal.load(Ordering::Relaxed) {
// return // return
return; return;
} }
let mut do_retry = false; let mut do_retry = false;
while !queue.is_empty() { let connection = self.get_connection().await;
let tx = queue.pop_front().unwrap();
let connection = self.get_connection().await;
if self.exit_signal.load(Ordering::Relaxed) { if self.exit_signal.load(Ordering::Relaxed) {
return; return;
} }
if let Some(connection) = connection { if let Some(connection) = connection {
let current_stable_id = connection.stable_id() as u64; let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream( match QuicConnectionUtils::open_unistream(
connection, connection,
self.connection_params.unistream_timeout, self.connection_params.unistream_timeout,
) )
.await .await
{ {
Ok(send_stream) => { Ok(send_stream) => {
match QuicConnectionUtils::write_all( match QuicConnectionUtils::write_all(
send_stream, send_stream,
&tx, &tx,
self.identity, self.identity,
self.connection_params, self.connection_params,
) )
.await .await
{ {
Ok(()) => { Ok(()) => {
// do nothing // do nothing
} }
Err(QuicConnectionError::ConnectionError { retry }) => { Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry; do_retry = retry;
} }
Err(QuicConnectionError::TimeOut) => { Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed); self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
} }
} }
Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry;
}
Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
} }
if do_retry { Err(QuicConnectionError::ConnectionError { retry }) => {
self.last_stable_id do_retry = retry;
.store(current_stable_id, Ordering::Relaxed);
queue.push_back(tx);
break;
} }
} else { Err(QuicConnectionError::TimeOut) => {
warn!( self.timeout_counters.fetch_add(1, Ordering::Relaxed);
"Could not establish connection with {}", }
self.identity.to_string() }
); if do_retry {
self.last_stable_id
.store(current_stable_id, Ordering::Relaxed);
break; break;
} }
} else {
warn!(
"Could not establish connection with {}",
self.identity.to_string()
);
break;
} }
if !do_retry { if !do_retry {
break; break;
@ -193,12 +179,15 @@ impl QuicConnection {
#[derive(Clone)] #[derive(Clone)]
pub struct QuicConnectionPool { pub struct QuicConnectionPool {
connections: RotatingQueue<QuicConnection>, connections: Vec<QuicConnection>,
connection_parameters: QuicConnectionParameters, // counting semaphore is ideal way to manage backpressure on the connection
endpoints: EndpointPool, // because a connection can create only N unistream connections
identity: Pubkey, transactions_in_sending_semaphore: Vec<Arc<Semaphore>>,
socket_address: SocketAddr, }
exit_signal: Arc<AtomicBool>,
pub struct PooledConnection {
pub connection: QuicConnection,
pub permit: OwnedSemaphorePermit,
} }
impl QuicConnectionPool { impl QuicConnectionPool {
@ -208,60 +197,46 @@ impl QuicConnectionPool {
socket_address: SocketAddr, socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters, connection_parameters: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>, exit_signal: Arc<AtomicBool>,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self { ) -> Self {
let connections = RotatingQueue::new_empty(); let mut connections = vec![];
// should not clone connection each time but create a new one
for _ in 0..nb_connection {
connections.push(QuicConnection::new(
identity,
endpoints.get().expect("Should get and endpoint"),
socket_address,
connection_parameters,
exit_signal.clone(),
));
}
Self { Self {
connections, connections,
identity, transactions_in_sending_semaphore: {
endpoints, // should create a new semaphore each time so avoid vec[elem;count]
socket_address, let mut v = Vec::with_capacity(nb_connection);
connection_parameters, (0..nb_connection).for_each(|_| {
exit_signal, v.push(Arc::new(Semaphore::new(max_number_of_unistream_connection)))
});
v
},
} }
} }
pub async fn send_transaction_batch(&self, txs: Vec<Vec<u8>>) { pub async fn get_pooled_connection(&self) -> anyhow::Result<PooledConnection> {
let connection = match self.connections.get().await { let (permit, index, _others) = futures::future::select_all(
Some(connection) => connection, self.transactions_in_sending_semaphore
None => { .iter()
let new_connection = QuicConnection::new( .map(|x| x.clone().acquire_owned().boxed()),
self.identity,
self.endpoints.clone(),
self.socket_address,
self.connection_parameters,
self.exit_signal.clone(),
)
.await;
if new_connection.is_err() {
return;
}
let new_connection = new_connection.expect("Cannot establish a connection");
self.connections.add(new_connection.clone()).await;
new_connection
}
};
connection.send_transaction_batch(txs).await;
}
pub async fn add_connection(&self) {
let new_connection = QuicConnection::new(
self.identity,
self.endpoints.clone(),
self.socket_address,
self.connection_parameters,
self.exit_signal.clone(),
) )
.await; .await;
if let Ok(new_connection) = new_connection { drop(_others);
self.connections.add(new_connection).await; let permit = permit.context("Cannot aquire permit, connection pool erased")?;
} Ok(PooledConnection {
} connection: self.connections[index].clone(),
permit,
pub async fn remove_connection(&self) { })
if !self.connections.is_empty() {
self.connections.remove().await;
}
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {

View File

@ -1,74 +1,46 @@
use std::{ use std::sync::{
collections::VecDeque, atomic::{AtomicU64, Ordering},
sync::{ Arc,
atomic::{AtomicU64, Ordering},
Arc,
},
}; };
use tokio::sync::Mutex;
#[derive(Clone)] #[derive(Clone)]
pub struct RotatingQueue<T> { pub struct RotatingQueue<T: Clone> {
deque: Arc<Mutex<VecDeque<T>>>, elements: Vec<T>,
count: Arc<AtomicU64>, current: Arc<AtomicU64>,
} }
impl<T: Clone> RotatingQueue<T> { impl<T: Clone> RotatingQueue<T> {
pub async fn new<F>(size: usize, creator_functor: F) -> Self pub fn new<F>(size: usize, creator_functor: F) -> Self
where where
F: Fn() -> T, F: Fn() -> T,
{ {
let item = Self { let mut item = Self {
deque: Arc::new(Mutex::new(VecDeque::<T>::new())), elements: Vec::<T>::new(),
count: Arc::new(AtomicU64::new(0)), current: Arc::new(AtomicU64::new(0)),
}; };
{ {
let mut deque = item.deque.lock().await;
for _i in 0..size { for _i in 0..size {
deque.push_back(creator_functor()); item.elements.push(creator_functor());
} }
item.count.store(size as u64, Ordering::Relaxed);
} }
item item
} }
pub fn new_empty() -> Self { pub fn get(&self) -> Option<T> {
Self { if !self.elements.is_empty() {
deque: Arc::new(Mutex::new(VecDeque::<T>::new())), let current = self.current.fetch_add(1, Ordering::Relaxed);
count: Arc::new(AtomicU64::new(0)), let index = current % (self.elements.len() as u64);
} Some(self.elements[index as usize].clone())
}
pub async fn get(&self) -> Option<T> {
let mut deque = self.deque.lock().await;
if !deque.is_empty() {
let current = deque.pop_front().unwrap();
deque.push_back(current.clone());
Some(current)
} else { } else {
None None
} }
} }
pub async fn add(&self, instance: T) {
let mut queue = self.deque.lock().await;
queue.push_front(instance);
self.count.fetch_add(1, Ordering::Relaxed);
}
pub async fn remove(&self) {
if !self.is_empty() {
let mut queue = self.deque.lock().await;
queue.pop_front();
self.count.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.count.load(Ordering::Relaxed) as usize self.elements.len()
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.len() == 0 self.elements.is_empty()
} }
} }

View File

@ -28,7 +28,7 @@ pub const DEFAULT_FANOUT_SIZE: u64 = 10;
#[from_env] #[from_env]
pub const MAX_RETRIES: usize = 40; pub const MAX_RETRIES: usize = 40;
pub const DEFAULT_RETRY_TIMEOUT: u64 = 2; pub const DEFAULT_RETRY_TIMEOUT: u64 = 1;
#[from_env] #[from_env]
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute

View File

@ -145,11 +145,7 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
let tpu_config = TpuServiceConfig { let tpu_config = TpuServiceConfig {
fanout_slots: fanout_size, fanout_slots: fanout_size,
number_of_leaders_to_cache: 1024,
clusterinfo_refresh_time: Duration::from_secs(60 * 60),
leader_schedule_update_frequency: Duration::from_secs(10),
maximum_transaction_in_queue: 20000, maximum_transaction_in_queue: 20000,
maximum_number_of_errors: 10,
quic_connection_params: QuicConnectionParameters { quic_connection_params: QuicConnectionParameters {
connection_timeout: Duration::from_secs(1), connection_timeout: Duration::from_secs(1),
connection_retry_count: 10, connection_retry_count: 10,

View File

@ -3,7 +3,7 @@ use log::{error, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge}; use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::Endpoint; use quinn::Endpoint;
use solana_lite_rpc_core::{ use solana_lite_rpc_core::{
quic_connection::QuicConnectionPool, quic_connection::{PooledConnection, QuicConnectionPool},
quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils}, quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils},
stores::tx_store::TxStore, stores::tx_store::TxStore,
structures::{identity_stakes::IdentityStakesData, rotating_queue::RotatingQueue}, structures::{identity_stakes::IdentityStakesData, rotating_queue::RotatingQueue},
@ -14,7 +14,7 @@ use std::{
collections::HashMap, collections::HashMap,
net::SocketAddr, net::SocketAddr,
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
}, },
}; };
@ -85,13 +85,11 @@ impl ActiveConnection {
.number_of_transactions_per_unistream; .number_of_transactions_per_unistream;
let max_number_of_connections = self.connection_parameters.max_number_of_connections; let max_number_of_connections = self.connection_parameters.max_number_of_connections;
let max_uni_stream_connections: u64 = (compute_max_allowed_uni_streams( let max_uni_stream_connections = compute_max_allowed_uni_streams(
identity_stakes.peer_type, identity_stakes.peer_type,
identity_stakes.stakes, identity_stakes.stakes,
identity_stakes.total_stakes, identity_stakes.total_stakes,
) * max_number_of_connections) as u64; );
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let exit_signal = self.exit_signal.clone(); let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new( let connection_pool = QuicConnectionPool::new(
identity, identity,
@ -99,6 +97,8 @@ impl ActiveConnection {
addr, addr,
self.connection_parameters, self.connection_parameters,
exit_signal.clone(), exit_signal.clone(),
max_number_of_connections,
max_uni_stream_connections,
); );
loop { loop {
@ -107,11 +107,6 @@ impl ActiveConnection {
break; break;
} }
if task_counter.load(Ordering::Relaxed) >= max_uni_stream_connections {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
continue;
}
tokio::select! { tokio::select! {
tx = transaction_reciever.recv() => { tx = transaction_reciever.recv() => {
// exit signal set // exit signal set
@ -146,22 +141,22 @@ impl ActiveConnection {
} }
} }
// queue getting full and a connection poll is getting slower let connection_pool = match connection_pool.get_pooled_connection().await {
// add more connections to the pool Ok(connection_pool) => connection_pool,
if connection_pool.len() < max_number_of_connections { Err(_) => break,
connection_pool.add_connection().await; };
NB_QUIC_CONNECTIONS.inc();
}
let task_counter = task_counter.clone();
let connection_pool = connection_pool.clone();
tokio::spawn(async move { tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed); let PooledConnection {
connection,
permit
} = connection_pool;
// permit will be used to send all the transaction and then destroyed
let _permit = permit;
NB_QUIC_TASKS.inc(); NB_QUIC_TASKS.inc();
connection_pool.send_transaction_batch(txs).await; for tx in txs {
connection.send_transaction(tx).await;
}
NB_QUIC_TASKS.dec(); NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
}); });
}, },
_ = exit_oneshot_channel.recv() => { _ = exit_oneshot_channel.recv() => {
@ -216,8 +211,7 @@ impl TpuConnectionManager {
Self { Self {
endpoints: RotatingQueue::new(number_of_clients, || { endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()) QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
}) }),
.await,
identity_to_active_connection: Arc::new(DashMap::new()), identity_to_active_connection: Arc::new(DashMap::new()),
} }
} }

View File

@ -16,7 +16,6 @@ use std::{
net::{IpAddr, Ipv4Addr}, net::{IpAddr, Ipv4Addr},
sync::Arc, sync::Arc,
}; };
use tokio::time::Duration;
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> = static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
@ -35,11 +34,7 @@ lazy_static::lazy_static! {
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
pub struct TpuServiceConfig { pub struct TpuServiceConfig {
pub fanout_slots: u64, pub fanout_slots: u64,
pub number_of_leaders_to_cache: usize,
pub clusterinfo_refresh_time: Duration,
pub leader_schedule_update_frequency: Duration,
pub maximum_transaction_in_queue: usize, pub maximum_transaction_in_queue: usize,
pub maximum_number_of_errors: usize,
pub quic_connection_params: QuicConnectionParameters, pub quic_connection_params: QuicConnectionParameters,
pub tpu_connection_path: TpuConnectionPath, pub tpu_connection_path: TpuConnectionPath,
} }