diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 8d4bcac70..6b3bb846e 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -8,12 +8,11 @@ use solana_core::gen_keys::GenKeys; use solana_drone::drone::request_airdrop_transaction; #[cfg(feature = "move")] use solana_librapay_api::{create_genesis, upload_mint_program, upload_payment_program}; -#[cfg(feature = "move")] use solana_measure::measure::Measure; use solana_metrics::datapoint_info; use solana_sdk::{ client::Client, - clock::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}, + clock::{DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE}, fee_calculator::FeeCalculator, hash::Hash, pubkey::Pubkey, @@ -34,10 +33,9 @@ use std::{ time::{Duration, Instant}, }; -// The point at which transactions become "too old", in seconds. The cluster keeps blockhashes for -// approximately MAX_RECENT_BLOCKHASHES/DEFAULT_TICKS_PER_SLOT seconds. The adjustment of 5sec -// seems about right to minimize BlockhashNotFound errors, based on empirical testing. -const MAX_TX_QUEUE_AGE: u64 = MAX_RECENT_BLOCKHASHES as u64 / DEFAULT_TICKS_PER_SLOT - 5; +// The point at which transactions become "too old", in seconds. +const MAX_TX_QUEUE_AGE: u64 = + MAX_PROCESSING_AGE as u64 * DEFAULT_TICKS_PER_SECOND / DEFAULT_TICKS_PER_SLOT; #[cfg(feature = "move")] use solana_librapay_api::librapay_transaction; @@ -103,7 +101,7 @@ where } } }; - println!("Initial transaction count {}", first_tx_count); + info!("Initial transaction count {}", first_tx_count); let exit_signal = Arc::new(AtomicBool::new(false)); @@ -111,7 +109,7 @@ where // collect the max transaction rate and total tx count seen let maxes = Arc::new(RwLock::new(Vec::new())); let sample_period = 1; // in seconds - println!("Sampling TPS every {} second...", sample_period); + info!("Sampling TPS every {} second...", sample_period); let v_threads: Vec<_> = clients .iter() .map(|client| { @@ -205,18 +203,18 @@ where // Stop the sampling threads so it will collect the stats exit_signal.store(true, Ordering::Relaxed); - println!("Waiting for validator threads..."); + info!("Waiting for validator threads..."); for t in v_threads { if let Err(err) = t.join() { - println!(" join() failed with: {:?}", err); + info!(" join() failed with: {:?}", err); } } // join the tx send threads - println!("Waiting for transmit threads..."); + info!("Waiting for transmit threads..."); for t in s_threads { if let Err(err) = t.join() { - println!(" join() failed with: {:?}", err); + info!(" join() failed with: {:?}", err); } } @@ -235,7 +233,7 @@ where } fn metrics_submit_lamport_balance(lamport_balance: u64) { - println!("Token balance: {}", lamport_balance); + info!("Token balance: {}", lamport_balance); datapoint_info!( "bench-tps-lamport_balance", ("balance", lamport_balance, i64) @@ -321,7 +319,7 @@ fn generate_txs( libra_args: &Option, ) { let tx_count = source.len(); - println!("Signing transactions... {} (reclaim={})", tx_count, reclaim); + info!("Signing transactions... {} (reclaim={})", tx_count, reclaim); let signing_start = Instant::now(); let transactions = if let Some(( @@ -356,7 +354,7 @@ fn generate_txs( let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let bsps = (tx_count) as f64 / ns as f64; let nsps = ns as f64 / (tx_count) as f64; - println!( + info!( "Done. {:.2} thousand signatures per second, {:.2} us per signature, {} ms total time, {}", bsps * 1_000_000_f64, nsps / 1_000_f64, @@ -397,7 +395,7 @@ fn do_tx_transfers( } if let Some(txs0) = txs { shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); - println!( + info!( "Transferring 1 unit {} times... to {}", txs0.len(), client.as_ref().tpu_addr(), @@ -423,7 +421,7 @@ fn do_tx_transfers( } shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed); - println!( + info!( "Tx send done. {} ms {} tps", duration_as_ms(&transfer_start.elapsed()), tx_len as f32 / duration_as_s(&transfer_start.elapsed()), @@ -446,7 +444,6 @@ fn verify_funding_transfer(client: &T, tx: &Transaction, amount: u64) return true; } } - false } @@ -465,7 +462,7 @@ pub fn fund_keys( let mut notfunded: Vec<&Keypair> = dests.iter().collect(); let lamports_per_account = (total - (extra * max_fee)) / (notfunded.len() as u64 + 1); - println!( + info!( "funding keys {} with lamports: {:?} total: {}", dests.len(), client.get_balance(&source.pubkey()), @@ -474,7 +471,8 @@ pub fn fund_keys( while !notfunded.is_empty() { let mut new_funded: Vec<(&Keypair, u64)> = vec![]; let mut to_fund = vec![]; - println!("creating from... {}", funded.len()); + info!("creating from... {}", funded.len()); + let mut build_to_fund = Measure::start("build_to_fund"); for f in &mut funded { let max_units = cmp::min(notfunded.len() as u64, MAX_SPENDS_PER_TX); if max_units == 0 { @@ -496,6 +494,8 @@ pub fn fund_keys( } extra -= 1; } + build_to_fund.stop(); + debug!("build to_fund vec: {}us", build_to_fund.as_us()); // try to transfer a "few" at a time with recent blockhash // assume 4MB network buffers, and 512 byte packets @@ -504,6 +504,7 @@ pub fn fund_keys( to_fund.chunks(FUND_CHUNK_LEN).for_each(|chunk| { let mut tries = 0; + let mut make_txs = Measure::start("make_txs"); // this set of transactions just initializes us for bookkeeping #[allow(clippy::clone_double_ref)] // sigh let mut to_fund_txs: Vec<_> = chunk @@ -515,6 +516,12 @@ pub fn fund_keys( (k.clone(), tx) }) .collect(); + make_txs.stop(); + debug!( + "make {} unsigned txs: {}us", + to_fund_txs.len(), + make_txs.as_us() + ); let amount = chunk[0].1[0].1; @@ -523,7 +530,7 @@ pub fn fund_keys( .iter() .fold(0, |len, (_, tx)| len + tx.message().instructions.len()); - println!( + info!( "{} {} to {} in {} txs", if tries == 0 { "transferring" @@ -538,30 +545,65 @@ pub fn fund_keys( let (blockhash, _fee_calculator) = get_recent_blockhash(client); // re-sign retained to_fund_txes with updated blockhash + let mut sign_txs = Measure::start("sign_txs"); to_fund_txs.par_iter_mut().for_each(|(k, tx)| { tx.sign(&[*k], blockhash); }); + sign_txs.stop(); + debug!("sign {} txs: {}us", to_fund_txs.len(), sign_txs.as_us()); + let mut send_txs = Measure::start("send_txs"); to_fund_txs.iter().for_each(|(_, tx)| { client.async_send_transaction(tx.clone()).expect("transfer"); }); + send_txs.stop(); + debug!("send {} txs: {}us", to_fund_txs.len(), send_txs.as_us()); + + let mut verify_txs = Measure::start("verify_txs"); + let mut starting_txs = to_fund_txs.len(); + let mut verified_txs = 0; + let mut failed_verify = 0; + // Only loop multiple times for small (quick) transaction batches + for _ in 0..(if starting_txs < 1000 { 3 } else { 1 }) { + let mut timer = Instant::now(); + to_fund_txs.retain(|(_, tx)| { + if timer.elapsed() >= Duration::from_secs(5) { + if failed_verify > 0 { + debug!("total txs failed verify: {}", failed_verify); + } + info!( + "Verifying transfers... {} remaining", + starting_txs - verified_txs + ); + timer = Instant::now(); + } + let verified = verify_funding_transfer(client, &tx, amount); + if verified { + verified_txs += 1; + } else { + failed_verify += 1; + } + !verified + }); + if to_fund_txs.is_empty() { + break; + } + debug!("Looping verifications"); + info!("Verifying transfers... {} remaining", to_fund_txs.len()); + sleep(Duration::from_millis(100)); + } + starting_txs -= to_fund_txs.len(); + verify_txs.stop(); + debug!("verified {} txs: {}us", starting_txs, verify_txs.as_us()); // retry anything that seems to have dropped through cracks // again since these txs are all or nothing, they're fine to // retry - for _ in 0..10 { - to_fund_txs.retain(|(_, tx)| !verify_funding_transfer(client, &tx, amount)); - if to_fund_txs.is_empty() { - break; - } - sleep(Duration::from_millis(100)); - } - tries += 1; } - println!("transferred"); + info!("transferred"); }); - println!("funded: {} left: {}", new_funded.len(), notfunded.len()); + info!("funded: {} left: {}", new_funded.len(), notfunded.len()); funded = new_funded; } } @@ -574,11 +616,11 @@ pub fn airdrop_lamports( ) -> Result<()> { let starting_balance = client.get_balance(&id.pubkey()).unwrap_or(0); metrics_submit_lamport_balance(starting_balance); - println!("starting balance {}", starting_balance); + info!("starting balance {}", starting_balance); if starting_balance < tx_count { let airdrop_amount = tx_count - starting_balance; - println!( + info!( "Airdropping {:?} lamports from {} for {}", airdrop_amount, drone_addr, @@ -607,14 +649,14 @@ pub fn airdrop_lamports( }; let current_balance = client.get_balance(&id.pubkey()).unwrap_or_else(|e| { - println!("airdrop error {}", e); + info!("airdrop error {}", e); starting_balance }); - println!("current balance {}...", current_balance); + info!("current balance {}...", current_balance); metrics_submit_lamport_balance(current_balance); if current_balance - starting_balance != airdrop_amount { - println!( + info!( "Airdrop failed! {} {} {}", id.pubkey(), current_balance, @@ -637,8 +679,8 @@ fn compute_and_report_stats( let mut max_tx_count = 0; let mut nodes_with_zero_tps = 0; let mut total_maxes = 0.0; - println!(" Node address | Max TPS | Total Transactions"); - println!("---------------------+---------------+--------------------"); + info!(" Node address | Max TPS | Total Transactions"); + info!("---------------------+---------------+--------------------"); for (sock, stats) in maxes.read().unwrap().iter() { let maybe_flag = match stats.txs { @@ -646,7 +688,7 @@ fn compute_and_report_stats( _ => "", }; - println!( + info!( "{:20} | {:13.2} | {} {}", sock, stats.tps, stats.txs, maybe_flag ); @@ -667,7 +709,7 @@ fn compute_and_report_stats( if total_maxes > 0.0 { let num_nodes_with_tps = maxes.read().unwrap().len() - nodes_with_zero_tps; let average_max = total_maxes / num_nodes_with_tps as f32; - println!( + info!( "\nAverage max TPS: {:.2}, {} nodes had 0 TPS", average_max, nodes_with_zero_tps ); @@ -679,7 +721,7 @@ fn compute_and_report_stats( } else { 0.0 }; - println!( + info!( "\nHighest TPS: {:.2} sampling period {}s max transactions: {} clients: {} drop rate: {:.2}", max_of_maxes, sample_period, @@ -687,7 +729,7 @@ fn compute_and_report_stats( maxes.read().unwrap().len(), drop_rate, ); - println!( + info!( "\tAverage TPS: {}", max_tx_count as f32 / duration_as_s(tx_send_elapsed) ); @@ -906,7 +948,7 @@ pub fn generate_and_fund_keypairs( total *= 3; } - println!("Previous key balance: {} max_fee: {} lamports_per_account: {} extra: {} desired_balance: {} total: {}", + info!("Previous key balance: {} max_fee: {} lamports_per_account: {} extra: {} desired_balance: {} total: {}", last_keypair_balance, fee_calculator.max_lamports_per_signature, lamports_per_account, extra, account_desired_balance, total ); diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 8ac13be69..a04b8d080 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -1,11 +1,8 @@ -use std::net::SocketAddr; -use std::process::exit; -use std::time::Duration; - use clap::{crate_description, crate_name, crate_version, App, Arg, ArgMatches}; use solana_drone::drone::DRONE_PORT; use solana_sdk::fee_calculator::FeeCalculator; use solana_sdk::signature::{read_keypair, Keypair, KeypairUtil}; +use std::{net::SocketAddr, process::exit, time::Duration}; const NUM_LAMPORTS_PER_ACCOUNT_DEFAULT: u64 = 64 * 1024; diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 175af487e..37233f27f 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -1,3 +1,4 @@ +use log::*; use solana_bench_tps::bench::{do_bench_tps, generate_and_fund_keypairs, generate_keypairs}; use solana_bench_tps::cli; use solana_core::gossip_service::{discover_cluster, get_multi_client}; @@ -5,11 +6,7 @@ use solana_genesis::PrimordialAccountDetails; use solana_sdk::fee_calculator::FeeCalculator; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_program; -use std::collections::HashMap; -use std::fs::File; -use std::io::prelude::*; -use std::path::Path; -use std::process::exit; +use std::{collections::HashMap, fs::File, io::prelude::*, path::Path, process::exit}; /// Number of signatures for all transactions in ~1 week at ~100K TPS pub const NUM_SIGNATURES_FOR_TXS: u64 = 100_000 * 60 * 60 * 24 * 7; @@ -37,7 +34,7 @@ fn main() { } = &cli_config; if *write_to_client_file { - println!("Generating {} keypairs", *tx_count * 2); + info!("Generating {} keypairs", *tx_count * 2); let (keypairs, _) = generate_keypairs(&id, *tx_count as u64 * 2); let num_accounts = keypairs.len() as u64; let max_fee = FeeCalculator::new(*target_lamports_per_signature).max_lamports_per_signature; @@ -57,7 +54,7 @@ fn main() { ); }); - println!("Writing {}", client_ids_and_stake_file); + info!("Writing {}", client_ids_and_stake_file); let serialized = serde_yaml::to_string(&accounts).unwrap(); let path = Path::new(&client_ids_and_stake_file); let mut file = File::create(path).unwrap(); @@ -65,7 +62,7 @@ fn main() { return; } - println!("Connecting to the cluster"); + info!("Connecting to the cluster"); let (nodes, _replicators) = discover_cluster(&entrypoint_addr, *num_nodes).unwrap_or_else(|err| { eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); @@ -86,7 +83,7 @@ fn main() { let path = Path::new(&client_ids_and_stake_file); let file = File::open(path).unwrap(); - println!("Reading {}", client_ids_and_stake_file); + info!("Reading {}", client_ids_and_stake_file); let accounts: HashMap = serde_yaml::from_reader(file).unwrap(); let mut keypairs = vec![];