More knobs. Arg for tx count per batch and also sustained mode

sustained mode overlaps tx generation with transfer. This mode seems
to have lower peak performance but higher average performance
This commit is contained in:
Stephen Akridge 2018-07-25 09:00:55 -07:00 committed by sakridge
parent 23ed65b339
commit 6c275ea5ef
1 changed files with 84 additions and 37 deletions

View File

@ -27,7 +27,7 @@ use std::fs::File;
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket};
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::Builder;
@ -155,6 +155,44 @@ fn generate_txs(
}
}
fn do_tx_transfers(
exit_signal: &Arc<AtomicBool>,
shared_txs: &Arc<RwLock<VecDeque<Vec<Transaction>>>>,
leader: &NodeInfo,
shared_tx_thread_count: &Arc<AtomicIsize>,
) {
let client = mk_client(&leader);
loop {
let txs;
{
let mut shared_txs_wl = shared_txs.write().unwrap();
txs = shared_txs_wl.pop_front();
}
if let Some(txs0) = txs {
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
println!(
"Transferring 1 unit {} times... to {}",
txs0.len(),
leader.contact_info.tpu
);
let tx_len = txs0.len();
let transfer_start = Instant::now();
for tx in txs0 {
client.transfer_signed(&tx).unwrap();
}
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
println!(
"Tx send done. {} ms {} tps",
duration_as_ms(&transfer_start.elapsed()),
tx_len as f32 / duration_as_s(&transfer_start.elapsed()),
);
}
if exit_signal.load(Ordering::Relaxed) {
break;
}
}
}
fn main() {
env_logger::init();
set_panic_hook("bench-tps");
@ -162,6 +200,8 @@ fn main() {
let mut num_nodes = 1usize;
let mut time_sec = 90;
let mut addr = None;
let mut sustained = false;
let mut tx_count = 500_000;
let matches = App::new("solana-bench-tps")
.arg(
@ -218,6 +258,18 @@ fn main() {
.takes_value(true)
.help("address to advertise to the network"),
)
.arg(
Arg::with_name("sustained")
.long("sustained")
.help("Use sustained performance mode vs. peak mode. This overlaps the tx generation with transfers."),
)
.arg(
Arg::with_name("tx_count")
.long("tx_count")
.value_name("NUMBER")
.takes_value(true)
.help("number of transactions to send in a single batch")
)
.get_matches();
let leader: NodeInfo;
@ -246,6 +298,14 @@ fn main() {
addr = Some(s.to_string());
}
if let Some(s) = matches.value_of("tx_count") {
tx_count = s.to_string().parse().expect("integer");
}
if matches.is_present("sustained") {
sustained = true;
}
let mut drone_addr = leader.contact_info.tpu;
drone_addr.set_port(DRONE_PORT);
@ -280,10 +340,9 @@ fn main() {
let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap();
println!("Token balance: {}", starting_balance);
let txs: i64 = 500_000;
if starting_balance < txs {
let airdrop_amount = txs - starting_balance;
if starting_balance < tx_count {
let airdrop_amount = tx_count - starting_balance;
println!(
"Airdropping {:?} tokens from {}",
airdrop_amount, drone_addr
@ -318,8 +377,8 @@ fn main() {
seed.copy_from_slice(&id.public_key_bytes()[..32]);
let rnd = GenKeys::new(seed);
println!("Creating {} keypairs...", txs / 2);
let keypairs = rnd.gen_n_keypairs(txs / 2);
println!("Creating {} keypairs...", tx_count / 2);
let keypairs = rnd.gen_n_keypairs(tx_count / 2);
let first_tx_count = client.transaction_count();
println!("Initial transaction count {}", first_tx_count);
@ -346,44 +405,23 @@ fn main() {
let shared_txs: Arc<RwLock<VecDeque<Vec<Transaction>>>> =
Arc::new(RwLock::new(VecDeque::new()));
let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0));
let s_threads: Vec<_> = (0..threads)
.map(|_| {
let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone();
let leader = leader.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
let client = mk_client(&leader);
loop {
let mut txs = None;
{
let mut shared_txs_wl = shared_txs.write().unwrap();
if shared_txs_wl.len() > 0 {
txs = shared_txs_wl.pop_front();
}
}
if let Some(txs0) = txs {
println!(
"Transferring 1 unit {} times... to {}",
txs0.len(),
leader.contact_info.tpu
do_tx_transfers(
&exit_signal,
&shared_txs,
&leader,
&shared_tx_active_thread_count,
);
let tx_len = txs0.len();
let transfer_start = Instant::now();
for tx in txs0 {
client.transfer_signed(&tx).unwrap();
}
println!(
"Tx send done. {} ms {} tps",
duration_as_ms(&transfer_start.elapsed()),
tx_len as f32 / duration_as_s(&transfer_start.elapsed()),
);
}
if exit_signal.load(Ordering::Relaxed) {
break;
}
}
})
.unwrap()
})
@ -405,12 +443,21 @@ fn main() {
&shared_txs,
&id,
&keypairs,
txs,
tx_count,
&mut last_id,
threads,
reclaim_tokens_back_to_source_account,
);
reclaim_tokens_back_to_source_account = !reclaim_tokens_back_to_source_account;
// In sustained mode overlap the transfers with generation
// this has higher average performance but lower peak performance
// in tested environments.
if !sustained {
while shared_tx_active_thread_count.load(Ordering::Relaxed) > 0 {
sleep(Duration::from_millis(100));
}
}
}
// Stop the sampling threads so it will collect the stats