diff --git a/bench/Cargo.toml b/bench/Cargo.toml index c7e0a308..c2469869 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -18,4 +18,3 @@ dirs = "5.0.0" rand = "0.8.5" rand_chacha = "0.3.1" futures = { workspace = true } - diff --git a/bench/src/main.rs b/bench/src/main.rs index b8502cf5..a4ae94cd 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, collections::HashMap}; use bench::{ cli::Args, @@ -9,9 +9,8 @@ use clap::Parser; use futures::future::join_all; use log::{error, info}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::{commitment_config::CommitmentConfig, signer::Signer}; +use solana_sdk::{commitment_config::CommitmentConfig, signer::Signer, signature::Keypair}; use tokio::{ - sync::Mutex, time::{Duration, Instant}, }; @@ -37,12 +36,16 @@ async fn main() { let mut tasks = vec![]; - for _ in 0..runs { - let rpc_client = Arc::new(RpcClient::new_with_commitment( - lite_rpc_addr.clone(), - CommitmentConfig::confirmed(), - )); - tasks.push(tokio::spawn(bench(rpc_client.clone(), tx_count))); + let funded_payer = BenchHelper::get_payer().await.unwrap(); + println!("payer : {}", funded_payer.pubkey()); + + let rpc_client = Arc::new(RpcClient::new_with_commitment( + lite_rpc_addr.clone(), + CommitmentConfig::confirmed(), + )); + for seed in 0..runs { + let funded_payer = Keypair::from_bytes(funded_payer.to_bytes().as_slice()).unwrap(); + tasks.push(tokio::spawn(bench(rpc_client.clone(), tx_count, funded_payer, seed as u64))); // wait for an interval run_interval_ms.tick().await; } @@ -73,44 +76,48 @@ async fn main() { csv_writer.flush().unwrap(); } -async fn bench(rpc_client: Arc, tx_count: usize) -> Metric { - let funded_payer = BenchHelper::get_payer().await.unwrap(); +#[derive(Clone, Debug, Copy)] +struct TxSendData { + sent_duration: Duration, + sent_instant: Instant, +} - println!("payer {}", funded_payer.pubkey()); +async fn bench(rpc_client: Arc, tx_count: usize, funded_payer: Keypair, seed: u64) -> Metric { let blockhash = rpc_client.get_latest_blockhash().await.unwrap(); - let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, None); + let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, Some(seed)); - let metric = Arc::new(Mutex::new(Metric::default())); - let mut tx_sent_tasks = vec![]; + let mut metric = Metric::default(); + let mut map_of_txs = HashMap::new(); for tx in txs { let rpc_client = rpc_client.clone(); - let metric = metric.clone(); - let task = tokio::spawn(async move { - let start_time = Instant::now(); - let signature = rpc_client.send_transaction(&tx).await.unwrap(); - let sent_duration = start_time.elapsed(); - let mut confirmed = false; - while start_time.elapsed() < Duration::from_secs(60) { - tokio::time::sleep(Duration::from_millis(1)).await; - if let Ok(b) = rpc_client.confirm_transaction(&signature).await { - if b == true { - confirmed = true; - break; - } + let start_time = Instant::now(); + if let Ok(signature) = rpc_client.send_transaction(&tx).await { + map_of_txs.insert( signature, TxSendData { + sent_duration: start_time.elapsed(), + sent_instant: Instant::now(), + }); + } + } + let confirmation_time = Instant::now(); + while confirmation_time.elapsed() < Duration::from_secs(60) && !map_of_txs.is_empty() { + let signatures = map_of_txs.iter().map(|x| x.0.clone()).collect::>(); + if let Ok(res) = rpc_client.get_signature_statuses(&signatures).await { + for i in 0..signatures.len() { + let tx_status = &res.value[i]; + if let Some(_) = tx_status { + let signature = signatures[i]; + let tx_data = map_of_txs.get(&signature).unwrap(); + metric.add_successful_transaction( tx_data.sent_duration, tx_data.sent_instant.elapsed()); + map_of_txs.remove(&signature); } } - let send_and_confirm_time = start_time.elapsed(); - let mut metric = metric.lock().await; - if confirmed { - metric.add_successful_transaction(sent_duration, send_and_confirm_time); - } else { - metric.add_unsuccessful_transaction(); - } - }); - tx_sent_tasks.push(task); + } } - futures::future::join_all(tx_sent_tasks).await; - let m = *metric.lock().await; - m + + for (_, tx) in map_of_txs { + metric.add_unsuccessful_transaction(tx.sent_duration); + } + metric.finalize(); + metric } diff --git a/bench/src/metrics.rs b/bench/src/metrics.rs index e55dfe11..79624d0a 100644 --- a/bench/src/metrics.rs +++ b/bench/src/metrics.rs @@ -1,39 +1,50 @@ use std::{ - ops::{AddAssign, Div, DivAssign, Mul}, + ops::{AddAssign, DivAssign}, time::Duration, }; #[derive(Clone, Copy, Debug, Default, serde::Serialize)] pub struct Metric { pub txs_sent: u64, - pub time_to_send_txs: f64, pub txs_confirmed: u64, pub txs_un_confirmed: u64, pub average_confirmation_time_ms: f64, + pub average_time_to_send_txs: f64, + + #[serde(skip_serializing)] + total_sent_time: Duration, + #[serde(skip_serializing)] + total_confirmation_time: Duration, } impl Metric { pub fn add_successful_transaction( &mut self, time_to_send: Duration, - time_to_send_and_confrim: Duration, + time_to_confrim: Duration, ) { - self.time_to_send_txs = (self.time_to_send_txs.mul(self.txs_confirmed as f64) - + time_to_send.as_millis() as f64) - .div(self.txs_confirmed as f64 + 1.0); - self.average_confirmation_time_ms = (self - .average_confirmation_time_ms - .mul(self.txs_confirmed as f64) - + time_to_send_and_confrim.as_millis() as f64) - .div(self.txs_confirmed as f64 + 1.0); + self.total_sent_time += time_to_send; + self.total_confirmation_time += time_to_confrim; + self.txs_confirmed += 1; self.txs_sent += 1; } - pub fn add_unsuccessful_transaction(&mut self) { + pub fn add_unsuccessful_transaction(&mut self, time_to_send: Duration) { + self.total_sent_time += time_to_send; self.txs_un_confirmed += 1; self.txs_sent += 1; } + + pub fn finalize(&mut self) { + if self.txs_sent > 0 { + self.average_time_to_send_txs = self.total_sent_time.as_millis() as f64 / self.txs_sent as f64; + } + + if self.txs_confirmed > 0 { + self.average_confirmation_time_ms = self.total_confirmation_time.as_millis() as f64 / self.txs_confirmed as f64; + } + } } #[derive(Default)] @@ -51,10 +62,12 @@ impl Metric { impl AddAssign<&Self> for Metric { fn add_assign(&mut self, rhs: &Self) { self.txs_sent += rhs.txs_sent; - self.time_to_send_txs += rhs.time_to_send_txs; self.txs_confirmed += rhs.txs_confirmed; self.txs_un_confirmed += rhs.txs_un_confirmed; - self.average_confirmation_time_ms += rhs.average_confirmation_time_ms; + + self.total_confirmation_time += rhs.total_confirmation_time; + self.total_sent_time += rhs.total_sent_time; + self.finalize(); } } @@ -65,10 +78,12 @@ impl DivAssign for Metric { return; } self.txs_sent /= rhs; - self.time_to_send_txs /= rhs as f64; self.txs_confirmed /= rhs; self.txs_un_confirmed /= rhs; - self.average_confirmation_time_ms /= rhs as f64; + + self.total_confirmation_time = Duration::from_micros((self.total_confirmation_time.as_micros() / rhs as u128) as u64); + self.total_sent_time = Duration::from_micros((self.total_sent_time.as_micros() / rhs as u128) as u64); + self.finalize(); } }