From 5b879067e660f3ac407a6f9696e850d4ec68c03e Mon Sep 17 00:00:00 2001 From: kirill lykov Date: Fri, 5 Aug 2022 10:51:15 +0200 Subject: [PATCH] Bench tps: refactor client creation (#26862) * relax Sized restriction functions using client * extract function to build client --- bench-tps/src/bench.rs | 33 ++--- bench-tps/src/keypairs.rs | 2 +- bench-tps/src/main.rs | 259 +++++++++++++++++++----------------- bench-tps/src/perf_utils.rs | 2 +- bench-tps/src/send_batch.rs | 34 ++--- 5 files changed, 177 insertions(+), 153 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 1e19ef089c..53a82c143d 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -62,7 +62,7 @@ impl<'a> KeypairChunks<'a> { } } -struct TransactionChunkGenerator<'a, 'b, T> { +struct TransactionChunkGenerator<'a, 'b, T: ?Sized> { client: Arc, account_chunks: KeypairChunks<'a>, nonce_chunks: Option>, @@ -72,7 +72,7 @@ struct TransactionChunkGenerator<'a, 'b, T> { impl<'a, 'b, T> TransactionChunkGenerator<'a, 'b, T> where - T: 'static + BenchTpsClient + Send + Sync, + T: 'static + BenchTpsClient + Send + Sync + ?Sized, { fn new( client: Arc, @@ -165,7 +165,7 @@ where fn wait_for_target_slots_per_epoch(target_slots_per_epoch: u64, client: &Arc) where - T: 'static + BenchTpsClient + Send + Sync, + T: 'static + BenchTpsClient + Send + Sync + ?Sized, { if target_slots_per_epoch != 0 { info!( @@ -195,7 +195,7 @@ fn create_sampler_thread( maxes: &Arc>>, ) -> JoinHandle<()> where - T: 'static + BenchTpsClient + Send + Sync, + T: 'static + BenchTpsClient + Send + Sync + ?Sized, { info!("Sampling TPS every {} second...", sample_period); let exit_signal = exit_signal.clone(); @@ -209,7 +209,7 @@ where .unwrap() } -fn generate_chunked_transfers( +fn generate_chunked_transfers( recent_blockhash: Arc>, shared_txs: &SharedTransactions, shared_tx_active_thread_count: Arc, @@ -264,7 +264,7 @@ fn create_sender_threads( shared_tx_active_thread_count: &Arc, ) -> Vec> where - T: 'static + BenchTpsClient + Send + Sync, + T: 'static + BenchTpsClient + Send + Sync + ?Sized, { (0..threads) .map(|_| { @@ -292,7 +292,7 @@ where pub fn do_bench_tps(client: Arc, config: Config, gen_keypairs: Vec) -> u64 where - T: 'static + BenchTpsClient + Send + Sync, + T: 'static + BenchTpsClient + Send + Sync + ?Sized, { let Config { id, @@ -441,7 +441,7 @@ fn generate_system_txs( .collect() } -fn get_nonce_blockhash( +fn get_nonce_blockhash( client: Arc, nonce_account_pubkey: Pubkey, ) -> Hash { @@ -453,7 +453,7 @@ fn get_nonce_blockhash( nonce_data.blockhash() } -fn generate_nonced_system_txs( +fn generate_nonced_system_txs( client: Arc, source: &[&Keypair], dest: &VecDeque<&Keypair>, @@ -495,7 +495,7 @@ fn generate_nonced_system_txs( transactions } -fn generate_txs( +fn generate_txs( shared_txs: &SharedTransactions, blockhash: &Arc>, chunk_generator: &mut TransactionChunkGenerator<'_, '_, T>, @@ -515,7 +515,10 @@ fn generate_txs( } } -fn get_new_latest_blockhash(client: &Arc, blockhash: &Hash) -> Option { +fn get_new_latest_blockhash( + client: &Arc, + blockhash: &Hash, +) -> Option { let start = Instant::now(); while start.elapsed().as_secs() < 5 { if let Ok(new_blockhash) = client.get_latest_blockhash() { @@ -531,7 +534,7 @@ fn get_new_latest_blockhash(client: &Arc, blockhash: &Hash None } -fn poll_blockhash( +fn poll_blockhash( exit_signal: &Arc, blockhash: &Arc>, client: &Arc, @@ -581,7 +584,7 @@ fn poll_blockhash( } } -fn do_tx_transfers( +fn do_tx_transfers( exit_signal: &Arc, shared_txs: &SharedTransactions, shared_tx_thread_count: &Arc, @@ -734,7 +737,7 @@ fn compute_and_report_stats( ); } -pub fn generate_and_fund_keypairs( +pub fn generate_and_fund_keypairs( client: Arc, funding_key: &Keypair, keypair_count: usize, @@ -753,7 +756,7 @@ pub fn generate_and_fund_keypairs( Ok(keypairs) } -pub fn fund_keypairs( +pub fn fund_keypairs( client: Arc, funding_key: &Keypair, keypairs: &[Keypair], diff --git a/bench-tps/src/keypairs.rs b/bench-tps/src/keypairs.rs index e165e484d5..601f74c065 100644 --- a/bench-tps/src/keypairs.rs +++ b/bench-tps/src/keypairs.rs @@ -18,7 +18,7 @@ pub fn get_keypairs( read_from_client_file: bool, ) -> Vec where - T: 'static + BenchTpsClient + Send + Sync, + T: 'static + BenchTpsClient + Send + Sync + ?Sized, { if read_from_client_file { let path = Path::new(client_ids_and_stake_file); diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index d7c77d0322..8d8fde7fda 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -1,9 +1,11 @@ #![allow(clippy::integer_arithmetic)] + use { clap::value_t, log::*, solana_bench_tps::{ bench::do_bench_tps, + bench_tps_client::BenchTpsClient, cli::{self, ExternalClientType}, keypairs::get_keypairs, send_batch::generate_keypairs, @@ -17,15 +19,115 @@ use { solana_genesis::Base64Account, solana_gossip::gossip_service::{discover_cluster, get_client, get_multi_client}, solana_sdk::{ - commitment_config::CommitmentConfig, fee_calculator::FeeRateGovernor, system_program, + commitment_config::CommitmentConfig, fee_calculator::FeeRateGovernor, pubkey::Pubkey, + system_program, }, solana_streamer::socket::SocketAddrSpace, - std::{collections::HashMap, fs::File, io::prelude::*, path::Path, process::exit, sync::Arc}, + std::{ + collections::HashMap, fs::File, io::prelude::*, net::SocketAddr, path::Path, process::exit, + sync::Arc, + }, }; /// Number of signatures for all transactions in ~1 week at ~100K TPS pub const NUM_SIGNATURES_FOR_TXS: u64 = 100_000 * 60 * 60 * 24 * 7; +#[allow(clippy::too_many_arguments)] +fn create_client( + external_client_type: &ExternalClientType, + entrypoint_addr: &SocketAddr, + json_rpc_url: &str, + websocket_url: &str, + multi_client: bool, + use_quic: bool, + tpu_connection_pool_size: usize, + rpc_tpu_sockets: Option<(SocketAddr, SocketAddr)>, + num_nodes: usize, + target_node: Option, +) -> Arc { + match external_client_type { + ExternalClientType::RpcClient => Arc::new(RpcClient::new_with_commitment( + json_rpc_url.to_string(), + CommitmentConfig::confirmed(), + )), + ExternalClientType::ThinClient => { + let connection_cache = match use_quic { + true => Arc::new(ConnectionCache::new(tpu_connection_pool_size)), + false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)), + }; + + if let Some((rpc, tpu)) = rpc_tpu_sockets { + Arc::new(ThinClient::new(rpc, tpu, connection_cache)) + } else { + let nodes = + discover_cluster(entrypoint_addr, num_nodes, SocketAddrSpace::Unspecified) + .unwrap_or_else(|err| { + eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); + exit(1); + }); + if multi_client { + let (client, num_clients) = + get_multi_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache); + if nodes.len() < num_clients { + eprintln!( + "Error: Insufficient nodes discovered. Expecting {} or more", + num_nodes + ); + exit(1); + } + Arc::new(client) + } else if let Some(target_node) = target_node { + info!("Searching for target_node: {:?}", target_node); + let mut target_client = None; + for node in nodes { + if node.id == target_node { + target_client = Some(get_client( + &[node], + &SocketAddrSpace::Unspecified, + connection_cache, + )); + break; + } + } + Arc::new(target_client.unwrap_or_else(|| { + eprintln!("Target node {} not found", target_node); + exit(1); + })) + } else { + Arc::new(get_client( + &nodes, + &SocketAddrSpace::Unspecified, + connection_cache, + )) + } + } + } + ExternalClientType::TpuClient => { + let rpc_client = Arc::new(RpcClient::new_with_commitment( + json_rpc_url.to_string(), + CommitmentConfig::confirmed(), + )); + let connection_cache = match use_quic { + true => ConnectionCache::new(tpu_connection_pool_size), + false => ConnectionCache::with_udp(tpu_connection_pool_size), + }; + + Arc::new( + TpuClient::new_with_connection_cache( + rpc_client, + websocket_url, + TpuClientConfig::default(), + Arc::new(connection_cache), + ) + .unwrap_or_else(|err| { + eprintln!("Could not create TpuClient {:?}", err); + exit(1); + }), + ) + } + } +} + fn main() { solana_logger::setup_with_default("solana=info"); solana_metrics::set_panic_hook("bench-tps", /*version:*/ None); @@ -86,124 +188,43 @@ fn main() { } info!("Connecting to the cluster"); - - match external_client_type { - ExternalClientType::RpcClient => { - let client = Arc::new(RpcClient::new_with_commitment( - json_rpc_url.to_string(), - CommitmentConfig::confirmed(), - )); - let keypairs = get_keypairs( - client.clone(), - id, - keypair_count, - *num_lamports_per_account, - client_ids_and_stake_file, - *read_from_client_file, - ); - do_bench_tps(client, cli_config, keypairs); - } - ExternalClientType::ThinClient => { - let connection_cache = match use_quic { - true => Arc::new(ConnectionCache::new(*tpu_connection_pool_size)), - false => Arc::new(ConnectionCache::with_udp(*tpu_connection_pool_size)), - }; - - let client = if let Ok(rpc_addr) = value_t!(matches, "rpc_addr", String) { - let rpc = rpc_addr.parse().unwrap_or_else(|e| { - eprintln!("RPC address should parse as socketaddr {:?}", e); + let rpc_tpu_sockets: Option<(SocketAddr, SocketAddr)> = + if let Ok(rpc_addr) = value_t!(matches, "rpc_addr", String) { + let rpc = rpc_addr.parse().unwrap_or_else(|e| { + eprintln!("RPC address should parse as socketaddr {:?}", e); + exit(1); + }); + let tpu = value_t!(matches, "tpu_addr", String) + .unwrap() + .parse() + .unwrap_or_else(|e| { + eprintln!("TPU address should parse to a socket: {:?}", e); exit(1); }); - let tpu = value_t!(matches, "tpu_addr", String) - .unwrap() - .parse() - .unwrap_or_else(|e| { - eprintln!("TPU address should parse to a socket: {:?}", e); - exit(1); - }); + Some((rpc, tpu)) + } else { + None + }; - ThinClient::new(rpc, tpu, connection_cache) - } else { - let nodes = - discover_cluster(entrypoint_addr, *num_nodes, SocketAddrSpace::Unspecified) - .unwrap_or_else(|err| { - eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); - exit(1); - }); - if *multi_client { - let (client, num_clients) = - get_multi_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache); - if nodes.len() < num_clients { - eprintln!( - "Error: Insufficient nodes discovered. Expecting {} or more", - num_nodes - ); - exit(1); - } - client - } else if let Some(target_node) = target_node { - info!("Searching for target_node: {:?}", target_node); - let mut target_client = None; - for node in nodes { - if node.id == *target_node { - target_client = Some(get_client( - &[node], - &SocketAddrSpace::Unspecified, - connection_cache, - )); - break; - } - } - target_client.unwrap_or_else(|| { - eprintln!("Target node {} not found", target_node); - exit(1); - }) - } else { - get_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache) - } - }; - let client = Arc::new(client); - let keypairs = get_keypairs( - client.clone(), - id, - keypair_count, - *num_lamports_per_account, - client_ids_and_stake_file, - *read_from_client_file, - ); - do_bench_tps(client, cli_config, keypairs); - } - ExternalClientType::TpuClient => { - let rpc_client = Arc::new(RpcClient::new_with_commitment( - json_rpc_url.to_string(), - CommitmentConfig::confirmed(), - )); - let connection_cache = match use_quic { - true => ConnectionCache::new(*tpu_connection_pool_size), - false => ConnectionCache::with_udp(*tpu_connection_pool_size), - }; - - let client = Arc::new( - TpuClient::new_with_connection_cache( - rpc_client, - websocket_url, - TpuClientConfig::default(), - Arc::new(connection_cache), - ) - .unwrap_or_else(|err| { - eprintln!("Could not create TpuClient {:?}", err); - exit(1); - }), - ); - let keypairs = get_keypairs( - client.clone(), - id, - keypair_count, - *num_lamports_per_account, - client_ids_and_stake_file, - *read_from_client_file, - ); - do_bench_tps(client, cli_config, keypairs); - } - } + let client = create_client( + external_client_type, + entrypoint_addr, + json_rpc_url, + websocket_url, + *multi_client, + *use_quic, + *tpu_connection_pool_size, + rpc_tpu_sockets, + *num_nodes, + *target_node, + ); + let keypairs = get_keypairs( + client.clone(), + id, + keypair_count, + *num_lamports_per_account, + client_ids_and_stake_file, + *read_from_client_file, + ); + do_bench_tps(client, cli_config, keypairs); } diff --git a/bench-tps/src/perf_utils.rs b/bench-tps/src/perf_utils.rs index 7e31e6408e..5e80695cd5 100644 --- a/bench-tps/src/perf_utils.rs +++ b/bench-tps/src/perf_utils.rs @@ -28,7 +28,7 @@ pub fn sample_txs( sample_period: u64, client: &Arc, ) where - T: BenchTpsClient, + T: BenchTpsClient + ?Sized, { let mut max_tps = 0.0; let mut total_elapsed; diff --git a/bench-tps/src/send_batch.rs b/bench-tps/src/send_batch.rs index 77a7403490..cda4a45bba 100644 --- a/bench-tps/src/send_batch.rs +++ b/bench-tps/src/send_batch.rs @@ -27,7 +27,7 @@ use { }, }; -pub fn get_latest_blockhash(client: &T) -> Hash { +pub fn get_latest_blockhash(client: &T) -> Hash { loop { match client.get_latest_blockhash_with_commitment(CommitmentConfig::processed()) { Ok((blockhash, _)) => return blockhash, @@ -58,7 +58,7 @@ pub fn generate_keypairs(seed_keypair: &Keypair, count: u64) -> (Vec, u /// fund the dests keys by spending all of the source keys into MAX_SPENDS_PER_TX /// on every iteration. This allows us to replay the transfers because the source is either empty, /// or full -pub fn fund_keys( +pub fn fund_keys( client: Arc, source: &Keypair, dests: &[Keypair], @@ -96,7 +96,7 @@ pub fn fund_keys( } } -pub fn generate_durable_nonce_accounts( +pub fn generate_durable_nonce_accounts( client: Arc, authority_keypairs: &[Keypair], ) -> Vec { @@ -122,7 +122,7 @@ pub fn generate_durable_nonce_accounts( +pub fn withdraw_durable_nonce_accounts( client: Arc, authority_keypairs: &[Keypair], nonce_keypairs: &[Keypair], @@ -145,7 +145,7 @@ const MAX_SPENDS_PER_TX: u64 = 4; // assume 4MB network buffers, and 512 byte packets const FUND_CHUNK_LEN: usize = 4 * 1024 * 1024 / 512; -fn verify_funding_transfer( +fn verify_funding_transfer( client: &Arc, tx: &Transaction, amount: u64, @@ -169,11 +169,11 @@ trait SendBatchTransactions<'a, T: Sliceable + Send + Sync> { ); fn send_transactions(&mut self, client: &Arc, to_lamports: u64, log_progress: F) where - C: 'static + BenchTpsClient + Send + Sync, + C: 'static + BenchTpsClient + Send + Sync + ?Sized, F: Fn(usize, usize); fn sign(&mut self, blockhash: Hash); - fn send(&self, client: &Arc); - fn verify( + fn send(&self, client: &Arc); + fn verify( &mut self, client: &Arc, to_lamports: u64, @@ -207,7 +207,7 @@ where fn send_transactions(&mut self, client: &Arc, to_lamports: u64, log_progress: F) where - C: 'static + BenchTpsClient + Send + Sync, + C: 'static + BenchTpsClient + Send + Sync + ?Sized, F: Fn(usize, usize), { let mut tries: usize = 0; @@ -241,7 +241,7 @@ where debug!("sign {} txs: {}us", self.len(), sign_txs.as_us()); } - fn send(&self, client: &Arc) { + fn send(&self, client: &Arc) { let mut send_txs = Measure::start("send_and_clone_txs"); let batch: Vec<_> = self.iter().map(|(_keypair, tx)| tx.clone()).collect(); client.send_batch(batch).expect("transfer"); @@ -249,7 +249,7 @@ where debug!("send {} {}", self.len(), send_txs); } - fn verify( + fn verify( &mut self, client: &Arc, to_lamports: u64, @@ -341,7 +341,7 @@ impl<'a> Sliceable for FundingSigners<'a> { } trait FundingTransactions<'a>: SendBatchTransactions<'a, FundingSigners<'a>> { - fn fund( + fn fund( &mut self, client: &Arc, to_fund: &FundingChunk<'a>, @@ -350,7 +350,7 @@ trait FundingTransactions<'a>: SendBatchTransactions<'a, FundingSigners<'a>> { } impl<'a> FundingTransactions<'a> for FundingContainer<'a> { - fn fund( + fn fund( &mut self, client: &Arc, to_fund: &FundingChunk<'a>, @@ -396,7 +396,7 @@ impl<'a> Sliceable for NonceCreateSigners<'a> { } trait NonceTransactions<'a>: SendBatchTransactions<'a, NonceCreateSigners<'a>> { - fn create_accounts( + fn create_accounts( &mut self, client: &Arc, to_fund: &'a NonceCreateChunk<'a>, @@ -405,7 +405,7 @@ trait NonceTransactions<'a>: SendBatchTransactions<'a, NonceCreateSigners<'a>> { } impl<'a> NonceTransactions<'a> for NonceCreateContainer<'a> { - fn create_accounts( + fn create_accounts( &mut self, client: &Arc, to_fund: &'a NonceCreateChunk<'a>, @@ -453,14 +453,14 @@ impl<'a> Sliceable for NonceWithdrawSigners<'a> { } trait NonceWithdrawTransactions<'a>: SendBatchTransactions<'a, NonceWithdrawSigners<'a>> { - fn withdraw_accounts( + fn withdraw_accounts( &mut self, client: &Arc, to_withdraw: &'a NonceWithdrawChunk<'a>, ); } impl<'a> NonceWithdrawTransactions<'a> for NonceWithdrawContainer<'a> { - fn withdraw_accounts( + fn withdraw_accounts( &mut self, client: &Arc, to_withdraw: &'a NonceWithdrawChunk<'a>,