From 01f44f531ef82e0acf4765c151855696d2453a51 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Wed, 18 Dec 2019 23:50:17 -0500 Subject: [PATCH] Improve bench-tps stability (#7537) * Improve bench-tps throughput * Fix tests * Fix more tests * Fix move test * Drop blockhash poll sleep interval --- bench-tps/src/bench.rs | 219 ++++++++++++++++++++++------------- bench-tps/src/cli.rs | 19 ++- bench-tps/src/main.rs | 12 +- bench-tps/tests/bench_tps.rs | 3 +- 4 files changed, 163 insertions(+), 90 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 2180d462c5..2f7f4cb8ff 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -88,8 +88,13 @@ where let clients: Vec<_> = clients.into_iter().map(Arc::new).collect(); let client = &clients[0]; - let start = gen_keypairs.len() - (tx_count * 2) as usize; - let keypairs = &gen_keypairs[start..]; + let mut source_keypair_chunks: Vec> = Vec::new(); + let mut dest_keypair_chunks: Vec> = Vec::new(); + assert!(gen_keypairs.len() >= 2 * tx_count); + for chunk in gen_keypairs.chunks_exact(2 * tx_count) { + source_keypair_chunks.push(chunk[..tx_count].iter().collect()); + dest_keypair_chunks.push(chunk[tx_count..].iter().collect()); + } let first_tx_count = loop { match client.get_transaction_count() { @@ -126,9 +131,23 @@ where let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); + let recent_blockhash = Arc::new(RwLock::new(get_recent_blockhash(client.as_ref()).0)); let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); let total_tx_sent_count = Arc::new(AtomicUsize::new(0)); + let blockhash_thread = { + let exit_signal = exit_signal.clone(); + let recent_blockhash = recent_blockhash.clone(); + let client = client.clone(); + let id = id.pubkey(); + Builder::new() + .name("solana-blockhash-poller".to_string()) + .spawn(move || { + poll_blockhash(&exit_signal, &recent_blockhash, &client, &id); + }) + .unwrap() + }; + let s_threads: Vec<_> = (0..threads) .map(|_| { let exit_signal = exit_signal.clone(); @@ -154,58 +173,40 @@ where // generate and send transactions for the specified duration let start = Instant::now(); + let keypair_chunks = source_keypair_chunks.len() as u64; let mut reclaim_lamports_back_to_source_account = false; let mut i = keypair0_balance; - let mut blockhash = Hash::default(); - let mut blockhash_time; while start.elapsed() < duration { - // ping-pong between source and destination accounts for each loop iteration - // this seems to be faster than trying to determine the balance of individual - // accounts - let len = tx_count as usize; - blockhash_time = Instant::now(); - if let Ok((new_blockhash, _fee_calculator)) = client.get_new_blockhash(&blockhash) { - blockhash = new_blockhash; - } else { - if blockhash_time.elapsed().as_secs() > 30 { - panic!("Blockhash is not updating"); - } - sleep(Duration::from_millis(100)); - continue; - } - datapoint_debug!( - "bench-tps-get_blockhash", - ("duration", duration_as_us(&blockhash_time.elapsed()), i64) - ); - - blockhash_time = Instant::now(); - let balance = client.get_balance(&id.pubkey()).unwrap_or(0); - metrics_submit_lamport_balance(balance); - datapoint_debug!( - "bench-tps-get_balance", - ("duration", duration_as_us(&blockhash_time.elapsed()), i64) - ); - + let chunk_index = (i % keypair_chunks) as usize; generate_txs( &shared_txs, - &blockhash, - &keypairs[..len], - &keypairs[len..], + &recent_blockhash, + &source_keypair_chunks[chunk_index], + &dest_keypair_chunks[chunk_index], threads, reclaim_lamports_back_to_source_account, &libra_args, ); - // In sustained mode overlap the transfers with generation - // this has higher average performance but lower peak performance - // in tested environments. - if !sustained { + + // In sustained mode, overlap the transfers with generation. This has higher average + // performance but lower peak performance in tested environments. + if sustained { + // Ensure that we don't generate more transactions than we can handle. + while shared_txs.read().unwrap().len() > 2 * threads { + sleep(Duration::from_millis(1)); + } + } else { while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 { sleep(Duration::from_millis(1)); } } + // Rotate destination keypairs so that the next round of transactions will have different + // transaction signatures even when blockhash is reused. + dest_keypair_chunks[chunk_index].rotate_left(1); + i += 1; - if should_switch_directions(num_lamports_per_account, i) { + if should_switch_directions(num_lamports_per_account, keypair_chunks, i) { reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account; } } @@ -228,6 +229,11 @@ where } } + info!("Waiting for blockhash thread..."); + if let Err(err) = blockhash_thread.join() { + info!(" join() failed with: {:?}", err); + } + let balance = client.get_balance(&id.pubkey()).unwrap_or(0); metrics_submit_lamport_balance(balance); @@ -252,8 +258,8 @@ fn metrics_submit_lamport_balance(lamport_balance: u64) { #[cfg(feature = "move")] fn generate_move_txs( - source: &[Keypair], - dest: &[Keypair], + source: &[&Keypair], + dest: &VecDeque<&Keypair>, reclaim: bool, move_keypairs: &[Keypair], libra_pay_program_id: &Pubkey, @@ -297,8 +303,8 @@ fn generate_move_txs( } fn generate_system_txs( - source: &[Keypair], - dest: &[Keypair], + source: &[&Keypair], + dest: &VecDeque<&Keypair>, reclaim: bool, blockhash: &Hash, ) -> Vec<(Transaction, u64)> { @@ -321,15 +327,19 @@ fn generate_system_txs( fn generate_txs( shared_txs: &SharedTransactions, - blockhash: &Hash, - source: &[Keypair], - dest: &[Keypair], + blockhash: &Arc>, + source: &[&Keypair], + dest: &VecDeque<&Keypair>, threads: usize, reclaim: bool, libra_args: &Option, ) { + let blockhash = *blockhash.read().unwrap(); let tx_count = source.len(); - info!("Signing transactions... {} (reclaim={})", tx_count, reclaim); + info!( + "Signing transactions... {} (reclaim={}, blockhash={})", + tx_count, reclaim, &blockhash + ); let signing_start = Instant::now(); let transactions = if let Some(( @@ -353,11 +363,11 @@ fn generate_txs( &_libra_keys, _libra_pay_program_id, &_libra_genesis_keypair.pubkey(), - blockhash, + &blockhash, ) } } else { - generate_system_txs(source, dest, reclaim, blockhash) + generate_system_txs(source, dest, reclaim, &blockhash) }; let duration = signing_start.elapsed(); @@ -386,6 +396,38 @@ fn generate_txs( } } +fn poll_blockhash( + exit_signal: &Arc, + blockhash: &Arc>, + client: &Arc, + id: &Pubkey, +) { + let mut blockhash_time; + loop { + blockhash_time = Instant::now(); + loop { + let old_blockhash = *blockhash.read().unwrap(); + if let Ok((new_blockhash, _fee)) = client.get_new_blockhash(&old_blockhash) { + *blockhash.write().unwrap() = new_blockhash; + break; + } else { + if blockhash_time.elapsed().as_secs() > 30 { + panic!("Blockhash is not updating"); + } + sleep(Duration::from_millis(50)); + continue; + } + } + + let balance = client.get_balance(id).unwrap_or(0); + metrics_submit_lamport_balance(balance); + + if exit_signal.load(Ordering::Relaxed) { + break; + } + } +} + fn do_tx_transfers( exit_signal: &Arc, shared_txs: &SharedTransactions, @@ -398,11 +440,10 @@ fn do_tx_transfers( if thread_batch_sleep_ms > 0 { sleep(Duration::from_millis(thread_batch_sleep_ms as u64)); } - let txs; - { + let txs = { let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers"); - txs = shared_txs_wl.pop_front(); - } + shared_txs_wl.pop_front() + }; if let Some(txs0) = txs { shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); info!( @@ -758,11 +799,15 @@ fn compute_and_report_stats( ); } -// First transfer 3/4 of the lamports to the dest accounts -// then ping-pong 1/4 of the lamports back to the other account -// this leaves 1/4 lamport buffer in each account -fn should_switch_directions(num_lamports_per_account: u64, i: u64) -> bool { - i % (num_lamports_per_account / 4) == 0 && (i >= (3 * num_lamports_per_account) / 4) +// First transfer 2/3 of the lamports to the dest accounts +// then ping-pong 1/3 of the lamports back to the other account +// this leaves 1/3 lamport buffer in each account +fn should_switch_directions(num_lamports_per_account: u64, keypair_chunks: u64, i: u64) -> bool { + if i < keypair_chunks * (2 * num_lamports_per_account) / 3 { + return false; + } + + i % (keypair_chunks * num_lamports_per_account / 3) == 0 } pub fn generate_keypairs(seed_keypair: &Keypair, count: u64) -> (Vec, u64) { @@ -897,9 +942,12 @@ fn fund_move_keys( info!("funded libra funding key {}", i); } - let tx_count = keypairs.len(); - let amount = total / (tx_count as u64); - for (i, keys) in keypairs[..tx_count].chunks(NUM_FUNDING_KEYS).enumerate() { + let keypair_count = keypairs.len(); + let amount = total / (keypair_count as u64); + for (i, keys) in keypairs[..keypair_count] + .chunks(NUM_FUNDING_KEYS) + .enumerate() + { for (j, key) in keys.iter().enumerate() { let tx = librapay_transaction::transfer( libra_pay_program_id, @@ -949,18 +997,18 @@ pub fn generate_and_fund_keypairs( client: &T, faucet_addr: Option, funding_key: &Keypair, - tx_count: usize, + keypair_count: usize, lamports_per_account: u64, use_move: bool, ) -> Result<(Vec, Option, u64)> { - info!("Creating {} keypairs...", tx_count * 2); - let (mut keypairs, extra) = generate_keypairs(funding_key, tx_count as u64 * 2); + info!("Creating {} keypairs...", keypair_count); + let (mut keypairs, extra) = generate_keypairs(funding_key, keypair_count as u64); info!("Get lamports..."); // Sample the first keypair, see if it has lamports, if so then resume. // This logic is to prevent lamport loss on repeated solana-bench-tps executions let last_keypair_balance = client - .get_balance(&keypairs[tx_count * 2 - 1].pubkey()) + .get_balance(&keypairs[keypair_count - 1].pubkey()) .unwrap_or(0); #[cfg(feature = "move")] @@ -999,7 +1047,7 @@ pub fn generate_and_fund_keypairs( // Still fund the solana ones which will be used for fees. let seed = [0u8; 32]; let mut rnd = GenKeys::new(seed); - let move_keypairs = rnd.gen_n_keypairs(tx_count as u64 * 2); + let move_keypairs = rnd.gen_n_keypairs(keypair_count as u64); fund_move_keys( client, funding_key, @@ -1032,7 +1080,7 @@ pub fn generate_and_fund_keypairs( } // 'generate_keypairs' generates extra keys to be able to have size-aligned funding batches for fund_keys. - keypairs.truncate(2 * tx_count); + keypairs.truncate(keypair_count); Ok((keypairs, move_keypairs_ret, last_keypair_balance)) } @@ -1048,17 +1096,21 @@ mod tests { #[test] fn test_switch_directions() { - assert_eq!(should_switch_directions(20, 0), false); - assert_eq!(should_switch_directions(20, 1), false); - assert_eq!(should_switch_directions(20, 14), false); - assert_eq!(should_switch_directions(20, 15), true); - assert_eq!(should_switch_directions(20, 16), false); - assert_eq!(should_switch_directions(20, 19), false); - assert_eq!(should_switch_directions(20, 20), true); - assert_eq!(should_switch_directions(20, 21), false); - assert_eq!(should_switch_directions(20, 99), false); - assert_eq!(should_switch_directions(20, 100), true); - assert_eq!(should_switch_directions(20, 101), false); + assert_eq!(should_switch_directions(30, 1, 0), false); + assert_eq!(should_switch_directions(30, 1, 1), false); + assert_eq!(should_switch_directions(30, 1, 20), true); + assert_eq!(should_switch_directions(30, 1, 21), false); + assert_eq!(should_switch_directions(30, 1, 30), true); + assert_eq!(should_switch_directions(30, 1, 90), true); + assert_eq!(should_switch_directions(30, 1, 91), false); + + assert_eq!(should_switch_directions(30, 2, 0), false); + assert_eq!(should_switch_directions(30, 2, 1), false); + assert_eq!(should_switch_directions(30, 2, 20), false); + assert_eq!(should_switch_directions(30, 2, 40), true); + assert_eq!(should_switch_directions(30, 2, 90), false); + assert_eq!(should_switch_directions(30, 2, 100), true); + assert_eq!(should_switch_directions(30, 2, 101), false); } #[test] @@ -1072,8 +1124,9 @@ mod tests { config.tx_count = 10; config.duration = Duration::from_secs(5); + let keypair_count = config.tx_count * config.keypair_multiplier; let (keypairs, _move_keypairs, _keypair_balance) = - generate_and_fund_keypairs(&clients[0], None, &config.id, config.tx_count, 20, false) + generate_and_fund_keypairs(&clients[0], None, &config.id, keypair_count, 20, false) .unwrap(); do_bench_tps(clients, config, keypairs, 0, None); @@ -1084,11 +1137,11 @@ mod tests { let (genesis_config, id) = create_genesis_config(10_000); let bank = Bank::new(&genesis_config); let client = BankClient::new(bank); - let tx_count = 10; + let keypair_count = 20; let lamports = 20; let (keypairs, _move_keypairs, _keypair_balance) = - generate_and_fund_keypairs(&client, None, &id, tx_count, lamports, false).unwrap(); + generate_and_fund_keypairs(&client, None, &id, keypair_count, lamports, false).unwrap(); for kp in &keypairs { assert_eq!( @@ -1107,11 +1160,11 @@ mod tests { genesis_config.fee_calculator = fee_calculator; let bank = Bank::new(&genesis_config); let client = BankClient::new(bank); - let tx_count = 10; + let keypair_count = 20; let lamports = 20; let (keypairs, _move_keypairs, _keypair_balance) = - generate_and_fund_keypairs(&client, None, &id, tx_count, lamports, false).unwrap(); + generate_and_fund_keypairs(&client, None, &id, keypair_count, lamports, false).unwrap(); let max_fee = client .get_recent_blockhash_with_commitment(CommitmentConfig::recent()) diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index a5fec88dd8..af3b90e078 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -15,6 +15,7 @@ pub struct Config { pub num_nodes: usize, pub duration: Duration, pub tx_count: usize, + pub keypair_multiplier: usize, pub thread_batch_sleep_ms: usize, pub sustained: bool, pub client_ids_and_stake_file: String, @@ -36,6 +37,7 @@ impl Default for Config { num_nodes: 1, duration: Duration::new(std::u64::MAX, 0), tx_count: 50_000, + keypair_multiplier: 8, thread_batch_sleep_ms: 1000, sustained: false, client_ids_and_stake_file: String::new(), @@ -122,6 +124,13 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .takes_value(true) .help("Number of transactions to send per batch") ) + .arg( + Arg::with_name("keypair_multiplier") + .long("keypair-multiplier") + .value_name("NUM") + .takes_value(true) + .help("Multiply by transaction count to determine number of keypairs to create") + ) .arg( Arg::with_name("thread-batch-sleep-ms") .short("z") @@ -208,7 +217,15 @@ pub fn extract_args<'a>(matches: &ArgMatches<'a>) -> Config { } if let Some(s) = matches.value_of("tx_count") { - args.tx_count = s.to_string().parse().expect("can't parse tx_account"); + args.tx_count = s.to_string().parse().expect("can't parse tx_count"); + } + + if let Some(s) = matches.value_of("keypair_multiplier") { + args.keypair_multiplier = s + .to_string() + .parse() + .expect("can't parse keypair-multiplier"); + assert!(args.keypair_multiplier >= 2); } if let Some(t) = matches.value_of("thread-batch-sleep-ms") { diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 7dbae9e8e9..c66f061a84 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -24,6 +24,7 @@ fn main() { id, num_nodes, tx_count, + keypair_multiplier, client_ids_and_stake_file, write_to_client_file, read_from_client_file, @@ -34,9 +35,10 @@ fn main() { .. } = &cli_config; + let keypair_count = *tx_count * keypair_multiplier; if *write_to_client_file { - info!("Generating {} keypairs", *tx_count * 2); - let (keypairs, _) = generate_keypairs(&id, *tx_count as u64 * 2); + info!("Generating {} keypairs", keypair_count); + let (keypairs, _) = generate_keypairs(&id, keypair_count as u64); let num_accounts = keypairs.len() as u64; let max_fee = FeeCalculator::new(*target_lamports_per_signature, 0).max_lamports_per_signature; @@ -102,10 +104,10 @@ fn main() { last_balance = primordial_account.balance; }); - if keypairs.len() < tx_count * 2 { + if keypairs.len() < keypair_count { eprintln!( "Expected {} accounts in {}, only received {} (--tx_count mismatch?)", - tx_count * 2, + keypair_count, client_ids_and_stake_file, keypairs.len(), ); @@ -121,7 +123,7 @@ fn main() { &client, Some(*faucet_addr), &id, - *tx_count, + keypair_count, *num_lamports_per_account, *use_move, ) diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 69685eb09d..7655709403 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -47,11 +47,12 @@ fn test_bench_tps_local_cluster(config: Config) { let lamports_per_account = 100; + let keypair_count = config.tx_count * config.keypair_multiplier; let (keypairs, move_keypairs, _keypair_balance) = generate_and_fund_keypairs( &client, Some(faucet_addr), &config.id, - config.tx_count, + keypair_count, lamports_per_account, config.use_move, )