bench-tps in a cargo test

This commit is contained in:
Stephen Akridge 2019-03-22 11:39:25 -07:00 committed by sakridge
parent 573dec63da
commit 4916cd8da5
3 changed files with 264 additions and 251 deletions

View File

@ -1,8 +1,11 @@
use solana_metrics;
use crate::cli::Config;
use rayon::prelude::*;
use solana::cluster_info::FULLNODE_PORT_RANGE;
use solana::contact_info::ContactInfo;
use solana::gen_keys::GenKeys;
use solana::gossip_service::discover;
use solana_client::thin_client::create_client;
use solana_client::thin_client::ThinClient;
use solana_drone::drone::request_airdrop_transaction;
@ -21,6 +24,7 @@ use std::process::exit;
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::Builder;
use std::time::Duration;
use std::time::Instant;
@ -35,7 +39,214 @@ pub const MAX_SPENDS_PER_TX: usize = 4;
pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<(Transaction, u64)>>>>;
pub fn metrics_submit_lamport_balance(lamport_balance: u64) {
pub fn do_bench_tps(config: Config) {
let Config {
network_addr: network,
drone_addr,
id,
threads,
thread_batch_sleep_ms,
num_nodes,
duration,
tx_count,
sustained,
reject_extra_nodes,
converge_only,
} = config;
let nodes = discover(&network, num_nodes).unwrap_or_else(|err| {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1);
});
if nodes.len() < num_nodes {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes
);
exit(1);
}
if reject_extra_nodes && nodes.len() > num_nodes {
eprintln!(
"Error: Extra nodes discovered. Expecting exactly {}",
num_nodes
);
exit(1);
}
if converge_only {
return;
}
let cluster_entrypoint = nodes[0].clone(); // Pick the first node, why not?
let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE);
let mut barrier_client =
create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE);
let mut seed = [0u8; 32];
seed.copy_from_slice(&id.public_key_bytes()[..32]);
let mut rnd = GenKeys::new(seed);
println!("Creating {} keypairs...", tx_count * 2);
let mut total_keys = 0;
let mut target = tx_count * 2;
while target > 0 {
total_keys += target;
target /= MAX_SPENDS_PER_TX;
}
let gen_keypairs = rnd.gen_n_keypairs(total_keys as u64);
let barrier_source_keypair = Keypair::new();
let barrier_dest_id = Keypair::new().pubkey();
println!("Get lamports...");
let num_lamports_per_account = 20;
// Sample the first keypair, see if it has lamports, if so then resume
// to avoid lamport loss
let keypair0_balance = client
.poll_get_balance(&gen_keypairs.last().unwrap().pubkey())
.unwrap_or(0);
if num_lamports_per_account > keypair0_balance {
let extra = num_lamports_per_account - keypair0_balance;
let total = extra * (gen_keypairs.len() as u64);
airdrop_lamports(&client, &drone_addr, &id, total);
println!("adding more lamports {}", extra);
fund_keys(&client, &id, &gen_keypairs, extra);
}
let start = gen_keypairs.len() - (tx_count * 2) as usize;
let keypairs = &gen_keypairs[start..];
airdrop_lamports(&barrier_client, &drone_addr, &barrier_source_keypair, 1);
println!("Get last ID...");
let mut blockhash = client.get_recent_blockhash().unwrap();
println!("Got last ID {:?}", blockhash);
let first_tx_count = client.get_transaction_count().expect("transaction count");
println!("Initial transaction count {}", first_tx_count);
let exit_signal = Arc::new(AtomicBool::new(false));
// Setup a thread per validator to sample every period
// collect the max transaction rate and total tx count seen
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
println!("Sampling TPS every {} second...", sample_period);
let v_threads: Vec<_> = nodes
.into_iter()
.map(|v| {
let exit_signal = exit_signal.clone();
let maxes = maxes.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period);
})
.unwrap()
})
.collect();
let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new()));
let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0));
let total_tx_sent_count = Arc::new(AtomicUsize::new(0));
let s_threads: Vec<_> = (0..threads)
.map(|_| {
let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone();
let cluster_entrypoint = cluster_entrypoint.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
do_tx_transfers(
&exit_signal,
&shared_txs,
&cluster_entrypoint,
&shared_tx_active_thread_count,
&total_tx_sent_count,
thread_batch_sleep_ms,
);
})
.unwrap()
})
.collect();
// generate and send transactions for the specified duration
let start = Instant::now();
let mut reclaim_lamports_back_to_source_account = false;
let mut i = keypair0_balance;
while start.elapsed() < duration {
let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_lamport_balance(balance);
// ping-pong between source and destination accounts for each loop iteration
// this seems to be faster than trying to determine the balance of individual
// accounts
let len = tx_count as usize;
generate_txs(
&shared_txs,
&keypairs[..len],
&keypairs[len..],
threads,
reclaim_lamports_back_to_source_account,
&cluster_entrypoint,
);
// In sustained mode overlap the transfers with generation
// this has higher average performance but lower peak performance
// in tested environments.
if !sustained {
while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 {
sleep(Duration::from_millis(100));
}
}
// It's not feasible (would take too much time) to confirm each of the `tx_count / 2`
// transactions sent by `generate_txs()` so instead send and confirm a single transaction
// to validate the network is still functional.
send_barrier_transaction(
&mut barrier_client,
&mut blockhash,
&barrier_source_keypair,
&barrier_dest_id,
);
i += 1;
if should_switch_directions(num_lamports_per_account, i) {
reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account;
}
}
// Stop the sampling threads so it will collect the stats
exit_signal.store(true, Ordering::Relaxed);
println!("Waiting for validator threads...");
for t in v_threads {
if let Err(err) = t.join() {
println!(" join() failed with: {:?}", err);
}
}
// join the tx send threads
println!("Waiting for transmit threads...");
for t in s_threads {
if let Err(err) = t.join() {
println!(" join() failed with: {:?}", err);
}
}
let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_lamport_balance(balance);
compute_and_report_stats(
&maxes,
sample_period,
&start.elapsed(),
total_tx_sent_count.load(Ordering::Relaxed),
);
}
fn metrics_submit_lamport_balance(lamport_balance: u64) {
println!("Token balance: {}", lamport_balance);
solana_metrics::submit(
influxdb::Point::new("bench-tps")
@ -45,7 +256,7 @@ pub fn metrics_submit_lamport_balance(lamport_balance: u64) {
);
}
pub fn sample_tx_count(
fn sample_tx_count(
exit_signal: &Arc<AtomicBool>,
maxes: &Arc<RwLock<Vec<(SocketAddr, NodeStats)>>>,
first_tx_count: u64,
@ -102,7 +313,7 @@ pub fn sample_tx_count(
}
/// Send loopback payment of 0 lamports and confirm the network processed it
pub fn send_barrier_transaction(
fn send_barrier_transaction(
barrier_client: &mut ThinClient,
blockhash: &mut Hash,
source_keypair: &Keypair,
@ -177,7 +388,7 @@ pub fn send_barrier_transaction(
}
}
pub fn generate_txs(
fn generate_txs(
shared_txs: &SharedTransactions,
source: &[Keypair],
dest: &[Keypair],
@ -237,7 +448,7 @@ pub fn generate_txs(
}
}
pub fn do_tx_transfers(
fn do_tx_transfers(
exit_signal: &Arc<AtomicBool>,
shared_txs: &SharedTransactions,
contact_info: &ContactInfo,
@ -295,7 +506,7 @@ pub fn do_tx_transfers(
}
}
pub fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u64) -> bool {
fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u64) -> bool {
for a in &tx.account_keys[1..] {
if client.get_balance(a).unwrap_or(0) >= amount {
return true;
@ -308,7 +519,7 @@ pub fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u6
/// fund the dests keys by spending all of the source keys into MAX_SPENDS_PER_TX
/// on every iteration. This allows us to replay the transfers because the source is either empty,
/// or full
pub fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lamports: u64) {
fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lamports: u64) {
let total = lamports * dests.len() as u64;
let mut funded: Vec<(&Keypair, u64)> = vec![(source, total)];
let mut notfunded: Vec<&Keypair> = dests.iter().collect();
@ -401,7 +612,7 @@ pub fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lampo
}
}
pub fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair, tx_count: u64) {
fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair, tx_count: u64) {
let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_lamport_balance(starting_balance);
println!("starting balance {}", starting_balance);
@ -448,7 +659,7 @@ pub fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypa
}
}
pub fn compute_and_report_stats(
fn compute_and_report_stats(
maxes: &Arc<RwLock<Vec<(SocketAddr, NodeStats)>>>,
sample_period: u64,
tx_send_elapsed: &Duration,
@ -515,13 +726,18 @@ pub fn compute_and_report_stats(
// First transfer 3/4 of the lamports to the dest accounts
// then ping-pong 1/4 of the lamports back to the other account
// this leaves 1/4 lamport buffer in each account
pub fn should_switch_directions(num_lamports_per_account: u64, i: u64) -> bool {
fn should_switch_directions(num_lamports_per_account: u64, i: u64) -> bool {
i % (num_lamports_per_account / 4) == 0 && (i >= (3 * num_lamports_per_account) / 4)
}
#[cfg(test)]
mod tests {
use super::*;
use solana::fullnode::FullnodeConfig;
use solana::local_cluster::LocalCluster;
use solana_drone::drone::run_local_drone;
use std::sync::mpsc::channel;
#[test]
fn test_switch_directions() {
assert_eq!(should_switch_directions(20, 0), false);
@ -536,4 +752,29 @@ mod tests {
assert_eq!(should_switch_directions(20, 100), true);
assert_eq!(should_switch_directions(20, 101), false);
}
#[test]
#[ignore]
fn test_bench_tps() {
let fullnode_config = FullnodeConfig::default();
const NUM_NODES: usize = 1;
let cluster =
LocalCluster::new_with_config(&[999_990; NUM_NODES], 2_000_000, &fullnode_config);
let drone_keypair = Keypair::new();
cluster.transfer(&cluster.funding_keypair, &drone_keypair.pubkey(), 1_000_000);
let (addr_sender, addr_receiver) = channel();
run_local_drone(drone_keypair, addr_sender);
let drone_addr = addr_receiver.recv_timeout(Duration::from_secs(2)).unwrap();
let mut cfg = Config::default();
cfg.network_addr = cluster.entry_point_info.gossip;
cfg.drone_addr = drone_addr;
cfg.tx_count = 100;
cfg.duration = Duration::from_secs(5);
cfg.num_nodes = NUM_NODES;
do_bench_tps(cfg);
}
}

View File

@ -1,21 +1,7 @@
mod bench;
mod cli;
use crate::bench::*;
use solana::cluster_info::FULLNODE_PORT_RANGE;
use solana::gen_keys::GenKeys;
use solana::gossip_service::discover;
use solana_client::thin_client::create_client;
use solana_metrics;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::VecDeque;
use std::process::exit;
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::Builder;
use std::time::Duration;
use std::time::Instant;
use crate::bench::do_bench_tps;
fn main() {
solana_logger::setup();
@ -25,227 +11,5 @@ fn main() {
let cfg = cli::extract_args(&matches);
let cli::Config {
network_addr: network,
drone_addr,
id,
threads,
thread_batch_sleep_ms,
num_nodes,
duration,
tx_count,
sustained,
reject_extra_nodes,
converge_only,
} = cfg;
let nodes = discover(&network, num_nodes).unwrap_or_else(|err| {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1);
});
if nodes.len() < num_nodes {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes
);
exit(1);
}
if reject_extra_nodes && nodes.len() > num_nodes {
eprintln!(
"Error: Extra nodes discovered. Expecting exactly {}",
num_nodes
);
exit(1);
}
if converge_only {
return;
}
let cluster_entrypoint = nodes[0].clone(); // Pick the first node, why not?
let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE);
let mut barrier_client =
create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE);
let mut seed = [0u8; 32];
seed.copy_from_slice(&id.public_key_bytes()[..32]);
let mut rnd = GenKeys::new(seed);
println!("Creating {} keypairs...", tx_count * 2);
let mut total_keys = 0;
let mut target = tx_count * 2;
while target > 0 {
total_keys += target;
target /= MAX_SPENDS_PER_TX;
}
let gen_keypairs = rnd.gen_n_keypairs(total_keys as u64);
let barrier_source_keypair = Keypair::new();
let barrier_dest_id = Keypair::new().pubkey();
println!("Get lamports...");
let num_lamports_per_account = 20;
// Sample the first keypair, see if it has lamports, if so then resume
// to avoid lamport loss
let keypair0_balance = client
.poll_get_balance(&gen_keypairs.last().unwrap().pubkey())
.unwrap_or(0);
if num_lamports_per_account > keypair0_balance {
let extra = num_lamports_per_account - keypair0_balance;
let total = extra * (gen_keypairs.len() as u64);
airdrop_lamports(&client, &drone_addr, &id, total);
println!("adding more lamports {}", extra);
fund_keys(&client, &id, &gen_keypairs, extra);
}
let start = gen_keypairs.len() - (tx_count * 2) as usize;
let keypairs = &gen_keypairs[start..];
airdrop_lamports(&barrier_client, &drone_addr, &barrier_source_keypair, 1);
println!("Get last ID...");
let mut blockhash = client.get_recent_blockhash().unwrap();
println!("Got last ID {:?}", blockhash);
let first_tx_count = client.get_transaction_count().expect("transaction count");
println!("Initial transaction count {}", first_tx_count);
let exit_signal = Arc::new(AtomicBool::new(false));
// Setup a thread per validator to sample every period
// collect the max transaction rate and total tx count seen
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
println!("Sampling TPS every {} second...", sample_period);
let v_threads: Vec<_> = nodes
.into_iter()
.map(|v| {
let exit_signal = exit_signal.clone();
let maxes = maxes.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period);
})
.unwrap()
})
.collect();
let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new()));
let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0));
let total_tx_sent_count = Arc::new(AtomicUsize::new(0));
let s_threads: Vec<_> = (0..threads)
.map(|_| {
let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone();
let cluster_entrypoint = cluster_entrypoint.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
do_tx_transfers(
&exit_signal,
&shared_txs,
&cluster_entrypoint,
&shared_tx_active_thread_count,
&total_tx_sent_count,
thread_batch_sleep_ms,
);
})
.unwrap()
})
.collect();
// generate and send transactions for the specified duration
let start = Instant::now();
let mut reclaim_lamports_back_to_source_account = false;
let mut i = keypair0_balance;
while start.elapsed() < duration {
let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_lamport_balance(balance);
// ping-pong between source and destination accounts for each loop iteration
// this seems to be faster than trying to determine the balance of individual
// accounts
let len = tx_count as usize;
generate_txs(
&shared_txs,
&keypairs[..len],
&keypairs[len..],
threads,
reclaim_lamports_back_to_source_account,
&cluster_entrypoint,
);
// In sustained mode overlap the transfers with generation
// this has higher average performance but lower peak performance
// in tested environments.
if !sustained {
while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 {
sleep(Duration::from_millis(100));
}
}
// It's not feasible (would take too much time) to confirm each of the `tx_count / 2`
// transactions sent by `generate_txs()` so instead send and confirm a single transaction
// to validate the network is still functional.
send_barrier_transaction(
&mut barrier_client,
&mut blockhash,
&barrier_source_keypair,
&barrier_dest_id,
);
i += 1;
if should_switch_directions(num_lamports_per_account, i) {
reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account;
}
}
// Stop the sampling threads so it will collect the stats
exit_signal.store(true, Ordering::Relaxed);
println!("Waiting for validator threads...");
for t in v_threads {
if let Err(err) = t.join() {
println!(" join() failed with: {:?}", err);
}
}
// join the tx send threads
println!("Waiting for transmit threads...");
for t in s_threads {
if let Err(err) = t.join() {
println!(" join() failed with: {:?}", err);
}
}
let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_lamport_balance(balance);
compute_and_report_stats(
&maxes,
sample_period,
&start.elapsed(),
total_tx_sent_count.load(Ordering::Relaxed),
);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_switch_directions() {
assert_eq!(should_switch_directions(20, 0), false);
assert_eq!(should_switch_directions(20, 1), false);
assert_eq!(should_switch_directions(20, 14), false);
assert_eq!(should_switch_directions(20, 15), true);
assert_eq!(should_switch_directions(20, 16), false);
assert_eq!(should_switch_directions(20, 19), false);
assert_eq!(should_switch_directions(20, 20), true);
assert_eq!(should_switch_directions(20, 21), false);
assert_eq!(should_switch_directions(20, 99), false);
assert_eq!(should_switch_directions(20, 100), true);
assert_eq!(should_switch_directions(20, 101), false);
}
do_bench_tps(cfg);
}

View File

@ -183,7 +183,7 @@ impl LocalCluster {
// Send each validator some lamports to vote
let validator_balance =
Self::transfer(&client, &self.funding_keypair, &validator_pubkey, stake);
Self::transfer_with_client(&client, &self.funding_keypair, &validator_pubkey, stake);
info!(
"validator {} balance {}",
validator_pubkey, validator_balance
@ -217,7 +217,7 @@ impl LocalCluster {
FULLNODE_PORT_RANGE,
);
Self::transfer(
Self::transfer_with_client(
&client,
&self.funding_keypair,
&replicator_keypair.pubkey(),
@ -252,7 +252,15 @@ impl LocalCluster {
}
}
fn transfer(
pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 {
let client = create_client(
self.entry_point_info.client_facing_addr(),
FULLNODE_PORT_RANGE,
);
Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports)
}
fn transfer_with_client(
client: &ThinClient,
source_keypair: &Keypair,
dest_pubkey: &Pubkey,