Bench tps: refactor client creation (#26862)

* relax Sized restriction functions using client

* extract function to build client
This commit is contained in:
kirill lykov 2022-08-05 10:51:15 +02:00 committed by GitHub
parent 2dca239480
commit 5b879067e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 177 additions and 153 deletions

View File

@ -62,7 +62,7 @@ impl<'a> KeypairChunks<'a> {
}
}
struct TransactionChunkGenerator<'a, 'b, T> {
struct TransactionChunkGenerator<'a, 'b, T: ?Sized> {
client: Arc<T>,
account_chunks: KeypairChunks<'a>,
nonce_chunks: Option<KeypairChunks<'b>>,
@ -72,7 +72,7 @@ struct TransactionChunkGenerator<'a, 'b, T> {
impl<'a, 'b, T> TransactionChunkGenerator<'a, 'b, T>
where
T: 'static + BenchTpsClient + Send + Sync,
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
fn new(
client: Arc<T>,
@ -165,7 +165,7 @@ where
fn wait_for_target_slots_per_epoch<T>(target_slots_per_epoch: u64, client: &Arc<T>)
where
T: 'static + BenchTpsClient + Send + Sync,
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
if target_slots_per_epoch != 0 {
info!(
@ -195,7 +195,7 @@ fn create_sampler_thread<T>(
maxes: &Arc<RwLock<Vec<(String, SampleStats)>>>,
) -> JoinHandle<()>
where
T: 'static + BenchTpsClient + Send + Sync,
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
info!("Sampling TPS every {} second...", sample_period);
let exit_signal = exit_signal.clone();
@ -209,7 +209,7 @@ where
.unwrap()
}
fn generate_chunked_transfers<T: 'static + BenchTpsClient + Send + Sync>(
fn generate_chunked_transfers<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
recent_blockhash: Arc<RwLock<Hash>>,
shared_txs: &SharedTransactions,
shared_tx_active_thread_count: Arc<AtomicIsize>,
@ -264,7 +264,7 @@ fn create_sender_threads<T>(
shared_tx_active_thread_count: &Arc<AtomicIsize>,
) -> Vec<JoinHandle<()>>
where
T: 'static + BenchTpsClient + Send + Sync,
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
(0..threads)
.map(|_| {
@ -292,7 +292,7 @@ where
pub fn do_bench_tps<T>(client: Arc<T>, config: Config, gen_keypairs: Vec<Keypair>) -> u64
where
T: 'static + BenchTpsClient + Send + Sync,
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
let Config {
id,
@ -441,7 +441,7 @@ fn generate_system_txs(
.collect()
}
fn get_nonce_blockhash<T: 'static + BenchTpsClient + Send + Sync>(
fn get_nonce_blockhash<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
client: Arc<T>,
nonce_account_pubkey: Pubkey,
) -> Hash {
@ -453,7 +453,7 @@ fn get_nonce_blockhash<T: 'static + BenchTpsClient + Send + Sync>(
nonce_data.blockhash()
}
fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync>(
fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
client: Arc<T>,
source: &[&Keypair],
dest: &VecDeque<&Keypair>,
@ -495,7 +495,7 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync>(
transactions
}
fn generate_txs<T: 'static + BenchTpsClient + Send + Sync>(
fn generate_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
shared_txs: &SharedTransactions,
blockhash: &Arc<RwLock<Hash>>,
chunk_generator: &mut TransactionChunkGenerator<'_, '_, T>,
@ -515,7 +515,10 @@ fn generate_txs<T: 'static + BenchTpsClient + Send + Sync>(
}
}
fn get_new_latest_blockhash<T: BenchTpsClient>(client: &Arc<T>, blockhash: &Hash) -> Option<Hash> {
fn get_new_latest_blockhash<T: BenchTpsClient + ?Sized>(
client: &Arc<T>,
blockhash: &Hash,
) -> Option<Hash> {
let start = Instant::now();
while start.elapsed().as_secs() < 5 {
if let Ok(new_blockhash) = client.get_latest_blockhash() {
@ -531,7 +534,7 @@ fn get_new_latest_blockhash<T: BenchTpsClient>(client: &Arc<T>, blockhash: &Hash
None
}
fn poll_blockhash<T: BenchTpsClient>(
fn poll_blockhash<T: BenchTpsClient + ?Sized>(
exit_signal: &Arc<AtomicBool>,
blockhash: &Arc<RwLock<Hash>>,
client: &Arc<T>,
@ -581,7 +584,7 @@ fn poll_blockhash<T: BenchTpsClient>(
}
}
fn do_tx_transfers<T: BenchTpsClient>(
fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
exit_signal: &Arc<AtomicBool>,
shared_txs: &SharedTransactions,
shared_tx_thread_count: &Arc<AtomicIsize>,
@ -734,7 +737,7 @@ fn compute_and_report_stats(
);
}
pub fn generate_and_fund_keypairs<T: 'static + BenchTpsClient + Send + Sync>(
pub fn generate_and_fund_keypairs<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
client: Arc<T>,
funding_key: &Keypair,
keypair_count: usize,
@ -753,7 +756,7 @@ pub fn generate_and_fund_keypairs<T: 'static + BenchTpsClient + Send + Sync>(
Ok(keypairs)
}
pub fn fund_keypairs<T: 'static + BenchTpsClient + Send + Sync>(
pub fn fund_keypairs<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
client: Arc<T>,
funding_key: &Keypair,
keypairs: &[Keypair],

View File

@ -18,7 +18,7 @@ pub fn get_keypairs<T>(
read_from_client_file: bool,
) -> Vec<Keypair>
where
T: 'static + BenchTpsClient + Send + Sync,
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
if read_from_client_file {
let path = Path::new(client_ids_and_stake_file);

View File

@ -1,9 +1,11 @@
#![allow(clippy::integer_arithmetic)]
use {
clap::value_t,
log::*,
solana_bench_tps::{
bench::do_bench_tps,
bench_tps_client::BenchTpsClient,
cli::{self, ExternalClientType},
keypairs::get_keypairs,
send_batch::generate_keypairs,
@ -17,15 +19,115 @@ use {
solana_genesis::Base64Account,
solana_gossip::gossip_service::{discover_cluster, get_client, get_multi_client},
solana_sdk::{
commitment_config::CommitmentConfig, fee_calculator::FeeRateGovernor, system_program,
commitment_config::CommitmentConfig, fee_calculator::FeeRateGovernor, pubkey::Pubkey,
system_program,
},
solana_streamer::socket::SocketAddrSpace,
std::{collections::HashMap, fs::File, io::prelude::*, path::Path, process::exit, sync::Arc},
std::{
collections::HashMap, fs::File, io::prelude::*, net::SocketAddr, path::Path, process::exit,
sync::Arc,
},
};
/// Number of signatures for all transactions in ~1 week at ~100K TPS
pub const NUM_SIGNATURES_FOR_TXS: u64 = 100_000 * 60 * 60 * 24 * 7;
#[allow(clippy::too_many_arguments)]
fn create_client(
external_client_type: &ExternalClientType,
entrypoint_addr: &SocketAddr,
json_rpc_url: &str,
websocket_url: &str,
multi_client: bool,
use_quic: bool,
tpu_connection_pool_size: usize,
rpc_tpu_sockets: Option<(SocketAddr, SocketAddr)>,
num_nodes: usize,
target_node: Option<Pubkey>,
) -> Arc<dyn BenchTpsClient + Send + Sync> {
match external_client_type {
ExternalClientType::RpcClient => Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
)),
ExternalClientType::ThinClient => {
let connection_cache = match use_quic {
true => Arc::new(ConnectionCache::new(tpu_connection_pool_size)),
false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)),
};
if let Some((rpc, tpu)) = rpc_tpu_sockets {
Arc::new(ThinClient::new(rpc, tpu, connection_cache))
} else {
let nodes =
discover_cluster(entrypoint_addr, num_nodes, SocketAddrSpace::Unspecified)
.unwrap_or_else(|err| {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1);
});
if multi_client {
let (client, num_clients) =
get_multi_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache);
if nodes.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes
);
exit(1);
}
Arc::new(client)
} else if let Some(target_node) = target_node {
info!("Searching for target_node: {:?}", target_node);
let mut target_client = None;
for node in nodes {
if node.id == target_node {
target_client = Some(get_client(
&[node],
&SocketAddrSpace::Unspecified,
connection_cache,
));
break;
}
}
Arc::new(target_client.unwrap_or_else(|| {
eprintln!("Target node {} not found", target_node);
exit(1);
}))
} else {
Arc::new(get_client(
&nodes,
&SocketAddrSpace::Unspecified,
connection_cache,
))
}
}
}
ExternalClientType::TpuClient => {
let rpc_client = Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
));
let connection_cache = match use_quic {
true => ConnectionCache::new(tpu_connection_pool_size),
false => ConnectionCache::with_udp(tpu_connection_pool_size),
};
Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
TpuClientConfig::default(),
Arc::new(connection_cache),
)
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {:?}", err);
exit(1);
}),
)
}
}
}
fn main() {
solana_logger::setup_with_default("solana=info");
solana_metrics::set_panic_hook("bench-tps", /*version:*/ None);
@ -86,124 +188,43 @@ fn main() {
}
info!("Connecting to the cluster");
match external_client_type {
ExternalClientType::RpcClient => {
let client = Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
));
let keypairs = get_keypairs(
client.clone(),
id,
keypair_count,
*num_lamports_per_account,
client_ids_and_stake_file,
*read_from_client_file,
);
do_bench_tps(client, cli_config, keypairs);
}
ExternalClientType::ThinClient => {
let connection_cache = match use_quic {
true => Arc::new(ConnectionCache::new(*tpu_connection_pool_size)),
false => Arc::new(ConnectionCache::with_udp(*tpu_connection_pool_size)),
};
let client = if let Ok(rpc_addr) = value_t!(matches, "rpc_addr", String) {
let rpc = rpc_addr.parse().unwrap_or_else(|e| {
eprintln!("RPC address should parse as socketaddr {:?}", e);
let rpc_tpu_sockets: Option<(SocketAddr, SocketAddr)> =
if let Ok(rpc_addr) = value_t!(matches, "rpc_addr", String) {
let rpc = rpc_addr.parse().unwrap_or_else(|e| {
eprintln!("RPC address should parse as socketaddr {:?}", e);
exit(1);
});
let tpu = value_t!(matches, "tpu_addr", String)
.unwrap()
.parse()
.unwrap_or_else(|e| {
eprintln!("TPU address should parse to a socket: {:?}", e);
exit(1);
});
let tpu = value_t!(matches, "tpu_addr", String)
.unwrap()
.parse()
.unwrap_or_else(|e| {
eprintln!("TPU address should parse to a socket: {:?}", e);
exit(1);
});
Some((rpc, tpu))
} else {
None
};
ThinClient::new(rpc, tpu, connection_cache)
} else {
let nodes =
discover_cluster(entrypoint_addr, *num_nodes, SocketAddrSpace::Unspecified)
.unwrap_or_else(|err| {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1);
});
if *multi_client {
let (client, num_clients) =
get_multi_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache);
if nodes.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes
);
exit(1);
}
client
} else if let Some(target_node) = target_node {
info!("Searching for target_node: {:?}", target_node);
let mut target_client = None;
for node in nodes {
if node.id == *target_node {
target_client = Some(get_client(
&[node],
&SocketAddrSpace::Unspecified,
connection_cache,
));
break;
}
}
target_client.unwrap_or_else(|| {
eprintln!("Target node {} not found", target_node);
exit(1);
})
} else {
get_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache)
}
};
let client = Arc::new(client);
let keypairs = get_keypairs(
client.clone(),
id,
keypair_count,
*num_lamports_per_account,
client_ids_and_stake_file,
*read_from_client_file,
);
do_bench_tps(client, cli_config, keypairs);
}
ExternalClientType::TpuClient => {
let rpc_client = Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
));
let connection_cache = match use_quic {
true => ConnectionCache::new(*tpu_connection_pool_size),
false => ConnectionCache::with_udp(*tpu_connection_pool_size),
};
let client = Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
TpuClientConfig::default(),
Arc::new(connection_cache),
)
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {:?}", err);
exit(1);
}),
);
let keypairs = get_keypairs(
client.clone(),
id,
keypair_count,
*num_lamports_per_account,
client_ids_and_stake_file,
*read_from_client_file,
);
do_bench_tps(client, cli_config, keypairs);
}
}
let client = create_client(
external_client_type,
entrypoint_addr,
json_rpc_url,
websocket_url,
*multi_client,
*use_quic,
*tpu_connection_pool_size,
rpc_tpu_sockets,
*num_nodes,
*target_node,
);
let keypairs = get_keypairs(
client.clone(),
id,
keypair_count,
*num_lamports_per_account,
client_ids_and_stake_file,
*read_from_client_file,
);
do_bench_tps(client, cli_config, keypairs);
}

