From aac626c2c278072f6d9f0c0b40fb5ba6a94678c6 Mon Sep 17 00:00:00 2001 From: sakridge Date: Wed, 1 May 2019 15:58:35 -0700 Subject: [PATCH] Add sample_txs function to perf_utils shared crate (#4104) Shared code between bench-tps and bench-exchange --- bench-exchange/src/bench.rs | 71 ++--------------------------- bench-tps/src/bench.rs | 91 +++++++------------------------------ client/src/lib.rs | 1 + client/src/perf_utils.rs | 76 +++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 143 deletions(-) create mode 100644 client/src/perf_utils.rs diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index 4797b839a..a686d97c0 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -8,6 +8,7 @@ use rayon::prelude::*; use solana::cluster_info::FULLNODE_PORT_RANGE; use solana::contact_info::ContactInfo; use solana::gen_keys::GenKeys; +use solana_client::perf_utils::{sample_txs, SampleStats}; use solana_client::thin_client::create_client; use solana_client::thin_client::ThinClient; use solana_drone::drone::request_airdrop_transaction; @@ -68,16 +69,6 @@ impl Default for Config { } } -#[derive(Default)] -pub struct SampleStats { - /// Maximum TPS reported by this node - pub tps: f32, - /// Total time taken for those txs - pub elapsed: Duration, - /// Total transactions reported by this node - pub txs: u64, -} - pub fn do_bench_exchange(clients: Vec, config: Config) where T: 'static + Client + Send + Sync, @@ -251,62 +242,6 @@ where ); } -fn sample_txs( - exit_signal: &Arc, - sample_stats: &Arc>>, - sample_period: u64, - client: &Arc, -) where - T: Client, -{ - let mut max_tps = 0.0; - let mut total_elapsed; - let mut total_txs; - let mut now = Instant::now(); - let start_time = now; - let initial_txs = client.get_transaction_count().expect("transaction count"); - let mut last_txs = initial_txs; - - loop { - total_elapsed = start_time.elapsed(); - let elapsed = now.elapsed(); - now = Instant::now(); - let mut txs = client.get_transaction_count().expect("transaction count"); - - if txs < last_txs { - info!("Expected txs({}) >= last_txs({})", txs, last_txs); - txs = last_txs; - } - total_txs = txs - initial_txs; - let sample_txs = txs - last_txs; - last_txs = txs; - - let tps = sample_txs as f32 / duration_as_s(&elapsed); - if tps > max_tps { - max_tps = tps; - } - - info!( - "Sampler {:9.2} TPS, Transactions: {:6}, Total transactions: {} over {} s", - tps, - sample_txs, - total_txs, - total_elapsed.as_secs(), - ); - - if exit_signal.load(Ordering::Relaxed) { - let stats = SampleStats { - tps: max_tps, - elapsed: total_elapsed, - txs: total_txs, - }; - sample_stats.write().unwrap().push(stats); - return; - } - sleep(Duration::from_secs(sample_period)); - } -} - fn do_tx_transfers( exit_signal: &Arc, shared_txs: &SharedTransactions, @@ -873,13 +808,13 @@ pub fn create_token_accounts(client: &Client, signers: &[Arc], accounts } } -fn compute_and_report_stats(maxes: &Arc>>, total_txs_sent: u64) { +fn compute_and_report_stats(maxes: &Arc>>, total_txs_sent: u64) { let mut max_txs = 0; let mut max_elapsed = Duration::new(0, 0); info!("| Max TPS | Total Transactions"); info!("+---------------+--------------------"); - for stats in maxes.read().unwrap().iter() { + for (_sock, stats) in maxes.read().unwrap().iter() { let maybe_flag = match stats.txs { 0 => "!!!!!", _ => "", diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index f1a83c51e..a224511c1 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -3,6 +3,7 @@ use solana_metrics; use log::*; use rayon::prelude::*; use solana::gen_keys::GenKeys; +use solana_client::perf_utils::{sample_txs, SampleStats}; use solana_drone::drone::request_airdrop_transaction; use solana_metrics::influxdb; use solana_sdk::client::Client; @@ -24,13 +25,6 @@ use std::thread::Builder; use std::time::Duration; use std::time::Instant; -pub struct NodeStats { - /// Maximum TPS reported by this node - pub tps: f64, - /// Total transactions reported by this node - pub tx: u64, -} - pub const MAX_SPENDS_PER_TX: usize = 4; pub const NUM_LAMPORTS_PER_ACCOUNT: u64 = 20; @@ -101,7 +95,7 @@ where Builder::new() .name("solana-client-sample".to_string()) .spawn(move || { - sample_tx_count(&exit_signal, &maxes, first_tx_count, sample_period, &client); + sample_txs(&exit_signal, &maxes, sample_period, &client); }) .unwrap() }) @@ -210,7 +204,7 @@ where ); let r_maxes = maxes.read().unwrap(); - r_maxes.first().unwrap().1.tx + r_maxes.first().unwrap().1.txs } fn metrics_submit_lamport_balance(lamport_balance: u64) { @@ -223,65 +217,6 @@ fn metrics_submit_lamport_balance(lamport_balance: u64) { ); } -fn sample_tx_count( - exit_signal: &Arc, - maxes: &Arc>>, - first_tx_count: u64, - sample_period: u64, - client: &Arc, -) { - let mut now = Instant::now(); - let mut initial_tx_count = client.get_transaction_count().expect("transaction count"); - let mut max_tps = 0.0; - let mut total; - - let log_prefix = format!("{:21}:", client.transactions_addr()); - - loop { - let mut tx_count = client.get_transaction_count().expect("transaction count"); - if tx_count < initial_tx_count { - println!( - "expected tx_count({}) >= initial_tx_count({})", - tx_count, initial_tx_count - ); - tx_count = initial_tx_count; - } - let duration = now.elapsed(); - now = Instant::now(); - let sample = tx_count - initial_tx_count; - initial_tx_count = tx_count; - - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let tps = (sample * 1_000_000_000) as f64 / ns as f64; - if tps > max_tps { - max_tps = tps; - } - if tx_count > first_tx_count { - total = tx_count - first_tx_count; - } else { - total = 0; - } - println!( - "{} {:9.2} TPS, Transactions: {:6}, Total transactions: {}", - log_prefix, tps, sample, total - ); - sleep(Duration::new(sample_period, 0)); - - if exit_signal.load(Ordering::Relaxed) { - println!("{} Exiting validator thread", log_prefix); - let stats = NodeStats { - tps: max_tps, - tx: total, - }; - maxes - .write() - .unwrap() - .push((client.transactions_addr(), stats)); - break; - } - } -} - fn generate_txs( shared_txs: &SharedTransactions, blockhash: &Hash, @@ -572,7 +507,7 @@ pub fn airdrop_lamports( } fn compute_and_report_stats( - maxes: &Arc>>, + maxes: &Arc>>, sample_period: u64, tx_send_elapsed: &Duration, total_tx_send_count: usize, @@ -586,14 +521,14 @@ fn compute_and_report_stats( println!("---------------------+---------------+--------------------"); for (sock, stats) in maxes.read().unwrap().iter() { - let maybe_flag = match stats.tx { + let maybe_flag = match stats.txs { 0 => "!!!!!", _ => "", }; println!( "{:20} | {:13.2} | {} {}", - sock, stats.tps, stats.tx, maybe_flag + sock, stats.tps, stats.txs, maybe_flag ); if stats.tps == 0.0 { @@ -604,27 +539,33 @@ fn compute_and_report_stats( if stats.tps > max_of_maxes { max_of_maxes = stats.tps; } - if stats.tx > max_tx_count { - max_tx_count = stats.tx; + if stats.txs > max_tx_count { + max_tx_count = stats.txs; } } if total_maxes > 0.0 { let num_nodes_with_tps = maxes.read().unwrap().len() - nodes_with_zero_tps; - let average_max = total_maxes / num_nodes_with_tps as f64; + let average_max = total_maxes / num_nodes_with_tps as f32; println!( "\nAverage max TPS: {:.2}, {} nodes had 0 TPS", average_max, nodes_with_zero_tps ); } + let total_tx_send_count = total_tx_send_count as u64; + let drop_rate = if total_tx_send_count > max_tx_count { + (total_tx_send_count - max_tx_count) as f64 / total_tx_send_count as f64 + } else { + 0.0 + }; println!( "\nHighest TPS: {:.2} sampling period {}s max transactions: {} clients: {} drop rate: {:.2}", max_of_maxes, sample_period, max_tx_count, maxes.read().unwrap().len(), - (total_tx_send_count as u64 - max_tx_count) as f64 / total_tx_send_count as f64, + drop_rate, ); println!( "\tAverage TPS: {}", diff --git a/client/src/lib.rs b/client/src/lib.rs index 151c52f88..011095b88 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,6 +1,7 @@ pub mod client_error; mod generic_rpc_client_request; pub mod mock_rpc_client_request; +pub mod perf_utils; pub mod rpc_client; pub mod rpc_client_request; pub mod rpc_request; diff --git a/client/src/perf_utils.rs b/client/src/perf_utils.rs new file mode 100644 index 000000000..eaceae3d7 --- /dev/null +++ b/client/src/perf_utils.rs @@ -0,0 +1,76 @@ +use log::*; +use solana_sdk::client::Client; +use solana_sdk::timing::duration_as_s; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::sleep; +use std::time::{Duration, Instant}; + +#[derive(Default)] +pub struct SampleStats { + /// Maximum TPS reported by this node + pub tps: f32, + /// Total time taken for those txs + pub elapsed: Duration, + /// Total transactions reported by this node + pub txs: u64, +} + +pub fn sample_txs( + exit_signal: &Arc, + sample_stats: &Arc>>, + sample_period: u64, + client: &Arc, +) where + T: Client, +{ + let mut max_tps = 0.0; + let mut total_elapsed; + let mut total_txs; + let mut now = Instant::now(); + let start_time = now; + let initial_txs = client.get_transaction_count().expect("transaction count"); + let mut last_txs = initial_txs; + + loop { + total_elapsed = start_time.elapsed(); + let elapsed = now.elapsed(); + now = Instant::now(); + let mut txs = client.get_transaction_count().expect("transaction count"); + + if txs < last_txs { + info!("Expected txs({}) >= last_txs({})", txs, last_txs); + txs = last_txs; + } + total_txs = txs - initial_txs; + let sample_txs = txs - last_txs; + last_txs = txs; + + let tps = sample_txs as f32 / duration_as_s(&elapsed); + if tps > max_tps { + max_tps = tps; + } + + info!( + "Sampler {:9.2} TPS, Transactions: {:6}, Total transactions: {} over {} s", + tps, + sample_txs, + total_txs, + total_elapsed.as_secs(), + ); + + if exit_signal.load(Ordering::Relaxed) { + let stats = SampleStats { + tps: max_tps, + elapsed: total_elapsed, + txs: total_txs, + }; + sample_stats + .write() + .unwrap() + .push((client.transactions_addr(), stats)); + return; + } + sleep(Duration::from_secs(sample_period)); + } +}