From dcc640f80ab5eb8b8f8e465d54b5aa378200ff62 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Wed, 13 Sep 2023 16:11:02 +0200 Subject: [PATCH 1/3] using semaphore in tpu connection manager and removing unwanted code --- lite-rpc/src/lib.rs | 2 +- lite-rpc/src/main.rs | 4 ---- .../src/tpu_utils/tpu_connection_manager.rs | 18 ++++++------------ services/src/tpu_utils/tpu_service.rs | 5 ----- 4 files changed, 7 insertions(+), 22 deletions(-) diff --git a/lite-rpc/src/lib.rs b/lite-rpc/src/lib.rs index 32b6e69f..43da1f66 100644 --- a/lite-rpc/src/lib.rs +++ b/lite-rpc/src/lib.rs @@ -28,7 +28,7 @@ pub const DEFAULT_FANOUT_SIZE: u64 = 10; #[from_env] pub const MAX_RETRIES: usize = 40; -pub const DEFAULT_RETRY_TIMEOUT: u64 = 2; +pub const DEFAULT_RETRY_TIMEOUT: u64 = 1; #[from_env] pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 9da86402..e7b13670 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -145,11 +145,7 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc) -> anyhow::R let tpu_config = TpuServiceConfig { fanout_slots: fanout_size, - number_of_leaders_to_cache: 1024, - clusterinfo_refresh_time: Duration::from_secs(60 * 60), - leader_schedule_update_frequency: Duration::from_secs(10), maximum_transaction_in_queue: 20000, - maximum_number_of_errors: 10, quic_connection_params: QuicConnectionParameters { connection_timeout: Duration::from_secs(1), connection_retry_count: 10, diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index d7fbe13e..bb5978cc 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -14,7 +14,7 @@ use std::{ collections::HashMap, net::SocketAddr, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, }; @@ -85,13 +85,13 @@ impl ActiveConnection { .number_of_transactions_per_unistream; let max_number_of_connections = self.connection_parameters.max_number_of_connections; - let max_uni_stream_connections: u64 = (compute_max_allowed_uni_streams( + let max_uni_stream_connections = compute_max_allowed_uni_streams( identity_stakes.peer_type, identity_stakes.stakes, identity_stakes.total_stakes, - ) * max_number_of_connections) as u64; + ) * max_number_of_connections; - let task_counter: Arc = Arc::new(AtomicU64::new(0)); + let task_counter = Arc::new(tokio::sync::Semaphore::new(max_uni_stream_connections)); let exit_signal = self.exit_signal.clone(); let connection_pool = QuicConnectionPool::new( identity, @@ -107,11 +107,6 @@ impl ActiveConnection { break; } - if task_counter.load(Ordering::Relaxed) >= max_uni_stream_connections { - tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - continue; - } - tokio::select! { tx = transaction_reciever.recv() => { // exit signal set @@ -155,13 +150,12 @@ impl ActiveConnection { let task_counter = task_counter.clone(); let connection_pool = connection_pool.clone(); - + let permit = task_counter.acquire_owned().await.expect("Should get permit"); tokio::spawn(async move { - task_counter.fetch_add(1, Ordering::Relaxed); + let _ = permit; NB_QUIC_TASKS.inc(); connection_pool.send_transaction_batch(txs).await; NB_QUIC_TASKS.dec(); - task_counter.fetch_sub(1, Ordering::Relaxed); }); }, _ = exit_oneshot_channel.recv() => { diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index 4c11713c..703293a1 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -16,7 +16,6 @@ use std::{ net::{IpAddr, Ipv4Addr}, sync::Arc, }; -use tokio::time::Duration; lazy_static::lazy_static! { static ref NB_CLUSTER_NODES: GenericGauge = @@ -35,11 +34,7 @@ lazy_static::lazy_static! { #[derive(Clone, Copy)] pub struct TpuServiceConfig { pub fanout_slots: u64, - pub number_of_leaders_to_cache: usize, - pub clusterinfo_refresh_time: Duration, - pub leader_schedule_update_frequency: Duration, pub maximum_transaction_in_queue: usize, - pub maximum_number_of_errors: usize, pub quic_connection_params: QuicConnectionParameters, pub tpu_connection_path: TpuConnectionPath, } From 3e4c6ed933f010e28fe3e01160d1add7194c76a9 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Thu, 14 Sep 2023 14:21:48 +0200 Subject: [PATCH 2/3] creating task counters per quic connection --- core/src/quic_connection.rs | 314 ++++++++---------- core/src/structures/rotating_queue.rs | 64 +--- .../src/tpu_utils/tpu_connection_manager.rs | 36 +- 3 files changed, 183 insertions(+), 231 deletions(-) diff --git a/core/src/quic_connection.rs b/core/src/quic_connection.rs index dc4b64c2..7fc4ece9 100644 --- a/core/src/quic_connection.rs +++ b/core/src/quic_connection.rs @@ -2,25 +2,26 @@ use crate::{ quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils}, structures::rotating_queue::RotatingQueue, }; -use anyhow::bail; +use anyhow::Context; +use futures::FutureExt; use log::warn; use quinn::{Connection, Endpoint}; use solana_sdk::pubkey::Pubkey; use std::{ - collections::VecDeque, net::SocketAddr, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, }; -use tokio::sync::RwLock; +use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore}; pub type EndpointPool = RotatingQueue; #[derive(Clone)] +#[warn(clippy::rc_clone_in_vec_init)] pub struct QuicConnection { - connection: Arc>, + connection: Arc>>, last_stable_id: Arc, endpoint: Endpoint, identity: Pubkey, @@ -31,150 +32,140 @@ pub struct QuicConnection { } impl QuicConnection { - pub async fn new( + pub fn new( identity: Pubkey, - endpoints: EndpointPool, + endpoint: Endpoint, socket_address: SocketAddr, connection_params: QuicConnectionParameters, exit_signal: Arc, - ) -> anyhow::Result { - let endpoint = endpoints - .get() - .await - .expect("endpoint pool is not suppose to be empty"); - let connection = QuicConnectionUtils::connect( + ) -> Self { + Self { + connection: Arc::new(RwLock::new(None)), + last_stable_id: Arc::new(AtomicU64::new(0)), + endpoint, identity, - false, - endpoint.clone(), socket_address, - connection_params.connection_timeout, - connection_params.connection_retry_count, - exit_signal.clone(), - ) - .await; - - match connection { - Some(connection) => Ok(Self { - connection: Arc::new(RwLock::new(connection)), - last_stable_id: Arc::new(AtomicU64::new(0)), - endpoint, - identity, - socket_address, - connection_params, - exit_signal, - timeout_counters: Arc::new(AtomicU64::new(0)), - }), - None => { - bail!("Could not establish connection"); - } + connection_params, + exit_signal, + timeout_counters: Arc::new(AtomicU64::new(0)), } } + async fn connect(&self) -> Option { + QuicConnectionUtils::connect( + self.identity, + true, + self.endpoint.clone(), + self.socket_address, + self.connection_params.connection_timeout, + self.connection_params.connection_retry_count, + self.exit_signal.clone(), + ) + .await + } + async fn get_connection(&self) -> Option { // get new connection reset if necessary let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize; - let conn = self.connection.read().await; - if conn.stable_id() == last_stable_id { - let current_stable_id = conn.stable_id(); - // problematic connection - drop(conn); - let mut conn = self.connection.write().await; - // check may be already written by another thread - if conn.stable_id() != current_stable_id { - Some(conn.clone()) - } else { - let new_conn = QuicConnectionUtils::connect( - self.identity, - true, - self.endpoint.clone(), - self.socket_address, - self.connection_params.connection_timeout, - self.connection_params.connection_retry_count, - self.exit_signal.clone(), - ) - .await; - if let Some(new_conn) = new_conn { - *conn = new_conn; - Some(conn.clone()) + let conn = self.connection.read().await.clone(); + match conn { + Some(connection) => { + if connection.stable_id() == last_stable_id { + let current_stable_id = connection.stable_id(); + // problematic connection + let mut conn = self.connection.write().await; + let connection = conn.clone().expect("Connection cannot be None here"); + // check may be already written by another thread + if connection.stable_id() != current_stable_id { + Some(connection) + } else { + let new_conn = QuicConnectionUtils::connect( + self.identity, + true, + self.endpoint.clone(), + self.socket_address, + self.connection_params.connection_timeout, + self.connection_params.connection_retry_count, + self.exit_signal.clone(), + ) + .await; + if let Some(new_conn) = new_conn { + *conn = Some(new_conn); + conn.clone() + } else { + // could not connect + None + } + } } else { - // could not connect - None + Some(connection.clone()) } } - } else { - Some(conn.clone()) + None => self.connect().await, } } - pub async fn send_transaction_batch(&self, txs: Vec>) { - let mut queue = VecDeque::new(); - for tx in txs { - queue.push_back(tx); - } + pub async fn send_transaction(&self, tx: Vec) { let connection_retry_count = self.connection_params.connection_retry_count; for _ in 0..connection_retry_count { - if queue.is_empty() || self.exit_signal.load(Ordering::Relaxed) { + if self.exit_signal.load(Ordering::Relaxed) { // return return; } let mut do_retry = false; - while !queue.is_empty() { - let tx = queue.pop_front().unwrap(); - let connection = self.get_connection().await; + let connection = self.get_connection().await; - if self.exit_signal.load(Ordering::Relaxed) { - return; - } + if self.exit_signal.load(Ordering::Relaxed) { + return; + } - if let Some(connection) = connection { - let current_stable_id = connection.stable_id() as u64; - match QuicConnectionUtils::open_unistream( - connection, - self.connection_params.unistream_timeout, - ) - .await - { - Ok(send_stream) => { - match QuicConnectionUtils::write_all( - send_stream, - &tx, - self.identity, - self.connection_params, - ) - .await - { - Ok(()) => { - // do nothing - } - Err(QuicConnectionError::ConnectionError { retry }) => { - do_retry = retry; - } - Err(QuicConnectionError::TimeOut) => { - self.timeout_counters.fetch_add(1, Ordering::Relaxed); - } + if let Some(connection) = connection { + let current_stable_id = connection.stable_id() as u64; + match QuicConnectionUtils::open_unistream( + connection, + self.connection_params.unistream_timeout, + ) + .await + { + Ok(send_stream) => { + match QuicConnectionUtils::write_all( + send_stream, + &tx, + self.identity, + self.connection_params, + ) + .await + { + Ok(()) => { + // do nothing + } + Err(QuicConnectionError::ConnectionError { retry }) => { + do_retry = retry; + } + Err(QuicConnectionError::TimeOut) => { + self.timeout_counters.fetch_add(1, Ordering::Relaxed); } } - Err(QuicConnectionError::ConnectionError { retry }) => { - do_retry = retry; - } - Err(QuicConnectionError::TimeOut) => { - self.timeout_counters.fetch_add(1, Ordering::Relaxed); - } } - if do_retry { - self.last_stable_id - .store(current_stable_id, Ordering::Relaxed); - queue.push_back(tx); - break; + Err(QuicConnectionError::ConnectionError { retry }) => { + do_retry = retry; } - } else { - warn!( - "Could not establish connection with {}", - self.identity.to_string() - ); + Err(QuicConnectionError::TimeOut) => { + self.timeout_counters.fetch_add(1, Ordering::Relaxed); + } + } + if do_retry { + self.last_stable_id + .store(current_stable_id, Ordering::Relaxed); break; } + } else { + warn!( + "Could not establish connection with {}", + self.identity.to_string() + ); + break; } if !do_retry { break; @@ -193,12 +184,15 @@ impl QuicConnection { #[derive(Clone)] pub struct QuicConnectionPool { - connections: RotatingQueue, - connection_parameters: QuicConnectionParameters, - endpoints: EndpointPool, - identity: Pubkey, - socket_address: SocketAddr, - exit_signal: Arc, + connections: Vec, + // counting semaphore is ideal way to manage backpressure on the connection + // because a connection can create only N unistream connections + transactions_in_sending_semaphore: Vec>, +} + +pub struct PooledConnection { + pub connection: QuicConnection, + pub permit: OwnedSemaphorePermit, } impl QuicConnectionPool { @@ -208,60 +202,46 @@ impl QuicConnectionPool { socket_address: SocketAddr, connection_parameters: QuicConnectionParameters, exit_signal: Arc, + nb_connection: usize, + max_number_of_unistream_connection: usize, ) -> Self { - let connections = RotatingQueue::new_empty(); + let mut connections = vec![]; + // should not clone connection each time but create a new one + for _ in 0..nb_connection { + connections.push(QuicConnection::new( + identity, + endpoints.get().expect("Should get and endpoint"), + socket_address, + connection_parameters, + exit_signal.clone(), + )); + } Self { connections, - identity, - endpoints, - socket_address, - connection_parameters, - exit_signal, + transactions_in_sending_semaphore: { + // should create a new semaphore each time so avoid vec[elem;count] + let mut v = Vec::with_capacity(nb_connection); + (0..nb_connection).for_each(|_| { + v.push(Arc::new(Semaphore::new(max_number_of_unistream_connection))) + }); + v + }, } } - pub async fn send_transaction_batch(&self, txs: Vec>) { - let connection = match self.connections.get().await { - Some(connection) => connection, - None => { - let new_connection = QuicConnection::new( - self.identity, - self.endpoints.clone(), - self.socket_address, - self.connection_parameters, - self.exit_signal.clone(), - ) - .await; - if new_connection.is_err() { - return; - } - let new_connection = new_connection.expect("Cannot establish a connection"); - self.connections.add(new_connection.clone()).await; - new_connection - } - }; - - connection.send_transaction_batch(txs).await; - } - - pub async fn add_connection(&self) { - let new_connection = QuicConnection::new( - self.identity, - self.endpoints.clone(), - self.socket_address, - self.connection_parameters, - self.exit_signal.clone(), + pub async fn get_pooled_connection(&self) -> anyhow::Result { + let (permit, index, _others) = futures::future::select_all( + self.transactions_in_sending_semaphore + .iter() + .map(|x| x.clone().acquire_owned().boxed()), ) .await; - if let Ok(new_connection) = new_connection { - self.connections.add(new_connection).await; - } - } - - pub async fn remove_connection(&self) { - if !self.connections.is_empty() { - self.connections.remove().await; - } + drop(_others); + let permit = permit.context("Cannot aquire permit, connection pool erased")?; + Ok(PooledConnection { + connection: self.connections[index].clone(), + permit, + }) } pub fn len(&self) -> usize { diff --git a/core/src/structures/rotating_queue.rs b/core/src/structures/rotating_queue.rs index 5280718b..a3821e83 100644 --- a/core/src/structures/rotating_queue.rs +++ b/core/src/structures/rotating_queue.rs @@ -1,74 +1,46 @@ -use std::{ - collections::VecDeque, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, }; -use tokio::sync::Mutex; #[derive(Clone)] -pub struct RotatingQueue { - deque: Arc>>, - count: Arc, +pub struct RotatingQueue { + elements: Vec, + current: Arc, } impl RotatingQueue { - pub async fn new(size: usize, creator_functor: F) -> Self + pub fn new(size: usize, creator_functor: F) -> Self where F: Fn() -> T, { - let item = Self { - deque: Arc::new(Mutex::new(VecDeque::::new())), - count: Arc::new(AtomicU64::new(0)), + let mut item = Self { + elements: Vec::::new(), + current: Arc::new(AtomicU64::new(0)), }; { - let mut deque = item.deque.lock().await; for _i in 0..size { - deque.push_back(creator_functor()); + item.elements.push(creator_functor()); } - item.count.store(size as u64, Ordering::Relaxed); } item } - pub fn new_empty() -> Self { - Self { - deque: Arc::new(Mutex::new(VecDeque::::new())), - count: Arc::new(AtomicU64::new(0)), - } - } - - pub async fn get(&self) -> Option { - let mut deque = self.deque.lock().await; - if !deque.is_empty() { - let current = deque.pop_front().unwrap(); - deque.push_back(current.clone()); - Some(current) + pub fn get(&self) -> Option { + if !self.elements.is_empty() { + let current = self.current.fetch_add(1, Ordering::Relaxed); + let index = current % (self.elements.len() as u64); + Some(self.elements[index as usize].clone()) } else { None } } - pub async fn add(&self, instance: T) { - let mut queue = self.deque.lock().await; - queue.push_front(instance); - self.count.fetch_add(1, Ordering::Relaxed); - } - - pub async fn remove(&self) { - if !self.is_empty() { - let mut queue = self.deque.lock().await; - queue.pop_front(); - self.count.fetch_sub(1, Ordering::Relaxed); - } - } - pub fn len(&self) -> usize { - self.count.load(Ordering::Relaxed) as usize + self.elements.len() } pub fn is_empty(&self) -> bool { - self.len() == 0 + self.elements.is_empty() } } diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index bb5978cc..b0eb34d2 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -3,7 +3,7 @@ use log::{error, trace}; use prometheus::{core::GenericGauge, opts, register_int_gauge}; use quinn::Endpoint; use solana_lite_rpc_core::{ - quic_connection::QuicConnectionPool, + quic_connection::{PooledConnection, QuicConnectionPool}, quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils}, stores::tx_store::TxStore, structures::{identity_stakes::IdentityStakesData, rotating_queue::RotatingQueue}, @@ -89,9 +89,7 @@ impl ActiveConnection { identity_stakes.peer_type, identity_stakes.stakes, identity_stakes.total_stakes, - ) * max_number_of_connections; - - let task_counter = Arc::new(tokio::sync::Semaphore::new(max_uni_stream_connections)); + ); let exit_signal = self.exit_signal.clone(); let connection_pool = QuicConnectionPool::new( identity, @@ -99,6 +97,8 @@ impl ActiveConnection { addr, self.connection_parameters, exit_signal.clone(), + max_number_of_connections, + max_uni_stream_connections, ); loop { @@ -141,20 +141,21 @@ impl ActiveConnection { } } - // queue getting full and a connection poll is getting slower - // add more connections to the pool - if connection_pool.len() < max_number_of_connections { - connection_pool.add_connection().await; - NB_QUIC_CONNECTIONS.inc(); - } - - let task_counter = task_counter.clone(); - let connection_pool = connection_pool.clone(); - let permit = task_counter.acquire_owned().await.expect("Should get permit"); + let connection_pool = match connection_pool.get_pooled_connection().await { + Ok(connection_pool) => connection_pool, + Err(_) => break, + }; tokio::spawn(async move { - let _ = permit; + let PooledConnection { + connection, + permit + } = connection_pool; + // permit will be used to send all the transaction and then destroyed + let _permit = permit; NB_QUIC_TASKS.inc(); - connection_pool.send_transaction_batch(txs).await; + for tx in txs { + connection.send_transaction(tx).await; + } NB_QUIC_TASKS.dec(); }); }, @@ -210,8 +211,7 @@ impl TpuConnectionManager { Self { endpoints: RotatingQueue::new(number_of_clients, || { QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()) - }) - .await, + }), identity_to_active_connection: Arc::new(DashMap::new()), } } From 41d36a8e3b6d5b28c039aa5403015e0cc52f8959 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Thu, 14 Sep 2023 15:09:35 +0200 Subject: [PATCH 3/3] bug fix add connection when created first time --- core/src/quic_connection.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/core/src/quic_connection.rs b/core/src/quic_connection.rs index 7fc4ece9..ca2720e5 100644 --- a/core/src/quic_connection.rs +++ b/core/src/quic_connection.rs @@ -79,16 +79,7 @@ impl QuicConnection { if connection.stable_id() != current_stable_id { Some(connection) } else { - let new_conn = QuicConnectionUtils::connect( - self.identity, - true, - self.endpoint.clone(), - self.socket_address, - self.connection_params.connection_timeout, - self.connection_params.connection_retry_count, - self.exit_signal.clone(), - ) - .await; + let new_conn = self.connect().await; if let Some(new_conn) = new_conn { *conn = Some(new_conn); conn.clone() @@ -101,7 +92,11 @@ impl QuicConnection { Some(connection.clone()) } } - None => self.connect().await, + None => { + let connection = self.connect().await; + *self.connection.write().await = connection.clone(); + connection + } } }