From 0bbaba34dac5b9b5a9bd8b3970952c27c7b955f7 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Tue, 2 May 2023 06:51:24 +0200 Subject: [PATCH] more optimizations for tpu connection --- .../tpu_utils/tpu_connection_manager.rs | 173 ++++++++---------- 1 file changed, 72 insertions(+), 101 deletions(-) diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index 61874c85..7243b519 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, VecDeque}, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -17,7 +17,6 @@ use quinn::{ }; use solana_sdk::{ pubkey::Pubkey, - quic::{QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO}, }; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use tokio::{ @@ -30,7 +29,7 @@ use crate::workers::TxProps; use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes}; pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; -const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10); +const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(1); const CONNECTION_RETRY_COUNT: usize = 10; lazy_static::lazy_static! { @@ -174,10 +173,11 @@ impl ActiveConnection { identity, e ); + return true; } } Err(_) => { - warn!("timeout while writing transaction for {}", identity); + warn!("timeout while finishing transaction for {}", identity); } } @@ -213,8 +213,13 @@ impl ActiveConnection { exit_signal: Arc, last_stable_id: Arc, ) { - for _ in 0..3 { - if exit_signal.load(Ordering::Relaxed) { + + let mut queue = VecDeque::new(); + for tx in txs { + queue.push_back(tx); + } + for _ in 0..CONNECTION_RETRY_COUNT { + if queue.is_empty() || exit_signal.load(Ordering::Relaxed) { // return return; } @@ -253,7 +258,8 @@ impl ActiveConnection { } }; let mut retry = false; - for tx in &txs { + while !queue.is_empty() { + let tx = queue.pop_front().unwrap(); let (stream, retry_conn) = Self::open_unistream(conn.clone(), last_stable_id.clone()).await; if let Some(send_stream) = stream { @@ -263,7 +269,7 @@ impl ActiveConnection { retry = Self::write_all( send_stream, - tx, + &tx, identity, last_stable_id.clone(), conn.stable_id() as u64, @@ -272,6 +278,10 @@ impl ActiveConnection { } else { retry = retry_conn; } + if retry { + queue.push_back(tx); + break; + } } if !retry { break; @@ -279,28 +289,6 @@ impl ActiveConnection { } } - // copied from solana code base - fn compute_receive_window_ratio_for_staked_node( - max_stake: u64, - min_stake: u64, - stake: u64, - ) -> u64 { - if stake > max_stake { - return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - } - - let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO; - if max_stake > min_stake { - let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64; - let b = max_ratio as f64 - ((max_stake as f64) * a); - let ratio = (a * stake as f64) + b; - ratio.round() as u64 - } else { - QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO - } - } - fn check_for_confirmation( txs_sent_store: &Arc>, signature: String, @@ -330,16 +318,7 @@ impl ActiveConnection { identity_stakes.stakes, identity_stakes.total_stakes, ) as u64; - let number_of_transactions_per_unistream = match identity_stakes.peer_type { - solana_streamer::nonblocking::quic::ConnectionPeerType::Staked => { - Self::compute_receive_window_ratio_for_staked_node( - identity_stakes.max_stakes, - identity_stakes.min_stakes, - identity_stakes.stakes, - ) - } - solana_streamer::nonblocking::quic::ConnectionPeerType::Unstaked => 1, - }; + let number_of_transactions_per_unistream = 5; let task_counter: Arc = Arc::new(AtomicU64::new(0)); let mut connection: Option>> = None; @@ -351,80 +330,72 @@ impl ActiveConnection { break; } - if task_counter.load(Ordering::Relaxed) > max_uni_stream_connections { - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + if task_counter.load(Ordering::Relaxed) >= max_uni_stream_connections { + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; continue; } tokio::select! { - tx_or_timeout = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, transaction_reciever.recv() ) => { + tx = transaction_reciever.recv() => { // exit signal set if exit_signal.load(Ordering::Relaxed) { break; } - match tx_or_timeout { - Ok(tx) => { - let first_tx: Vec = match tx { - Ok((sig, tx)) => { - if Self::check_for_confirmation(&txs_sent_store, sig) { - // transaction is already confirmed/ no need to send - continue; - } - tx - }, - Err(e) => { - error!( - "Broadcast channel error on recv for {} error {}", - identity, e - ); - continue; - } - }; - - let mut txs = vec![first_tx]; - for _ in 1..number_of_transactions_per_unistream { - if let Ok((signature, tx)) = transaction_reciever.try_recv() { - if Self::check_for_confirmation(&txs_sent_store, signature) { - continue; - } - txs.push(tx); - } + let first_tx: Vec = match tx { + Ok((sig, tx)) => { + if Self::check_for_confirmation(&txs_sent_store, sig) { + // transaction is already confirmed/ no need to send + continue; } - - if connection.is_none() { - // initial connection - let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await; - if let Some(conn) = conn { - // could connect - connection = Some(Arc::new(RwLock::new(conn))); - } else { - break; - } - } - - let task_counter = task_counter.clone(); - let endpoint = endpoint.clone(); - let exit_signal = exit_signal.clone(); - let addr = addr.clone(); - let connection = connection.clone(); - let last_stable_id = last_stable_id.clone(); - - tokio::spawn(async move { - task_counter.fetch_add(1, Ordering::Relaxed); - NB_QUIC_TASKS.inc(); - let connection = connection.unwrap(); - Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await; - - NB_QUIC_TASKS.dec(); - task_counter.fetch_sub(1, Ordering::Relaxed); - }); - tokio::time::sleep(tokio::time::Duration::from_micros(100)).await; + tx }, - Err(_) => { - // timed out + Err(e) => { + error!( + "Broadcast channel error on recv for {} error {}", + identity, e + ); + continue; + } + }; + + let mut txs = vec![first_tx]; + for _ in 1..number_of_transactions_per_unistream { + if let Ok((signature, tx)) = transaction_reciever.try_recv() { + if Self::check_for_confirmation(&txs_sent_store, signature) { + continue; + } + txs.push(tx); } } + + if connection.is_none() { + // initial connection + let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await; + if let Some(conn) = conn { + // could connect + connection = Some(Arc::new(RwLock::new(conn))); + } else { + break; + } + } + + let task_counter = task_counter.clone(); + let endpoint = endpoint.clone(); + let exit_signal = exit_signal.clone(); + let addr = addr.clone(); + let connection = connection.clone(); + let last_stable_id = last_stable_id.clone(); + + tokio::spawn(async move { + task_counter.fetch_add(1, Ordering::Relaxed); + NB_QUIC_TASKS.inc(); + let connection = connection.unwrap(); + Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await; + + NB_QUIC_TASKS.dec(); + task_counter.fetch_sub(1, Ordering::Relaxed); + }); }, _ = exit_oneshot_channel.recv() => { break;