Add BenchTpsClient trait (#24208)
* Add BenchTpsClient * Impl BenchTpsClient for used clients * Use BenchTpsClient in do_bench * Update integration test to use faucet via rpc * Support keypairs from file that are not prefunded * Remove old perf-utils
This commit is contained in:
parent
c0019edf00
commit
3871c85fd7
|
@ -4446,10 +4446,12 @@ dependencies = [
|
|||
"solana-measure",
|
||||
"solana-metrics",
|
||||
"solana-net-utils",
|
||||
"solana-rpc",
|
||||
"solana-runtime",
|
||||
"solana-sdk",
|
||||
"solana-streamer",
|
||||
"solana-version",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -24,10 +24,12 @@ solana-logger = { path = "../logger", version = "=1.11.0" }
|
|||
solana-measure = { path = "../measure", version = "=1.11.0" }
|
||||
solana-metrics = { path = "../metrics", version = "=1.11.0" }
|
||||
solana-net-utils = { path = "../net-utils", version = "=1.11.0" }
|
||||
solana-rpc = { path = "../rpc", version = "=1.11.0" }
|
||||
solana-runtime = { path = "../runtime", version = "=1.11.0" }
|
||||
solana-sdk = { path = "../sdk", version = "=1.11.0" }
|
||||
solana-streamer = { path = "../streamer", version = "=1.11.0" }
|
||||
solana-version = { path = "../version", version = "=1.11.0" }
|
||||
thiserror = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
serial_test = "0.6.0"
|
||||
|
|
|
@ -1,19 +1,21 @@
|
|||
use {
|
||||
crate::cli::Config,
|
||||
crate::{
|
||||
bench_tps_client::*,
|
||||
cli::Config,
|
||||
perf_utils::{sample_txs, SampleStats},
|
||||
},
|
||||
log::*,
|
||||
rayon::prelude::*,
|
||||
solana_client::perf_utils::{sample_txs, SampleStats},
|
||||
solana_core::gen_keys::GenKeys,
|
||||
solana_faucet::faucet::request_airdrop_transaction,
|
||||
solana_measure::measure::Measure,
|
||||
solana_metrics::{self, datapoint_info},
|
||||
solana_sdk::{
|
||||
client::Client,
|
||||
clock::{DEFAULT_MS_PER_SLOT, DEFAULT_S_PER_SLOT, MAX_PROCESSING_AGE},
|
||||
commitment_config::CommitmentConfig,
|
||||
hash::Hash,
|
||||
instruction::{AccountMeta, Instruction},
|
||||
message::Message,
|
||||
native_token::Sol,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signer},
|
||||
system_instruction, system_transaction,
|
||||
|
@ -22,7 +24,6 @@ use {
|
|||
},
|
||||
std::{
|
||||
collections::{HashSet, VecDeque},
|
||||
net::SocketAddr,
|
||||
process::exit,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering},
|
||||
|
@ -38,16 +39,9 @@ const MAX_TX_QUEUE_AGE: u64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) a
|
|||
|
||||
pub const MAX_SPENDS_PER_TX: u64 = 4;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum BenchTpsError {
|
||||
AirdropFailure,
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, BenchTpsError>;
|
||||
|
||||
pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<(Transaction, u64)>>>>;
|
||||
|
||||
fn get_latest_blockhash<T: Client>(client: &T) -> Hash {
|
||||
fn get_latest_blockhash<T: BenchTpsClient>(client: &T) -> Hash {
|
||||
loop {
|
||||
match client.get_latest_blockhash_with_commitment(CommitmentConfig::processed()) {
|
||||
Ok((blockhash, _)) => return blockhash,
|
||||
|
@ -61,7 +55,7 @@ fn get_latest_blockhash<T: Client>(client: &T) -> Hash {
|
|||
|
||||
fn wait_for_target_slots_per_epoch<T>(target_slots_per_epoch: u64, client: &Arc<T>)
|
||||
where
|
||||
T: 'static + Client + Send + Sync,
|
||||
T: 'static + BenchTpsClient + Send + Sync,
|
||||
{
|
||||
if target_slots_per_epoch != 0 {
|
||||
info!(
|
||||
|
@ -91,7 +85,7 @@ fn create_sampler_thread<T>(
|
|||
maxes: &Arc<RwLock<Vec<(String, SampleStats)>>>,
|
||||
) -> JoinHandle<()>
|
||||
where
|
||||
T: 'static + Client + Send + Sync,
|
||||
T: 'static + BenchTpsClient + Send + Sync,
|
||||
{
|
||||
info!("Sampling TPS every {} second...", sample_period);
|
||||
let exit_signal = exit_signal.clone();
|
||||
|
@ -169,7 +163,7 @@ fn create_sender_threads<T>(
|
|||
shared_tx_active_thread_count: &Arc<AtomicIsize>,
|
||||
) -> Vec<JoinHandle<()>>
|
||||
where
|
||||
T: 'static + Client + Send + Sync,
|
||||
T: 'static + BenchTpsClient + Send + Sync,
|
||||
{
|
||||
(0..threads)
|
||||
.map(|_| {
|
||||
|
@ -197,7 +191,7 @@ where
|
|||
|
||||
pub fn do_bench_tps<T>(client: Arc<T>, config: Config, gen_keypairs: Vec<Keypair>) -> u64
|
||||
where
|
||||
T: 'static + Client + Send + Sync,
|
||||
T: 'static + BenchTpsClient + Send + Sync,
|
||||
{
|
||||
let Config {
|
||||
id,
|
||||
|
@ -391,7 +385,7 @@ fn generate_txs(
|
|||
}
|
||||
}
|
||||
|
||||
fn get_new_latest_blockhash<T: Client>(client: &Arc<T>, blockhash: &Hash) -> Option<Hash> {
|
||||
fn get_new_latest_blockhash<T: BenchTpsClient>(client: &Arc<T>, blockhash: &Hash) -> Option<Hash> {
|
||||
let start = Instant::now();
|
||||
while start.elapsed().as_secs() < 5 {
|
||||
if let Ok(new_blockhash) = client.get_latest_blockhash() {
|
||||
|
@ -407,7 +401,7 @@ fn get_new_latest_blockhash<T: Client>(client: &Arc<T>, blockhash: &Hash) -> Opt
|
|||
None
|
||||
}
|
||||
|
||||
fn poll_blockhash<T: Client>(
|
||||
fn poll_blockhash<T: BenchTpsClient>(
|
||||
exit_signal: &Arc<AtomicBool>,
|
||||
blockhash: &Arc<RwLock<Hash>>,
|
||||
client: &Arc<T>,
|
||||
|
@ -449,7 +443,7 @@ fn poll_blockhash<T: Client>(
|
|||
}
|
||||
}
|
||||
|
||||
fn do_tx_transfers<T: Client>(
|
||||
fn do_tx_transfers<T: BenchTpsClient>(
|
||||
exit_signal: &Arc<AtomicBool>,
|
||||
shared_txs: &SharedTransactions,
|
||||
shared_tx_thread_count: &Arc<AtomicIsize>,
|
||||
|
@ -467,11 +461,7 @@ fn do_tx_transfers<T: Client>(
|
|||
};
|
||||
if let Some(txs0) = txs {
|
||||
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
|
||||
info!(
|
||||
"Transferring 1 unit {} times... to {}",
|
||||
txs0.len(),
|
||||
client.as_ref().tpu_addr(),
|
||||
);
|
||||
info!("Transferring 1 unit {} times...", txs0.len());
|
||||
let tx_len = txs0.len();
|
||||
let transfer_start = Instant::now();
|
||||
let mut old_transactions = false;
|
||||
|
@ -487,7 +477,7 @@ fn do_tx_transfers<T: Client>(
|
|||
transactions.push(tx.0);
|
||||
}
|
||||
|
||||
if let Err(error) = client.async_send_batch(transactions) {
|
||||
if let Err(error) = client.send_batch(transactions) {
|
||||
warn!("send_batch_sync in do_tx_transfers failed: {}", error);
|
||||
}
|
||||
|
||||
|
@ -514,7 +504,11 @@ fn do_tx_transfers<T: Client>(
|
|||
}
|
||||
}
|
||||
|
||||
fn verify_funding_transfer<T: Client>(client: &Arc<T>, tx: &Transaction, amount: u64) -> bool {
|
||||
fn verify_funding_transfer<T: BenchTpsClient>(
|
||||
client: &Arc<T>,
|
||||
tx: &Transaction,
|
||||
amount: u64,
|
||||
) -> bool {
|
||||
for a in &tx.message().account_keys[1..] {
|
||||
match client.get_balance_with_commitment(a, CommitmentConfig::processed()) {
|
||||
Ok(balance) => return balance >= amount,
|
||||
|
@ -525,7 +519,7 @@ fn verify_funding_transfer<T: Client>(client: &Arc<T>, tx: &Transaction, amount:
|
|||
}
|
||||
|
||||
trait FundingTransactions<'a> {
|
||||
fn fund<T: 'static + Client + Send + Sync>(
|
||||
fn fund<T: 'static + BenchTpsClient + Send + Sync>(
|
||||
&mut self,
|
||||
client: &Arc<T>,
|
||||
to_fund: &[(&'a Keypair, Vec<(Pubkey, u64)>)],
|
||||
|
@ -533,12 +527,16 @@ trait FundingTransactions<'a> {
|
|||
);
|
||||
fn make(&mut self, to_fund: &[(&'a Keypair, Vec<(Pubkey, u64)>)]);
|
||||
fn sign(&mut self, blockhash: Hash);
|
||||
fn send<T: Client>(&self, client: &Arc<T>);
|
||||
fn verify<T: 'static + Client + Send + Sync>(&mut self, client: &Arc<T>, to_lamports: u64);
|
||||
fn send<T: BenchTpsClient>(&self, client: &Arc<T>);
|
||||
fn verify<T: 'static + BenchTpsClient + Send + Sync>(
|
||||
&mut self,
|
||||
client: &Arc<T>,
|
||||
to_lamports: u64,
|
||||
);
|
||||
}
|
||||
|
||||
impl<'a> FundingTransactions<'a> for Vec<(&'a Keypair, Transaction)> {
|
||||
fn fund<T: 'static + Client + Send + Sync>(
|
||||
fn fund<T: 'static + BenchTpsClient + Send + Sync>(
|
||||
&mut self,
|
||||
client: &Arc<T>,
|
||||
to_fund: &[(&'a Keypair, Vec<(Pubkey, u64)>)],
|
||||
|
@ -607,16 +605,20 @@ impl<'a> FundingTransactions<'a> for Vec<(&'a Keypair, Transaction)> {
|
|||
debug!("sign {} txs: {}us", self.len(), sign_txs.as_us());
|
||||
}
|
||||
|
||||
fn send<T: Client>(&self, client: &Arc<T>) {
|
||||
fn send<T: BenchTpsClient>(&self, client: &Arc<T>) {
|
||||
let mut send_txs = Measure::start("send_txs");
|
||||
self.iter().for_each(|(_, tx)| {
|
||||
client.async_send_transaction(tx.clone()).expect("transfer");
|
||||
client.send_transaction(tx.clone()).expect("transfer");
|
||||
});
|
||||
send_txs.stop();
|
||||
debug!("send {} txs: {}us", self.len(), send_txs.as_us());
|
||||
}
|
||||
|
||||
fn verify<T: 'static + Client + Send + Sync>(&mut self, client: &Arc<T>, to_lamports: u64) {
|
||||
fn verify<T: 'static + BenchTpsClient + Send + Sync>(
|
||||
&mut self,
|
||||
client: &Arc<T>,
|
||||
to_lamports: u64,
|
||||
) {
|
||||
let starting_txs = self.len();
|
||||
let verified_txs = Arc::new(AtomicUsize::new(0));
|
||||
let too_many_failures = Arc::new(AtomicBool::new(false));
|
||||
|
@ -691,7 +693,7 @@ impl<'a> FundingTransactions<'a> for Vec<(&'a Keypair, Transaction)> {
|
|||
/// fund the dests keys by spending all of the source keys into MAX_SPENDS_PER_TX
|
||||
/// on every iteration. This allows us to replay the transfers because the source is either empty,
|
||||
/// or full
|
||||
pub fn fund_keys<T: 'static + Client + Send + Sync>(
|
||||
pub fn fund_keys<T: 'static + BenchTpsClient + Send + Sync>(
|
||||
client: Arc<T>,
|
||||
source: &Keypair,
|
||||
dests: &[Keypair],
|
||||
|
@ -733,75 +735,6 @@ pub fn fund_keys<T: 'static + Client + Send + Sync>(
|
|||
}
|
||||
}
|
||||
|
||||
pub fn airdrop_lamports<T: Client>(
|
||||
client: &T,
|
||||
faucet_addr: &SocketAddr,
|
||||
id: &Keypair,
|
||||
desired_balance: u64,
|
||||
) -> Result<()> {
|
||||
let starting_balance = client.get_balance(&id.pubkey()).unwrap_or(0);
|
||||
metrics_submit_lamport_balance(starting_balance);
|
||||
info!("starting balance {}", starting_balance);
|
||||
|
||||
if starting_balance < desired_balance {
|
||||
let airdrop_amount = desired_balance - starting_balance;
|
||||
info!(
|
||||
"Airdropping {:?} lamports from {} for {}",
|
||||
airdrop_amount,
|
||||
faucet_addr,
|
||||
id.pubkey(),
|
||||
);
|
||||
|
||||
let blockhash = get_latest_blockhash(client);
|
||||
match request_airdrop_transaction(faucet_addr, &id.pubkey(), airdrop_amount, blockhash) {
|
||||
Ok(transaction) => {
|
||||
let mut tries = 0;
|
||||
loop {
|
||||
tries += 1;
|
||||
let signature = client.async_send_transaction(transaction.clone()).unwrap();
|
||||
let result = client.poll_for_signature_confirmation(&signature, 1);
|
||||
|
||||
if result.is_ok() {
|
||||
break;
|
||||
}
|
||||
if tries >= 5 {
|
||||
panic!(
|
||||
"Error requesting airdrop: to addr: {:?} amount: {} {:?}",
|
||||
faucet_addr, airdrop_amount, result
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
panic!(
|
||||
"Error requesting airdrop: {:?} to addr: {:?} amount: {}",
|
||||
err, faucet_addr, airdrop_amount
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let current_balance = client
|
||||
.get_balance_with_commitment(&id.pubkey(), CommitmentConfig::processed())
|
||||
.unwrap_or_else(|e| {
|
||||
info!("airdrop error {}", e);
|
||||
starting_balance
|
||||
});
|
||||
info!("current balance {}...", current_balance);
|
||||
|
||||
metrics_submit_lamport_balance(current_balance);
|
||||
if current_balance - starting_balance != airdrop_amount {
|
||||
info!(
|
||||
"Airdrop failed! {} {} {}",
|
||||
id.pubkey(),
|
||||
current_balance,
|
||||
starting_balance
|
||||
);
|
||||
return Err(BenchTpsError::AirdropFailure);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compute_and_report_stats(
|
||||
maxes: &Arc<RwLock<Vec<(String, SampleStats)>>>,
|
||||
sample_period: u64,
|
||||
|
@ -885,15 +818,33 @@ pub fn generate_keypairs(seed_keypair: &Keypair, count: u64) -> (Vec<Keypair>, u
|
|||
(rnd.gen_n_keypairs(total_keys), extra)
|
||||
}
|
||||
|
||||
pub fn generate_and_fund_keypairs<T: 'static + Client + Send + Sync>(
|
||||
pub fn generate_and_fund_keypairs<T: 'static + BenchTpsClient + Send + Sync>(
|
||||
client: Arc<T>,
|
||||
faucet_addr: Option<SocketAddr>,
|
||||
funding_key: &Keypair,
|
||||
keypair_count: usize,
|
||||
lamports_per_account: u64,
|
||||
) -> Result<Vec<Keypair>> {
|
||||
let rent = client.get_minimum_balance_for_rent_exemption(0)?;
|
||||
let lamports_per_account = lamports_per_account + rent;
|
||||
|
||||
info!("Creating {} keypairs...", keypair_count);
|
||||
let (mut keypairs, extra) = generate_keypairs(funding_key, keypair_count as u64);
|
||||
fund_keypairs(client, funding_key, &keypairs, extra, lamports_per_account)?;
|
||||
|
||||
// 'generate_keypairs' generates extra keys to be able to have size-aligned funding batches for fund_keys.
|
||||
keypairs.truncate(keypair_count);
|
||||
|
||||
Ok(keypairs)
|
||||
}
|
||||
|
||||
pub fn fund_keypairs<T: 'static + BenchTpsClient + Send + Sync>(
|
||||
client: Arc<T>,
|
||||
funding_key: &Keypair,
|
||||
keypairs: &[Keypair],
|
||||
extra: u64,
|
||||
lamports_per_account: u64,
|
||||
) -> Result<()> {
|
||||
let rent = client.get_minimum_balance_for_rent_exemption(0)?;
|
||||
info!("Get lamports...");
|
||||
|
||||
// Sample the first keypair, to prevent lamport loss on repeated solana-bench-tps executions
|
||||
|
@ -901,7 +852,7 @@ pub fn generate_and_fund_keypairs<T: 'static + Client + Send + Sync>(
|
|||
let first_keypair_balance = client.get_balance(&first_key).unwrap_or(0);
|
||||
|
||||
// Sample the last keypair, to check if funding was already completed
|
||||
let last_key = keypairs[keypair_count - 1].pubkey();
|
||||
let last_key = keypairs[keypairs.len() - 1].pubkey();
|
||||
let last_keypair_balance = client.get_balance(&last_key).unwrap_or(0);
|
||||
|
||||
// Repeated runs will eat up keypair balances from transaction fees. In order to quickly
|
||||
|
@ -930,24 +881,35 @@ pub fn generate_and_fund_keypairs<T: 'static + Client + Send + Sync>(
|
|||
funding_key_balance, max_fee, lamports_per_account, extra, total
|
||||
);
|
||||
|
||||
if client.get_balance(&funding_key.pubkey()).unwrap_or(0) < total {
|
||||
airdrop_lamports(client.as_ref(), &faucet_addr.unwrap(), funding_key, total)?;
|
||||
if funding_key_balance < total + rent {
|
||||
error!(
|
||||
"funder has {}, needed {}",
|
||||
Sol(funding_key_balance),
|
||||
Sol(total)
|
||||
);
|
||||
let latest_blockhash = get_latest_blockhash(client.as_ref());
|
||||
if client
|
||||
.request_airdrop_with_blockhash(
|
||||
&funding_key.pubkey(),
|
||||
total + rent - funding_key_balance,
|
||||
&latest_blockhash,
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
return Err(BenchTpsError::AirdropFailure);
|
||||
}
|
||||
}
|
||||
|
||||
fund_keys(
|
||||
client,
|
||||
funding_key,
|
||||
&keypairs,
|
||||
keypairs,
|
||||
total,
|
||||
max_fee,
|
||||
lamports_per_account,
|
||||
);
|
||||
}
|
||||
|
||||
// 'generate_keypairs' generates extra keys to be able to have size-aligned funding batches for fund_keys.
|
||||
keypairs.truncate(keypair_count);
|
||||
|
||||
Ok(keypairs)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -956,14 +918,14 @@ mod tests {
|
|||
super::*,
|
||||
solana_runtime::{bank::Bank, bank_client::BankClient},
|
||||
solana_sdk::{
|
||||
client::SyncClient, fee_calculator::FeeRateGovernor,
|
||||
genesis_config::create_genesis_config,
|
||||
fee_calculator::FeeRateGovernor, genesis_config::create_genesis_config,
|
||||
native_token::sol_to_lamports,
|
||||
},
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_bench_tps_bank_client() {
|
||||
let (genesis_config, id) = create_genesis_config(10_000);
|
||||
let (genesis_config, id) = create_genesis_config(sol_to_lamports(10_000.0));
|
||||
let bank = Bank::new_for_tests(&genesis_config);
|
||||
let client = Arc::new(BankClient::new(bank));
|
||||
|
||||
|
@ -976,48 +938,49 @@ mod tests {
|
|||
|
||||
let keypair_count = config.tx_count * config.keypair_multiplier;
|
||||
let keypairs =
|
||||
generate_and_fund_keypairs(client.clone(), None, &config.id, keypair_count, 20)
|
||||
.unwrap();
|
||||
generate_and_fund_keypairs(client.clone(), &config.id, keypair_count, 20).unwrap();
|
||||
|
||||
do_bench_tps(client, config, keypairs);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bench_tps_fund_keys() {
|
||||
let (genesis_config, id) = create_genesis_config(10_000);
|
||||
let (genesis_config, id) = create_genesis_config(sol_to_lamports(10_000.0));
|
||||
let bank = Bank::new_for_tests(&genesis_config);
|
||||
let client = Arc::new(BankClient::new(bank));
|
||||
let keypair_count = 20;
|
||||
let lamports = 20;
|
||||
let rent = client.get_minimum_balance_for_rent_exemption(0).unwrap();
|
||||
|
||||
let keypairs =
|
||||
generate_and_fund_keypairs(client.clone(), None, &id, keypair_count, lamports).unwrap();
|
||||
generate_and_fund_keypairs(client.clone(), &id, keypair_count, lamports).unwrap();
|
||||
|
||||
for kp in &keypairs {
|
||||
assert_eq!(
|
||||
client
|
||||
.get_balance_with_commitment(&kp.pubkey(), CommitmentConfig::processed())
|
||||
.unwrap(),
|
||||
lamports
|
||||
lamports + rent
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bench_tps_fund_keys_with_fees() {
|
||||
let (mut genesis_config, id) = create_genesis_config(10_000);
|
||||
let (mut genesis_config, id) = create_genesis_config(sol_to_lamports(10_000.0));
|
||||
let fee_rate_governor = FeeRateGovernor::new(11, 0);
|
||||
genesis_config.fee_rate_governor = fee_rate_governor;
|
||||
let bank = Bank::new_for_tests(&genesis_config);
|
||||
let client = Arc::new(BankClient::new(bank));
|
||||
let keypair_count = 20;
|
||||
let lamports = 20;
|
||||
let rent = client.get_minimum_balance_for_rent_exemption(0).unwrap();
|
||||
|
||||
let keypairs =
|
||||
generate_and_fund_keypairs(client.clone(), None, &id, keypair_count, lamports).unwrap();
|
||||
generate_and_fund_keypairs(client.clone(), &id, keypair_count, lamports).unwrap();
|
||||
|
||||
for kp in &keypairs {
|
||||
assert_eq!(client.get_balance(&kp.pubkey()).unwrap(), lamports);
|
||||
assert_eq!(client.get_balance(&kp.pubkey()).unwrap(), lamports + rent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
use {
|
||||
solana_client::{client_error::ClientError, tpu_client::TpuSenderError},
|
||||
solana_sdk::{
|
||||
commitment_config::CommitmentConfig, epoch_info::EpochInfo, hash::Hash, message::Message,
|
||||
pubkey::Pubkey, signature::Signature, transaction::Transaction, transport::TransportError,
|
||||
},
|
||||
thiserror::Error,
|
||||
};
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum BenchTpsError {
|
||||
#[error("Airdrop failure")]
|
||||
AirdropFailure,
|
||||
#[error("IO error: {0:?}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("Client error: {0:?}")]
|
||||
ClientError(#[from] ClientError),
|
||||
#[error("TpuClient error: {0:?}")]
|
||||
TpuSenderError(#[from] TpuSenderError),
|
||||
#[error("Transport error: {0:?}")]
|
||||
TransportError(#[from] TransportError),
|
||||
#[error("Custom error: {0}")]
|
||||
Custom(String),
|
||||
}
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, BenchTpsError>;
|
||||
|
||||
pub trait BenchTpsClient {
|
||||
/// Send a signed transaction without confirmation
|
||||
fn send_transaction(&self, transaction: Transaction) -> Result<Signature>;
|
||||
|
||||
/// Send a batch of signed transactions without confirmation.
|
||||
fn send_batch(&self, transactions: Vec<Transaction>) -> Result<()>;
|
||||
|
||||
/// Get latest blockhash
|
||||
fn get_latest_blockhash(&self) -> Result<Hash>;
|
||||
|
||||
/// Get latest blockhash and its last valid block height, using explicit commitment
|
||||
fn get_latest_blockhash_with_commitment(
|
||||
&self,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> Result<(Hash, u64)>;
|
||||
|
||||
/// Get transaction count
|
||||
fn get_transaction_count(&self) -> Result<u64>;
|
||||
|
||||
/// Get transaction count, using explicit commitment
|
||||
fn get_transaction_count_with_commitment(
|
||||
&self,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> Result<u64>;
|
||||
|
||||
/// Get epoch info
|
||||
fn get_epoch_info(&self) -> Result<EpochInfo>;
|
||||
|
||||
/// Get account balance
|
||||
fn get_balance(&self, pubkey: &Pubkey) -> Result<u64>;
|
||||
|
||||
/// Get account balance, using explicit commitment
|
||||
fn get_balance_with_commitment(
|
||||
&self,
|
||||
pubkey: &Pubkey,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> Result<u64>;
|
||||
|
||||
/// Calculate the fee for a `Message`
|
||||
fn get_fee_for_message(&self, message: &Message) -> Result<u64>;
|
||||
|
||||
/// Get the rent-exempt minimum for an account
|
||||
fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> Result<u64>;
|
||||
|
||||
/// Return the address of client
|
||||
fn addr(&self) -> String;
|
||||
|
||||
/// Request, submit, and confirm an airdrop transaction
|
||||
fn request_airdrop_with_blockhash(
|
||||
&self,
|
||||
pubkey: &Pubkey,
|
||||
lamports: u64,
|
||||
recent_blockhash: &Hash,
|
||||
) -> Result<Signature>;
|
||||
}
|
||||
|
||||
mod bank_client;
|
||||
mod thin_client;
|
|
@ -0,0 +1,85 @@
|
|||
use {
|
||||
crate::bench_tps_client::{BenchTpsClient, BenchTpsError, Result},
|
||||
solana_runtime::bank_client::BankClient,
|
||||
solana_sdk::{
|
||||
client::{AsyncClient, SyncClient},
|
||||
commitment_config::CommitmentConfig,
|
||||
epoch_info::EpochInfo,
|
||||
hash::Hash,
|
||||
message::Message,
|
||||
pubkey::Pubkey,
|
||||
signature::Signature,
|
||||
transaction::Transaction,
|
||||
},
|
||||
};
|
||||
|
||||
impl BenchTpsClient for BankClient {
|
||||
fn send_transaction(&self, transaction: Transaction) -> Result<Signature> {
|
||||
AsyncClient::async_send_transaction(self, transaction).map_err(|err| err.into())
|
||||
}
|
||||
fn send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
|
||||
AsyncClient::async_send_batch(self, transactions).map_err(|err| err.into())
|
||||
}
|
||||
fn get_latest_blockhash(&self) -> Result<Hash> {
|
||||
SyncClient::get_latest_blockhash(self).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_latest_blockhash_with_commitment(
|
||||
&self,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> Result<(Hash, u64)> {
|
||||
SyncClient::get_latest_blockhash_with_commitment(self, commitment_config)
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_transaction_count(&self) -> Result<u64> {
|
||||
SyncClient::get_transaction_count(self).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_transaction_count_with_commitment(
|
||||
&self,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> Result<u64> {
|
||||
SyncClient::get_transaction_count_with_commitment(self, commitment_config)
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_epoch_info(&self) -> Result<EpochInfo> {
|
||||
SyncClient::get_epoch_info(self).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_balance(&self, pubkey: &Pubkey) -> Result<u64> {
|
||||
SyncClient::get_balance(self, pubkey).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_balance_with_commitment(
|
||||
&self,
|
||||
pubkey: &Pubkey,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> Result<u64> {
|
||||
SyncClient::get_balance_with_commitment(self, pubkey, commitment_config)
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_fee_for_message(&self, message: &Message) -> Result<u64> {
|
||||
SyncClient::get_fee_for_message(self, message).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> Result<u64> {
|
||||
SyncClient::get_minimum_balance_for_rent_exemption(self, data_len).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn addr(&self) -> String {
|
||||
"Local BankClient".to_string()
|
||||
}
|
||||
|
||||
fn request_airdrop_with_blockhash(
|
||||
&self,
|
||||
_pubkey: &Pubkey,
|
||||
_lamports: u64,
|
||||
_recent_blockhash: &Hash,
|
||||
) -> Result<Signature> {
|
||||
// BankClient doesn't support airdrops
|
||||
Err(BenchTpsError::AirdropFailure)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
use {
|
||||
crate::bench_tps_client::{BenchTpsClient, Result},
|
||||
solana_client::thin_client::ThinClient,
|
||||
solana_sdk::{
|
||||
client::{AsyncClient, Client, SyncClient},
|
||||
commitment_config::CommitmentConfig,
|
||||
epoch_info::EpochInfo,
|
||||
hash::Hash,
|
||||
message::Message,
|
||||
pubkey::Pubkey,
|
||||
signature::Signature,
|
||||
transaction::Transaction,
|
||||
},
|
||||
};
|
||||
|
||||
impl BenchTpsClient for ThinClient {
|
||||
fn send_transaction(&self, transaction: Transaction) -> Result<Signature> {
|
||||
AsyncClient::async_send_transaction(self, transaction).map_err(|err| err.into())
|
||||
}
|
||||
fn send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
|
||||
AsyncClient::async_send_batch(self, transactions).map_err(|err| err.into())
|
||||
}
|
||||
fn get_latest_blockhash(&self) -> Result<Hash> {
|
||||
SyncClient::get_latest_blockhash(self).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_latest_blockhash_with_commitment(
|
||||
&self,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> Result<(Hash, u64)> {
|
||||
SyncClient::get_latest_blockhash_with_commitment(self, commitment_config)
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_transaction_count(&self) -> Result<u64> {
|
||||
SyncClient::get_transaction_count(self).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_transaction_count_with_commitment(
|
||||
&self,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> Result<u64> {
|
||||
SyncClient::get_transaction_count_with_commitment(self, commitment_config)
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_epoch_info(&self) -> Result<EpochInfo> {
|
||||
SyncClient::get_epoch_info(self).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_balance(&self, pubkey: &Pubkey) -> Result<u64> {
|
||||
SyncClient::get_balance(self, pubkey).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_balance_with_commitment(
|
||||
&self,
|
||||
pubkey: &Pubkey,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> Result<u64> {
|
||||
SyncClient::get_balance_with_commitment(self, pubkey, commitment_config)
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_fee_for_message(&self, message: &Message) -> Result<u64> {
|
||||
SyncClient::get_fee_for_message(self, message).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> Result<u64> {
|
||||
SyncClient::get_minimum_balance_for_rent_exemption(self, data_len).map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn addr(&self) -> String {
|
||||
Client::tpu_addr(self)
|
||||
}
|
||||
|
||||
fn request_airdrop_with_blockhash(
|
||||
&self,
|
||||
pubkey: &Pubkey,
|
||||
lamports: u64,
|
||||
recent_blockhash: &Hash,
|
||||
) -> Result<Signature> {
|
||||
self.rpc_client()
|
||||
.request_airdrop_with_blockhash(pubkey, lamports, recent_blockhash)
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
}
|
|
@ -1,6 +1,5 @@
|
|||
use {
|
||||
clap::{crate_description, crate_name, App, Arg, ArgMatches},
|
||||
solana_faucet::faucet::FAUCET_PORT,
|
||||
solana_sdk::{
|
||||
fee_calculator::FeeRateGovernor,
|
||||
pubkey::Pubkey,
|
||||
|
@ -14,7 +13,6 @@ const NUM_LAMPORTS_PER_ACCOUNT_DEFAULT: u64 = solana_sdk::native_token::LAMPORTS
|
|||
/// Holds the configuration for a single run of the benchmark
|
||||
pub struct Config {
|
||||
pub entrypoint_addr: SocketAddr,
|
||||
pub faucet_addr: SocketAddr,
|
||||
pub id: Keypair,
|
||||
pub threads: usize,
|
||||
pub num_nodes: usize,
|
||||
|
@ -37,7 +35,6 @@ impl Default for Config {
|
|||
fn default() -> Config {
|
||||
Config {
|
||||
entrypoint_addr: SocketAddr::from(([127, 0, 0, 1], 8001)),
|
||||
faucet_addr: SocketAddr::from(([127, 0, 0, 1], FAUCET_PORT)),
|
||||
id: Keypair::new(),
|
||||
threads: 4,
|
||||
num_nodes: 1,
|
||||
|
@ -76,7 +73,8 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
|
|||
.long("faucet")
|
||||
.value_name("HOST:PORT")
|
||||
.takes_value(true)
|
||||
.help("Location of the faucet; defaults to entrypoint:FAUCET_PORT"),
|
||||
.hidden(true)
|
||||
.help("Deprecated. BenchTps no longer queries the faucet directly"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("identity")
|
||||
|
@ -208,13 +206,6 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
|
|||
});
|
||||
}
|
||||
|
||||
if let Some(addr) = matches.value_of("faucet") {
|
||||
args.faucet_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| {
|
||||
eprintln!("failed to parse faucet address: {}", e);
|
||||
exit(1)
|
||||
});
|
||||
}
|
||||
|
||||
if matches.is_present("identity") {
|
||||
args.id = read_keypair_file(matches.value_of("identity").unwrap())
|
||||
.expect("can't read client identity");
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
#![allow(clippy::integer_arithmetic)]
|
||||
pub mod bench;
|
||||
mod bench_tps_client;
|
||||
pub mod cli;
|
||||
mod perf_utils;
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
use {
|
||||
log::*,
|
||||
solana_bench_tps::{
|
||||
bench::{do_bench_tps, generate_and_fund_keypairs, generate_keypairs},
|
||||
bench::{do_bench_tps, fund_keypairs, generate_and_fund_keypairs, generate_keypairs},
|
||||
cli,
|
||||
},
|
||||
solana_genesis::Base64Account,
|
||||
|
@ -28,7 +28,6 @@ fn main() {
|
|||
|
||||
let cli::Config {
|
||||
entrypoint_addr,
|
||||
faucet_addr,
|
||||
id,
|
||||
num_nodes,
|
||||
tx_count,
|
||||
|
@ -138,19 +137,24 @@ fn main() {
|
|||
// This prevents the amount of storage needed for bench-tps accounts from creeping up
|
||||
// across multiple runs.
|
||||
keypairs.sort_by_key(|x| x.pubkey().to_string());
|
||||
keypairs
|
||||
} else {
|
||||
generate_and_fund_keypairs(
|
||||
fund_keypairs(
|
||||
client.clone(),
|
||||
Some(*faucet_addr),
|
||||
id,
|
||||
keypair_count,
|
||||
*num_lamports_per_account,
|
||||
&keypairs,
|
||||
keypairs.len().saturating_sub(keypair_count) as u64,
|
||||
last_balance,
|
||||
)
|
||||
.unwrap_or_else(|e| {
|
||||
eprintln!("Error could not fund keys: {:?}", e);
|
||||
exit(1);
|
||||
})
|
||||
});
|
||||
keypairs
|
||||
} else {
|
||||
generate_and_fund_keypairs(client.clone(), id, keypair_count, *num_lamports_per_account)
|
||||
.unwrap_or_else(|e| {
|
||||
eprintln!("Error could not fund keys: {:?}", e);
|
||||
exit(1);
|
||||
})
|
||||
};
|
||||
|
||||
do_bench_tps(client, cli_config, keypairs);
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use {
|
||||
crate::bench_tps_client::BenchTpsClient,
|
||||
log::*,
|
||||
solana_sdk::{client::Client, commitment_config::CommitmentConfig, timing::duration_as_s},
|
||||
solana_sdk::{commitment_config::CommitmentConfig, timing::duration_as_s},
|
||||
std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
|
@ -27,7 +28,7 @@ pub fn sample_txs<T>(
|
|||
sample_period: u64,
|
||||
client: &Arc<T>,
|
||||
) where
|
||||
T: Client,
|
||||
T: BenchTpsClient,
|
||||
{
|
||||
let mut max_tps = 0.0;
|
||||
let mut total_elapsed;
|
||||
|
@ -81,10 +82,7 @@ pub fn sample_txs<T>(
|
|||
elapsed: total_elapsed,
|
||||
txs: total_txs,
|
||||
};
|
||||
sample_stats
|
||||
.write()
|
||||
.unwrap()
|
||||
.push((client.tpu_addr(), stats));
|
||||
sample_stats.write().unwrap().push((client.addr(), stats));
|
||||
return;
|
||||
}
|
||||
sleep(Duration::from_secs(sample_period));
|
|
@ -13,6 +13,7 @@ use {
|
|||
local_cluster::{ClusterConfig, LocalCluster},
|
||||
validator_configs::make_identical_validator_configs,
|
||||
},
|
||||
solana_rpc::rpc::JsonRpcConfig,
|
||||
solana_sdk::signature::{Keypair, Signer},
|
||||
solana_streamer::socket::SocketAddrSpace,
|
||||
std::{sync::Arc, time::Duration},
|
||||
|
@ -22,13 +23,34 @@ fn test_bench_tps_local_cluster(config: Config) {
|
|||
let native_instruction_processors = vec![];
|
||||
|
||||
solana_logger::setup();
|
||||
|
||||
let faucet_keypair = Keypair::new();
|
||||
let faucet_pubkey = faucet_keypair.pubkey();
|
||||
let (addr_sender, addr_receiver) = unbounded();
|
||||
run_local_faucet_with_port(faucet_keypair, addr_sender, None, 0);
|
||||
let faucet_addr = addr_receiver
|
||||
.recv_timeout(Duration::from_secs(2))
|
||||
.expect("run_local_faucet")
|
||||
.expect("faucet_addr");
|
||||
|
||||
const NUM_NODES: usize = 1;
|
||||
let mut validator_config = ValidatorConfig::default_for_test();
|
||||
validator_config.rpc_config = JsonRpcConfig {
|
||||
faucet_addr: Some(faucet_addr),
|
||||
..JsonRpcConfig::default_for_test()
|
||||
};
|
||||
let cluster = LocalCluster::new(
|
||||
&mut ClusterConfig {
|
||||
node_stakes: vec![999_990; NUM_NODES],
|
||||
cluster_lamports: 200_000_000,
|
||||
validator_configs: make_identical_validator_configs(
|
||||
&ValidatorConfig::default_for_test(),
|
||||
&ValidatorConfig {
|
||||
rpc_config: JsonRpcConfig {
|
||||
faucet_addr: Some(faucet_addr),
|
||||
..JsonRpcConfig::default_for_test()
|
||||
},
|
||||
..ValidatorConfig::default_for_test()
|
||||
},
|
||||
NUM_NODES,
|
||||
),
|
||||
native_instruction_processors,
|
||||
|
@ -37,31 +59,18 @@ fn test_bench_tps_local_cluster(config: Config) {
|
|||
SocketAddrSpace::Unspecified,
|
||||
);
|
||||
|
||||
let faucet_keypair = Keypair::new();
|
||||
cluster.transfer(
|
||||
&cluster.funding_keypair,
|
||||
&faucet_keypair.pubkey(),
|
||||
100_000_000,
|
||||
);
|
||||
cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000);
|
||||
|
||||
let client = Arc::new(create_client(
|
||||
cluster.entry_point_info.rpc,
|
||||
cluster.entry_point_info.tpu,
|
||||
));
|
||||
|
||||
let (addr_sender, addr_receiver) = unbounded();
|
||||
run_local_faucet_with_port(faucet_keypair, addr_sender, None, 0);
|
||||
let faucet_addr = addr_receiver
|
||||
.recv_timeout(Duration::from_secs(2))
|
||||
.expect("run_local_faucet")
|
||||
.expect("faucet_addr");
|
||||
|
||||
let lamports_per_account = 100;
|
||||
|
||||
let keypair_count = config.tx_count * config.keypair_multiplier;
|
||||
let keypairs = generate_and_fund_keypairs(
|
||||
client.clone(),
|
||||
Some(faucet_addr),
|
||||
&config.id,
|
||||
keypair_count,
|
||||
lamports_per_account,
|
||||
|
|
|
@ -9,7 +9,6 @@ pub(crate) mod http_sender;
|
|||
pub(crate) mod mock_sender;
|
||||
pub mod nonblocking;
|
||||
pub mod nonce_utils;
|
||||
pub mod perf_utils;
|
||||
pub mod pubsub_client;
|
||||
pub mod quic_client;
|
||||
pub mod rpc_cache;
|
||||
|
|
|
@ -171,7 +171,7 @@ impl ThinClient {
|
|||
&self.tpu_addrs[self.optimizer.best()]
|
||||
}
|
||||
|
||||
fn rpc_client(&self) -> &RpcClient {
|
||||
pub fn rpc_client(&self) -> &RpcClient {
|
||||
&self.rpc_clients[self.optimizer.best()]
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue