From e0acd48944cba1cf1ba3ca1915261b7eaec49521 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 19 Apr 2019 15:04:36 -0600 Subject: [PATCH] Write bench-tps in terms of client (#3904) * Write bench-tps in terms of client * Add transactions_addr method for logging * Move cluster config outside do_bench_tps * Add BankClient test --- Cargo.lock | 1 + bench-tps/Cargo.toml | 1 + bench-tps/src/bench.rs | 212 ++++++++++++++++++++----------------- bench-tps/src/main.rs | 82 +++++++++++++- client/src/thin_client.rs | 6 +- runtime/src/bank_client.rs | 6 +- sdk/src/client.rs | 4 +- sdk/src/transport.rs | 12 +++ 8 files changed, 223 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 70c1be67e..5e77ccd66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2293,6 +2293,7 @@ dependencies = [ "solana-logger 0.14.0", "solana-metrics 0.14.0", "solana-netutil 0.14.0", + "solana-runtime 0.14.0", "solana-sdk 0.14.0", ] diff --git a/bench-tps/Cargo.toml b/bench-tps/Cargo.toml index 40f0624c0..f6e1c415a 100644 --- a/bench-tps/Cargo.toml +++ b/bench-tps/Cargo.toml @@ -17,6 +17,7 @@ solana-drone = { path = "../drone", version = "0.14.0" } solana-logger = { path = "../logger", version = "0.14.0" } solana-metrics = { path = "../metrics", version = "0.14.0" } solana-netutil = { path = "../netutil", version = "0.14.0" } +solana-runtime = { path = "../runtime", version = "0.14.0" } solana-sdk = { path = "../sdk", version = "0.14.0" } [features] diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 2c07666c1..4fab83f6d 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -1,16 +1,10 @@ use solana_metrics; -use crate::cli::Config; use rayon::prelude::*; -use solana::cluster_info::FULLNODE_PORT_RANGE; -use solana::contact_info::ContactInfo; use solana::gen_keys::GenKeys; -use solana::gossip_service::discover_nodes; -use solana_client::thin_client::create_client; -use solana_client::thin_client::ThinClient; use solana_drone::drone::request_airdrop_transaction; use solana_metrics::influxdb; -use solana_sdk::client::{AsyncClient, SyncClient}; +use solana_sdk::client::Client; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_instruction; use solana_sdk::system_transaction; @@ -36,66 +30,52 @@ pub struct NodeStats { } pub const MAX_SPENDS_PER_TX: usize = 4; +pub const NUM_LAMPORTS_PER_ACCOUNT: u64 = 20; pub type SharedTransactions = Arc>>>; -pub fn do_bench_tps(config: Config) { +pub struct Config { + pub id: Keypair, + pub threads: usize, + pub thread_batch_sleep_ms: usize, + pub duration: Duration, + pub tx_count: usize, + pub sustained: bool, +} + +impl Default for Config { + fn default() -> Self { + Self { + id: Keypair::new(), + threads: 4, + thread_batch_sleep_ms: 0, + duration: Duration::new(std::u64::MAX, 0), + tx_count: 500_000, + sustained: false, + } + } +} + +pub fn do_bench_tps( + clients: Vec, + config: Config, + gen_keypairs: Vec, + keypair0_balance: u64, +) where + T: 'static + Client + Send + Sync, +{ let Config { - network_addr: network, - drone_addr, id, threads, thread_batch_sleep_ms, - num_nodes, duration, tx_count, sustained, } = config; - let nodes = discover_nodes(&network, num_nodes).unwrap_or_else(|err| { - eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); - exit(1); - }); - if nodes.len() < num_nodes { - eprintln!( - "Error: Insufficient nodes discovered. Expecting {} or more", - num_nodes - ); - exit(1); - } - let cluster_entrypoint = nodes[0].clone(); // Pick the first node, why not? + let clients: Vec<_> = clients.into_iter().map(Arc::new).collect(); + let client = &clients[0]; - let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE); - - let mut seed = [0u8; 32]; - seed.copy_from_slice(&id.to_bytes()[..32]); - let mut rnd = GenKeys::new(seed); - - println!("Creating {} keypairs...", tx_count * 2); - let mut total_keys = 0; - let mut target = tx_count * 2; - while target > 0 { - total_keys += target; - target /= MAX_SPENDS_PER_TX; - } - let gen_keypairs = rnd.gen_n_keypairs(total_keys as u64); - - println!("Get lamports..."); - let num_lamports_per_account = 20; - - // Sample the first keypair, see if it has lamports, if so then resume - // to avoid lamport loss - let keypair0_balance = client - .poll_get_balance(&gen_keypairs.last().unwrap().pubkey()) - .unwrap_or(0); - - if num_lamports_per_account > keypair0_balance { - let extra = num_lamports_per_account - keypair0_balance; - let total = extra * (gen_keypairs.len() as u64); - airdrop_lamports(&client, &drone_addr, &id, total); - println!("adding more lamports {}", extra); - fund_keys(&client, &id, &gen_keypairs, extra); - } let start = gen_keypairs.len() - (tx_count * 2) as usize; let keypairs = &gen_keypairs[start..]; @@ -109,15 +89,16 @@ pub fn do_bench_tps(config: Config) { let maxes = Arc::new(RwLock::new(Vec::new())); let sample_period = 1; // in seconds println!("Sampling TPS every {} second...", sample_period); - let v_threads: Vec<_> = nodes - .into_iter() - .map(|v| { + let v_threads: Vec<_> = clients + .iter() + .map(|client| { let exit_signal = exit_signal.clone(); let maxes = maxes.clone(); + let client = client.clone(); Builder::new() .name("solana-client-sample".to_string()) .spawn(move || { - sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period); + sample_tx_count(&exit_signal, &maxes, first_tx_count, sample_period, &client); }) .unwrap() }) @@ -132,19 +113,19 @@ pub fn do_bench_tps(config: Config) { .map(|_| { let exit_signal = exit_signal.clone(); let shared_txs = shared_txs.clone(); - let cluster_entrypoint = cluster_entrypoint.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let total_tx_sent_count = total_tx_sent_count.clone(); + let client = client.clone(); Builder::new() .name("solana-client-sender".to_string()) .spawn(move || { do_tx_transfers( &exit_signal, &shared_txs, - &cluster_entrypoint, &shared_tx_active_thread_count, &total_tx_sent_count, thread_batch_sleep_ms, + &client, ); }) .unwrap() @@ -156,7 +137,7 @@ pub fn do_bench_tps(config: Config) { let mut reclaim_lamports_back_to_source_account = false; let mut i = keypair0_balance; while start.elapsed() < duration { - let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0); + let balance = client.get_balance(&id.pubkey()).unwrap_or(0); metrics_submit_lamport_balance(balance); // ping-pong between source and destination accounts for each loop iteration @@ -169,7 +150,7 @@ pub fn do_bench_tps(config: Config) { &keypairs[len..], threads, reclaim_lamports_back_to_source_account, - &cluster_entrypoint, + &client, ); // In sustained mode overlap the transfers with generation // this has higher average performance but lower peak performance @@ -181,7 +162,7 @@ pub fn do_bench_tps(config: Config) { } i += 1; - if should_switch_directions(num_lamports_per_account, i) { + if should_switch_directions(NUM_LAMPORTS_PER_ACCOUNT, i) { reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account; } } @@ -204,7 +185,7 @@ pub fn do_bench_tps(config: Config) { } } - let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0); + let balance = client.get_balance(&id.pubkey()).unwrap_or(0); metrics_submit_lamport_balance(balance); compute_and_report_stats( @@ -225,20 +206,19 @@ fn metrics_submit_lamport_balance(lamport_balance: u64) { ); } -fn sample_tx_count( +fn sample_tx_count( exit_signal: &Arc, - maxes: &Arc>>, + maxes: &Arc>>, first_tx_count: u64, - v: &ContactInfo, sample_period: u64, + client: &Arc, ) { - let client = create_client(v.client_facing_addr(), FULLNODE_PORT_RANGE); let mut now = Instant::now(); let mut initial_tx_count = client.get_transaction_count().expect("transaction count"); let mut max_tps = 0.0; let mut total; - let log_prefix = format!("{:21}:", v.tpu.to_string()); + let log_prefix = format!("{:21}:", client.transactions_addr()); loop { let tx_count = client.get_transaction_count().expect("transaction count"); @@ -275,21 +255,23 @@ fn sample_tx_count( tps: max_tps, tx: total, }; - maxes.write().unwrap().push((v.tpu, stats)); + maxes + .write() + .unwrap() + .push((client.transactions_addr(), stats)); break; } } } -fn generate_txs( +fn generate_txs( shared_txs: &SharedTransactions, source: &[Keypair], dest: &[Keypair], threads: usize, reclaim: bool, - contact_info: &ContactInfo, + client: &Arc, ) { - let client = create_client(contact_info.client_facing_addr(), FULLNODE_PORT_RANGE); let blockhash = client.get_recent_blockhash().unwrap(); let tx_count = source.len(); println!("Signing transactions... {} (reclaim={})", tx_count, reclaim); @@ -341,15 +323,14 @@ fn generate_txs( } } -fn do_tx_transfers( +fn do_tx_transfers( exit_signal: &Arc, shared_txs: &SharedTransactions, - contact_info: &ContactInfo, shared_tx_thread_count: &Arc, total_tx_sent_count: &Arc, thread_batch_sleep_ms: usize, + client: &Arc, ) { - let client = create_client(contact_info.client_facing_addr(), FULLNODE_PORT_RANGE); loop { if thread_batch_sleep_ms > 0 { sleep(Duration::from_millis(thread_batch_sleep_ms as u64)); @@ -364,7 +345,7 @@ fn do_tx_transfers( println!( "Transferring 1 unit {} times... to {}", txs0.len(), - contact_info.tpu + client.as_ref().transactions_addr(), ); let tx_len = txs0.len(); let transfer_start = Instant::now(); @@ -399,7 +380,7 @@ fn do_tx_transfers( } } -fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u64) -> bool { +fn verify_funding_transfer(client: &T, tx: &Transaction, amount: u64) -> bool { for a in &tx.message().account_keys[1..] { if client.get_balance(a).unwrap_or(0) >= amount { return true; @@ -412,7 +393,7 @@ fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u64) - /// fund the dests keys by spending all of the source keys into MAX_SPENDS_PER_TX /// on every iteration. This allows us to replay the transfers because the source is either empty, /// or full -fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lamports: u64) { +pub fn fund_keys(client: &T, source: &Keypair, dests: &[Keypair], lamports: u64) { let total = lamports * dests.len() as u64; let mut funded: Vec<(&Keypair, u64)> = vec![(source, total)]; let mut notfunded: Vec<&Keypair> = dests.iter().collect(); @@ -514,8 +495,13 @@ fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lamports: } } -fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair, tx_count: u64) { - let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0); +pub fn airdrop_lamports( + client: &T, + drone_addr: &SocketAddr, + id: &Keypair, + tx_count: u64, +) { + let starting_balance = client.get_balance(&id.pubkey()).unwrap_or(0); metrics_submit_lamport_balance(starting_balance); println!("starting balance {}", starting_balance); @@ -532,7 +518,7 @@ fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair, match request_airdrop_transaction(&drone_addr, &id.pubkey(), airdrop_amount, blockhash) { Ok(transaction) => { let signature = client.async_send_transaction(transaction).unwrap(); - client.poll_for_signature(&signature).unwrap(); + client.get_signature_status(&signature).unwrap(); } Err(err) => { panic!( @@ -542,7 +528,7 @@ fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair, } }; - let current_balance = client.poll_get_balance(&id.pubkey()).unwrap_or_else(|e| { + let current_balance = client.get_balance(&id.pubkey()).unwrap_or_else(|e| { println!("airdrop error {}", e); starting_balance }); @@ -562,7 +548,7 @@ fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair, } fn compute_and_report_stats( - maxes: &Arc>>, + maxes: &Arc>>, sample_period: u64, tx_send_elapsed: &Duration, total_tx_send_count: usize, @@ -583,10 +569,7 @@ fn compute_and_report_stats( println!( "{:20} | {:13.2} | {} {}", - (*sock).to_string(), - stats.tps, - stats.tx, - maybe_flag + sock, stats.tps, stats.tx, maybe_flag ); if stats.tps == 0.0 { @@ -632,12 +615,31 @@ fn should_switch_directions(num_lamports_per_account: u64, i: u64) -> bool { i % (num_lamports_per_account / 4) == 0 && (i >= (3 * num_lamports_per_account) / 4) } +pub fn generate_keypairs(id: &Keypair, tx_count: usize) -> Vec { + let mut seed = [0u8; 32]; + seed.copy_from_slice(&id.to_bytes()[..32]); + let mut rnd = GenKeys::new(seed); + + let mut total_keys = 0; + let mut target = tx_count * 2; + while target > 0 { + total_keys += target; + target /= MAX_SPENDS_PER_TX; + } + rnd.gen_n_keypairs(total_keys as u64) +} + #[cfg(test)] mod tests { use super::*; + use solana::cluster_info::FULLNODE_PORT_RANGE; use solana::fullnode::FullnodeConfig; use solana::local_cluster::{ClusterConfig, LocalCluster}; + use solana_client::thin_client::create_client; use solana_drone::drone::run_local_drone; + use solana_runtime::bank::Bank; + use solana_runtime::bank_client::BankClient; + use solana_sdk::genesis_block::GenesisBlock; use std::sync::mpsc::channel; #[test] @@ -674,13 +676,33 @@ mod tests { run_local_drone(drone_keypair, addr_sender, None); let drone_addr = addr_receiver.recv_timeout(Duration::from_secs(2)).unwrap(); - let mut cfg = Config::default(); - cfg.network_addr = cluster.entry_point_info.gossip; - cfg.drone_addr = drone_addr; - cfg.tx_count = 100; - cfg.duration = Duration::from_secs(5); - cfg.num_nodes = NUM_NODES; + let mut config = Config::default(); + config.tx_count = 100; + config.duration = Duration::from_secs(5); - do_bench_tps(cfg); + let keypairs = generate_keypairs(&config.id, config.tx_count); + let client = create_client( + (cluster.entry_point_info.gossip, drone_addr), + FULLNODE_PORT_RANGE, + ); + + do_bench_tps(vec![client], config, keypairs, 0); + } + + #[test] + fn test_bench_tps_bank_client() { + let (genesis_block, id) = GenesisBlock::new(10_000); + let bank = Bank::new(&genesis_block); + let clients = vec![BankClient::new(bank)]; + + let mut config = Config::default(); + config.id = id; + config.tx_count = 10; + config.duration = Duration::from_secs(5); + + let keypairs = generate_keypairs(&config.id, config.tx_count); + fund_keys(&clients[0], &config.id, &keypairs, 20); + + do_bench_tps(clients, config, keypairs, 0); } } diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index ed8749aae..f3b254a8e 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -1,15 +1,91 @@ mod bench; mod cli; -use crate::bench::do_bench_tps; +use crate::bench::{ + airdrop_lamports, do_bench_tps, fund_keys, generate_keypairs, Config, NUM_LAMPORTS_PER_ACCOUNT, +}; +use solana::cluster_info::FULLNODE_PORT_RANGE; +use solana::contact_info::ContactInfo; +use solana::gossip_service::discover_nodes; +use solana_client::thin_client::create_client; +use solana_sdk::client::SyncClient; +use solana_sdk::signature::KeypairUtil; +use std::process::exit; fn main() { solana_logger::setup(); solana_metrics::set_panic_hook("bench-tps"); let matches = cli::build_args().get_matches(); + let cli_config = cli::extract_args(&matches); - let cfg = cli::extract_args(&matches); + let cli::Config { + network_addr, + drone_addr, + id, + threads, + num_nodes, + duration, + tx_count, + thread_batch_sleep_ms, + sustained, + } = cli_config; - do_bench_tps(cfg); + println!("Connecting to the cluster"); + let nodes = discover_nodes(&network_addr, num_nodes).unwrap_or_else(|err| { + eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); + exit(1); + }); + if nodes.len() < num_nodes { + eprintln!( + "Error: Insufficient nodes discovered. Expecting {} or more", + num_nodes + ); + exit(1); + } + let clients: Vec<_> = nodes + .iter() + .filter_map(|node| { + let cluster_entrypoint = node.clone(); + let cluster_addrs = cluster_entrypoint.client_facing_addr(); + if ContactInfo::is_valid_address(&cluster_addrs.0) + && ContactInfo::is_valid_address(&cluster_addrs.1) + { + let client = create_client(cluster_addrs, FULLNODE_PORT_RANGE); + Some(client) + } else { + None + } + }) + .collect(); + + println!("Creating {} keypairs...", tx_count * 2); + let keypairs = generate_keypairs(&id, tx_count); + + println!("Get lamports..."); + + // Sample the first keypair, see if it has lamports, if so then resume. + // This logic is to prevent lamport loss on repeated solana-bench-tps executions + let keypair0_balance = clients[0] + .get_balance(&keypairs.last().unwrap().pubkey()) + .unwrap_or(0); + + if NUM_LAMPORTS_PER_ACCOUNT > keypair0_balance { + let extra = NUM_LAMPORTS_PER_ACCOUNT - keypair0_balance; + let total = extra * (keypairs.len() as u64); + airdrop_lamports(&clients[0], &drone_addr, &id, total); + println!("adding more lamports {}", extra); + fund_keys(&clients[0], &id, &keypairs, extra); + } + + let config = Config { + id, + threads, + thread_batch_sleep_ms, + duration, + tx_count, + sustained, + }; + + do_bench_tps(clients, config, keypairs, keypair0_balance); } diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 8f0f0201a..5db51aabd 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -168,7 +168,11 @@ impl ThinClient { } } -impl Client for ThinClient {} +impl Client for ThinClient { + fn transactions_addr(&self) -> String { + self.transactions_addr.to_string() + } +} impl SyncClient for ThinClient { fn send_message(&self, keypairs: &[&Keypair], message: Message) -> TransportResult { diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index 82aec90bd..536572fbc 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -20,7 +20,11 @@ pub struct BankClient { transaction_sender: Mutex>, } -impl Client for BankClient {} +impl Client for BankClient { + fn transactions_addr(&self) -> String { + "Local BankClient".to_string() + } +} impl AsyncClient for BankClient { fn async_send_transaction(&self, transaction: Transaction) -> io::Result { diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 034627934..acfc1e403 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -16,7 +16,9 @@ use crate::transaction; use crate::transport::Result; use std::io; -pub trait Client: SyncClient + AsyncClient {} +pub trait Client: SyncClient + AsyncClient { + fn transactions_addr(&self) -> String; +} pub trait SyncClient { /// Create a transaction from the given message, and send it to the diff --git a/sdk/src/transport.rs b/sdk/src/transport.rs index d03d1a50d..ea0308ca7 100644 --- a/sdk/src/transport.rs +++ b/sdk/src/transport.rs @@ -1,4 +1,6 @@ use crate::transaction::TransactionError; +use std::error; +use std::fmt; use std::io; #[derive(Debug)] @@ -7,6 +9,16 @@ pub enum TransportError { TransactionError(TransactionError), } +impl error::Error for TransportError {} +impl fmt::Display for TransportError { + fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + match self { + TransportError::IoError(err) => write!(formatter, "{:?}", err), + TransportError::TransactionError(err) => write!(formatter, "{:?}", err), + } + } +} + impl TransportError { pub fn unwrap(&self) -> TransactionError { if let TransportError::TransactionError(err) = self {