Improve bench-tps logging

This commit is contained in:
Michael Vines 2018-07-22 16:20:07 -07:00 committed by Grimes
parent 7ebbaaeb2d
commit 4ecd0a0e45
2 changed files with 54 additions and 27 deletions

View File

@ -39,9 +39,9 @@ pub struct NodeStats {
}
fn sample_tx_count(
exit: &Arc<AtomicBool>,
exit_signal: &Arc<AtomicBool>,
maxes: &Arc<RwLock<Vec<(SocketAddr, NodeStats)>>>,
first_count: u64,
first_tx_count: u64,
v: &NodeInfo,
sample_period: u64,
) {
@ -50,28 +50,33 @@ fn sample_tx_count(
let mut initial_tx_count = client.transaction_count();
let mut max_tps = 0.0;
let mut total;
let log_prefix = format!("{:21}:", v.contact_info.tpu.to_string());
loop {
let tx_count = client.transaction_count();
assert!(tx_count >= initial_tx_count);
let duration = now.elapsed();
now = Instant::now();
let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count;
println!("{}: Transactions processed {}", v.contact_info.tpu, sample);
println!("{} Transactions processed {}", log_prefix, sample);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
if tps > max_tps {
max_tps = tps;
}
println!("{}: {:.2} tps", v.contact_info.tpu, tps);
total = tx_count - first_count;
println!(
"{}: Total Transactions processed {}",
v.contact_info.tpu, total
);
println!("{} {:.2} TPS", log_prefix, tps);
if tx_count > first_tx_count {
total = tx_count - first_tx_count;
} else {
total = 0;
}
println!("{} Total transactions processed {}", log_prefix, total);
sleep(Duration::new(sample_period, 0));
if exit.load(Ordering::Relaxed) {
println!("exiting validator thread");
if exit_signal.load(Ordering::Relaxed) {
println!("{} Exiting validator thread", log_prefix);
let stats = NodeStats {
tps: max_tps,
tx: total,
@ -93,7 +98,7 @@ fn generate_and_send_txs(
threads: usize,
reclaim: bool,
) {
println!("Signing transactions... {}", txs / 2,);
println!("Signing transactions... {} (reclaim={})", txs / 2, reclaim);
let signing_start = Instant::now();
let transactions: Vec<_> = if !reclaim {
@ -120,7 +125,7 @@ fn generate_and_send_txs(
);
println!(
"Transfering {} transactions in {} batches",
"Transferring {} transactions in {} batches",
txs / 2,
threads
);
@ -132,7 +137,7 @@ fn generate_and_send_txs(
.zip(tx_clients)
.for_each(|(txs, client)| {
println!(
"Transferring 1 unit {} times... to {:?}",
"Transferring 1 unit {} times... to {}",
txs.len(),
leader.contact_info.tpu
);
@ -240,10 +245,14 @@ fn main() {
let mut c_threads = vec![];
let validators = converge(&leader, &signal, num_nodes, &mut c_threads);
println!(" Node identifier | Gossip address");
println!("-----------------+--------------------");
println!(" Node address | Node identifier");
println!("----------------------+------------------");
for node in &validators {
println!("{:16x} | {}", node.debug_id(), node.contact_info.ncp);
println!(
" {:20} | {:16x}",
node.contact_info.tpu.to_string(),
node.debug_id()
);
}
println!("Nodes: {}", validators.len());
@ -291,10 +300,10 @@ fn main() {
println!("Creating keypairs...");
let keypairs = rnd.gen_n_keypairs(txs / 2);
let first_count = client.transaction_count();
println!("initial count {}", first_count);
let first_tx_count = client.transaction_count();
println!("Initial transaction count {}", first_tx_count);
println!("Sampling tps every second...",);
println!("Sampling TPS every second...",);
// Setup a thread per validator to sample every period
// collect the max transaction rate and total tx count seen
@ -303,12 +312,12 @@ fn main() {
let v_threads: Vec<_> = validators
.into_iter()
.map(|v| {
let exit = signal.clone();
let exit_signal = signal.clone();
let maxes = maxes.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(&exit, &maxes, first_count, &v, sample_period);
sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period);
})
.unwrap()
})
@ -332,7 +341,11 @@ fn main() {
false,
);
}
println!("Get last ID...");
last_id = client.get_last_id();
println!("Got last ID {:?}", last_id);
now = Instant::now();
while now.elapsed() < time {
generate_and_send_txs(
@ -359,8 +372,22 @@ fn main() {
let mut total_txs = 0;
let mut nodes_with_zero_tps = 0;
let mut total_maxes = 0.0;
println!(" Node address | Max TPS");
println!("---------------------+---------");
for (sock, stats) in maxes.read().unwrap().iter() {
println!("Node:{}, Max TPS: {:.2}", *sock, stats.tps);
let mut maybe_flag = match stats.tx {
0 => "!!!!!",
_ => "",
};
println!(
"{:20} | {:.2} {}",
(*sock).to_string(),
stats.tps,
maybe_flag
);
if stats.tx == 0 {
nodes_with_zero_tps += 1;
}
@ -431,7 +458,7 @@ fn spy_node() -> (NodeInfo, UdpSocket) {
fn converge(
leader: &NodeInfo,
exit: &Arc<AtomicBool>,
exit_signal: &Arc<AtomicBool>,
num_nodes: usize,
threads: &mut Vec<JoinHandle<()>>,
) -> Vec<NodeInfo> {
@ -448,7 +475,7 @@ fn converge(
window.clone(),
spy_gossip,
gossip_send_socket,
exit.clone(),
exit_signal.clone(),
).expect("DataReplicator::new");
let mut rv = vec![];
//wait for the network to converge, 30 seconds should be plenty

View File

@ -146,7 +146,7 @@ impl ThinClient {
/// Request the transaction count. If the response packet is dropped by the network,
/// this method will hang.
pub fn transaction_count(&mut self) -> u64 {
info!("transaction_count");
debug!("transaction_count");
let req = Request::GetTransactionCount;
let data =
serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count");
@ -157,7 +157,7 @@ impl ThinClient {
.expect("buffer error in pub fn transaction_count");
if let Ok(resp) = self.recv_response() {
info!("recv_response {:?}", resp);
debug!("transaction_count recv_response: {:?}", resp);
if let Response::TransactionCount { .. } = resp {
done = true;
}