Merge pull request #128 from blockworks-foundation/optimizing_for_high_flow

Optimizing lite-rpc to handle heavy transaction flow
This commit is contained in:
galactus 2023-04-24 20:27:53 +02:00 committed by GitHub
commit 1128832ed2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 50 deletions

View File

@ -1,5 +1,5 @@
use std::{ use std::{
collections::HashMap, collections::{HashMap, VecDeque},
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, AtomicU64, Ordering},
@ -15,10 +15,7 @@ use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig, TokioRuntime, TransportConfig,
}; };
use solana_sdk::{ use solana_sdk::pubkey::Pubkey;
pubkey::Pubkey,
quic::{QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO},
};
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use tokio::{ use tokio::{
sync::{broadcast::Receiver, broadcast::Sender, RwLock}, sync::{broadcast::Receiver, broadcast::Sender, RwLock},
@ -30,6 +27,7 @@ use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes};
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10); const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10);
const CONNECTION_RETRY_COUNT: usize = 10; const CONNECTION_RETRY_COUNT: usize = 10;
const TRANSACTIONS_SENT_PER_TASK: usize = 5;
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> = static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
@ -197,14 +195,14 @@ impl ActiveConnection {
async fn send_transaction_batch( async fn send_transaction_batch(
connection: Arc<RwLock<Connection>>, connection: Arc<RwLock<Connection>>,
txs: Vec<Vec<u8>>, mut txs: VecDeque<Vec<u8>>,
identity: Pubkey, identity: Pubkey,
endpoint: Endpoint, endpoint: Endpoint,
socket_addr: SocketAddr, socket_addr: SocketAddr,
exit_signal: Arc<AtomicBool>, exit_signal: Arc<AtomicBool>,
last_stable_id: Arc<AtomicU64>, last_stable_id: Arc<AtomicU64>,
) { ) {
for _ in 0..3 { for _ in 0..CONNECTION_RETRY_COUNT {
if exit_signal.load(Ordering::Relaxed) { if exit_signal.load(Ordering::Relaxed) {
// return // return
return; return;
@ -244,24 +242,34 @@ impl ActiveConnection {
} }
}; };
let mut retry = false; let mut retry = false;
for tx in &txs { while !txs.is_empty() {
let tx = txs.pop_front().unwrap();
let (stream, retry_conn) = let (stream, retry_conn) =
Self::open_unistream(conn.clone(), last_stable_id.clone()).await; Self::open_unistream(conn.clone(), last_stable_id.clone()).await;
if retry_conn {
txs.push_back(tx);
retry = true;
break;
}
if let Some(send_stream) = stream { if let Some(send_stream) = stream {
if exit_signal.load(Ordering::Relaxed) { if exit_signal.load(Ordering::Relaxed) {
return; return;
} }
retry = Self::write_all( if Self::write_all(
send_stream, send_stream,
tx, &tx,
identity, identity,
last_stable_id.clone(), last_stable_id.clone(),
conn.stable_id() as u64, conn.stable_id() as u64,
) )
.await; .await
} else { {
retry = retry_conn; txs.push_back(tx);
retry = true;
break;
}
} }
} }
if !retry { if !retry {
@ -270,28 +278,6 @@ impl ActiveConnection {
} }
} }
// copied from solana code base
fn compute_receive_window_ratio_for_staked_node(
max_stake: u64,
min_stake: u64,
stake: u64,
) -> u64 {
if stake > max_stake {
return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
}
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
if max_stake > min_stake {
let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
let b = max_ratio as f64 - ((max_stake as f64) * a);
let ratio = (a * stake as f64) + b;
ratio.round() as u64
} else {
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
}
}
async fn listen( async fn listen(
transaction_reciever: Receiver<Vec<u8>>, transaction_reciever: Receiver<Vec<u8>>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>, exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
@ -310,16 +296,7 @@ impl ActiveConnection {
identity_stakes.stakes, identity_stakes.stakes,
identity_stakes.total_stakes, identity_stakes.total_stakes,
) as u64; ) as u64;
let number_of_transactions_per_unistream = match identity_stakes.peer_type { let number_of_transactions_per_unistream = TRANSACTIONS_SENT_PER_TASK;
solana_streamer::nonblocking::quic::ConnectionPeerType::Staked => {
Self::compute_receive_window_ratio_for_staked_node(
identity_stakes.max_stakes,
identity_stakes.min_stakes,
identity_stakes.stakes,
)
}
solana_streamer::nonblocking::quic::ConnectionPeerType::Unstaked => 1,
};
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0)); let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let mut connection: Option<Arc<RwLock<Connection>>> = None; let mut connection: Option<Arc<RwLock<Connection>>> = None;
@ -331,7 +308,7 @@ impl ActiveConnection {
break; break;
} }
if task_counter.load(Ordering::Relaxed) > max_uni_stream_connections { if task_counter.load(Ordering::Relaxed) >= max_uni_stream_connections {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
continue; continue;
} }
@ -356,10 +333,11 @@ impl ActiveConnection {
} }
}; };
let mut txs = vec![first_tx]; let mut txs = VecDeque::new();
txs.push_back(first_tx);
for _ in 1..number_of_transactions_per_unistream { for _ in 1..number_of_transactions_per_unistream {
if let Ok(tx) = transaction_reciever.try_recv() { if let Ok(tx) = transaction_reciever.try_recv() {
txs.push(tx); txs.push_back(tx);
} }
} }
@ -403,7 +381,9 @@ impl ActiveConnection {
}; };
} }
drop(transaction_reciever); drop(transaction_reciever);
NB_QUIC_CONNECTIONS.dec(); if connection.is_some() {
NB_QUIC_CONNECTIONS.dec();
}
NB_QUIC_ACTIVE_CONNECTIONS.dec(); NB_QUIC_ACTIVE_CONNECTIONS.dec();
} }

View File

@ -35,7 +35,7 @@ const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and cont
const CLUSTERINFO_REFRESH_TIME: u64 = 60; // refresh cluster every minute const CLUSTERINFO_REFRESH_TIME: u64 = 60; // refresh cluster every minute
const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s
const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400; const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 16384; const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> = static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =