more optimizations for tpu connection

This commit is contained in:
Godmode Galactus 2023-05-02 06:51:24 +02:00
parent acdac2061e
commit 0bbaba34da
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
1 changed files with 72 additions and 101 deletions

View File

@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{HashMap, VecDeque},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
@ -17,7 +17,6 @@ use quinn::{
};
use solana_sdk::{
pubkey::Pubkey,
quic::{QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO},
};
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use tokio::{
@ -30,7 +29,7 @@ use crate::workers::TxProps;
use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes};
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10);
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(1);
const CONNECTION_RETRY_COUNT: usize = 10;
lazy_static::lazy_static! {
@ -174,10 +173,11 @@ impl ActiveConnection {
identity,
e
);
return true;
}
}
Err(_) => {
warn!("timeout while writing transaction for {}", identity);
warn!("timeout while finishing transaction for {}", identity);
}
}
@ -213,8 +213,13 @@ impl ActiveConnection {
exit_signal: Arc<AtomicBool>,
last_stable_id: Arc<AtomicU64>,
) {
for _ in 0..3 {
if exit_signal.load(Ordering::Relaxed) {
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);
}
for _ in 0..CONNECTION_RETRY_COUNT {
if queue.is_empty() || exit_signal.load(Ordering::Relaxed) {
// return
return;
}
@ -253,7 +258,8 @@ impl ActiveConnection {
}
};
let mut retry = false;
for tx in &txs {
while !queue.is_empty() {
let tx = queue.pop_front().unwrap();
let (stream, retry_conn) =
Self::open_unistream(conn.clone(), last_stable_id.clone()).await;
if let Some(send_stream) = stream {
@ -263,7 +269,7 @@ impl ActiveConnection {
retry = Self::write_all(
send_stream,
tx,
&tx,
identity,
last_stable_id.clone(),
conn.stable_id() as u64,
@ -272,6 +278,10 @@ impl ActiveConnection {
} else {
retry = retry_conn;
}
if retry {
queue.push_back(tx);
break;
}
}
if !retry {
break;
@ -279,28 +289,6 @@ impl ActiveConnection {
}
}
// copied from solana code base
fn compute_receive_window_ratio_for_staked_node(
max_stake: u64,
min_stake: u64,
stake: u64,
) -> u64 {
if stake > max_stake {
return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
}
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
if max_stake > min_stake {
let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
let b = max_ratio as f64 - ((max_stake as f64) * a);
let ratio = (a * stake as f64) + b;
ratio.round() as u64
} else {
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
}
}
fn check_for_confirmation(
txs_sent_store: &Arc<DashMap<String, TxProps>>,
signature: String,
@ -330,16 +318,7 @@ impl ActiveConnection {
identity_stakes.stakes,
identity_stakes.total_stakes,
) as u64;
let number_of_transactions_per_unistream = match identity_stakes.peer_type {
solana_streamer::nonblocking::quic::ConnectionPeerType::Staked => {
Self::compute_receive_window_ratio_for_staked_node(
identity_stakes.max_stakes,
identity_stakes.min_stakes,
identity_stakes.stakes,
)
}
solana_streamer::nonblocking::quic::ConnectionPeerType::Unstaked => 1,
};
let number_of_transactions_per_unistream = 5;
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let mut connection: Option<Arc<RwLock<Connection>>> = None;
@ -351,80 +330,72 @@ impl ActiveConnection {
break;
}
if task_counter.load(Ordering::Relaxed) > max_uni_stream_connections {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
if task_counter.load(Ordering::Relaxed) >= max_uni_stream_connections {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
continue;
}
tokio::select! {
tx_or_timeout = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, transaction_reciever.recv() ) => {
tx = transaction_reciever.recv() => {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}
match tx_or_timeout {
Ok(tx) => {
let first_tx: Vec<u8> = match tx {
Ok((sig, tx)) => {
if Self::check_for_confirmation(&txs_sent_store, sig) {
// transaction is already confirmed/ no need to send
continue;
}
tx
},
Err(e) => {
error!(
"Broadcast channel error on recv for {} error {}",
identity, e
);
continue;
}
};
let mut txs = vec![first_tx];
for _ in 1..number_of_transactions_per_unistream {
if let Ok((signature, tx)) = transaction_reciever.try_recv() {
if Self::check_for_confirmation(&txs_sent_store, signature) {
continue;
}
txs.push(tx);
}
let first_tx: Vec<u8> = match tx {
Ok((sig, tx)) => {
if Self::check_for_confirmation(&txs_sent_store, sig) {
// transaction is already confirmed/ no need to send
continue;
}
if connection.is_none() {
// initial connection
let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await;
if let Some(conn) = conn {
// could connect
connection = Some(Arc::new(RwLock::new(conn)));
} else {
break;
}
}
let task_counter = task_counter.clone();
let endpoint = endpoint.clone();
let exit_signal = exit_signal.clone();
let addr = addr.clone();
let connection = connection.clone();
let last_stable_id = last_stable_id.clone();
tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed);
NB_QUIC_TASKS.inc();
let connection = connection.unwrap();
Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await;
NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
});
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
tx
},
Err(_) => {
// timed out
Err(e) => {
error!(
"Broadcast channel error on recv for {} error {}",
identity, e
);
continue;
}
};
let mut txs = vec![first_tx];
for _ in 1..number_of_transactions_per_unistream {
if let Ok((signature, tx)) = transaction_reciever.try_recv() {
if Self::check_for_confirmation(&txs_sent_store, signature) {
continue;
}
txs.push(tx);
}
}
if connection.is_none() {
// initial connection
let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await;
if let Some(conn) = conn {
// could connect
connection = Some(Arc::new(RwLock::new(conn)));
} else {
break;
}
}
let task_counter = task_counter.clone();
let endpoint = endpoint.clone();
let exit_signal = exit_signal.clone();
let addr = addr.clone();
let connection = connection.clone();
let last_stable_id = last_stable_id.clone();
tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed);
NB_QUIC_TASKS.inc();
let connection = connection.unwrap();
Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await;
NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
});
},
_ = exit_oneshot_channel.recv() => {
break;