View File

@ -28,7 +28,7 @@ pub fn sample_txs<T>(
sample_period: u64,
client: &Arc<T>,
) where
T: BenchTpsClient,
T: BenchTpsClient + ?Sized,
{
let mut max_tps = 0.0;
let mut total_elapsed;

View File

@ -27,7 +27,7 @@ use {
},
};
pub fn get_latest_blockhash<T: BenchTpsClient>(client: &T) -> Hash {
pub fn get_latest_blockhash<T: BenchTpsClient + ?Sized>(client: &T) -> Hash {
loop {
match client.get_latest_blockhash_with_commitment(CommitmentConfig::processed()) {
Ok((blockhash, _)) => return blockhash,
@ -58,7 +58,7 @@ pub fn generate_keypairs(seed_keypair: &Keypair, count: u64) -> (Vec<Keypair>, u
/// fund the dests keys by spending all of the source keys into MAX_SPENDS_PER_TX
/// on every iteration. This allows us to replay the transfers because the source is either empty,
/// or full
pub fn fund_keys<T: 'static + BenchTpsClient + Send + Sync>(
pub fn fund_keys<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
client: Arc<T>,
source: &Keypair,
dests: &[Keypair],
@ -96,7 +96,7 @@ pub fn fund_keys<T: 'static + BenchTpsClient + Send + Sync>(
}
}
pub fn generate_durable_nonce_accounts<T: 'static + BenchTpsClient + Send + Sync>(
pub fn generate_durable_nonce_accounts<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
client: Arc<T>,
authority_keypairs: &[Keypair],
) -> Vec<Keypair> {
@ -122,7 +122,7 @@ pub fn generate_durable_nonce_accounts<T: 'static + BenchTpsClient + Send + Sync
nonce_keypairs
}
pub fn withdraw_durable_nonce_accounts<T: 'static + BenchTpsClient + Send + Sync>(
pub fn withdraw_durable_nonce_accounts<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
client: Arc<T>,
authority_keypairs: &[Keypair],
nonce_keypairs: &[Keypair],
@ -145,7 +145,7 @@ const MAX_SPENDS_PER_TX: u64 = 4;
// assume 4MB network buffers, and 512 byte packets
const FUND_CHUNK_LEN: usize = 4 * 1024 * 1024 / 512;
fn verify_funding_transfer<T: BenchTpsClient>(
fn verify_funding_transfer<T: BenchTpsClient + ?Sized>(
client: &Arc<T>,
tx: &Transaction,
amount: u64,
@ -169,11 +169,11 @@ trait SendBatchTransactions<'a, T: Sliceable + Send + Sync> {
);
fn send_transactions<C, F>(&mut self, client: &Arc<C>, to_lamports: u64, log_progress: F)
where
C: 'static + BenchTpsClient + Send + Sync,
C: 'static + BenchTpsClient + Send + Sync + ?Sized,
F: Fn(usize, usize);
fn sign(&mut self, blockhash: Hash);
fn send<C: BenchTpsClient>(&self, client: &Arc<C>);
fn verify<C: 'static + BenchTpsClient + Send + Sync>(
fn send<C: BenchTpsClient + ?Sized>(&self, client: &Arc<C>);
fn verify<C: 'static + BenchTpsClient + Send + Sync + ?Sized>(
&mut self,
client: &Arc<C>,
to_lamports: u64,
@ -207,7 +207,7 @@ where
fn send_transactions<C, F>(&mut self, client: &Arc<C>, to_lamports: u64, log_progress: F)
where
C: 'static + BenchTpsClient + Send + Sync,
C: 'static + BenchTpsClient + Send + Sync + ?Sized,
F: Fn(usize, usize),
{
let mut tries: usize = 0;
@ -241,7 +241,7 @@ where
debug!("sign {} txs: {}us", self.len(), sign_txs.as_us());
}
fn send<C: BenchTpsClient>(&self, client: &Arc<C>) {
fn send<C: BenchTpsClient + ?Sized>(&self, client: &Arc<C>) {
let mut send_txs = Measure::start("send_and_clone_txs");
let batch: Vec<_> = self.iter().map(|(_keypair, tx)| tx.clone()).collect();
client.send_batch(batch).expect("transfer");
@ -249,7 +249,7 @@ where
debug!("send {} {}", self.len(), send_txs);
}
fn verify<C: 'static + BenchTpsClient + Send + Sync>(
fn verify<C: 'static + BenchTpsClient + Send + Sync + ?Sized>(
&mut self,
client: &Arc<C>,
to_lamports: u64,
@ -341,7 +341,7 @@ impl<'a> Sliceable for FundingSigners<'a> {
}
trait FundingTransactions<'a>: SendBatchTransactions<'a, FundingSigners<'a>> {
fn fund<T: 'static + BenchTpsClient + Send + Sync>(
fn fund<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
&mut self,
client: &Arc<T>,
to_fund: &FundingChunk<'a>,
@ -350,7 +350,7 @@ trait FundingTransactions<'a>: SendBatchTransactions<'a, FundingSigners<'a>> {
}
impl<'a> FundingTransactions<'a> for FundingContainer<'a> {
fn fund<T: 'static + BenchTpsClient + Send + Sync>(
fn fund<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
&mut self,
client: &Arc<T>,
to_fund: &FundingChunk<'a>,
@ -396,7 +396,7 @@ impl<'a> Sliceable for NonceCreateSigners<'a> {
}
trait NonceTransactions<'a>: SendBatchTransactions<'a, NonceCreateSigners<'a>> {
fn create_accounts<T: 'static + BenchTpsClient + Send + Sync>(
fn create_accounts<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
&mut self,
client: &Arc<T>,
to_fund: &'a NonceCreateChunk<'a>,
@ -405,7 +405,7 @@ trait NonceTransactions<'a>: SendBatchTransactions<'a, NonceCreateSigners<'a>> {
}
impl<'a> NonceTransactions<'a> for NonceCreateContainer<'a> {
fn create_accounts<T: 'static + BenchTpsClient + Send + Sync>(
fn create_accounts<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
&mut self,
client: &Arc<T>,
to_fund: &'a NonceCreateChunk<'a>,
@ -453,14 +453,14 @@ impl<'a> Sliceable for NonceWithdrawSigners<'a> {
}
trait NonceWithdrawTransactions<'a>: SendBatchTransactions<'a, NonceWithdrawSigners<'a>> {
fn withdraw_accounts<T: 'static + BenchTpsClient + Send + Sync>(
fn withdraw_accounts<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
&mut self,
client: &Arc<T>,
to_withdraw: &'a NonceWithdrawChunk<'a>,
);
}
impl<'a> NonceWithdrawTransactions<'a> for NonceWithdrawContainer<'a> {
fn withdraw_accounts<T: 'static + BenchTpsClient + Send + Sync>(
fn withdraw_accounts<T: 'static + BenchTpsClient + Send + Sync + ?Sized>(
&mut self,
client: &Arc<T>,
to_withdraw: &'a NonceWithdrawChunk<'a>,