Bench tps: improve fund_keys (#6225)

automerge
This commit is contained in:
Tyera Eulberg 2019-10-04 02:16:07 -06:00 committed by Grimes
parent 844d231d74
commit aa3694cca8
3 changed files with 92 additions and 56 deletions

View File

@ -8,12 +8,11 @@ use solana_core::gen_keys::GenKeys;
use solana_drone::drone::request_airdrop_transaction;
#[cfg(feature = "move")]
use solana_librapay_api::{create_genesis, upload_mint_program, upload_payment_program};
#[cfg(feature = "move")]
use solana_measure::measure::Measure;
use solana_metrics::datapoint_info;
use solana_sdk::{
client::Client,
clock::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES},
clock::{DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE},
fee_calculator::FeeCalculator,
hash::Hash,
pubkey::Pubkey,
@ -34,10 +33,9 @@ use std::{
time::{Duration, Instant},
};
// The point at which transactions become "too old", in seconds. The cluster keeps blockhashes for
// approximately MAX_RECENT_BLOCKHASHES/DEFAULT_TICKS_PER_SLOT seconds. The adjustment of 5sec
// seems about right to minimize BlockhashNotFound errors, based on empirical testing.
const MAX_TX_QUEUE_AGE: u64 = MAX_RECENT_BLOCKHASHES as u64 / DEFAULT_TICKS_PER_SLOT - 5;
// The point at which transactions become "too old", in seconds.
const MAX_TX_QUEUE_AGE: u64 =
MAX_PROCESSING_AGE as u64 * DEFAULT_TICKS_PER_SECOND / DEFAULT_TICKS_PER_SLOT;
#[cfg(feature = "move")]
use solana_librapay_api::librapay_transaction;
@ -103,7 +101,7 @@ where
}
}
};
println!("Initial transaction count {}", first_tx_count);
info!("Initial transaction count {}", first_tx_count);
let exit_signal = Arc::new(AtomicBool::new(false));
@ -111,7 +109,7 @@ where
// collect the max transaction rate and total tx count seen
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
println!("Sampling TPS every {} second...", sample_period);
info!("Sampling TPS every {} second...", sample_period);
let v_threads: Vec<_> = clients
.iter()
.map(|client| {
@ -205,18 +203,18 @@ where
// Stop the sampling threads so it will collect the stats
exit_signal.store(true, Ordering::Relaxed);
println!("Waiting for validator threads...");
info!("Waiting for validator threads...");
for t in v_threads {
if let Err(err) = t.join() {
println!(" join() failed with: {:?}", err);
info!(" join() failed with: {:?}", err);
}
}
// join the tx send threads
println!("Waiting for transmit threads...");
info!("Waiting for transmit threads...");
for t in s_threads {
if let Err(err) = t.join() {
println!(" join() failed with: {:?}", err);
info!(" join() failed with: {:?}", err);
}
}
@ -235,7 +233,7 @@ where
}
fn metrics_submit_lamport_balance(lamport_balance: u64) {
println!("Token balance: {}", lamport_balance);
info!("Token balance: {}", lamport_balance);
datapoint_info!(
"bench-tps-lamport_balance",
("balance", lamport_balance, i64)
@ -321,7 +319,7 @@ fn generate_txs(
libra_args: &Option<LibraKeys>,
) {
let tx_count = source.len();
println!("Signing transactions... {} (reclaim={})", tx_count, reclaim);
info!("Signing transactions... {} (reclaim={})", tx_count, reclaim);
let signing_start = Instant::now();
let transactions = if let Some((
@ -356,7 +354,7 @@ fn generate_txs(
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = (tx_count) as f64 / ns as f64;
let nsps = ns as f64 / (tx_count) as f64;
println!(
info!(
"Done. {:.2} thousand signatures per second, {:.2} us per signature, {} ms total time, {}",
bsps * 1_000_000_f64,
nsps / 1_000_f64,
@ -397,7 +395,7 @@ fn do_tx_transfers<T: Client>(
}
if let Some(txs0) = txs {
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
println!(
info!(
"Transferring 1 unit {} times... to {}",
txs0.len(),
client.as_ref().tpu_addr(),
@ -423,7 +421,7 @@ fn do_tx_transfers<T: Client>(
}
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed);
println!(
info!(
"Tx send done. {} ms {} tps",
duration_as_ms(&transfer_start.elapsed()),
tx_len as f32 / duration_as_s(&transfer_start.elapsed()),
@ -446,7 +444,6 @@ fn verify_funding_transfer<T: Client>(client: &T, tx: &Transaction, amount: u64)
return true;
}
}
false
}
@ -465,7 +462,7 @@ pub fn fund_keys<T: Client>(
let mut notfunded: Vec<&Keypair> = dests.iter().collect();
let lamports_per_account = (total - (extra * max_fee)) / (notfunded.len() as u64 + 1);
println!(
info!(
"funding keys {} with lamports: {:?} total: {}",
dests.len(),
client.get_balance(&source.pubkey()),
@ -474,7 +471,8 @@ pub fn fund_keys<T: Client>(
while !notfunded.is_empty() {
let mut new_funded: Vec<(&Keypair, u64)> = vec![];
let mut to_fund = vec![];
println!("creating from... {}", funded.len());
info!("creating from... {}", funded.len());
let mut build_to_fund = Measure::start("build_to_fund");
for f in &mut funded {
let max_units = cmp::min(notfunded.len() as u64, MAX_SPENDS_PER_TX);
if max_units == 0 {
@ -496,6 +494,8 @@ pub fn fund_keys<T: Client>(
}
extra -= 1;
}
build_to_fund.stop();
debug!("build to_fund vec: {}us", build_to_fund.as_us());
// try to transfer a "few" at a time with recent blockhash
// assume 4MB network buffers, and 512 byte packets
@ -504,6 +504,7 @@ pub fn fund_keys<T: Client>(
to_fund.chunks(FUND_CHUNK_LEN).for_each(|chunk| {
let mut tries = 0;
let mut make_txs = Measure::start("make_txs");
// this set of transactions just initializes us for bookkeeping
#[allow(clippy::clone_double_ref)] // sigh
let mut to_fund_txs: Vec<_> = chunk
@ -515,6 +516,12 @@ pub fn fund_keys<T: Client>(
(k.clone(), tx)
})
.collect();
make_txs.stop();
debug!(
"make {} unsigned txs: {}us",
to_fund_txs.len(),
make_txs.as_us()
);
let amount = chunk[0].1[0].1;
@ -523,7 +530,7 @@ pub fn fund_keys<T: Client>(
.iter()
.fold(0, |len, (_, tx)| len + tx.message().instructions.len());
println!(
info!(
"{} {} to {} in {} txs",
if tries == 0 {
"transferring"
@ -538,30 +545,65 @@ pub fn fund_keys<T: Client>(
let (blockhash, _fee_calculator) = get_recent_blockhash(client);
// re-sign retained to_fund_txes with updated blockhash
let mut sign_txs = Measure::start("sign_txs");
to_fund_txs.par_iter_mut().for_each(|(k, tx)| {
tx.sign(&[*k], blockhash);
});
sign_txs.stop();
debug!("sign {} txs: {}us", to_fund_txs.len(), sign_txs.as_us());
let mut send_txs = Measure::start("send_txs");
to_fund_txs.iter().for_each(|(_, tx)| {
client.async_send_transaction(tx.clone()).expect("transfer");
});
send_txs.stop();
debug!("send {} txs: {}us", to_fund_txs.len(), send_txs.as_us());
let mut verify_txs = Measure::start("verify_txs");
let mut starting_txs = to_fund_txs.len();
let mut verified_txs = 0;
let mut failed_verify = 0;
// Only loop multiple times for small (quick) transaction batches
for _ in 0..(if starting_txs < 1000 { 3 } else { 1 }) {
let mut timer = Instant::now();
to_fund_txs.retain(|(_, tx)| {
if timer.elapsed() >= Duration::from_secs(5) {
if failed_verify > 0 {
debug!("total txs failed verify: {}", failed_verify);
}
info!(
"Verifying transfers... {} remaining",
starting_txs - verified_txs
);
timer = Instant::now();
}
let verified = verify_funding_transfer(client, &tx, amount);
if verified {
verified_txs += 1;
} else {
failed_verify += 1;
}
!verified
});
if to_fund_txs.is_empty() {
break;
}
debug!("Looping verifications");
info!("Verifying transfers... {} remaining", to_fund_txs.len());
sleep(Duration::from_millis(100));
}
starting_txs -= to_fund_txs.len();
verify_txs.stop();
debug!("verified {} txs: {}us", starting_txs, verify_txs.as_us());
// retry anything that seems to have dropped through cracks
// again since these txs are all or nothing, they're fine to
// retry
for _ in 0..10 {
to_fund_txs.retain(|(_, tx)| !verify_funding_transfer(client, &tx, amount));
if to_fund_txs.is_empty() {
break;
}
sleep(Duration::from_millis(100));
}
tries += 1;
}
println!("transferred");
info!("transferred");
});
println!("funded: {} left: {}", new_funded.len(), notfunded.len());
info!("funded: {} left: {}", new_funded.len(), notfunded.len());
funded = new_funded;
}
}
@ -574,11 +616,11 @@ pub fn airdrop_lamports<T: Client>(
) -> Result<()> {
let starting_balance = client.get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_lamport_balance(starting_balance);
println!("starting balance {}", starting_balance);
info!("starting balance {}", starting_balance);
if starting_balance < tx_count {
let airdrop_amount = tx_count - starting_balance;
println!(
info!(
"Airdropping {:?} lamports from {} for {}",
airdrop_amount,
drone_addr,
@ -607,14 +649,14 @@ pub fn airdrop_lamports<T: Client>(
};
let current_balance = client.get_balance(&id.pubkey()).unwrap_or_else(|e| {
println!("airdrop error {}", e);
info!("airdrop error {}", e);
starting_balance
});
println!("current balance {}...", current_balance);
info!("current balance {}...", current_balance);
metrics_submit_lamport_balance(current_balance);
if current_balance - starting_balance != airdrop_amount {
println!(
info!(
"Airdrop failed! {} {} {}",
id.pubkey(),
current_balance,
@ -637,8 +679,8 @@ fn compute_and_report_stats(
let mut max_tx_count = 0;
let mut nodes_with_zero_tps = 0;
let mut total_maxes = 0.0;
println!(" Node address | Max TPS | Total Transactions");
println!("---------------------+---------------+--------------------");
info!(" Node address | Max TPS | Total Transactions");
info!("---------------------+---------------+--------------------");
for (sock, stats) in maxes.read().unwrap().iter() {
let maybe_flag = match stats.txs {
@ -646,7 +688,7 @@ fn compute_and_report_stats(
_ => "",
};
println!(
info!(
"{:20} | {:13.2} | {} {}",
sock, stats.tps, stats.txs, maybe_flag
);
@ -667,7 +709,7 @@ fn compute_and_report_stats(
if total_maxes > 0.0 {
let num_nodes_with_tps = maxes.read().unwrap().len() - nodes_with_zero_tps;
let average_max = total_maxes / num_nodes_with_tps as f32;
println!(
info!(
"\nAverage max TPS: {:.2}, {} nodes had 0 TPS",
average_max, nodes_with_zero_tps
);
@ -679,7 +721,7 @@ fn compute_and_report_stats(
} else {
0.0
};
println!(
info!(
"\nHighest TPS: {:.2} sampling period {}s max transactions: {} clients: {} drop rate: {:.2}",
max_of_maxes,
sample_period,
@ -687,7 +729,7 @@ fn compute_and_report_stats(
maxes.read().unwrap().len(),
drop_rate,
);
println!(
info!(
"\tAverage TPS: {}",
max_tx_count as f32 / duration_as_s(tx_send_elapsed)
);
@ -906,7 +948,7 @@ pub fn generate_and_fund_keypairs<T: Client>(
total *= 3;
}
println!("Previous key balance: {} max_fee: {} lamports_per_account: {} extra: {} desired_balance: {} total: {}",
info!("Previous key balance: {} max_fee: {} lamports_per_account: {} extra: {} desired_balance: {} total: {}",
last_keypair_balance, fee_calculator.max_lamports_per_signature, lamports_per_account, extra,
account_desired_balance, total
);

View File

@ -1,11 +1,8 @@
use std::net::SocketAddr;
use std::process::exit;
use std::time::Duration;
use clap::{crate_description, crate_name, crate_version, App, Arg, ArgMatches};
use solana_drone::drone::DRONE_PORT;
use solana_sdk::fee_calculator::FeeCalculator;
use solana_sdk::signature::{read_keypair, Keypair, KeypairUtil};
use std::{net::SocketAddr, process::exit, time::Duration};
const NUM_LAMPORTS_PER_ACCOUNT_DEFAULT: u64 = 64 * 1024;

View File

@ -1,3 +1,4 @@
use log::*;
use solana_bench_tps::bench::{do_bench_tps, generate_and_fund_keypairs, generate_keypairs};
use solana_bench_tps::cli;
use solana_core::gossip_service::{discover_cluster, get_multi_client};
@ -5,11 +6,7 @@ use solana_genesis::PrimordialAccountDetails;
use solana_sdk::fee_calculator::FeeCalculator;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_program;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::path::Path;
use std::process::exit;
use std::{collections::HashMap, fs::File, io::prelude::*, path::Path, process::exit};
/// Number of signatures for all transactions in ~1 week at ~100K TPS
pub const NUM_SIGNATURES_FOR_TXS: u64 = 100_000 * 60 * 60 * 24 * 7;
@ -37,7 +34,7 @@ fn main() {
} = &cli_config;
if *write_to_client_file {
println!("Generating {} keypairs", *tx_count * 2);
info!("Generating {} keypairs", *tx_count * 2);
let (keypairs, _) = generate_keypairs(&id, *tx_count as u64 * 2);
let num_accounts = keypairs.len() as u64;
let max_fee = FeeCalculator::new(*target_lamports_per_signature).max_lamports_per_signature;
@ -57,7 +54,7 @@ fn main() {
);
});
println!("Writing {}", client_ids_and_stake_file);
info!("Writing {}", client_ids_and_stake_file);
let serialized = serde_yaml::to_string(&accounts).unwrap();
let path = Path::new(&client_ids_and_stake_file);
let mut file = File::create(path).unwrap();
@ -65,7 +62,7 @@ fn main() {
return;
}
println!("Connecting to the cluster");
info!("Connecting to the cluster");
let (nodes, _replicators) =
discover_cluster(&entrypoint_addr, *num_nodes).unwrap_or_else(|err| {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
@ -86,7 +83,7 @@ fn main() {
let path = Path::new(&client_ids_and_stake_file);
let file = File::open(path).unwrap();
println!("Reading {}", client_ids_and_stake_file);
info!("Reading {}", client_ids_and_stake_file);
let accounts: HashMap<String, PrimordialAccountDetails> =
serde_yaml::from_reader(file).unwrap();
let mut keypairs = vec![];