Creating more connection with each leader

This commit is contained in:
godmodegalactus 2023-06-21 19:04:20 +00:00 committed by Godmode Galactus
parent 999df3aea2
commit ea8d60abc2
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
7 changed files with 400 additions and 223 deletions

View File

@ -9,5 +9,6 @@ pub mod structures;
pub mod subscription_handler;
pub mod subscription_sink;
pub mod tx_store;
pub mod quic_connection;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;

227
core/src/quic_connection.rs Normal file
View File

@ -0,0 +1,227 @@
use std::{sync::{Arc, atomic::{AtomicBool, AtomicU64, Ordering}}, net::SocketAddr, collections::VecDeque};
use anyhow::bail;
use log::{warn};
use quinn::{Endpoint, Connection};
use solana_sdk::pubkey::Pubkey;
use tokio::sync::RwLock;
use crate::{rotating_queue::RotatingQueue, quic_connection_utils::{QuicConnectionUtils, QuicConnectionError, QuicConnectionParameters}};
pub type EndpointPool = RotatingQueue<Endpoint>;
#[derive(Clone)]
pub struct QuicConnection {
connection: Arc<RwLock<Connection>>,
last_stable_id: Arc<AtomicU64>,
endpoint: Endpoint,
identity: Pubkey,
socket_address : SocketAddr,
connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
timeout_counters: Arc<AtomicU64>,
}
impl QuicConnection {
pub async fn new(identity: Pubkey, endpoints: EndpointPool, socket_address: SocketAddr, connection_params: QuicConnectionParameters, exit_signal: Arc<AtomicBool>) -> anyhow::Result<Self> {
let endpoint = endpoints.get().await.expect("endpoint pool is not suppose to be empty");
let connection = QuicConnectionUtils::connect(
identity.clone(),
false,
endpoint.clone(),
socket_address.clone(),
connection_params.connection_timeout.clone(),
connection_params.connection_retry_count,
exit_signal.clone(),
).await;
match connection {
Some(connection) => {
Ok(Self {
connection: Arc::new(RwLock::new(connection)),
last_stable_id: Arc::new(AtomicU64::new(0)),
endpoint,
identity,
socket_address,
connection_params,
exit_signal,
timeout_counters: Arc::new(AtomicU64::new(0)),
})
},
None => {
bail!("Could not establish connection");
}
}
}
async fn get_connection(&self) -> Option<Connection> {
// get new connection reset if necessary
let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize;
let conn = self.connection.read().await;
if conn.stable_id() == last_stable_id {
let current_stable_id = conn.stable_id();
// problematic connection
drop(conn);
let mut conn = self.connection.write().await;
// check may be already written by another thread
if conn.stable_id() != current_stable_id {
Some(conn.clone())
} else {
let new_conn = QuicConnectionUtils::connect(
self.identity.clone(),
true,
self.endpoint.clone(),
self.socket_address.clone(),
self.connection_params.connection_timeout.clone(),
self.connection_params.connection_retry_count,
self.exit_signal.clone(),
)
.await;
if let Some(new_conn) = new_conn {
*conn = new_conn;
Some(conn.clone())
} else {
// could not connect
None
}
}
} else {
Some(conn.clone())
}
}
pub async fn send_transaction_batch(
&self,
txs: Vec<Vec<u8>>
) {
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);
}
let connection_retry_count = self.connection_params.connection_retry_count;
for _ in 0..connection_retry_count {
if queue.is_empty() || self.exit_signal.load(Ordering::Relaxed) {
// return
return;
}
let mut do_retry = false;
while !queue.is_empty() {
let tx = queue.pop_front().unwrap();
let connection = self.get_connection().await;
if self.exit_signal.load(Ordering::Relaxed) {
return;
}
if let Some(connection) = connection {
let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream(connection, self.connection_params.unistream_timeout).await {
Ok(send_stream) => {
match QuicConnectionUtils::write_all(
send_stream,
&tx,
self.identity.clone(),
self.connection_params,
).await {
Ok(()) => {
// do nothing
},
Err(QuicConnectionError::ConnectionError{retry}) => {
do_retry = retry;
},
Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
}
},
Err(QuicConnectionError::ConnectionError{retry}) => {
do_retry = retry;
},
Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
}
if do_retry {
self.last_stable_id.store(current_stable_id, Ordering::Relaxed);
queue.push_back(tx);
break;
}
} else {
warn!("Could not establish connection with {}", self.identity.to_string());
break;
}
}
if !do_retry {
break;
}
}
}
pub fn get_timeout_count(&self) -> u64 {
self.timeout_counters.load(Ordering::Relaxed)
}
pub fn reset_timeouts(&self) {
self.timeout_counters.store(0, Ordering::Relaxed);
}
}
#[derive(Clone)]
pub struct QuicConnectionPool {
connections : RotatingQueue<QuicConnection>,
connection_parameters: QuicConnectionParameters,
endpoints: EndpointPool,
identity: Pubkey,
socket_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
}
impl QuicConnectionPool {
pub fn new(identity: Pubkey, endpoints: EndpointPool, socket_address: SocketAddr, connection_parameters: QuicConnectionParameters, exit_signal: Arc<AtomicBool>) -> Self {
let connections = RotatingQueue::new_empty();
Self {
connections,
identity,
endpoints,
socket_address,
connection_parameters,
exit_signal,
}
}
pub async fn send_transaction_batch(
&self,
txs: Vec<Vec<u8>>
) {
let connection = match self.connections.get().await {
Some(connection) => connection,
None => {
let new_connection = QuicConnection::new(self.identity.clone(), self.endpoints.clone(), self.socket_address.clone(), self.connection_parameters, self.exit_signal.clone()).await;
if new_connection.is_err() {
return;
}
let new_connection = new_connection.expect("Cannot establish a connection");
self.connections.add(new_connection.clone()).await;
new_connection
}
};
connection.send_transaction_batch(txs).await;
}
pub async fn add_connection(&self) {
let new_connection = QuicConnection::new(self.identity.clone(), self.endpoints.clone(), self.socket_address.clone(), self.connection_parameters, self.exit_signal.clone()).await;
if let Ok(new_connection) = new_connection {
self.connections.add(new_connection).await;
}
}
pub async fn remove_connection(&self, min_count: usize) {
if self.connections.len() > min_count {
self.connections.remove().await;
}
}
pub fn len(&self) -> usize {
self.connections.len()
}
}

