From 4916cd8da560a62813d112372ed2f531f92484c8 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Fri, 22 Mar 2019 11:39:25 -0700 Subject: [PATCH] bench-tps in a cargo test --- bench-tps/src/bench.rs | 261 ++++++++++++++++++++++++++++++++++++-- bench-tps/src/main.rs | 240 +---------------------------------- core/src/local_cluster.rs | 14 +- 3 files changed, 264 insertions(+), 251 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 2fa775bf1..95f6875b0 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -1,8 +1,11 @@ use solana_metrics; +use crate::cli::Config; use rayon::prelude::*; use solana::cluster_info::FULLNODE_PORT_RANGE; use solana::contact_info::ContactInfo; +use solana::gen_keys::GenKeys; +use solana::gossip_service::discover; use solana_client::thin_client::create_client; use solana_client::thin_client::ThinClient; use solana_drone::drone::request_airdrop_transaction; @@ -21,6 +24,7 @@ use std::process::exit; use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; +use std::thread::Builder; use std::time::Duration; use std::time::Instant; @@ -35,7 +39,214 @@ pub const MAX_SPENDS_PER_TX: usize = 4; pub type SharedTransactions = Arc>>>; -pub fn metrics_submit_lamport_balance(lamport_balance: u64) { +pub fn do_bench_tps(config: Config) { + let Config { + network_addr: network, + drone_addr, + id, + threads, + thread_batch_sleep_ms, + num_nodes, + duration, + tx_count, + sustained, + reject_extra_nodes, + converge_only, + } = config; + + let nodes = discover(&network, num_nodes).unwrap_or_else(|err| { + eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); + exit(1); + }); + if nodes.len() < num_nodes { + eprintln!( + "Error: Insufficient nodes discovered. Expecting {} or more", + num_nodes + ); + exit(1); + } + if reject_extra_nodes && nodes.len() > num_nodes { + eprintln!( + "Error: Extra nodes discovered. Expecting exactly {}", + num_nodes + ); + exit(1); + } + + if converge_only { + return; + } + let cluster_entrypoint = nodes[0].clone(); // Pick the first node, why not? + + let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE); + let mut barrier_client = + create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE); + + let mut seed = [0u8; 32]; + seed.copy_from_slice(&id.public_key_bytes()[..32]); + let mut rnd = GenKeys::new(seed); + + println!("Creating {} keypairs...", tx_count * 2); + let mut total_keys = 0; + let mut target = tx_count * 2; + while target > 0 { + total_keys += target; + target /= MAX_SPENDS_PER_TX; + } + let gen_keypairs = rnd.gen_n_keypairs(total_keys as u64); + let barrier_source_keypair = Keypair::new(); + let barrier_dest_id = Keypair::new().pubkey(); + + println!("Get lamports..."); + let num_lamports_per_account = 20; + + // Sample the first keypair, see if it has lamports, if so then resume + // to avoid lamport loss + let keypair0_balance = client + .poll_get_balance(&gen_keypairs.last().unwrap().pubkey()) + .unwrap_or(0); + + if num_lamports_per_account > keypair0_balance { + let extra = num_lamports_per_account - keypair0_balance; + let total = extra * (gen_keypairs.len() as u64); + airdrop_lamports(&client, &drone_addr, &id, total); + println!("adding more lamports {}", extra); + fund_keys(&client, &id, &gen_keypairs, extra); + } + let start = gen_keypairs.len() - (tx_count * 2) as usize; + let keypairs = &gen_keypairs[start..]; + airdrop_lamports(&barrier_client, &drone_addr, &barrier_source_keypair, 1); + + println!("Get last ID..."); + let mut blockhash = client.get_recent_blockhash().unwrap(); + println!("Got last ID {:?}", blockhash); + + let first_tx_count = client.get_transaction_count().expect("transaction count"); + println!("Initial transaction count {}", first_tx_count); + + let exit_signal = Arc::new(AtomicBool::new(false)); + + // Setup a thread per validator to sample every period + // collect the max transaction rate and total tx count seen + let maxes = Arc::new(RwLock::new(Vec::new())); + let sample_period = 1; // in seconds + println!("Sampling TPS every {} second...", sample_period); + let v_threads: Vec<_> = nodes + .into_iter() + .map(|v| { + let exit_signal = exit_signal.clone(); + let maxes = maxes.clone(); + Builder::new() + .name("solana-client-sample".to_string()) + .spawn(move || { + sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period); + }) + .unwrap() + }) + .collect(); + + let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); + + let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); + let total_tx_sent_count = Arc::new(AtomicUsize::new(0)); + + let s_threads: Vec<_> = (0..threads) + .map(|_| { + let exit_signal = exit_signal.clone(); + let shared_txs = shared_txs.clone(); + let cluster_entrypoint = cluster_entrypoint.clone(); + let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); + let total_tx_sent_count = total_tx_sent_count.clone(); + Builder::new() + .name("solana-client-sender".to_string()) + .spawn(move || { + do_tx_transfers( + &exit_signal, + &shared_txs, + &cluster_entrypoint, + &shared_tx_active_thread_count, + &total_tx_sent_count, + thread_batch_sleep_ms, + ); + }) + .unwrap() + }) + .collect(); + + // generate and send transactions for the specified duration + let start = Instant::now(); + let mut reclaim_lamports_back_to_source_account = false; + let mut i = keypair0_balance; + while start.elapsed() < duration { + let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0); + metrics_submit_lamport_balance(balance); + + // ping-pong between source and destination accounts for each loop iteration + // this seems to be faster than trying to determine the balance of individual + // accounts + let len = tx_count as usize; + generate_txs( + &shared_txs, + &keypairs[..len], + &keypairs[len..], + threads, + reclaim_lamports_back_to_source_account, + &cluster_entrypoint, + ); + // In sustained mode overlap the transfers with generation + // this has higher average performance but lower peak performance + // in tested environments. + if !sustained { + while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 { + sleep(Duration::from_millis(100)); + } + } + // It's not feasible (would take too much time) to confirm each of the `tx_count / 2` + // transactions sent by `generate_txs()` so instead send and confirm a single transaction + // to validate the network is still functional. + send_barrier_transaction( + &mut barrier_client, + &mut blockhash, + &barrier_source_keypair, + &barrier_dest_id, + ); + + i += 1; + if should_switch_directions(num_lamports_per_account, i) { + reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account; + } + } + + // Stop the sampling threads so it will collect the stats + exit_signal.store(true, Ordering::Relaxed); + + println!("Waiting for validator threads..."); + for t in v_threads { + if let Err(err) = t.join() { + println!(" join() failed with: {:?}", err); + } + } + + // join the tx send threads + println!("Waiting for transmit threads..."); + for t in s_threads { + if let Err(err) = t.join() { + println!(" join() failed with: {:?}", err); + } + } + + let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0); + metrics_submit_lamport_balance(balance); + + compute_and_report_stats( + &maxes, + sample_period, + &start.elapsed(), + total_tx_sent_count.load(Ordering::Relaxed), + ); +} + +fn metrics_submit_lamport_balance(lamport_balance: u64) { println!("Token balance: {}", lamport_balance); solana_metrics::submit( influxdb::Point::new("bench-tps") @@ -45,7 +256,7 @@ pub fn metrics_submit_lamport_balance(lamport_balance: u64) { ); } -pub fn sample_tx_count( +fn sample_tx_count( exit_signal: &Arc, maxes: &Arc>>, first_tx_count: u64, @@ -102,7 +313,7 @@ pub fn sample_tx_count( } /// Send loopback payment of 0 lamports and confirm the network processed it -pub fn send_barrier_transaction( +fn send_barrier_transaction( barrier_client: &mut ThinClient, blockhash: &mut Hash, source_keypair: &Keypair, @@ -177,7 +388,7 @@ pub fn send_barrier_transaction( } } -pub fn generate_txs( +fn generate_txs( shared_txs: &SharedTransactions, source: &[Keypair], dest: &[Keypair], @@ -237,7 +448,7 @@ pub fn generate_txs( } } -pub fn do_tx_transfers( +fn do_tx_transfers( exit_signal: &Arc, shared_txs: &SharedTransactions, contact_info: &ContactInfo, @@ -295,7 +506,7 @@ pub fn do_tx_transfers( } } -pub fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u64) -> bool { +fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u64) -> bool { for a in &tx.account_keys[1..] { if client.get_balance(a).unwrap_or(0) >= amount { return true; @@ -308,7 +519,7 @@ pub fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u6 /// fund the dests keys by spending all of the source keys into MAX_SPENDS_PER_TX /// on every iteration. This allows us to replay the transfers because the source is either empty, /// or full -pub fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lamports: u64) { +fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lamports: u64) { let total = lamports * dests.len() as u64; let mut funded: Vec<(&Keypair, u64)> = vec![(source, total)]; let mut notfunded: Vec<&Keypair> = dests.iter().collect(); @@ -401,7 +612,7 @@ pub fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lampo } } -pub fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair, tx_count: u64) { +fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair, tx_count: u64) { let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0); metrics_submit_lamport_balance(starting_balance); println!("starting balance {}", starting_balance); @@ -448,7 +659,7 @@ pub fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypa } } -pub fn compute_and_report_stats( +fn compute_and_report_stats( maxes: &Arc>>, sample_period: u64, tx_send_elapsed: &Duration, @@ -515,13 +726,18 @@ pub fn compute_and_report_stats( // First transfer 3/4 of the lamports to the dest accounts // then ping-pong 1/4 of the lamports back to the other account // this leaves 1/4 lamport buffer in each account -pub fn should_switch_directions(num_lamports_per_account: u64, i: u64) -> bool { +fn should_switch_directions(num_lamports_per_account: u64, i: u64) -> bool { i % (num_lamports_per_account / 4) == 0 && (i >= (3 * num_lamports_per_account) / 4) } #[cfg(test)] mod tests { use super::*; + use solana::fullnode::FullnodeConfig; + use solana::local_cluster::LocalCluster; + use solana_drone::drone::run_local_drone; + use std::sync::mpsc::channel; + #[test] fn test_switch_directions() { assert_eq!(should_switch_directions(20, 0), false); @@ -536,4 +752,29 @@ mod tests { assert_eq!(should_switch_directions(20, 100), true); assert_eq!(should_switch_directions(20, 101), false); } + + #[test] + #[ignore] + fn test_bench_tps() { + let fullnode_config = FullnodeConfig::default(); + const NUM_NODES: usize = 1; + let cluster = + LocalCluster::new_with_config(&[999_990; NUM_NODES], 2_000_000, &fullnode_config); + + let drone_keypair = Keypair::new(); + cluster.transfer(&cluster.funding_keypair, &drone_keypair.pubkey(), 1_000_000); + + let (addr_sender, addr_receiver) = channel(); + run_local_drone(drone_keypair, addr_sender); + let drone_addr = addr_receiver.recv_timeout(Duration::from_secs(2)).unwrap(); + + let mut cfg = Config::default(); + cfg.network_addr = cluster.entry_point_info.gossip; + cfg.drone_addr = drone_addr; + cfg.tx_count = 100; + cfg.duration = Duration::from_secs(5); + cfg.num_nodes = NUM_NODES; + + do_bench_tps(cfg); + } } diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index bfed3bf56..ed8749aae 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -1,21 +1,7 @@ mod bench; mod cli; -use crate::bench::*; -use solana::cluster_info::FULLNODE_PORT_RANGE; -use solana::gen_keys::GenKeys; -use solana::gossip_service::discover; -use solana_client::thin_client::create_client; -use solana_metrics; -use solana_sdk::signature::{Keypair, KeypairUtil}; -use std::collections::VecDeque; -use std::process::exit; -use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; -use std::thread::sleep; -use std::thread::Builder; -use std::time::Duration; -use std::time::Instant; +use crate::bench::do_bench_tps; fn main() { solana_logger::setup(); @@ -25,227 +11,5 @@ fn main() { let cfg = cli::extract_args(&matches); - let cli::Config { - network_addr: network, - drone_addr, - id, - threads, - thread_batch_sleep_ms, - num_nodes, - duration, - tx_count, - sustained, - reject_extra_nodes, - converge_only, - } = cfg; - - let nodes = discover(&network, num_nodes).unwrap_or_else(|err| { - eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); - exit(1); - }); - if nodes.len() < num_nodes { - eprintln!( - "Error: Insufficient nodes discovered. Expecting {} or more", - num_nodes - ); - exit(1); - } - if reject_extra_nodes && nodes.len() > num_nodes { - eprintln!( - "Error: Extra nodes discovered. Expecting exactly {}", - num_nodes - ); - exit(1); - } - - if converge_only { - return; - } - let cluster_entrypoint = nodes[0].clone(); // Pick the first node, why not? - - let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE); - let mut barrier_client = - create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE); - - let mut seed = [0u8; 32]; - seed.copy_from_slice(&id.public_key_bytes()[..32]); - let mut rnd = GenKeys::new(seed); - - println!("Creating {} keypairs...", tx_count * 2); - let mut total_keys = 0; - let mut target = tx_count * 2; - while target > 0 { - total_keys += target; - target /= MAX_SPENDS_PER_TX; - } - let gen_keypairs = rnd.gen_n_keypairs(total_keys as u64); - let barrier_source_keypair = Keypair::new(); - let barrier_dest_id = Keypair::new().pubkey(); - - println!("Get lamports..."); - let num_lamports_per_account = 20; - - // Sample the first keypair, see if it has lamports, if so then resume - // to avoid lamport loss - let keypair0_balance = client - .poll_get_balance(&gen_keypairs.last().unwrap().pubkey()) - .unwrap_or(0); - - if num_lamports_per_account > keypair0_balance { - let extra = num_lamports_per_account - keypair0_balance; - let total = extra * (gen_keypairs.len() as u64); - airdrop_lamports(&client, &drone_addr, &id, total); - println!("adding more lamports {}", extra); - fund_keys(&client, &id, &gen_keypairs, extra); - } - let start = gen_keypairs.len() - (tx_count * 2) as usize; - let keypairs = &gen_keypairs[start..]; - airdrop_lamports(&barrier_client, &drone_addr, &barrier_source_keypair, 1); - - println!("Get last ID..."); - let mut blockhash = client.get_recent_blockhash().unwrap(); - println!("Got last ID {:?}", blockhash); - - let first_tx_count = client.get_transaction_count().expect("transaction count"); - println!("Initial transaction count {}", first_tx_count); - - let exit_signal = Arc::new(AtomicBool::new(false)); - - // Setup a thread per validator to sample every period - // collect the max transaction rate and total tx count seen - let maxes = Arc::new(RwLock::new(Vec::new())); - let sample_period = 1; // in seconds - println!("Sampling TPS every {} second...", sample_period); - let v_threads: Vec<_> = nodes - .into_iter() - .map(|v| { - let exit_signal = exit_signal.clone(); - let maxes = maxes.clone(); - Builder::new() - .name("solana-client-sample".to_string()) - .spawn(move || { - sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period); - }) - .unwrap() - }) - .collect(); - - let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); - - let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); - let total_tx_sent_count = Arc::new(AtomicUsize::new(0)); - - let s_threads: Vec<_> = (0..threads) - .map(|_| { - let exit_signal = exit_signal.clone(); - let shared_txs = shared_txs.clone(); - let cluster_entrypoint = cluster_entrypoint.clone(); - let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); - let total_tx_sent_count = total_tx_sent_count.clone(); - Builder::new() - .name("solana-client-sender".to_string()) - .spawn(move || { - do_tx_transfers( - &exit_signal, - &shared_txs, - &cluster_entrypoint, - &shared_tx_active_thread_count, - &total_tx_sent_count, - thread_batch_sleep_ms, - ); - }) - .unwrap() - }) - .collect(); - - // generate and send transactions for the specified duration - let start = Instant::now(); - let mut reclaim_lamports_back_to_source_account = false; - let mut i = keypair0_balance; - while start.elapsed() < duration { - let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0); - metrics_submit_lamport_balance(balance); - - // ping-pong between source and destination accounts for each loop iteration - // this seems to be faster than trying to determine the balance of individual - // accounts - let len = tx_count as usize; - generate_txs( - &shared_txs, - &keypairs[..len], - &keypairs[len..], - threads, - reclaim_lamports_back_to_source_account, - &cluster_entrypoint, - ); - // In sustained mode overlap the transfers with generation - // this has higher average performance but lower peak performance - // in tested environments. - if !sustained { - while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 { - sleep(Duration::from_millis(100)); - } - } - // It's not feasible (would take too much time) to confirm each of the `tx_count / 2` - // transactions sent by `generate_txs()` so instead send and confirm a single transaction - // to validate the network is still functional. - send_barrier_transaction( - &mut barrier_client, - &mut blockhash, - &barrier_source_keypair, - &barrier_dest_id, - ); - - i += 1; - if should_switch_directions(num_lamports_per_account, i) { - reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account; - } - } - - // Stop the sampling threads so it will collect the stats - exit_signal.store(true, Ordering::Relaxed); - - println!("Waiting for validator threads..."); - for t in v_threads { - if let Err(err) = t.join() { - println!(" join() failed with: {:?}", err); - } - } - - // join the tx send threads - println!("Waiting for transmit threads..."); - for t in s_threads { - if let Err(err) = t.join() { - println!(" join() failed with: {:?}", err); - } - } - - let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0); - metrics_submit_lamport_balance(balance); - - compute_and_report_stats( - &maxes, - sample_period, - &start.elapsed(), - total_tx_sent_count.load(Ordering::Relaxed), - ); -} - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_switch_directions() { - assert_eq!(should_switch_directions(20, 0), false); - assert_eq!(should_switch_directions(20, 1), false); - assert_eq!(should_switch_directions(20, 14), false); - assert_eq!(should_switch_directions(20, 15), true); - assert_eq!(should_switch_directions(20, 16), false); - assert_eq!(should_switch_directions(20, 19), false); - assert_eq!(should_switch_directions(20, 20), true); - assert_eq!(should_switch_directions(20, 21), false); - assert_eq!(should_switch_directions(20, 99), false); - assert_eq!(should_switch_directions(20, 100), true); - assert_eq!(should_switch_directions(20, 101), false); - } + do_bench_tps(cfg); } diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index 49a036183..e507c53a5 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -183,7 +183,7 @@ impl LocalCluster { // Send each validator some lamports to vote let validator_balance = - Self::transfer(&client, &self.funding_keypair, &validator_pubkey, stake); + Self::transfer_with_client(&client, &self.funding_keypair, &validator_pubkey, stake); info!( "validator {} balance {}", validator_pubkey, validator_balance @@ -217,7 +217,7 @@ impl LocalCluster { FULLNODE_PORT_RANGE, ); - Self::transfer( + Self::transfer_with_client( &client, &self.funding_keypair, &replicator_keypair.pubkey(), @@ -252,7 +252,15 @@ impl LocalCluster { } } - fn transfer( + pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { + let client = create_client( + self.entry_point_info.client_facing_addr(), + FULLNODE_PORT_RANGE, + ); + Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) + } + + fn transfer_with_client( client: &ThinClient, source_keypair: &Keypair, dest_pubkey: &Pubkey,