From 46c123121c1cec7f224737e6d18061ea52576bb6 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Thu, 13 Apr 2023 16:18:43 +0200 Subject: [PATCH 1/3] optimizing tpu connection by getting limits and sending transactions in parallel --- src/workers/block_listenser.rs | 2 +- .../tpu_utils/tpu_connection_manager.rs | 291 +++++++++++------- src/workers/tpu_utils/tpu_service.rs | 83 ++++- 3 files changed, 252 insertions(+), 124 deletions(-) diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 0324f393..219c2d08 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -333,7 +333,7 @@ impl BlockListener { slot: slot as i64, leader_id: 0, // TODO: lookup leader parent_slot: parent_slot as i64, - cluster_time: Utc.timestamp_millis_opt(block_time*1000).unwrap(), + cluster_time: Utc.timestamp_millis_opt(block_time * 1000).unwrap(), local_time: block_info.and_then(|b| b.processed_local_time), })) .expect("Error sending block to postgres service"); diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index 4d9052ef..68c5bb10 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -2,7 +2,7 @@ use std::{ collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, time::Duration, @@ -15,13 +15,17 @@ use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, }; -use solana_sdk::pubkey::Pubkey; +use solana_sdk::{ + pubkey::Pubkey, + quic::{QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO}, +}; +use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use tokio::{ sync::{broadcast::Receiver, broadcast::Sender}, time::timeout, }; -use super::rotating_queue::RotatingQueue; +use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes}; pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(5); @@ -30,10 +34,12 @@ const CONNECTION_RETRY_COUNT: usize = 10; lazy_static::lazy_static! { static ref NB_QUIC_CONNECTIONS: GenericGauge = register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); - static ref NB_QUIC_TASKS: GenericGauge = - register_int_gauge!(opts!("literpc_nb_quic_tasks", "Number quic tasks that are running")).unwrap(); + static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge = + register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap(); static ref NB_CONNECTIONS_TO_KEEP: GenericGauge = register_int_gauge!(opts!("literpc_connections_to_keep", "Number of connections to keep asked by tpu service")).unwrap(); + static ref NB_QUIC_TASKS: GenericGauge = + register_int_gauge!(opts!("literpc_quic_tasks", "Number of connections to keep asked by tpu service")).unwrap(); } struct ActiveConnection { @@ -87,23 +93,25 @@ impl ActiveConnection { async fn connect( identity: Pubkey, - already_connected: bool, + already_connected: Arc, endpoint: Endpoint, addr: SocketAddr, exit_signal: Arc, - ) -> Option> { + ) -> Option { for _i in 0..CONNECTION_RETRY_COUNT { - let conn = if already_connected { + let conn = if already_connected.load(Ordering::Relaxed) { info!("making make_connection_0rtt"); Self::make_connection_0rtt(endpoint.clone(), addr).await } else { info!("making make_connection"); - Self::make_connection(endpoint.clone(), addr).await + let conn = Self::make_connection(endpoint.clone(), addr).await; + already_connected.store(true, Ordering::Relaxed); + conn }; match conn { Ok(conn) => { NB_QUIC_CONNECTIONS.inc(); - return Some(Arc::new(conn)); + return Some(conn); } Err(e) => { trace!("Could not connect to {} because of error {}", identity, e); @@ -116,50 +124,117 @@ impl ActiveConnection { None } + async fn write_all(mut send_stream: SendStream, txs: Vec>, identity: Pubkey) { + for tx in txs { + let write_timeout_res = timeout( + QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, + send_stream.write_all(tx.as_slice()), + ) + .await; + match write_timeout_res { + Ok(write_res) => { + if let Err(e) = write_res { + warn!( + "Error while writing transaction for {}, error {}", + identity, e + ); + } + } + Err(_) => { + warn!("timeout while writing transaction for {}", identity); + } + } + } + + let finish_timeout_res = timeout( + QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, + send_stream.finish(), + ) + .await; + match finish_timeout_res { + Ok(finish_res) => { + if let Err(e) = finish_res { + warn!( + "Error while writing transaction for {}, error {}", + identity, e + ); + } + } + Err(_) => { + warn!("timeout while writing transaction for {}", identity); + } + } + } + async fn open_unistream( - connection: &mut Option>, - mut reconnect: bool, identity: Pubkey, - already_connected: bool, + already_connected: Arc, endpoint: Endpoint, addr: SocketAddr, exit_signal: Arc, - ) -> Option { - loop { - if let Some(connection) = connection { - match timeout( - QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, - connection.open_uni(), - ) - .await - { - Ok(Ok(unistream)) => return Some(unistream), - Ok(Err(_)) => (), - Err(_) => return None, - } - } else { - reconnect = true - } - - if !reconnect { - return None; - } - - // re connect - let Some(conn) = Self::connect( - identity, - already_connected, - endpoint.clone(), - addr, - exit_signal.clone(), + ) -> Option<(Connection, SendStream)> { + let connection = + Self::connect(identity, already_connected, endpoint, addr, exit_signal).await; + if let Some(connection) = connection { + match timeout( + QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, + connection.open_uni(), ) - .await else { - return None; - }; + .await + { + Ok(Ok(unistream)) => return Some((connection, unistream)), + Ok(Err(_)) => None, + Err(_) => return None, + } + } else { + None + } + } - // new connection don't reconnect now - *connection = Some(conn); - reconnect = false; + async fn send_transaction_batch( + txs: Vec>, + identity: Pubkey, + already_connected: Arc, + endpoint: Endpoint, + socket_addr: SocketAddr, + exit_signal: Arc, + ) { + let stream = Self::open_unistream( + identity, + already_connected, + endpoint, + socket_addr, + exit_signal.clone(), + ) + .await; + if let Some((_connection, send_stream)) = stream { + if exit_signal.load(Ordering::Relaxed) { + return; + } + + Self::write_all(send_stream, txs, identity).await; + } + } + + // copied from solana code base + fn compute_receive_window_ratio_for_staked_node( + max_stake: u64, + min_stake: u64, + stake: u64, + ) -> u64 { + if stake > max_stake { + return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; + } + + let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; + let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO; + if max_stake > min_stake { + let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64; + let b = max_ratio as f64 - ((max_stake as f64) * a); + let ratio = (a * stake as f64) + b; + ratio.round() as u64 + } else { + QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO } } @@ -170,19 +245,42 @@ impl ActiveConnection { addr: SocketAddr, exit_signal: Arc, identity: Pubkey, + identity_stakes: IdentityStakes, ) { - NB_QUIC_TASKS.inc(); - let mut already_connected = false; - let mut connection: Option> = None; + NB_QUIC_ACTIVE_CONNECTIONS.inc(); + let already_connected: Arc = Arc::new(AtomicBool::new(false)); let mut transaction_reciever = transaction_reciever; let mut exit_oneshot_channel = exit_oneshot_channel; + let max_uni_stream_connections: u64 = compute_max_allowed_uni_streams( + identity_stakes.peer_type, + identity_stakes.stakes, + identity_stakes.total_stakes, + ) as u64; + let number_of_transactions_per_unistream = match identity_stakes.peer_type { + solana_streamer::nonblocking::quic::ConnectionPeerType::Staked => { + Self::compute_receive_window_ratio_for_staked_node( + identity_stakes.max_stakes, + identity_stakes.min_stakes, + identity_stakes.stakes, + ) + } + solana_streamer::nonblocking::quic::ConnectionPeerType::Unstaked => 1, + }; + + let task_counter: Arc = Arc::new(AtomicU64::new(0)); + loop { // exit signal set if exit_signal.load(Ordering::Relaxed) { break; } + if task_counter.load(Ordering::Relaxed) > max_uni_stream_connections { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + continue; + } + tokio::select! { tx_or_timeout = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, transaction_reciever.recv() ) => { // exit signal set @@ -192,7 +290,7 @@ impl ActiveConnection { match tx_or_timeout { Ok(tx) => { - let tx: Vec = match tx { + let first_tx: Vec = match tx { Ok(tx) => tx, Err(e) => { error!( @@ -202,73 +300,31 @@ impl ActiveConnection { continue; } }; - let unistream = Self::open_unistream( - &mut connection, - true, - identity, - already_connected, - endpoint.clone(), - addr, - exit_signal.clone(), - ).await; - if !already_connected && connection.is_some() { - already_connected = true; - } - - match unistream { - Some(mut send_stream) => { - trace!("Sending {} transaction", identity); - let write_timeout_res = timeout( QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, send_stream.write_all(tx.as_slice())).await; - match write_timeout_res { - Ok(write_res) => { - if let Err(e) = write_res { - warn!( - "Error while writing transaction for {}, error {}", - identity, - e - ); - } - }, - Err(_) => { - warn!( - "timeout while writing transaction for {}", - identity - ); - } - } - - let finish_timeout_res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, send_stream.finish()).await; - match finish_timeout_res { - Ok(finish_res) => { - if let Err(e) = finish_res { - warn!( - "Error while writing transaction for {}, error {}", - identity, - e - ); - } - }, - Err(_) => { - warn!( - "timeout while writing transaction for {}", - identity - ); - } - } - }, - None => { - trace!("could not create a unistream for {}", identity); - break; + let mut txs = vec![first_tx]; + for _ in 1..number_of_transactions_per_unistream { + if let Ok(tx) = transaction_reciever.try_recv() { + txs.push(tx); } } + let task_counter = task_counter.clone(); + let endpoint = endpoint.clone(); + let exit_signal = exit_signal.clone(); + let addr = addr.clone(); + let already_connected = already_connected.clone(); + + tokio::spawn(async move { + task_counter.fetch_add(1, Ordering::Relaxed); + NB_QUIC_TASKS.inc(); + + Self::send_transaction_batch(txs, identity, already_connected, endpoint, addr, exit_signal).await; + + NB_QUIC_TASKS.dec(); + task_counter.fetch_sub(1, Ordering::Relaxed); + }); }, Err(_) => { // timed out - if connection.is_some() { - NB_QUIC_CONNECTIONS.dec(); - connection = None; - } } } }, @@ -277,17 +333,14 @@ impl ActiveConnection { } }; } - - if connection.is_some() { - NB_QUIC_CONNECTIONS.dec(); - } - NB_QUIC_TASKS.dec(); + NB_QUIC_ACTIVE_CONNECTIONS.dec(); } pub fn start_listening( &self, transaction_reciever: Receiver>, exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>, + identity_stakes: IdentityStakes, ) { let endpoint = self.endpoint.clone(); let addr = self.tpu_address; @@ -301,6 +354,7 @@ impl ActiveConnection { addr, exit_signal, identity, + identity_stakes, ) .await; }); @@ -365,6 +419,7 @@ impl TpuConnectionManager { &self, transaction_sender: Arc>>, connections_to_keep: HashMap, + identity_stakes: IdentityStakes, ) { NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64); for (identity, socket_addr) in &connections_to_keep { @@ -376,7 +431,7 @@ impl TpuConnectionManager { let (sx, rx) = tokio::sync::mpsc::channel(1); let transaction_reciever = transaction_sender.subscribe(); - active_connection.start_listening(transaction_reciever, rx); + active_connection.start_listening(transaction_reciever, rx, identity_stakes); self.identity_to_active_connection.insert( *identity, Arc::new(ActiveConnectionWithExitChannel { diff --git a/src/workers/tpu_utils/tpu_service.rs b/src/workers/tpu_utils/tpu_service.rs index 56475df7..58a95593 100644 --- a/src/workers/tpu_utils/tpu_service.rs +++ b/src/workers/tpu_utils/tpu_service.rs @@ -8,10 +8,14 @@ use solana_client::{ rpc_response::RpcContactInfo, }; -use solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot}; -use solana_streamer::tls_certificates::new_self_signed_tls_certificate; +use solana_sdk::{ + pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, signer::Signer, slot_history::Slot, +}; +use solana_streamer::{ + nonblocking::quic::ConnectionPeerType, tls_certificates::new_self_signed_tls_certificate, +}; use std::{ - collections::VecDeque, + collections::{HashMap, VecDeque}, net::{IpAddr, Ipv4Addr}, str::FromStr, sync::{ @@ -63,6 +67,29 @@ pub struct TpuService { pubsub_client: Arc, broadcast_sender: Arc>>, tpu_connection_manager: Arc, + identity: Arc, + identity_stakes: Arc>, +} + +#[derive(Debug, Copy, Clone)] +pub struct IdentityStakes { + pub peer_type: ConnectionPeerType, + pub stakes: u64, + pub total_stakes: u64, + pub min_stakes: u64, + pub max_stakes: u64, +} + +impl Default for IdentityStakes { + fn default() -> Self { + IdentityStakes { + peer_type: ConnectionPeerType::Unstaked, + stakes: 0, + total_stakes: 0, + max_stakes: 0, + min_stakes: 0, + } + } } impl TpuService { @@ -95,6 +122,8 @@ impl TpuService { pubsub_client: Arc::new(pubsub_client), broadcast_sender: Arc::new(sender), tpu_connection_manager: Arc::new(tpu_connection_manager), + identity, + identity_stakes: Arc::new(RwLock::new(IdentityStakes::default())), }) } @@ -106,6 +135,43 @@ impl TpuService { } }); NB_CLUSTER_NODES.set(self.cluster_nodes.len() as i64); + + // update stakes for identity + // update stakes for the identity + { + let vote_accounts = self.rpc_client.get_vote_accounts().await?; + let map_of_stakes: HashMap = vote_accounts + .current + .iter() + .map(|x| (x.node_pubkey.clone(), x.activated_stake)) + .collect(); + + if let Some(stakes) = map_of_stakes.get(&self.identity.pubkey().to_string()) { + let all_stakes: Vec = vote_accounts + .current + .iter() + .map(|x| x.activated_stake) + .collect(); + + let identity_stakes = IdentityStakes { + peer_type: ConnectionPeerType::Staked, + stakes: *stakes, + min_stakes: all_stakes.iter().min().map_or(0, |x| *x), + max_stakes: all_stakes.iter().max().map_or(0, |x| *x), + total_stakes: all_stakes.iter().sum(), + }; + + info!( + "Idenity stakes {}, {}, {}, {}", + identity_stakes.total_stakes, + identity_stakes.min_stakes, + identity_stakes.max_stakes, + identity_stakes.stakes + ); + let mut lock = self.identity_stakes.write().await; + *lock = identity_stakes; + } + } Ok(()) } @@ -195,8 +261,15 @@ impl TpuService { (Pubkey::from_str(x.pubkey.as_str()).unwrap(), addr) }) .collect(); + + let identity_stakes = self.identity_stakes.read().await; + self.tpu_connection_manager - .update_connections(self.broadcast_sender.clone(), connections_to_keep) + .update_connections( + self.broadcast_sender.clone(), + connections_to_keep, + *identity_stakes, + ) .await; } @@ -224,7 +297,7 @@ impl TpuService { Duration::from_millis(2000), self.pubsub_client.slot_subscribe(), ) - .await; + .await; match res { Ok(sub_res) => { match sub_res { From ddd45dd4781cbd5ce673fdd759d56c41e1dfff95 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Fri, 14 Apr 2023 11:50:52 +0200 Subject: [PATCH 2/3] Using same connection and reconnecting when necessary --- bench/src/main.rs | 6 +- .../tpu_utils/tpu_connection_manager.rs | 164 +++++++++++------- 2 files changed, 101 insertions(+), 69 deletions(-) diff --git a/bench/src/main.rs b/bench/src/main.rs index 314574d2..a0e90d16 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -12,7 +12,7 @@ use bench::{ use clap::Parser; use log::info; use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction}; -use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; +use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature, signer::Signer}; #[tokio::main] async fn main() { @@ -60,6 +60,8 @@ async fn main() { async fn bench(rpc_client: Arc, tx_count: usize) -> Metric { let funded_payer = BenchHelper::get_payer().await.unwrap(); + + println!("payer {}", funded_payer.pubkey()); let blockhash = rpc_client.get_latest_blockhash().await.unwrap(); let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, None); @@ -106,7 +108,7 @@ async fn bench(rpc_client: Arc, tx_count: usize) -> Metric { metrics.txs_confirmed += 1; to_remove_txs.push(sig); } else if time_elapsed_since_last_confirmed.unwrap().elapsed() - > Duration::from_secs(3) + > Duration::from_secs(30) { metrics.txs_un_confirmed += 1; to_remove_txs.push(sig); diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index 68c5bb10..e22d06a2 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -9,7 +9,7 @@ use std::{ }; use dashmap::DashMap; -use log::{error, info, trace, warn}; +use log::{error, trace, warn}; use prometheus::{core::GenericGauge, opts, register_int_gauge}; use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, @@ -21,14 +21,14 @@ use solana_sdk::{ }; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use tokio::{ - sync::{broadcast::Receiver, broadcast::Sender}, + sync::{broadcast::Receiver, broadcast::Sender, RwLock}, time::timeout, }; use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes}; pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; -const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(5); +const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10); const CONNECTION_RETRY_COUNT: usize = 10; lazy_static::lazy_static! { @@ -93,19 +93,16 @@ impl ActiveConnection { async fn connect( identity: Pubkey, - already_connected: Arc, + already_connected: bool, endpoint: Endpoint, addr: SocketAddr, exit_signal: Arc, ) -> Option { for _i in 0..CONNECTION_RETRY_COUNT { - let conn = if already_connected.load(Ordering::Relaxed) { - info!("making make_connection_0rtt"); + let conn = if already_connected { Self::make_connection_0rtt(endpoint.clone(), addr).await } else { - info!("making make_connection"); let conn = Self::make_connection(endpoint.clone(), addr).await; - already_connected.store(true, Ordering::Relaxed); conn }; match conn { @@ -124,26 +121,27 @@ impl ActiveConnection { None } - async fn write_all(mut send_stream: SendStream, txs: Vec>, identity: Pubkey) { - for tx in txs { - let write_timeout_res = timeout( - QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, - send_stream.write_all(tx.as_slice()), - ) - .await; - match write_timeout_res { - Ok(write_res) => { - if let Err(e) = write_res { - warn!( - "Error while writing transaction for {}, error {}", - identity, e - ); - } - } - Err(_) => { - warn!("timeout while writing transaction for {}", identity); + async fn write_all(mut send_stream: SendStream, tx: &Vec, identity: Pubkey, last_stable_id : Arc, connection_stable_id: u64) -> bool { + let write_timeout_res = timeout( + QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, + send_stream.write_all(tx.as_slice()), + ) + .await; + match write_timeout_res { + Ok(write_res) => { + if let Err(e) = write_res { + trace!( + "Error while writing transaction for {}, error {}", + identity, e + ); + // retry + last_stable_id.store(connection_stable_id, Ordering::Relaxed); + return true; } } + Err(_) => { + warn!("timeout while writing transaction for {}", identity); + } } let finish_timeout_res = timeout( @@ -164,55 +162,72 @@ impl ActiveConnection { warn!("timeout while writing transaction for {}", identity); } } + + false } - async fn open_unistream( - identity: Pubkey, - already_connected: Arc, - endpoint: Endpoint, - addr: SocketAddr, - exit_signal: Arc, - ) -> Option<(Connection, SendStream)> { - let connection = - Self::connect(identity, already_connected, endpoint, addr, exit_signal).await; - if let Some(connection) = connection { - match timeout( - QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, - connection.open_uni(), - ) - .await - { - Ok(Ok(unistream)) => return Some((connection, unistream)), - Ok(Err(_)) => None, - Err(_) => return None, - } - } else { - None + async fn open_unistream(connection: Connection, last_stable_id : Arc) -> (Option, bool) { + match timeout( + QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, + connection.open_uni(), + ) + .await + { + Ok(Ok(unistream)) => return (Some(unistream), false), + Ok(Err(_)) => { + // reset connection for next retry + last_stable_id.store(connection.stable_id() as u64, Ordering::Relaxed); + (None, true) + }, + Err(_) => return (None, false), } } async fn send_transaction_batch( + connection: Arc>, txs: Vec>, - identity: Pubkey, - already_connected: Arc, + identity: Pubkey, endpoint: Endpoint, socket_addr: SocketAddr, exit_signal: Arc, + last_stable_id : Arc ) { - let stream = Self::open_unistream( - identity, - already_connected, - endpoint, - socket_addr, - exit_signal.clone(), - ) - .await; - if let Some((_connection, send_stream)) = stream { - if exit_signal.load(Ordering::Relaxed) { - return; - } + for _ in 0..3 { + let conn = { + let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize; + let conn = connection.read().await; + if conn.stable_id() == last_stable_id { + // problematic connection + drop(conn); + let mut conn = connection.write().await; + let new_conn = Self::connect(identity, true, endpoint.clone(), socket_addr.clone(), exit_signal.clone()).await; + if let Some(new_conn) = new_conn { + *conn = new_conn; + conn.clone() + } else { + // could not connect + return; + } + } else { + conn.clone() + } + }; + let mut retry = false; + for tx in &txs { + let (stream, retry_conn) = Self::open_unistream(conn.clone(), last_stable_id.clone()).await; + if let Some(send_stream) = stream { + if exit_signal.load(Ordering::Relaxed) { + return; + } - Self::write_all(send_stream, txs, identity).await; + retry = Self::write_all(send_stream, tx, identity, last_stable_id.clone(), conn.stable_id() as u64).await; + } else { + retry = retry_conn; + } + } + if !retry { + break; + } } } @@ -248,7 +263,6 @@ impl ActiveConnection { identity_stakes: IdentityStakes, ) { NB_QUIC_ACTIVE_CONNECTIONS.inc(); - let already_connected: Arc = Arc::new(AtomicBool::new(false)); let mut transaction_reciever = transaction_reciever; let mut exit_oneshot_channel = exit_oneshot_channel; @@ -269,6 +283,8 @@ impl ActiveConnection { }; let task_counter: Arc = Arc::new(AtomicU64::new(0)); + let mut connection : Option>> = None; + let last_stable_id : Arc = Arc::new(AtomicU64::new(0)); loop { // exit signal set @@ -307,21 +323,35 @@ impl ActiveConnection { txs.push(tx); } } + + if connection.is_none() { + // initial connection + let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await; + if let Some(conn) = conn { + // could connect + connection = Some(Arc::new(RwLock::new(conn))); + } else { + break; + } + } + let task_counter = task_counter.clone(); let endpoint = endpoint.clone(); let exit_signal = exit_signal.clone(); let addr = addr.clone(); - let already_connected = already_connected.clone(); + let connection = connection.clone(); + let last_stable_id = last_stable_id.clone(); tokio::spawn(async move { task_counter.fetch_add(1, Ordering::Relaxed); NB_QUIC_TASKS.inc(); - - Self::send_transaction_batch(txs, identity, already_connected, endpoint, addr, exit_signal).await; + let connection = connection.unwrap(); + Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await; NB_QUIC_TASKS.dec(); task_counter.fetch_sub(1, Ordering::Relaxed); }); + tokio::time::sleep(tokio::time::Duration::from_micros(100)).await; }, Err(_) => { // timed out From 81b422c69a37dc12e13c17cbd1350655dbb0ca8a Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Fri, 14 Apr 2023 12:35:11 +0200 Subject: [PATCH 3/3] increase limits, minor changes, updated the bench executable --- Cargo.lock | 1 + bench/Cargo.toml | 1 + bench/src/main.rs | 33 ++++++++--- src/lib.rs | 2 +- .../tpu_utils/tpu_connection_manager.rs | 55 ++++++++++++++----- src/workers/tpu_utils/tpu_service.rs | 2 +- 6 files changed, 70 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 246fe7dc..ce9977ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -379,6 +379,7 @@ dependencies = [ "clap 4.1.6", "csv", "dirs", + "futures", "log", "rand 0.8.5", "rand_chacha 0.3.1", diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 3d5cd2ee..c7e0a308 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -17,4 +17,5 @@ csv = "1.2.1" dirs = "5.0.0" rand = "0.8.5" rand_chacha = "0.3.1" +futures = { workspace = true } diff --git a/bench/src/main.rs b/bench/src/main.rs index a0e90d16..cd4e17e6 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -10,7 +10,8 @@ use bench::{ metrics::{AvgMetric, Metric}, }; use clap::Parser; -use log::info; +use futures::future::join_all; +use log::{error, info}; use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction}; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature, signer::Signer}; @@ -39,17 +40,33 @@ async fn main() { let mut avg_metric = AvgMetric::default(); - for run_num in 0..runs { - let metric = bench(rpc_client.clone(), tx_count).await; - info!("Run {run_num}: Sent and Confirmed {tx_count} tx(s) in {metric:?} with",); - // update avg metric - avg_metric += &metric; - // write metric to file - csv_writer.serialize(metric).unwrap(); + let mut tasks = vec![]; + + for _ in 0..runs { + let rpc_client = rpc_client.clone(); + tasks.push(tokio::spawn(bench(rpc_client.clone(), tx_count))); // wait for an interval run_interval_ms.tick().await; } + let join_res = join_all(tasks).await; + + let mut run_num = 1; + for res in join_res { + match res { + Ok(metric) => { + info!("Run {run_num}: Sent and Confirmed {tx_count} tx(s) in {metric:?} with",); + // update avg metric + avg_metric += &metric; + csv_writer.serialize(metric).unwrap(); + } + Err(_) => { + error!("join error for run {}", run_num); + } + } + run_num += 1; + } + let avg_metric = Metric::from(avg_metric); info!("Avg Metric {avg_metric:?}",); diff --git a/src/lib.rs b/src/lib.rs index 88946da8..658cbc32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900"; #[from_env] pub const DEFAULT_TX_MAX_RETRIES: u16 = 1; #[from_env] -pub const DEFAULT_TX_BATCH_SIZE: usize = 32; +pub const DEFAULT_TX_BATCH_SIZE: usize = 100; #[from_env] pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000; diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index e22d06a2..494dbf16 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -121,7 +121,13 @@ impl ActiveConnection { None } - async fn write_all(mut send_stream: SendStream, tx: &Vec, identity: Pubkey, last_stable_id : Arc, connection_stable_id: u64) -> bool { + async fn write_all( + mut send_stream: SendStream, + tx: &Vec, + identity: Pubkey, + last_stable_id: Arc, + connection_stable_id: u64, + ) -> bool { let write_timeout_res = timeout( QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, send_stream.write_all(tx.as_slice()), @@ -132,7 +138,8 @@ impl ActiveConnection { if let Err(e) = write_res { trace!( "Error while writing transaction for {}, error {}", - identity, e + identity, + e ); // retry last_stable_id.store(connection_stable_id, Ordering::Relaxed); @@ -152,9 +159,11 @@ impl ActiveConnection { match finish_timeout_res { Ok(finish_res) => { if let Err(e) = finish_res { - warn!( + last_stable_id.store(connection_stable_id, Ordering::Relaxed); + trace!( "Error while writing transaction for {}, error {}", - identity, e + identity, + e ); } } @@ -166,7 +175,10 @@ impl ActiveConnection { false } - async fn open_unistream(connection: Connection, last_stable_id : Arc) -> (Option, bool) { + async fn open_unistream( + connection: Connection, + last_stable_id: Arc, + ) -> (Option, bool) { match timeout( QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, connection.open_uni(), @@ -178,7 +190,7 @@ impl ActiveConnection { // reset connection for next retry last_stable_id.store(connection.stable_id() as u64, Ordering::Relaxed); (None, true) - }, + } Err(_) => return (None, false), } } @@ -186,21 +198,28 @@ impl ActiveConnection { async fn send_transaction_batch( connection: Arc>, txs: Vec>, - identity: Pubkey, + identity: Pubkey, endpoint: Endpoint, socket_addr: SocketAddr, exit_signal: Arc, - last_stable_id : Arc + last_stable_id: Arc, ) { for _ in 0..3 { - let conn = { + let conn = { let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize; let conn = connection.read().await; if conn.stable_id() == last_stable_id { // problematic connection drop(conn); let mut conn = connection.write().await; - let new_conn = Self::connect(identity, true, endpoint.clone(), socket_addr.clone(), exit_signal.clone()).await; + let new_conn = Self::connect( + identity, + true, + endpoint.clone(), + socket_addr.clone(), + exit_signal.clone(), + ) + .await; if let Some(new_conn) = new_conn { *conn = new_conn; conn.clone() @@ -214,13 +233,21 @@ impl ActiveConnection { }; let mut retry = false; for tx in &txs { - let (stream, retry_conn) = Self::open_unistream(conn.clone(), last_stable_id.clone()).await; + let (stream, retry_conn) = + Self::open_unistream(conn.clone(), last_stable_id.clone()).await; if let Some(send_stream) = stream { if exit_signal.load(Ordering::Relaxed) { return; } - retry = Self::write_all(send_stream, tx, identity, last_stable_id.clone(), conn.stable_id() as u64).await; + retry = Self::write_all( + send_stream, + tx, + identity, + last_stable_id.clone(), + conn.stable_id() as u64, + ) + .await; } else { retry = retry_conn; } @@ -283,8 +310,8 @@ impl ActiveConnection { }; let task_counter: Arc = Arc::new(AtomicU64::new(0)); - let mut connection : Option>> = None; - let last_stable_id : Arc = Arc::new(AtomicU64::new(0)); + let mut connection: Option>> = None; + let last_stable_id: Arc = Arc::new(AtomicU64::new(0)); loop { // exit signal set diff --git a/src/workers/tpu_utils/tpu_service.rs b/src/workers/tpu_utils/tpu_service.rs index 58a95593..cdebd0f5 100644 --- a/src/workers/tpu_utils/tpu_service.rs +++ b/src/workers/tpu_utils/tpu_service.rs @@ -35,7 +35,7 @@ const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and cont const CLUSTERINFO_REFRESH_TIME: u64 = 60; // refresh cluster every minute const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400; -const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 1024; +const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 16384; lazy_static::lazy_static! { static ref NB_CLUSTER_NODES: GenericGauge =