View File

@ -5,18 +5,36 @@ use quinn::{
};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::VecDeque,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::{sync::RwLock, time::timeout};
use tokio::{time::timeout};
const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
pub enum QuicConnectionError {
TimeOut,
ConnectionError {
retry: bool,
},
}
#[derive(Clone, Copy)]
pub struct QuicConnectionParameters{
pub connection_timeout : Duration,
pub unistream_timeout: Duration,
pub write_timeout: Duration,
pub finalize_timeout: Duration,
pub connection_retry_count: usize,
pub max_number_of_connections : usize,
pub number_of_transactions_per_unistream: usize,
}
pub struct QuicConnectionUtils {}
impl QuicConnectionUtils {
@ -97,7 +115,6 @@ impl QuicConnectionUtils {
connection_timeout: Duration,
connection_retry_count: usize,
exit_signal: Arc<AtomicBool>,
on_connect: fn(),
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
@ -107,7 +124,6 @@ impl QuicConnectionUtils {
};
match conn {
Ok(conn) => {
on_connect();
return Some(conn);
}
Err(e) => {
@ -125,12 +141,10 @@ impl QuicConnectionUtils {
mut send_stream: SendStream,
tx: &Vec<u8>,
identity: Pubkey,
last_stable_id: Arc<AtomicU64>,
connection_stable_id: u64,
connection_timeout: Duration,
) -> bool {
connection_params: QuicConnectionParameters,
) -> Result<(), QuicConnectionError> {
let write_timeout_res =
timeout(connection_timeout, send_stream.write_all(tx.as_slice())).await;
timeout(connection_params.write_timeout, send_stream.write_all(tx.as_slice())).await;
match write_timeout_res {
Ok(write_res) => {
if let Err(e) = write_res {
@ -139,142 +153,46 @@ impl QuicConnectionUtils {
identity,
e
);
// retry
last_stable_id.store(connection_stable_id, Ordering::Relaxed);
return true;
return Err(QuicConnectionError::ConnectionError {retry: true});
}
}
Err(_) => {
warn!("timeout while writing transaction for {}", identity);
return Err(QuicConnectionError::TimeOut)
}
}
let finish_timeout_res = timeout(connection_timeout, send_stream.finish()).await;
let finish_timeout_res = timeout(connection_params.finalize_timeout, send_stream.finish()).await;
match finish_timeout_res {
Ok(finish_res) => {
if let Err(e) = finish_res {
last_stable_id.store(connection_stable_id, Ordering::Relaxed);
trace!(
"Error while writing transaction for {}, error {}",
"Error while finishing transaction for {}, error {}",
identity,
e
);
return true;
return Err(QuicConnectionError::ConnectionError {retry: false});
}
}
Err(_) => {
warn!("timeout while finishing transaction for {}", identity);
return Err(QuicConnectionError::TimeOut)
}
}
false
Ok(())
}
pub async fn open_unistream(
connection: Connection,
last_stable_id: Arc<AtomicU64>,
connection_timeout: Duration,
) -> (Option<SendStream>, bool) {
) -> Result<SendStream, QuicConnectionError> {
match timeout(connection_timeout, connection.open_uni()).await {
Ok(Ok(unistream)) => (Some(unistream), false),
Ok(Ok(unistream)) => Ok(unistream),
Ok(Err(_)) => {
// reset connection for next retry
last_stable_id.store(connection.stable_id() as u64, Ordering::Relaxed);
(None, true)
}
Err(_) => (None, false),
}
}
#[allow(clippy::too_many_arguments)]
pub async fn send_transaction_batch(
connection: Arc<RwLock<Connection>>,
txs: Vec<Vec<u8>>,
identity: Pubkey,
endpoint: Endpoint,
socket_addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
last_stable_id: Arc<AtomicU64>,
connection_timeout: Duration,
connection_retry_count: usize,
on_connect: fn(),
) {
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);
}
for _ in 0..connection_retry_count {
if queue.is_empty() || exit_signal.load(Ordering::Relaxed) {
// return
return;
}
// get new connection reset if necessary
let conn = {
let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize;
let conn = connection.read().await;
if conn.stable_id() == last_stable_id {
let current_stable_id = conn.stable_id();
// problematic connection
drop(conn);
let mut conn = connection.write().await;
// check may be already written by another thread
if conn.stable_id() != current_stable_id {
conn.clone()
} else {
let new_conn = Self::connect(
identity,
true,
endpoint.clone(),
socket_addr,
connection_timeout,
connection_retry_count,
exit_signal.clone(),
on_connect,
)
.await;
if let Some(new_conn) = new_conn {
*conn = new_conn;
conn.clone()
} else {
// could not connect
return;
}
}
} else {
conn.clone()
}
};
let mut retry = false;
while !queue.is_empty() {
let tx = queue.pop_front().unwrap();
let (stream, retry_conn) =
Self::open_unistream(conn.clone(), last_stable_id.clone(), connection_timeout)
.await;
if let Some(send_stream) = stream {
if exit_signal.load(Ordering::Relaxed) {
return;
}
retry = Self::write_all(
send_stream,
&tx,
identity,
last_stable_id.clone(),
conn.stable_id() as u64,
connection_timeout,
)
.await;
} else {
retry = retry_conn;
}
if retry {
queue.push_back(tx);
break;
}
}
if !retry {
break;
Err(QuicConnectionError::ConnectionError{retry: true})
}
Err(_) => Err(QuicConnectionError::TimeOut),
}
}
}

View File

@ -1,34 +1,69 @@
use std::{
collections::VecDeque,
sync::{Arc, RwLock},
sync::{Arc, atomic::{AtomicU64, Ordering}},
};
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct RotatingQueue<T> {
deque: Arc<RwLock<VecDeque<T>>>,
deque: Arc<Mutex<VecDeque<T>>>,
count: Arc<AtomicU64>,
}
impl<T: Clone> RotatingQueue<T> {
pub fn new<F>(size: usize, creator_functor: F) -> Self
pub async fn new<F>(size: usize, creator_functor: F) -> Self
where
F: Fn() -> T,
{
let item = Self {
deque: Arc::new(RwLock::new(VecDeque::<T>::new())),
deque: Arc::new(Mutex::new(VecDeque::<T>::new())),
count: Arc::new(AtomicU64::new(0)),
};
{
let mut deque = item.deque.write().unwrap();
let mut deque = item.deque.lock().await;
for _i in 0..size {
deque.push_back(creator_functor());
}
item.count.store(size as u64, Ordering::Relaxed);
}
item
}
pub fn get(&self) -> T {
let mut deque = self.deque.write().unwrap();
let current = deque.pop_front().unwrap();
deque.push_back(current.clone());
current
pub fn new_empty() -> Self
{
let item = Self {
deque: Arc::new(Mutex::new(VecDeque::<T>::new())),
count: Arc::new(AtomicU64::new(0)),
};
item
}
pub async fn get(&self) -> Option<T> {
let mut deque = self.deque.lock().await;
if !deque.is_empty() {
let current = deque.pop_front().unwrap();
deque.push_back(current.clone());
Some(current)
} else {
None
}
}
pub async fn add(&self, instance: T) {
let mut queue = self.deque.lock().await;
queue.push_front(instance);
self.count.fetch_add(1, Ordering::Relaxed);
}
pub async fn remove(&self) {
if self.len() > 0 {
let mut queue = self.deque.lock().await;
queue.pop_front();
self.count.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn len(&self) -> usize {
self.count.load(Ordering::Relaxed) as usize
}
}

View File

@ -10,7 +10,7 @@ use solana_lite_rpc_services::{
block_listenser::BlockListener,
metrics_capture::MetricsCapture,
prometheus_sync::PrometheusSync,
tpu_utils::tpu_service::TpuService,
tpu_utils::tpu_service::{TpuService, TpuServiceConfig},
transaction_replayer::TransactionReplayer,
transaction_service::{TransactionService, TransactionServiceBuilder},
tx_sender::WireTransaction,
@ -24,7 +24,7 @@ use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_core::{
block_store::{BlockInformation, BlockStore},
tx_store::{empty_tx_store, TxStore},
AnyhowJoinHandle,
AnyhowJoinHandle, quic_connection_utils::QuicConnectionParameters,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{
@ -98,10 +98,28 @@ impl LiteBridge {
let tx_store = empty_tx_store();
let tpu_service = TpuService::new(
current_slot,
let tpu_config = TpuServiceConfig {
fanout_slots,
number_of_leaders_to_cache: 1024,
clusterinfo_refresh_time: Duration::from_secs(60*60),
leader_schedule_update_frequency: Duration::from_secs(10),
maximum_transaction_in_queue: 20000,
maximum_number_of_errors: 10,
quic_connection_params : QuicConnectionParameters {
connection_timeout: Duration::from_secs(500),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200),
max_number_of_connections: 10,
unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 10,
}
};
let tpu_service = TpuService::new(
tpu_config,
Arc::new(identity),
current_slot,
rpc_client.clone(),
ws_addr,
tx_store.clone(),

View File

@ -1,10 +1,10 @@
use dashmap::DashMap;
use log::{error, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{Connection, Endpoint};
use quinn::{Endpoint};
use solana_lite_rpc_core::{
quic_connection_utils::QuicConnectionUtils, rotating_queue::RotatingQueue,
structures::identity_stakes::IdentityStakes, tx_store::TxStore,
quic_connection_utils::{QuicConnectionUtils, QuicConnectionParameters}, rotating_queue::RotatingQueue,
structures::identity_stakes::IdentityStakes, tx_store::TxStore, quic_connection::{QuicConnectionPool},
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
@ -15,12 +15,8 @@ use std::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::{broadcast::Receiver, broadcast::Sender, RwLock};
pub const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
pub const CONNECTION_RETRY_COUNT: usize = 10;
use tokio::sync::{broadcast::Receiver, broadcast::Sender};
lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
@ -33,34 +29,34 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("literpc_quic_tasks", "Number of connections to keep asked by tpu service")).unwrap();
}
#[derive(Clone)]
struct ActiveConnection {
endpoint: Endpoint,
endpoints: RotatingQueue<Endpoint>,
identity: Pubkey,
tpu_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
txs_sent_store: TxStore,
connection_parameters: QuicConnectionParameters,
}
impl ActiveConnection {
pub fn new(
endpoint: Endpoint,
endpoints: RotatingQueue<Endpoint>,
tpu_address: SocketAddr,
identity: Pubkey,
txs_sent_store: TxStore,
connection_parameters: QuicConnectionParameters,
) -> Self {
Self {
endpoint,
endpoints,
tpu_address,
identity,
exit_signal: Arc::new(AtomicBool::new(false)),
txs_sent_store,
connection_parameters,
}
}
fn on_connect() {
NB_QUIC_CONNECTIONS.inc();
}
fn check_for_confirmation(txs_sent_store: &TxStore, signature: String) -> bool {
match txs_sent_store.get(&signature) {
Some(props) => props.status.is_some(),
@ -70,30 +66,30 @@ impl ActiveConnection {
#[allow(clippy::too_many_arguments)]
async fn listen(
&self,
transaction_reciever: Receiver<(String, Vec<u8>)>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
identity: Pubkey,
identity_stakes: IdentityStakes,
txs_sent_store: TxStore,
) {
NB_QUIC_ACTIVE_CONNECTIONS.inc();
let mut transaction_reciever = transaction_reciever;
let mut exit_oneshot_channel = exit_oneshot_channel;
let identity = self.identity;
let max_uni_stream_connections: u64 = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
) as u64;
let number_of_transactions_per_unistream = 5;
let number_of_transactions_per_unistream = self.connection_parameters.number_of_transactions_per_unistream;
let max_number_of_connections = self.connection_parameters.max_number_of_connections;
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let mut connection: Option<Arc<RwLock<Connection>>> = None;
let last_stable_id: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(identity, self.endpoints.clone(), addr, self.connection_parameters, exit_signal.clone());
loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
@ -139,51 +135,24 @@ impl ActiveConnection {
}
}
if connection.is_none() {
// initial connection
let conn = QuicConnectionUtils::connect(
identity,
false,
endpoint.clone(),
addr,
QUIC_CONNECTION_TIMEOUT,
CONNECTION_RETRY_COUNT,
exit_signal.clone(),
Self::on_connect).await;
if let Some(conn) = conn {
// could connect
connection = Some(Arc::new(RwLock::new(conn)));
} else {
break;
if txs.len() >= number_of_transactions_per_unistream - 1 {
// queue getting full and a connection poll is getting slower
// add more connections to the pool
if connection_pool.len() < max_number_of_connections {
connection_pool.add_connection().await;
}
} else if txs.len() == 1 {
// low traffic / reduce connection till minimum 2
connection_pool.remove_connection(2).await;
}
let task_counter = task_counter.clone();
let endpoint = endpoint.clone();
let exit_signal = exit_signal.clone();
let connection = connection.clone();
let last_stable_id = last_stable_id.clone();
let connection_pool = connection_pool.clone();
tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed);
NB_QUIC_TASKS.inc();
let connection = connection.unwrap();
QuicConnectionUtils::send_transaction_batch(
connection,
txs,
identity,
endpoint,
addr,
exit_signal,
last_stable_id,
QUIC_CONNECTION_TIMEOUT,
CONNECTION_RETRY_COUNT,
|| {
// do nothing as we are using the same connection
}
).await;
connection_pool.send_transaction_batch(txs).await;
NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
});
@ -204,19 +173,14 @@ impl ActiveConnection {
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
identity_stakes: IdentityStakes,
) {
let endpoint = self.endpoint.clone();
let addr = self.tpu_address;
let exit_signal = self.exit_signal.clone();
let identity = self.identity;
let txs_sent_store = self.txs_sent_store.clone();
let this = self.clone();
tokio::spawn(async move {
Self::listen(
this.listen(
transaction_reciever,
exit_oneshot_channel,
endpoint,
addr,
exit_signal,
identity,
identity_stakes,
txs_sent_store,
)
@ -236,12 +200,12 @@ pub struct TpuConnectionManager {
}
impl TpuConnectionManager {
pub fn new(certificate: rustls::Certificate, key: rustls::PrivateKey, fanout: usize) -> Self {
let number_of_clients = if fanout > 5 { fanout / 4 } else { 1 };
pub async fn new(certificate: rustls::Certificate, key: rustls::PrivateKey, fanout: usize) -> Self {
let number_of_clients = fanout * 2;
Self {
endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
}),
}).await,
identity_to_active_connection: Arc::new(DashMap::new()),
}
}
@ -252,17 +216,18 @@ impl TpuConnectionManager {
connections_to_keep: HashMap<Pubkey, SocketAddr>,
identity_stakes: IdentityStakes,
txs_sent_store: TxStore,
connection_parameters: QuicConnectionParameters,
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(identity).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr);
let endpoint = self.endpoints.get();
let active_connection = ActiveConnection::new(
endpoint,
self.endpoints.clone(),
*socket_addr,
*identity,
txs_sent_store.clone(),
connection_parameters,
);
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let (sx, rx) = tokio::sync::mpsc::channel(1);

View File

@ -5,7 +5,7 @@ use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
leader_schedule::LeaderSchedule, solana_utils::SolanaUtils,
structures::identity_stakes::IdentityStakes, tx_store::TxStore, AnyhowJoinHandle,
structures::identity_stakes::IdentityStakes, tx_store::TxStore, AnyhowJoinHandle, quic_connection_utils::QuicConnectionParameters,
};
use super::tpu_connection_manager::TpuConnectionManager;
@ -26,11 +26,6 @@ use tokio::{
time::{Duration, Instant},
};
const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and contact info of next 1024 leaders in the queue
const CLUSTERINFO_REFRESH_TIME: u64 = 60 * 60; // stakes every 1hrs
const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
const MAX_NB_ERRORS: usize = 10;
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
@ -46,31 +41,42 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("literpc_estimated_slot", "Estimated slot seen by last rpc")).unwrap();
}
#[derive(Clone, Copy)]
pub struct TpuServiceConfig {
pub fanout_slots: u64,
pub number_of_leaders_to_cache: usize,
pub clusterinfo_refresh_time: Duration,
pub leader_schedule_update_frequency: Duration,
pub maximum_transaction_in_queue: usize,
pub maximum_number_of_errors: usize,
pub quic_connection_params: QuicConnectionParameters,
}
#[derive(Clone)]
pub struct TpuService {
current_slot: Arc<AtomicU64>,
estimated_slot: Arc<AtomicU64>,
fanout_slots: u64,
rpc_client: Arc<RpcClient>,
rpc_ws_address: String,
broadcast_sender: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
tpu_connection_manager: Arc<TpuConnectionManager>,
identity: Arc<Keypair>,
identity_stakes: Arc<RwLock<IdentityStakes>>,
txs_sent_store: TxStore,
leader_schedule: Arc<LeaderSchedule>,
config: TpuServiceConfig,
identity: Pubkey,
}
impl TpuService {
pub async fn new(
current_slot: Slot,
fanout_slots: u64,
config: TpuServiceConfig,
identity: Arc<Keypair>,
current_slot: Slot,
rpc_client: Arc<RpcClient>,
rpc_ws_address: String,
txs_sent_store: TxStore,
) -> anyhow::Result<Self> {
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let (sender, _) = tokio::sync::broadcast::channel(config.maximum_transaction_in_queue);
let (certificate, key) = new_self_signed_tls_certificate(
identity.as_ref(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
@ -78,20 +84,20 @@ impl TpuService {
.expect("Failed to initialize QUIC client certificates");
let tpu_connection_manager =
TpuConnectionManager::new(certificate, key, fanout_slots as usize);
TpuConnectionManager::new(certificate, key, config.fanout_slots as usize).await;
Ok(Self {
current_slot: Arc::new(AtomicU64::new(current_slot)),
estimated_slot: Arc::new(AtomicU64::new(current_slot)),
leader_schedule: Arc::new(LeaderSchedule::new(CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE)),
fanout_slots,
leader_schedule: Arc::new(LeaderSchedule::new(config.number_of_leaders_to_cache)),
rpc_client,
rpc_ws_address,
broadcast_sender: Arc::new(sender),
tpu_connection_manager: Arc::new(tpu_connection_manager),
identity,
identity_stakes: Arc::new(RwLock::new(IdentityStakes::default())),
txs_sent_store,
identity: identity.pubkey(),
config,
})
}
@ -102,7 +108,7 @@ impl TpuService {
let mut lock = self.identity_stakes.write().await;
*lock = SolanaUtils::get_stakes_for_identity(
self.rpc_client.clone(),
self.identity.pubkey(),
self.identity,
)
.await?;
}
@ -136,7 +142,7 @@ impl TpuService {
current_slot
};
let fanout = self.fanout_slots;
let fanout = self.config.fanout_slots;
let last_slot = estimated_slot + fanout;
let next_leaders = self.leader_schedule.get_leaders(load_slot, last_slot).await;
@ -159,6 +165,7 @@ impl TpuService {
connections_to_keep,
*identity_stakes,
self.txs_sent_store.clone(),
self.config.quic_connection_params,
)
.await;
}
@ -191,6 +198,14 @@ impl TpuService {
nb_errror += 1;
log::info!("Got error while polling slot {}", err);
if nb_errror > self.config.maximum_number_of_errors {
error!(
"Reached max amount of errors to fetch latest slot, exiting poll slot loop"
);
break;
}
} else {
nb_errror = 0;
}
bail!("Reached max amount of errors to fetch latest slot, exiting poll slot loop")
@ -209,10 +224,8 @@ impl TpuService {
let this = self.clone();
let update_leader_schedule_service = tokio::spawn(async move {
let mut last_cluster_info_update = Instant::now();
let leader_schedule_update_interval =
Duration::from_secs(LEADER_SCHEDULE_UPDATE_INTERVAL);
let cluster_info_update_interval = Duration::from_secs(CLUSTERINFO_REFRESH_TIME);
let leader_schedule_update_interval = this.config.leader_schedule_update_frequency;
let cluster_info_update_interval = this.config.clusterinfo_refresh_time;
loop {
tokio::time::sleep(leader_schedule_update_interval).await;
info!("update leader schedule and cluster nodes");