Fmt, clippy and other minor changes

This commit is contained in:
Godmode Galactus 2023-06-22 10:10:35 +02:00
parent ea8d60abc2
commit f63c8538c3
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
7 changed files with 174 additions and 105 deletions

View File

@ -2,6 +2,7 @@ pub mod block_processor;
pub mod block_store;
pub mod leader_schedule;
pub mod notifications;
pub mod quic_connection;
pub mod quic_connection_utils;
pub mod rotating_queue;
pub mod solana_utils;
@ -9,6 +10,5 @@ pub mod structures;
pub mod subscription_handler;
pub mod subscription_sink;
pub mod tx_store;
pub mod quic_connection;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;

View File

@ -1,10 +1,20 @@
use std::{sync::{Arc, atomic::{AtomicBool, AtomicU64, Ordering}}, net::SocketAddr, collections::VecDeque};
use crate::{
quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils},
rotating_queue::RotatingQueue,
};
use anyhow::bail;
use log::{warn};
use quinn::{Endpoint, Connection};
use log::warn;
use quinn::{Connection, Endpoint};
use solana_sdk::pubkey::Pubkey;
use std::{
collections::VecDeque,
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::RwLock;
use crate::{rotating_queue::RotatingQueue, quic_connection_utils::{QuicConnectionUtils, QuicConnectionError, QuicConnectionParameters}};
pub type EndpointPool = RotatingQueue<Endpoint>;
@ -14,38 +24,46 @@ pub struct QuicConnection {
last_stable_id: Arc<AtomicU64>,
endpoint: Endpoint,
identity: Pubkey,
socket_address : SocketAddr,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
timeout_counters: Arc<AtomicU64>,
}
impl QuicConnection {
pub async fn new(identity: Pubkey, endpoints: EndpointPool, socket_address: SocketAddr, connection_params: QuicConnectionParameters, exit_signal: Arc<AtomicBool>) -> anyhow::Result<Self> {
let endpoint = endpoints.get().await.expect("endpoint pool is not suppose to be empty");
pub async fn new(
identity: Pubkey,
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<Self> {
let endpoint = endpoints
.get()
.await
.expect("endpoint pool is not suppose to be empty");
let connection = QuicConnectionUtils::connect(
identity.clone(),
identity,
false,
endpoint.clone(),
socket_address.clone(),
connection_params.connection_timeout.clone(),
socket_address,
connection_params.connection_timeout,
connection_params.connection_retry_count,
exit_signal.clone(),
).await;
)
.await;
match connection {
Some(connection) => {
Ok(Self {
connection: Arc::new(RwLock::new(connection)),
last_stable_id: Arc::new(AtomicU64::new(0)),
endpoint,
identity,
socket_address,
connection_params,
exit_signal,
timeout_counters: Arc::new(AtomicU64::new(0)),
})
},
Some(connection) => Ok(Self {
connection: Arc::new(RwLock::new(connection)),
last_stable_id: Arc::new(AtomicU64::new(0)),
endpoint,
identity,
socket_address,
connection_params,
exit_signal,
timeout_counters: Arc::new(AtomicU64::new(0)),
}),
None => {
bail!("Could not establish connection");
}
@ -66,11 +84,11 @@ impl QuicConnection {
Some(conn.clone())
} else {
let new_conn = QuicConnectionUtils::connect(
self.identity.clone(),
self.identity,
true,
self.endpoint.clone(),
self.socket_address.clone(),
self.connection_params.connection_timeout.clone(),
self.socket_address,
self.connection_params.connection_timeout,
self.connection_params.connection_retry_count,
self.exit_signal.clone(),
)
@ -88,10 +106,7 @@ impl QuicConnection {
}
}
pub async fn send_transaction_batch(
&self,
txs: Vec<Vec<u8>>
) {
pub async fn send_transaction_batch(&self, txs: Vec<Vec<u8>>) {
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);
@ -102,51 +117,62 @@ impl QuicConnection {
// return
return;
}
let mut do_retry = false;
while !queue.is_empty() {
let tx = queue.pop_front().unwrap();
let connection = self.get_connection().await;
if self.exit_signal.load(Ordering::Relaxed) {
return;
}
if let Some(connection) = connection {
let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream(connection, self.connection_params.unistream_timeout).await {
match QuicConnectionUtils::open_unistream(
connection,
self.connection_params.unistream_timeout,
)
.await
{
Ok(send_stream) => {
match QuicConnectionUtils::write_all(
send_stream,
&tx,
self.identity.clone(),
self.identity,
self.connection_params,
).await {
)
.await
{
Ok(()) => {
// do nothing
},
Err(QuicConnectionError::ConnectionError{retry}) => {
}
Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry;
},
}
Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
}
},
Err(QuicConnectionError::ConnectionError{retry}) => {
}
Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry;
},
}
Err(QuicConnectionError::TimeOut) => {
self.timeout_counters.fetch_add(1, Ordering::Relaxed);
}
}
if do_retry {
self.last_stable_id.store(current_stable_id, Ordering::Relaxed);
self.last_stable_id
.store(current_stable_id, Ordering::Relaxed);
queue.push_back(tx);
break;
}
} else {
warn!("Could not establish connection with {}", self.identity.to_string());
warn!(
"Could not establish connection with {}",
self.identity.to_string()
);
break;
}
}
@ -167,7 +193,7 @@ impl QuicConnection {
#[derive(Clone)]
pub struct QuicConnectionPool {
connections : RotatingQueue<QuicConnection>,
connections: RotatingQueue<QuicConnection>,
connection_parameters: QuicConnectionParameters,
endpoints: EndpointPool,
identity: Pubkey,
@ -176,8 +202,14 @@ pub struct QuicConnectionPool {
}
impl QuicConnectionPool {
pub fn new(identity: Pubkey, endpoints: EndpointPool, socket_address: SocketAddr, connection_parameters: QuicConnectionParameters, exit_signal: Arc<AtomicBool>) -> Self {
let connections = RotatingQueue::new_empty();
pub fn new(
identity: Pubkey,
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
) -> Self {
let connections = RotatingQueue::new_empty();
Self {
connections,
identity,
@ -188,14 +220,18 @@ impl QuicConnectionPool {
}
}
pub async fn send_transaction_batch(
&self,
txs: Vec<Vec<u8>>
) {
pub async fn send_transaction_batch(&self, txs: Vec<Vec<u8>>) {
let connection = match self.connections.get().await {
Some(connection) => connection,
None => {
let new_connection = QuicConnection::new(self.identity.clone(), self.endpoints.clone(), self.socket_address.clone(), self.connection_parameters, self.exit_signal.clone()).await;
let new_connection = QuicConnection::new(
self.identity,
self.endpoints.clone(),
self.socket_address,
self.connection_parameters,
self.exit_signal.clone(),
)
.await;
if new_connection.is_err() {
return;
}
@ -209,14 +245,21 @@ impl QuicConnectionPool {
}
pub async fn add_connection(&self) {
let new_connection = QuicConnection::new(self.identity.clone(), self.endpoints.clone(), self.socket_address.clone(), self.connection_parameters, self.exit_signal.clone()).await;
let new_connection = QuicConnection::new(
self.identity,
self.endpoints.clone(),
self.socket_address,
self.connection_parameters,
self.exit_signal.clone(),
)
.await;
if let Ok(new_connection) = new_connection {
self.connections.add(new_connection).await;
}
}
pub async fn remove_connection(&self, min_count: usize) {
if self.connections.len() > min_count {
pub async fn remove_connection(&self) {
if !self.connections.is_empty() {
self.connections.remove().await;
}
}
@ -224,4 +267,8 @@ impl QuicConnectionPool {
pub fn len(&self) -> usize {
self.connections.len()
}
}
pub fn is_empty(&self) -> bool {
self.connections.is_empty()
}
}

View File

@ -12,26 +12,23 @@ use std::{
},
time::Duration,
};
use tokio::{time::timeout};
use tokio::time::timeout;
const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
pub enum QuicConnectionError {
TimeOut,
ConnectionError {
retry: bool,
},
ConnectionError { retry: bool },
}
#[derive(Clone, Copy)]
pub struct QuicConnectionParameters{
pub connection_timeout : Duration,
pub struct QuicConnectionParameters {
pub connection_timeout: Duration,
pub unistream_timeout: Duration,
pub write_timeout: Duration,
pub finalize_timeout: Duration,
pub connection_retry_count: usize,
pub max_number_of_connections : usize,
pub max_number_of_connections: usize,
pub number_of_transactions_per_unistream: usize,
}
@ -143,8 +140,11 @@ impl QuicConnectionUtils {
identity: Pubkey,
connection_params: QuicConnectionParameters,
) -> Result<(), QuicConnectionError> {
let write_timeout_res =
timeout(connection_params.write_timeout, send_stream.write_all(tx.as_slice())).await;
let write_timeout_res = timeout(
connection_params.write_timeout,
send_stream.write_all(tx.as_slice()),
)
.await;
match write_timeout_res {
Ok(write_res) => {
if let Err(e) = write_res {
@ -153,16 +153,17 @@ impl QuicConnectionUtils {
identity,
e
);
return Err(QuicConnectionError::ConnectionError {retry: true});
return Err(QuicConnectionError::ConnectionError { retry: true });
}
}
Err(_) => {
warn!("timeout while writing transaction for {}", identity);
return Err(QuicConnectionError::TimeOut)
return Err(QuicConnectionError::TimeOut);
}
}
let finish_timeout_res = timeout(connection_params.finalize_timeout, send_stream.finish()).await;
let finish_timeout_res =
timeout(connection_params.finalize_timeout, send_stream.finish()).await;
match finish_timeout_res {
Ok(finish_res) => {
if let Err(e) = finish_res {
@ -171,12 +172,12 @@ impl QuicConnectionUtils {
identity,
e
);
return Err(QuicConnectionError::ConnectionError {retry: false});
return Err(QuicConnectionError::ConnectionError { retry: false });
}
}
Err(_) => {
warn!("timeout while finishing transaction for {}", identity);
return Err(QuicConnectionError::TimeOut)
return Err(QuicConnectionError::TimeOut);
}
}
@ -189,9 +190,7 @@ impl QuicConnectionUtils {
) -> Result<SendStream, QuicConnectionError> {
match timeout(connection_timeout, connection.open_uni()).await {
Ok(Ok(unistream)) => Ok(unistream),
Ok(Err(_)) => {
Err(QuicConnectionError::ConnectionError{retry: true})
}
Ok(Err(_)) => Err(QuicConnectionError::ConnectionError { retry: true }),
Err(_) => Err(QuicConnectionError::TimeOut),
}
}

View File

@ -1,6 +1,9 @@
use std::{
collections::VecDeque,
sync::{Arc, atomic::{AtomicU64, Ordering}},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::sync::Mutex;
@ -29,13 +32,11 @@ impl<T: Clone> RotatingQueue<T> {
item
}
pub fn new_empty() -> Self
{
let item = Self {
pub fn new_empty() -> Self {
Self {
deque: Arc::new(Mutex::new(VecDeque::<T>::new())),
count: Arc::new(AtomicU64::new(0)),
};
item
}
}
pub async fn get(&self) -> Option<T> {
@ -53,10 +54,10 @@ impl<T: Clone> RotatingQueue<T> {
let mut queue = self.deque.lock().await;
queue.push_front(instance);
self.count.fetch_add(1, Ordering::Relaxed);
}
}
pub async fn remove(&self) {
if self.len() > 0 {
if !self.is_empty() {
let mut queue = self.deque.lock().await;
queue.pop_front();
self.count.fetch_sub(1, Ordering::Relaxed);
@ -66,4 +67,8 @@ impl<T: Clone> RotatingQueue<T> {
pub fn len(&self) -> usize {
self.count.load(Ordering::Relaxed) as usize
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

View File

@ -23,8 +23,9 @@ use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_core::{
block_store::{BlockInformation, BlockStore},
quic_connection_utils::QuicConnectionParameters,
tx_store::{empty_tx_store, TxStore},
AnyhowJoinHandle, quic_connection_utils::QuicConnectionParameters,
AnyhowJoinHandle,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{
@ -101,19 +102,19 @@ impl LiteBridge {
let tpu_config = TpuServiceConfig {
fanout_slots,
number_of_leaders_to_cache: 1024,
clusterinfo_refresh_time: Duration::from_secs(60*60),
clusterinfo_refresh_time: Duration::from_secs(60 * 60),
leader_schedule_update_frequency: Duration::from_secs(10),
maximum_transaction_in_queue: 20000,
maximum_number_of_errors: 10,
quic_connection_params : QuicConnectionParameters {
connection_timeout: Duration::from_secs(500),
quic_connection_params: QuicConnectionParameters {
connection_timeout: Duration::from_secs(1),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200),
max_number_of_connections: 10,
unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 10,
}
},
};
let tpu_service = TpuService::new(

View File

@ -1,10 +1,13 @@
use dashmap::DashMap;
use log::{error, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{Endpoint};
use quinn::Endpoint;
use solana_lite_rpc_core::{
quic_connection_utils::{QuicConnectionUtils, QuicConnectionParameters}, rotating_queue::RotatingQueue,
structures::identity_stakes::IdentityStakes, tx_store::TxStore, quic_connection::{QuicConnectionPool},
quic_connection::QuicConnectionPool,
quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils},
rotating_queue::RotatingQueue,
structures::identity_stakes::IdentityStakes,
tx_store::TxStore,
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
@ -83,13 +86,21 @@ impl ActiveConnection {
identity_stakes.stakes,
identity_stakes.total_stakes,
) as u64;
let number_of_transactions_per_unistream = self.connection_parameters.number_of_transactions_per_unistream;
let number_of_transactions_per_unistream = self
.connection_parameters
.number_of_transactions_per_unistream;
let max_number_of_connections = self.connection_parameters.max_number_of_connections;
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(identity, self.endpoints.clone(), addr, self.connection_parameters, exit_signal.clone());
let connection_pool = QuicConnectionPool::new(
identity,
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_signal.clone(),
);
loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
@ -140,10 +151,14 @@ impl ActiveConnection {
// add more connections to the pool
if connection_pool.len() < max_number_of_connections {
connection_pool.add_connection().await;
NB_QUIC_CONNECTIONS.inc();
}
} else if txs.len() == 1 {
// low traffic / reduce connection till minimum 2
connection_pool.remove_connection(2).await;
// low traffic / reduce connection till minimum 1
if connection_pool.len() > 1 {
connection_pool.remove_connection().await;
NB_QUIC_CONNECTIONS.dec();
}
}
let task_counter = task_counter.clone();
@ -200,12 +215,17 @@ pub struct TpuConnectionManager {
}
impl TpuConnectionManager {
pub async fn new(certificate: rustls::Certificate, key: rustls::PrivateKey, fanout: usize) -> Self {
pub async fn new(
certificate: rustls::Certificate,
key: rustls::PrivateKey,
fanout: usize,
) -> Self {
let number_of_clients = fanout * 2;
Self {
endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
}).await,
})
.await,
identity_to_active_connection: Arc::new(DashMap::new()),
}
}

View File

@ -4,8 +4,9 @@ use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
leader_schedule::LeaderSchedule, solana_utils::SolanaUtils,
structures::identity_stakes::IdentityStakes, tx_store::TxStore, AnyhowJoinHandle, quic_connection_utils::QuicConnectionParameters,
leader_schedule::LeaderSchedule, quic_connection_utils::QuicConnectionParameters,
solana_utils::SolanaUtils, structures::identity_stakes::IdentityStakes, tx_store::TxStore,
AnyhowJoinHandle,
};
use super::tpu_connection_manager::TpuConnectionManager;
@ -26,7 +27,6 @@ use tokio::{
time::{Duration, Instant},
};
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_cluster_nodes", "Number of cluster nodes in saved")).unwrap();
@ -106,11 +106,8 @@ impl TpuService {
// update stakes for the identity
{
let mut lock = self.identity_stakes.write().await;
*lock = SolanaUtils::get_stakes_for_identity(
self.rpc_client.clone(),
self.identity,
)
.await?;
*lock = SolanaUtils::get_stakes_for_identity(self.rpc_client.clone(), self.identity)
.await?;
}
Ok(())
}