diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index 4c5bda5c..68bcb1db 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, VecDeque}, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -15,10 +15,7 @@ use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, }; -use solana_sdk::{ - pubkey::Pubkey, - quic::{QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO}, -}; +use solana_sdk::pubkey::Pubkey; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use tokio::{ sync::{broadcast::Receiver, broadcast::Sender, RwLock}, @@ -30,6 +27,7 @@ use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes}; pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10); const CONNECTION_RETRY_COUNT: usize = 10; +const TRANSACTIONS_SENT_PER_TASK: usize = 5; lazy_static::lazy_static! { static ref NB_QUIC_CONNECTIONS: GenericGauge = @@ -197,14 +195,14 @@ impl ActiveConnection { async fn send_transaction_batch( connection: Arc>, - txs: Vec>, + mut txs: VecDeque>, identity: Pubkey, endpoint: Endpoint, socket_addr: SocketAddr, exit_signal: Arc, last_stable_id: Arc, ) { - for _ in 0..3 { + for _ in 0..CONNECTION_RETRY_COUNT { if exit_signal.load(Ordering::Relaxed) { // return return; @@ -244,24 +242,34 @@ impl ActiveConnection { } }; let mut retry = false; - for tx in &txs { + while !txs.is_empty() { + let tx = txs.pop_front().unwrap(); let (stream, retry_conn) = Self::open_unistream(conn.clone(), last_stable_id.clone()).await; + if retry_conn { + txs.push_back(tx); + retry = true; + break; + } + if let Some(send_stream) = stream { if exit_signal.load(Ordering::Relaxed) { return; } - retry = Self::write_all( + if Self::write_all( send_stream, - tx, + &tx, identity, last_stable_id.clone(), conn.stable_id() as u64, ) - .await; - } else { - retry = retry_conn; + .await + { + txs.push_back(tx); + retry = true; + break; + } } } if !retry { @@ -270,28 +278,6 @@ impl ActiveConnection { } } - // copied from solana code base - fn compute_receive_window_ratio_for_staked_node( - max_stake: u64, - min_stake: u64, - stake: u64, - ) -> u64 { - if stake > max_stake { - return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - } - - let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO; - if max_stake > min_stake { - let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64; - let b = max_ratio as f64 - ((max_stake as f64) * a); - let ratio = (a * stake as f64) + b; - ratio.round() as u64 - } else { - QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO - } - } - async fn listen( transaction_reciever: Receiver>, exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>, @@ -310,16 +296,7 @@ impl ActiveConnection { identity_stakes.stakes, identity_stakes.total_stakes, ) as u64; - let number_of_transactions_per_unistream = match identity_stakes.peer_type { - solana_streamer::nonblocking::quic::ConnectionPeerType::Staked => { - Self::compute_receive_window_ratio_for_staked_node( - identity_stakes.max_stakes, - identity_stakes.min_stakes, - identity_stakes.stakes, - ) - } - solana_streamer::nonblocking::quic::ConnectionPeerType::Unstaked => 1, - }; + let number_of_transactions_per_unistream = TRANSACTIONS_SENT_PER_TASK; let task_counter: Arc = Arc::new(AtomicU64::new(0)); let mut connection: Option>> = None; @@ -331,7 +308,7 @@ impl ActiveConnection { break; } - if task_counter.load(Ordering::Relaxed) > max_uni_stream_connections { + if task_counter.load(Ordering::Relaxed) >= max_uni_stream_connections { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; continue; } @@ -356,10 +333,11 @@ impl ActiveConnection { } }; - let mut txs = vec![first_tx]; + let mut txs = VecDeque::new(); + txs.push_back(first_tx); for _ in 1..number_of_transactions_per_unistream { if let Ok(tx) = transaction_reciever.try_recv() { - txs.push(tx); + txs.push_back(tx); } } @@ -403,7 +381,9 @@ impl ActiveConnection { }; } drop(transaction_reciever); - NB_QUIC_CONNECTIONS.dec(); + if connection.is_some() { + NB_QUIC_CONNECTIONS.dec(); + } NB_QUIC_ACTIVE_CONNECTIONS.dec(); } diff --git a/src/workers/tpu_utils/tpu_service.rs b/src/workers/tpu_utils/tpu_service.rs index cdebd0f5..a08d79b3 100644 --- a/src/workers/tpu_utils/tpu_service.rs +++ b/src/workers/tpu_utils/tpu_service.rs @@ -35,7 +35,7 @@ const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and cont const CLUSTERINFO_REFRESH_TIME: u64 = 60; // refresh cluster every minute const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400; -const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 16384; +const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; lazy_static::lazy_static! { static ref NB_CLUSTER_NODES: GenericGauge =