Write bench-tps in terms of client (#3904)

* Write bench-tps in terms of client

* Add transactions_addr method for logging

* Move cluster config outside do_bench_tps

* Add BankClient test
This commit is contained in:
Tyera Eulberg 2019-04-19 15:04:36 -06:00 committed by GitHub
parent afb00432d4
commit e0acd48944
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 223 additions and 101 deletions

1
Cargo.lock generated
View File

@ -2293,6 +2293,7 @@ dependencies = [
"solana-logger 0.14.0",
"solana-metrics 0.14.0",
"solana-netutil 0.14.0",
"solana-runtime 0.14.0",
"solana-sdk 0.14.0",
]

View File

@ -17,6 +17,7 @@ solana-drone = { path = "../drone", version = "0.14.0" }
solana-logger = { path = "../logger", version = "0.14.0" }
solana-metrics = { path = "../metrics", version = "0.14.0" }
solana-netutil = { path = "../netutil", version = "0.14.0" }
solana-runtime = { path = "../runtime", version = "0.14.0" }
solana-sdk = { path = "../sdk", version = "0.14.0" }
[features]

View File

@ -1,16 +1,10 @@
use solana_metrics;
use crate::cli::Config;
use rayon::prelude::*;
use solana::cluster_info::FULLNODE_PORT_RANGE;
use solana::contact_info::ContactInfo;
use solana::gen_keys::GenKeys;
use solana::gossip_service::discover_nodes;
use solana_client::thin_client::create_client;
use solana_client::thin_client::ThinClient;
use solana_drone::drone::request_airdrop_transaction;
use solana_metrics::influxdb;
use solana_sdk::client::{AsyncClient, SyncClient};
use solana_sdk::client::Client;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_instruction;
use solana_sdk::system_transaction;
@ -36,66 +30,52 @@ pub struct NodeStats {
}
pub const MAX_SPENDS_PER_TX: usize = 4;
pub const NUM_LAMPORTS_PER_ACCOUNT: u64 = 20;
pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<(Transaction, u64)>>>>;
pub fn do_bench_tps(config: Config) {
pub struct Config {
pub id: Keypair,
pub threads: usize,
pub thread_batch_sleep_ms: usize,
pub duration: Duration,
pub tx_count: usize,
pub sustained: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
id: Keypair::new(),
threads: 4,
thread_batch_sleep_ms: 0,
duration: Duration::new(std::u64::MAX, 0),
tx_count: 500_000,
sustained: false,
}
}
}
pub fn do_bench_tps<T>(
clients: Vec<T>,
config: Config,
gen_keypairs: Vec<Keypair>,
keypair0_balance: u64,
) where
T: 'static + Client + Send + Sync,
{
let Config {
network_addr: network,
drone_addr,
id,
threads,
thread_batch_sleep_ms,
num_nodes,
duration,
tx_count,
sustained,
} = config;
let nodes = discover_nodes(&network, num_nodes).unwrap_or_else(|err| {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1);
});
if nodes.len() < num_nodes {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes
);
exit(1);
}
let cluster_entrypoint = nodes[0].clone(); // Pick the first node, why not?
let clients: Vec<_> = clients.into_iter().map(Arc::new).collect();
let client = &clients[0];
let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE);
let mut seed = [0u8; 32];
seed.copy_from_slice(&id.to_bytes()[..32]);
let mut rnd = GenKeys::new(seed);
println!("Creating {} keypairs...", tx_count * 2);
let mut total_keys = 0;
let mut target = tx_count * 2;
while target > 0 {
total_keys += target;
target /= MAX_SPENDS_PER_TX;
}
let gen_keypairs = rnd.gen_n_keypairs(total_keys as u64);
println!("Get lamports...");
let num_lamports_per_account = 20;
// Sample the first keypair, see if it has lamports, if so then resume
// to avoid lamport loss
let keypair0_balance = client
.poll_get_balance(&gen_keypairs.last().unwrap().pubkey())
.unwrap_or(0);
if num_lamports_per_account > keypair0_balance {
let extra = num_lamports_per_account - keypair0_balance;
let total = extra * (gen_keypairs.len() as u64);
airdrop_lamports(&client, &drone_addr, &id, total);
println!("adding more lamports {}", extra);
fund_keys(&client, &id, &gen_keypairs, extra);
}
let start = gen_keypairs.len() - (tx_count * 2) as usize;
let keypairs = &gen_keypairs[start..];
@ -109,15 +89,16 @@ pub fn do_bench_tps(config: Config) {
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
println!("Sampling TPS every {} second...", sample_period);
let v_threads: Vec<_> = nodes
.into_iter()
.map(|v| {
let v_threads: Vec<_> = clients
.iter()
.map(|client| {
let exit_signal = exit_signal.clone();
let maxes = maxes.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(&exit_signal, &maxes, first_tx_count, &v, sample_period);
sample_tx_count(&exit_signal, &maxes, first_tx_count, sample_period, &client);
})
.unwrap()
})
@ -132,19 +113,19 @@ pub fn do_bench_tps(config: Config) {
.map(|_| {
let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone();
let cluster_entrypoint = cluster_entrypoint.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
do_tx_transfers(
&exit_signal,
&shared_txs,
&cluster_entrypoint,
&shared_tx_active_thread_count,
&total_tx_sent_count,
thread_batch_sleep_ms,
&client,
);
})
.unwrap()
@ -156,7 +137,7 @@ pub fn do_bench_tps(config: Config) {
let mut reclaim_lamports_back_to_source_account = false;
let mut i = keypair0_balance;
while start.elapsed() < duration {
let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
let balance = client.get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_lamport_balance(balance);
// ping-pong between source and destination accounts for each loop iteration
@ -169,7 +150,7 @@ pub fn do_bench_tps(config: Config) {
&keypairs[len..],
threads,
reclaim_lamports_back_to_source_account,
&cluster_entrypoint,
&client,
);
// In sustained mode overlap the transfers with generation
// this has higher average performance but lower peak performance
@ -181,7 +162,7 @@ pub fn do_bench_tps(config: Config) {
}
i += 1;
if should_switch_directions(num_lamports_per_account, i) {
if should_switch_directions(NUM_LAMPORTS_PER_ACCOUNT, i) {
reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account;
}
}
@ -204,7 +185,7 @@ pub fn do_bench_tps(config: Config) {
}
}
let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
let balance = client.get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_lamport_balance(balance);
compute_and_report_stats(
@ -225,20 +206,19 @@ fn metrics_submit_lamport_balance(lamport_balance: u64) {
);
}
fn sample_tx_count(
fn sample_tx_count<T: Client>(
exit_signal: &Arc<AtomicBool>,
maxes: &Arc<RwLock<Vec<(SocketAddr, NodeStats)>>>,
maxes: &Arc<RwLock<Vec<(String, NodeStats)>>>,
first_tx_count: u64,
v: &ContactInfo,
sample_period: u64,
client: &Arc<T>,
) {
let client = create_client(v.client_facing_addr(), FULLNODE_PORT_RANGE);
let mut now = Instant::now();
let mut initial_tx_count = client.get_transaction_count().expect("transaction count");
let mut max_tps = 0.0;
let mut total;
let log_prefix = format!("{:21}:", v.tpu.to_string());
let log_prefix = format!("{:21}:", client.transactions_addr());
loop {
let tx_count = client.get_transaction_count().expect("transaction count");
@ -275,21 +255,23 @@ fn sample_tx_count(
tps: max_tps,
tx: total,
};
maxes.write().unwrap().push((v.tpu, stats));
maxes
.write()
.unwrap()
.push((client.transactions_addr(), stats));
break;
}
}
}
fn generate_txs(
fn generate_txs<T: Client>(
shared_txs: &SharedTransactions,
source: &[Keypair],
dest: &[Keypair],
threads: usize,
reclaim: bool,
contact_info: &ContactInfo,
client: &Arc<T>,
) {
let client = create_client(contact_info.client_facing_addr(), FULLNODE_PORT_RANGE);
let blockhash = client.get_recent_blockhash().unwrap();
let tx_count = source.len();
println!("Signing transactions... {} (reclaim={})", tx_count, reclaim);
@ -341,15 +323,14 @@ fn generate_txs(
}
}
fn do_tx_transfers(
fn do_tx_transfers<T: Client>(
exit_signal: &Arc<AtomicBool>,
shared_txs: &SharedTransactions,
contact_info: &ContactInfo,
shared_tx_thread_count: &Arc<AtomicIsize>,
total_tx_sent_count: &Arc<AtomicUsize>,
thread_batch_sleep_ms: usize,
client: &Arc<T>,
) {
let client = create_client(contact_info.client_facing_addr(), FULLNODE_PORT_RANGE);
loop {
if thread_batch_sleep_ms > 0 {
sleep(Duration::from_millis(thread_batch_sleep_ms as u64));
@ -364,7 +345,7 @@ fn do_tx_transfers(
println!(
"Transferring 1 unit {} times... to {}",
txs0.len(),
contact_info.tpu
client.as_ref().transactions_addr(),
);
let tx_len = txs0.len();
let transfer_start = Instant::now();
@ -399,7 +380,7 @@ fn do_tx_transfers(
}
}
fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u64) -> bool {
fn verify_funding_transfer<T: Client>(client: &T, tx: &Transaction, amount: u64) -> bool {
for a in &tx.message().account_keys[1..] {
if client.get_balance(a).unwrap_or(0) >= amount {
return true;
@ -412,7 +393,7 @@ fn verify_funding_transfer(client: &ThinClient, tx: &Transaction, amount: u64) -
/// fund the dests keys by spending all of the source keys into MAX_SPENDS_PER_TX
/// on every iteration. This allows us to replay the transfers because the source is either empty,
/// or full
fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lamports: u64) {
pub fn fund_keys<T: Client>(client: &T, source: &Keypair, dests: &[Keypair], lamports: u64) {
let total = lamports * dests.len() as u64;
let mut funded: Vec<(&Keypair, u64)> = vec![(source, total)];
let mut notfunded: Vec<&Keypair> = dests.iter().collect();
@ -514,8 +495,13 @@ fn fund_keys(client: &ThinClient, source: &Keypair, dests: &[Keypair], lamports:
}
}
fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair, tx_count: u64) {
let starting_balance = client.poll_get_balance(&id.pubkey()).unwrap_or(0);
pub fn airdrop_lamports<T: Client>(
client: &T,
drone_addr: &SocketAddr,
id: &Keypair,
tx_count: u64,
) {
let starting_balance = client.get_balance(&id.pubkey()).unwrap_or(0);
metrics_submit_lamport_balance(starting_balance);
println!("starting balance {}", starting_balance);
@ -532,7 +518,7 @@ fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair,
match request_airdrop_transaction(&drone_addr, &id.pubkey(), airdrop_amount, blockhash) {
Ok(transaction) => {
let signature = client.async_send_transaction(transaction).unwrap();
client.poll_for_signature(&signature).unwrap();
client.get_signature_status(&signature).unwrap();
}
Err(err) => {
panic!(
@ -542,7 +528,7 @@ fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair,
}
};
let current_balance = client.poll_get_balance(&id.pubkey()).unwrap_or_else(|e| {
let current_balance = client.get_balance(&id.pubkey()).unwrap_or_else(|e| {
println!("airdrop error {}", e);
starting_balance
});
@ -562,7 +548,7 @@ fn airdrop_lamports(client: &ThinClient, drone_addr: &SocketAddr, id: &Keypair,
}
fn compute_and_report_stats(
maxes: &Arc<RwLock<Vec<(SocketAddr, NodeStats)>>>,
maxes: &Arc<RwLock<Vec<(String, NodeStats)>>>,
sample_period: u64,
tx_send_elapsed: &Duration,
total_tx_send_count: usize,
@ -583,10 +569,7 @@ fn compute_and_report_stats(
println!(
"{:20} | {:13.2} | {} {}",
(*sock).to_string(),
stats.tps,
stats.tx,
maybe_flag
sock, stats.tps, stats.tx, maybe_flag
);
if stats.tps == 0.0 {
@ -632,12 +615,31 @@ fn should_switch_directions(num_lamports_per_account: u64, i: u64) -> bool {
i % (num_lamports_per_account / 4) == 0 && (i >= (3 * num_lamports_per_account) / 4)
}
pub fn generate_keypairs(id: &Keypair, tx_count: usize) -> Vec<Keypair> {
let mut seed = [0u8; 32];
seed.copy_from_slice(&id.to_bytes()[..32]);
let mut rnd = GenKeys::new(seed);
let mut total_keys = 0;
let mut target = tx_count * 2;
while target > 0 {
total_keys += target;
target /= MAX_SPENDS_PER_TX;
}
rnd.gen_n_keypairs(total_keys as u64)
}
#[cfg(test)]
mod tests {
use super::*;
use solana::cluster_info::FULLNODE_PORT_RANGE;
use solana::fullnode::FullnodeConfig;
use solana::local_cluster::{ClusterConfig, LocalCluster};
use solana_client::thin_client::create_client;
use solana_drone::drone::run_local_drone;
use solana_runtime::bank::Bank;
use solana_runtime::bank_client::BankClient;
use solana_sdk::genesis_block::GenesisBlock;
use std::sync::mpsc::channel;
#[test]
@ -674,13 +676,33 @@ mod tests {
run_local_drone(drone_keypair, addr_sender, None);
let drone_addr = addr_receiver.recv_timeout(Duration::from_secs(2)).unwrap();
let mut cfg = Config::default();
cfg.network_addr = cluster.entry_point_info.gossip;
cfg.drone_addr = drone_addr;
cfg.tx_count = 100;
cfg.duration = Duration::from_secs(5);
cfg.num_nodes = NUM_NODES;
let mut config = Config::default();
config.tx_count = 100;
config.duration = Duration::from_secs(5);
do_bench_tps(cfg);
let keypairs = generate_keypairs(&config.id, config.tx_count);
let client = create_client(
(cluster.entry_point_info.gossip, drone_addr),
FULLNODE_PORT_RANGE,
);
do_bench_tps(vec![client], config, keypairs, 0);
}
#[test]
fn test_bench_tps_bank_client() {
let (genesis_block, id) = GenesisBlock::new(10_000);
let bank = Bank::new(&genesis_block);
let clients = vec![BankClient::new(bank)];
let mut config = Config::default();
config.id = id;
config.tx_count = 10;
config.duration = Duration::from_secs(5);
let keypairs = generate_keypairs(&config.id, config.tx_count);
fund_keys(&clients[0], &config.id, &keypairs, 20);
do_bench_tps(clients, config, keypairs, 0);
}
}

View File

@ -1,15 +1,91 @@
mod bench;
mod cli;
use crate::bench::do_bench_tps;
use crate::bench::{
airdrop_lamports, do_bench_tps, fund_keys, generate_keypairs, Config, NUM_LAMPORTS_PER_ACCOUNT,
};
use solana::cluster_info::FULLNODE_PORT_RANGE;
use solana::contact_info::ContactInfo;
use solana::gossip_service::discover_nodes;
use solana_client::thin_client::create_client;
use solana_sdk::client::SyncClient;
use solana_sdk::signature::KeypairUtil;
use std::process::exit;
fn main() {
solana_logger::setup();
solana_metrics::set_panic_hook("bench-tps");
let matches = cli::build_args().get_matches();
let cli_config = cli::extract_args(&matches);
let cfg = cli::extract_args(&matches);
let cli::Config {
network_addr,
drone_addr,
id,
threads,
num_nodes,
duration,
tx_count,
thread_batch_sleep_ms,
sustained,
} = cli_config;
do_bench_tps(cfg);
println!("Connecting to the cluster");
let nodes = discover_nodes(&network_addr, num_nodes).unwrap_or_else(|err| {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1);
});
if nodes.len() < num_nodes {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes
);
exit(1);
}
let clients: Vec<_> = nodes
.iter()
.filter_map(|node| {
let cluster_entrypoint = node.clone();
let cluster_addrs = cluster_entrypoint.client_facing_addr();
if ContactInfo::is_valid_address(&cluster_addrs.0)
&& ContactInfo::is_valid_address(&cluster_addrs.1)
{
let client = create_client(cluster_addrs, FULLNODE_PORT_RANGE);
Some(client)
} else {
None
}
})
.collect();
println!("Creating {} keypairs...", tx_count * 2);
let keypairs = generate_keypairs(&id, tx_count);
println!("Get lamports...");
// Sample the first keypair, see if it has lamports, if so then resume.
// This logic is to prevent lamport loss on repeated solana-bench-tps executions
let keypair0_balance = clients[0]
.get_balance(&keypairs.last().unwrap().pubkey())
.unwrap_or(0);
if NUM_LAMPORTS_PER_ACCOUNT > keypair0_balance {
let extra = NUM_LAMPORTS_PER_ACCOUNT - keypair0_balance;
let total = extra * (keypairs.len() as u64);
airdrop_lamports(&clients[0], &drone_addr, &id, total);
println!("adding more lamports {}", extra);
fund_keys(&clients[0], &id, &keypairs, extra);
}
let config = Config {
id,
threads,
thread_batch_sleep_ms,
duration,
tx_count,
sustained,
};
do_bench_tps(clients, config, keypairs, keypair0_balance);
}

View File

@ -168,7 +168,11 @@ impl ThinClient {
}
}
impl Client for ThinClient {}
impl Client for ThinClient {
fn transactions_addr(&self) -> String {
self.transactions_addr.to_string()
}
}
impl SyncClient for ThinClient {
fn send_message(&self, keypairs: &[&Keypair], message: Message) -> TransportResult<Signature> {

View File

@ -20,7 +20,11 @@ pub struct BankClient {
transaction_sender: Mutex<Sender<Transaction>>,
}
impl Client for BankClient {}
impl Client for BankClient {
fn transactions_addr(&self) -> String {
"Local BankClient".to_string()
}
}
impl AsyncClient for BankClient {
fn async_send_transaction(&self, transaction: Transaction) -> io::Result<Signature> {

View File

@ -16,7 +16,9 @@ use crate::transaction;
use crate::transport::Result;
use std::io;
pub trait Client: SyncClient + AsyncClient {}
pub trait Client: SyncClient + AsyncClient {
fn transactions_addr(&self) -> String;
}
pub trait SyncClient {
/// Create a transaction from the given message, and send it to the

View File

@ -1,4 +1,6 @@
use crate::transaction::TransactionError;
use std::error;
use std::fmt;
use std::io;
#[derive(Debug)]
@ -7,6 +9,16 @@ pub enum TransportError {
TransactionError(TransactionError),
}
impl error::Error for TransportError {}
impl fmt::Display for TransportError {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
match self {
TransportError::IoError(err) => write!(formatter, "{:?}", err),
TransportError::TransactionError(err) => write!(formatter, "{:?}", err),
}
}
}
impl TransportError {
pub fn unwrap(&self) -> TransactionError {
if let TransportError::TransactionError(err) = self {