diff --git a/Cargo.lock b/Cargo.lock index a81fbb1d..5edc2db0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -427,6 +427,7 @@ dependencies = [ "anyhow", "clap 4.2.4", "csv", + "dashmap", "dirs", "futures", "log", diff --git a/bench/Cargo.toml b/bench/Cargo.toml index c2469869..737d8aee 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -18,3 +18,4 @@ dirs = "5.0.0" rand = "0.8.5" rand_chacha = "0.3.1" futures = { workspace = true } +dashmap = {workspace = true } diff --git a/bench/src/helpers.rs b/bench/src/helpers.rs index 622b52bc..8d16a29b 100644 --- a/bench/src/helpers.rs +++ b/bench/src/helpers.rs @@ -1,5 +1,3 @@ -use std::{str::FromStr, time::Duration}; - use anyhow::Context; use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; @@ -14,6 +12,7 @@ use solana_sdk::{ system_instruction, transaction::Transaction, }; +use std::{str::FromStr, time::Duration}; use tokio::time::Instant; const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; @@ -69,6 +68,14 @@ impl BenchHelper { Transaction::new(&[funded_payer], message, blockhash) } + pub fn generate_random_strings(num_of_txs: usize, random_seed: Option) -> Vec> { + let seed = random_seed.map_or(0, |x| x); + let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed); + (0..num_of_txs) + .map(|_| Alphanumeric.sample_iter(&mut rng).take(10).collect()) + .collect() + } + #[inline] pub fn generate_txs( num_of_txs: usize, diff --git a/bench/src/main.rs b/bench/src/main.rs index a4ae94cd..000e7ac1 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -1,20 +1,23 @@ -use std::{sync::Arc, collections::HashMap}; - use bench::{ cli::Args, helpers::BenchHelper, metrics::{AvgMetric, Metric}, }; use clap::Parser; +use dashmap::DashMap; use futures::future::join_all; use log::{error, info}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::{commitment_config::CommitmentConfig, signer::Signer, signature::Keypair}; +use solana_sdk::{ + commitment_config::CommitmentConfig, hash::Hash, signature::Keypair, signer::Signer, +}; +use std::sync::Arc; use tokio::{ + sync::RwLock, time::{Duration, Instant}, }; -#[tokio::main] +#[tokio::main(flavor = "multi_thread", worker_threads = 16)] async fn main() { tracing_subscriber::fmt::init(); @@ -43,9 +46,36 @@ async fn main() { lite_rpc_addr.clone(), CommitmentConfig::confirmed(), )); + let bh = rpc_client.get_latest_blockhash().await.unwrap(); + let block_hash: Arc> = Arc::new(RwLock::new(bh)); + let _jh = { + // block hash updater task + let block_hash = block_hash.clone(); + let rpc_client = rpc_client.clone(); + tokio::spawn(async move { + loop { + let bh = rpc_client.get_latest_blockhash().await; + match bh { + Ok(bh) => { + let mut lock = block_hash.write().await; + *lock = bh; + }, + Err(e) => println!("blockhash update error {}", e), + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + }) + }; + for seed in 0..runs { let funded_payer = Keypair::from_bytes(funded_payer.to_bytes().as_slice()).unwrap(); - tasks.push(tokio::spawn(bench(rpc_client.clone(), tx_count, funded_payer, seed as u64))); + tasks.push(tokio::spawn(bench( + rpc_client.clone(), + tx_count, + funded_payer, + seed as u64, + block_hash.clone(), + ))); // wait for an interval run_interval_ms.tick().await; } @@ -82,40 +112,73 @@ struct TxSendData { sent_instant: Instant, } -async fn bench(rpc_client: Arc, tx_count: usize, funded_payer: Keypair, seed: u64) -> Metric { - let blockhash = rpc_client.get_latest_blockhash().await.unwrap(); +async fn bench( + rpc_client: Arc, + tx_count: usize, + funded_payer: Keypair, + seed: u64, + block_hash: Arc>, +) -> Metric { + let map_of_txs = Arc::new(DashMap::new()); + // transaction sender task + { + let map_of_txs = map_of_txs.clone(); + let rpc_client = rpc_client.clone(); + tokio::spawn(async move { + let map_of_txs = map_of_txs.clone(); + let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed)); - let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, Some(seed)); + for rand_string in rand_strings { + let blockhash = { *block_hash.read().await }; + let tx = BenchHelper::create_memo_tx(&rand_string, &funded_payer, blockhash); + let start_time = Instant::now(); + if let Ok(signature) = rpc_client.send_transaction(&tx).await { + map_of_txs.insert( + signature, + TxSendData { + sent_duration: start_time.elapsed(), + sent_instant: Instant::now(), + }, + ); + } + } + }); + } let mut metric = Metric::default(); - let mut map_of_txs = HashMap::new(); - for tx in txs { - let rpc_client = rpc_client.clone(); - let start_time = Instant::now(); - if let Ok(signature) = rpc_client.send_transaction(&tx).await { - map_of_txs.insert( signature, TxSendData { - sent_duration: start_time.elapsed(), - sent_instant: Instant::now(), - }); - } - } let confirmation_time = Instant::now(); - while confirmation_time.elapsed() < Duration::from_secs(60) && !map_of_txs.is_empty() { - let signatures = map_of_txs.iter().map(|x| x.0.clone()).collect::>(); + let mut confirmed_count = 0; + while confirmation_time.elapsed() < Duration::from_secs(60) + && !(map_of_txs.is_empty() && confirmed_count == tx_count) + { + let signatures = map_of_txs + .iter() + .map(|x| x.key().clone()) + .collect::>(); + if signatures.is_empty() { + tokio::time::sleep(Duration::from_millis(1)).await; + continue; + } + if let Ok(res) = rpc_client.get_signature_statuses(&signatures).await { for i in 0..signatures.len() { let tx_status = &res.value[i]; if let Some(_) = tx_status { let signature = signatures[i]; let tx_data = map_of_txs.get(&signature).unwrap(); - metric.add_successful_transaction( tx_data.sent_duration, tx_data.sent_instant.elapsed()); + metric.add_successful_transaction( + tx_data.sent_duration, + tx_data.sent_instant.elapsed(), + ); + drop(tx_data); map_of_txs.remove(&signature); + confirmed_count += 1; } } } } - for (_, tx) in map_of_txs { + for tx in map_of_txs.iter() { metric.add_unsuccessful_transaction(tx.sent_duration); } metric.finalize(); diff --git a/bench/src/metrics.rs b/bench/src/metrics.rs index 79624d0a..f5f9615d 100644 --- a/bench/src/metrics.rs +++ b/bench/src/metrics.rs @@ -38,11 +38,13 @@ impl Metric { pub fn finalize(&mut self) { if self.txs_sent > 0 { - self.average_time_to_send_txs = self.total_sent_time.as_millis() as f64 / self.txs_sent as f64; + self.average_time_to_send_txs = + self.total_sent_time.as_millis() as f64 / self.txs_sent as f64; } if self.txs_confirmed > 0 { - self.average_confirmation_time_ms = self.total_confirmation_time.as_millis() as f64 / self.txs_confirmed as f64; + self.average_confirmation_time_ms = + self.total_confirmation_time.as_millis() as f64 / self.txs_confirmed as f64; } } } @@ -81,8 +83,10 @@ impl DivAssign for Metric { self.txs_confirmed /= rhs; self.txs_un_confirmed /= rhs; - self.total_confirmation_time = Duration::from_micros((self.total_confirmation_time.as_micros() / rhs as u128) as u64); - self.total_sent_time = Duration::from_micros((self.total_sent_time.as_micros() / rhs as u128) as u64); + self.total_confirmation_time = + Duration::from_micros((self.total_confirmation_time.as_micros() / rhs as u128) as u64); + self.total_sent_time = + Duration::from_micros((self.total_sent_time.as_micros() / rhs as u128) as u64); self.finalize(); } } diff --git a/src/bridge.rs b/src/bridge.rs index 4905126a..1ac63bbd 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -5,7 +5,7 @@ use crate::{ rpc::LiteRpcServer, workers::{ tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres, - PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction, + PrometheusSync, TransactionReplay, TransactionReplayer, TxProps, TxSender, WireTransaction, MESSAGES_IN_REPLAY_QUEUE, }, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, @@ -15,6 +15,7 @@ use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration}; use anyhow::bail; +use dashmap::DashMap; use log::{error, info}; use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink}; @@ -82,17 +83,21 @@ impl LiteBridge { let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); let current_slot = rpc_client.get_slot().await?; + let tx_store: Arc> = Default::default(); + let tpu_service = TpuService::new( current_slot, fanout_slots, Arc::new(identity), rpc_client.clone(), ws_addr, + tx_store.clone(), ) .await?; + let tpu_service = Arc::new(tpu_service); - let tx_sender = TxSender::new(tpu_service.clone()); + let tx_sender = TxSender::new(tx_store, tpu_service.clone()); let block_store = BlockStore::new(&rpc_client).await?; @@ -152,15 +157,17 @@ impl LiteBridge { let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture(); let prometheus_sync = PrometheusSync.sync(prometheus_addr); - let finalized_block_listener = self - .block_listner - .clone() - .listen(CommitmentConfig::finalized(), postgres_send.clone()); + let finalized_block_listener = self.block_listner.clone().listen( + CommitmentConfig::finalized(), + postgres_send.clone(), + self.tpu_service.get_estimated_slot_holder(), + ); - let confirmed_block_listener = self - .block_listner - .clone() - .listen(CommitmentConfig::confirmed(), None); + let confirmed_block_listener = self.block_listner.clone().listen( + CommitmentConfig::confirmed(), + None, + self.tpu_service.get_estimated_slot_holder(), + ); let processed_block_listener = self.block_listner.clone().listen_processed(); diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 8c23a5e2..9c85ca77 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -1,5 +1,8 @@ use std::{ - sync::{atomic::AtomicU64, Arc}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::Duration, }; @@ -406,10 +409,11 @@ impl BlockListener { self, commitment_config: CommitmentConfig, postgres: Option, + estimated_slot: Arc, ) -> JoinHandle> { let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel(); let (block_schedule_queue_sx, block_schedule_queue_rx) = - async_channel::unbounded::<(Slot, u8)>(); + async_channel::unbounded::(); // task to fetch blocks for _i in 0..8 { @@ -420,7 +424,7 @@ impl BlockListener { tokio::spawn(async move { loop { - let (slot, error_count) = match block_schedule_queue_rx.recv().await { + let slot = match block_schedule_queue_rx.recv().await { Ok(v) => v, Err(e) => { error!("Recv error on block channel {}", e); @@ -439,24 +443,12 @@ impl BlockListener { .await .is_err() { - // usually as we index all the slots even if they are not been processed we get some errors for slot - // as they are not in long term storage of the rpc // we check 5 times before ignoring the slot - - if error_count > 5 { - // retried for 10 times / there should be no block for this slot - warn!( - "unable to get block at slot {} and commitment {}", - slot, commitment_config.commitment - ); - continue; - } else { - // add a task to be queued after a delay - let retry_at = tokio::time::Instant::now() - .checked_add(Duration::from_millis(100)) - .unwrap(); - let _ = slot_retry_queue_sx.send((slot, error_count, retry_at)); - BLOCKS_IN_RETRY_QUEUE.inc(); - } + // add a task to be queued after a delay + let retry_at = tokio::time::Instant::now() + .checked_add(Duration::from_millis(10)) + .unwrap(); + let _ = slot_retry_queue_sx.send((slot, retry_at)); + BLOCKS_IN_RETRY_QUEUE.inc(); }; } }); @@ -469,11 +461,11 @@ impl BlockListener { let block_schedule_queue_sx = block_schedule_queue_sx.clone(); let recent_slot = recent_slot.clone(); tokio::spawn(async move { - while let Some((slot, error_count, instant)) = slot_retry_queue_rx.recv().await { + while let Some((slot, instant)) = slot_retry_queue_rx.recv().await { BLOCKS_IN_RETRY_QUEUE.dec(); let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed); // if slot is too old ignore - if recent_slot.saturating_sub(slot) > 256 { + if recent_slot.saturating_sub(slot) > 128 { // slot too old to retry // most probably its an empty slot continue; @@ -483,7 +475,7 @@ impl BlockListener { if now < instant { tokio::time::sleep_until(instant).await; } - if let Ok(_) = block_schedule_queue_sx.send((slot, error_count + 1)).await { + if let Ok(_) = block_schedule_queue_sx.send(slot).await { if commitment_config.is_finalized() { BLOCKS_IN_FINALIZED_QUEUE.inc(); } else { @@ -494,7 +486,6 @@ impl BlockListener { }); } - let rpc_client = self.rpc_client.clone(); tokio::spawn(async move { info!("{commitment_config:?} block listner started"); @@ -504,24 +495,14 @@ impl BlockListener { .await .slot; // -5 for warmup - let mut last_latest_slot = last_latest_slot - 5; + let mut last_latest_slot = last_latest_slot.saturating_sub(5); recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed); - // storage for recent slots processed - let rpc_client = rpc_client.clone(); loop { - let new_slot = match rpc_client.get_slot_with_commitment(commitment_config).await { - Ok(new_slot) => new_slot, - Err(err) => { - warn!("Error while fetching slot {err:?}"); - ERRORS_WHILE_FETCHING_SLOTS.inc(); - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - continue; - } - }; + let new_slot = estimated_slot.load(Ordering::Relaxed); if last_latest_slot == new_slot { - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; continue; } @@ -530,7 +511,7 @@ impl BlockListener { // context for lock { for slot in new_block_slots { - if let Err(e) = block_schedule_queue_sx.send((slot, 0)).await { + if let Err(e) = block_schedule_queue_sx.send(slot).await { error!("error sending of block schedule queue {}", e); } else { if commitment_config.is_finalized() { @@ -544,8 +525,6 @@ impl BlockListener { last_latest_slot = new_slot; recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed); - - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; } }) } diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index 4c5bda5c..7243b519 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, VecDeque}, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -17,7 +17,6 @@ use quinn::{ }; use solana_sdk::{ pubkey::Pubkey, - quic::{QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO}, }; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use tokio::{ @@ -25,10 +24,12 @@ use tokio::{ time::timeout, }; +use crate::workers::TxProps; + use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes}; pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; -const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10); +const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(1); const CONNECTION_RETRY_COUNT: usize = 10; lazy_static::lazy_static! { @@ -43,19 +44,26 @@ lazy_static::lazy_static! { } struct ActiveConnection { - pub endpoint: Endpoint, - pub identity: Pubkey, - pub tpu_address: SocketAddr, - pub exit_signal: Arc, + endpoint: Endpoint, + identity: Pubkey, + tpu_address: SocketAddr, + exit_signal: Arc, + txs_sent_store: Arc>, } impl ActiveConnection { - pub fn new(endpoint: Endpoint, tpu_address: SocketAddr, identity: Pubkey) -> Self { + pub fn new( + endpoint: Endpoint, + tpu_address: SocketAddr, + identity: Pubkey, + txs_sent_store: Arc>, + ) -> Self { Self { endpoint, tpu_address, identity, exit_signal: Arc::new(AtomicBool::new(false)), + txs_sent_store, } } @@ -165,10 +173,11 @@ impl ActiveConnection { identity, e ); + return true; } } Err(_) => { - warn!("timeout while writing transaction for {}", identity); + warn!("timeout while finishing transaction for {}", identity); } } @@ -204,8 +213,13 @@ impl ActiveConnection { exit_signal: Arc, last_stable_id: Arc, ) { - for _ in 0..3 { - if exit_signal.load(Ordering::Relaxed) { + + let mut queue = VecDeque::new(); + for tx in txs { + queue.push_back(tx); + } + for _ in 0..CONNECTION_RETRY_COUNT { + if queue.is_empty() || exit_signal.load(Ordering::Relaxed) { // return return; } @@ -244,7 +258,8 @@ impl ActiveConnection { } }; let mut retry = false; - for tx in &txs { + while !queue.is_empty() { + let tx = queue.pop_front().unwrap(); let (stream, retry_conn) = Self::open_unistream(conn.clone(), last_stable_id.clone()).await; if let Some(send_stream) = stream { @@ -254,7 +269,7 @@ impl ActiveConnection { retry = Self::write_all( send_stream, - tx, + &tx, identity, last_stable_id.clone(), conn.stable_id() as u64, @@ -263,6 +278,10 @@ impl ActiveConnection { } else { retry = retry_conn; } + if retry { + queue.push_back(tx); + break; + } } if !retry { break; @@ -270,36 +289,25 @@ impl ActiveConnection { } } - // copied from solana code base - fn compute_receive_window_ratio_for_staked_node( - max_stake: u64, - min_stake: u64, - stake: u64, - ) -> u64 { - if stake > max_stake { - return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - } - - let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO; - if max_stake > min_stake { - let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64; - let b = max_ratio as f64 - ((max_stake as f64) * a); - let ratio = (a * stake as f64) + b; - ratio.round() as u64 - } else { - QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + fn check_for_confirmation( + txs_sent_store: &Arc>, + signature: String, + ) -> bool { + match txs_sent_store.get(&signature) { + Some(props) => props.status.is_some(), + None => false, } } async fn listen( - transaction_reciever: Receiver>, + transaction_reciever: Receiver<(String, Vec)>, exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>, endpoint: Endpoint, addr: SocketAddr, exit_signal: Arc, identity: Pubkey, identity_stakes: IdentityStakes, + txs_sent_store: Arc>, ) { NB_QUIC_ACTIVE_CONNECTIONS.inc(); let mut transaction_reciever = transaction_reciever; @@ -310,16 +318,7 @@ impl ActiveConnection { identity_stakes.stakes, identity_stakes.total_stakes, ) as u64; - let number_of_transactions_per_unistream = match identity_stakes.peer_type { - solana_streamer::nonblocking::quic::ConnectionPeerType::Staked => { - Self::compute_receive_window_ratio_for_staked_node( - identity_stakes.max_stakes, - identity_stakes.min_stakes, - identity_stakes.stakes, - ) - } - solana_streamer::nonblocking::quic::ConnectionPeerType::Unstaked => 1, - }; + let number_of_transactions_per_unistream = 5; let task_counter: Arc = Arc::new(AtomicU64::new(0)); let mut connection: Option>> = None; @@ -331,71 +330,72 @@ impl ActiveConnection { break; } - if task_counter.load(Ordering::Relaxed) > max_uni_stream_connections { - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + if task_counter.load(Ordering::Relaxed) >= max_uni_stream_connections { + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; continue; } tokio::select! { - tx_or_timeout = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, transaction_reciever.recv() ) => { + tx = transaction_reciever.recv() => { // exit signal set if exit_signal.load(Ordering::Relaxed) { break; } - match tx_or_timeout { - Ok(tx) => { - let first_tx: Vec = match tx { - Ok(tx) => tx, - Err(e) => { - error!( - "Broadcast channel error on recv for {} error {}", - identity, e - ); - continue; - } - }; - - let mut txs = vec![first_tx]; - for _ in 1..number_of_transactions_per_unistream { - if let Ok(tx) = transaction_reciever.try_recv() { - txs.push(tx); - } + let first_tx: Vec = match tx { + Ok((sig, tx)) => { + if Self::check_for_confirmation(&txs_sent_store, sig) { + // transaction is already confirmed/ no need to send + continue; } - - if connection.is_none() { - // initial connection - let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await; - if let Some(conn) = conn { - // could connect - connection = Some(Arc::new(RwLock::new(conn))); - } else { - break; - } - } - - let task_counter = task_counter.clone(); - let endpoint = endpoint.clone(); - let exit_signal = exit_signal.clone(); - let addr = addr.clone(); - let connection = connection.clone(); - let last_stable_id = last_stable_id.clone(); - - tokio::spawn(async move { - task_counter.fetch_add(1, Ordering::Relaxed); - NB_QUIC_TASKS.inc(); - let connection = connection.unwrap(); - Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await; - - NB_QUIC_TASKS.dec(); - task_counter.fetch_sub(1, Ordering::Relaxed); - }); - tokio::time::sleep(tokio::time::Duration::from_micros(100)).await; + tx }, - Err(_) => { - // timed out + Err(e) => { + error!( + "Broadcast channel error on recv for {} error {}", + identity, e + ); + continue; + } + }; + + let mut txs = vec![first_tx]; + for _ in 1..number_of_transactions_per_unistream { + if let Ok((signature, tx)) = transaction_reciever.try_recv() { + if Self::check_for_confirmation(&txs_sent_store, signature) { + continue; + } + txs.push(tx); } } + + if connection.is_none() { + // initial connection + let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await; + if let Some(conn) = conn { + // could connect + connection = Some(Arc::new(RwLock::new(conn))); + } else { + break; + } + } + + let task_counter = task_counter.clone(); + let endpoint = endpoint.clone(); + let exit_signal = exit_signal.clone(); + let addr = addr.clone(); + let connection = connection.clone(); + let last_stable_id = last_stable_id.clone(); + + tokio::spawn(async move { + task_counter.fetch_add(1, Ordering::Relaxed); + NB_QUIC_TASKS.inc(); + let connection = connection.unwrap(); + Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await; + + NB_QUIC_TASKS.dec(); + task_counter.fetch_sub(1, Ordering::Relaxed); + }); }, _ = exit_oneshot_channel.recv() => { break; @@ -409,7 +409,7 @@ impl ActiveConnection { pub fn start_listening( &self, - transaction_reciever: Receiver>, + transaction_reciever: Receiver<(String, Vec)>, exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>, identity_stakes: IdentityStakes, ) { @@ -417,6 +417,7 @@ impl ActiveConnection { let addr = self.tpu_address; let exit_signal = self.exit_signal.clone(); let identity = self.identity; + let txs_sent_store = self.txs_sent_store.clone(); tokio::spawn(async move { Self::listen( transaction_reciever, @@ -426,6 +427,7 @@ impl ActiveConnection { exit_signal, identity, identity_stakes, + txs_sent_store, ) .await; }); @@ -488,16 +490,22 @@ impl TpuConnectionManager { pub async fn update_connections( &self, - transaction_sender: Arc>>, + transaction_sender: Arc)>>, connections_to_keep: HashMap, identity_stakes: IdentityStakes, + txs_sent_store: Arc>, ) { NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64); for (identity, socket_addr) in &connections_to_keep { if self.identity_to_active_connection.get(identity).is_none() { trace!("added a connection for {}, {}", identity, socket_addr); let endpoint = self.endpoints.get(); - let active_connection = ActiveConnection::new(endpoint, *socket_addr, *identity); + let active_connection = ActiveConnection::new( + endpoint, + *socket_addr, + *identity, + txs_sent_store.clone(), + ); // using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever let (sx, rx) = tokio::sync::mpsc::channel(1); diff --git a/src/workers/tpu_utils/tpu_service.rs b/src/workers/tpu_utils/tpu_service.rs index 1eb656d0..a0470e4a 100644 --- a/src/workers/tpu_utils/tpu_service.rs +++ b/src/workers/tpu_utils/tpu_service.rs @@ -29,13 +29,15 @@ use tokio::{ time::{Duration, Instant}, }; +use crate::workers::TxProps; + use super::tpu_connection_manager::TpuConnectionManager; const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and contact info of next 1024 leaders in the queue const CLUSTERINFO_REFRESH_TIME: u64 = 60; // refresh cluster every minute const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400; -const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 16384; +const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; lazy_static::lazy_static! { static ref NB_CLUSTER_NODES: GenericGauge = @@ -65,10 +67,11 @@ pub struct TpuService { fanout_slots: u64, rpc_client: Arc, pubsub_client: Arc, - broadcast_sender: Arc>>, + broadcast_sender: Arc)>>, tpu_connection_manager: Arc, identity: Arc, identity_stakes: Arc>, + txs_sent_store: Arc>, } #[derive(Debug, Copy, Clone)] @@ -99,6 +102,7 @@ impl TpuService { identity: Arc, rpc_client: Arc, rpc_ws_address: String, + txs_sent_store: Arc>, ) -> anyhow::Result { let pubsub_client = PubsubClient::new(&rpc_ws_address).await?; let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); @@ -123,6 +127,7 @@ impl TpuService { tpu_connection_manager: Arc::new(tpu_connection_manager), identity, identity_stakes: Arc::new(RwLock::new(IdentityStakes::default())), + txs_sent_store, }) } @@ -174,8 +179,8 @@ impl TpuService { Ok(()) } - pub fn send_transaction(&self, transaction: Vec) -> anyhow::Result<()> { - self.broadcast_sender.send(transaction)?; + pub fn send_transaction(&self, signature: String, transaction: Vec) -> anyhow::Result<()> { + self.broadcast_sender.send((signature, transaction))?; Ok(()) } @@ -271,6 +276,7 @@ impl TpuService { self.broadcast_sender.clone(), connections_to_keep, *identity_stakes, + self.txs_sent_store.clone(), ) .await; } @@ -441,4 +447,8 @@ impl TpuService { pub fn get_estimated_slot(&self) -> u64 { self.estimated_slot.load(Ordering::Relaxed) } + + pub fn get_estimated_slot_holder(&self) -> Arc { + self.estimated_slot.clone() + } } diff --git a/src/workers/transaction_replayer.rs b/src/workers/transaction_replayer.rs index 67bbb6ce..249762d0 100644 --- a/src/workers/transaction_replayer.rs +++ b/src/workers/transaction_replayer.rs @@ -63,7 +63,9 @@ impl TransactionReplayer { continue; } // ignore reset error - let _ = tx_sender.tpu_service.send_transaction(tx_replay.tx.clone()); + let _ = tx_sender + .tpu_service + .send_transaction(tx_replay.signature.clone(), tx_replay.tx.clone()); if tx_replay.replay_count < tx_replay.max_replay { tx_replay.replay_count += 1; diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index a0b9625e..23b48407 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -70,10 +70,13 @@ impl Default for TxProps { } impl TxSender { - pub fn new(tpu_service: Arc) -> Self { + pub fn new( + txs_sent_store: Arc>, + tpu_service: Arc, + ) -> Self { Self { tpu_service, - txs_sent_store: Default::default(), + txs_sent_store: txs_sent_store, } } @@ -105,8 +108,9 @@ impl TxSender { let forwarded_local_time = Utc::now(); let mut quic_responses = vec![]; - for tx in txs { - let quic_response = match tpu_client.send_transaction(tx) { + for (tx, (signature, _)) in txs.iter().zip(sigs_and_slots.clone()) { + txs_sent.insert(signature.to_owned(), TxProps::default()); + let quic_response = match tpu_client.send_transaction(signature.clone(), tx.clone()) { Ok(_) => { TXS_SENT.inc_by(1); 1 diff --git a/tests/workers.rs b/tests/workers.rs index 2749c83c..881dd623 100644 --- a/tests/workers.rs +++ b/tests/workers.rs @@ -1,11 +1,12 @@ use std::{sync::Arc, time::Duration}; use bench::helpers::BenchHelper; +use dashmap::DashMap; use futures::future::try_join_all; use lite_rpc::{ block_store::BlockStore, encoding::BinaryEncoding, - workers::{tpu_utils::tpu_service::TpuService, BlockListener, TxSender}, + workers::{tpu_utils::tpu_service::TpuService, BlockListener, TxProps, TxSender}, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR, }; use solana_rpc_client::nonblocking::rpc_client::RpcClient; @@ -18,20 +19,21 @@ use tokio::sync::mpsc; async fn send_and_confirm_txs() { let rpc_client = Arc::new(RpcClient::new(DEFAULT_RPC_ADDR.to_string())); let current_slot = rpc_client.get_slot().await.unwrap(); - + let txs_sent_store: Arc> = Default::default(); let tpu_service = TpuService::new( current_slot, 32, Arc::new(Keypair::new()), rpc_client.clone(), DEFAULT_WS_ADDR.into(), + txs_sent_store.clone(), ) .await .unwrap(); - let tpu_client = Arc::new(tpu_service); + let tpu_client = Arc::new(tpu_service.clone()); - let tx_sender = TxSender::new(tpu_client); + let tx_sender = TxSender::new(txs_sent_store, tpu_client); let block_store = BlockStore::new(&rpc_client).await.unwrap(); let block_listener = BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store); @@ -39,9 +41,11 @@ async fn send_and_confirm_txs() { let (tx_send, tx_recv) = mpsc::channel(1024); let services = try_join_all(vec![ - block_listener - .clone() - .listen(CommitmentConfig::confirmed(), None), + block_listener.clone().listen( + CommitmentConfig::confirmed(), + None, + tpu_service.get_estimated_slot_holder(), + ), tx_sender.clone().execute(tx_recv, None), ]);