Update accounts-cluster-bench to work against public clusters (#30781)

* Move client ctor outside start_sig_clear_thread

* Add TransactionExecutor::new_with_url method and dedupe

* Remove unnecessary pub

* Update airdrop to use RpcClient

* Construct client outside run_accounts_bench

* Add config and url parameters
This commit is contained in:
Tyera 2023-03-20 09:46:06 -06:00 committed by GitHub
parent 4de59881b7
commit cbbae8fcd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 126 additions and 112 deletions

1
Cargo.lock generated
View File

@ -4813,6 +4813,7 @@ dependencies = [
"rayon",
"solana-account-decoder",
"solana-clap-utils",
"solana-cli-config",
"solana-client",
"solana-core",
"solana-faucet",

View File

@ -15,6 +15,7 @@ rand = { workspace = true }
rayon = { workspace = true }
solana-account-decoder = { workspace = true }
solana-clap-utils = { workspace = true }
solana-cli-config = { workspace = true }
solana-client = { workspace = true }
solana-faucet = { workspace = true }
solana-gossip = { workspace = true }

View File

@ -5,9 +5,9 @@ use {
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_account_decoder::parse_token::spl_token_pubkey,
solana_clap_utils::input_parsers::pubkey_of,
solana_clap_utils::{input_parsers::pubkey_of, input_validators::is_url_or_moniker},
solana_cli_config::{ConfigInput, CONFIG_FILE},
solana_client::transaction_executor::TransactionExecutor,
solana_faucet::faucet::{request_airdrop_transaction, FAUCET_PORT},
solana_gossip::gossip_service::discover,
solana_rpc_client::rpc_client::RpcClient,
solana_runtime::inline_spl_token,
@ -17,7 +17,6 @@ use {
instruction::{AccountMeta, Instruction},
message::Message,
pubkey::Pubkey,
rpc_port::DEFAULT_RPC_PORT,
signature::{read_keypair_file, Keypair, Signer},
system_instruction, system_program,
transaction::Transaction,
@ -26,7 +25,6 @@ use {
solana_transaction_status::parse_token::spl_token_instruction,
std::{
cmp::min,
net::{Ipv4Addr, SocketAddr},
process::exit,
sync::{
atomic::{AtomicU64, Ordering},
@ -83,12 +81,7 @@ pub fn poll_get_fee_for_message(client: &RpcClient, message: &mut Message) -> (O
}
}
pub fn airdrop_lamports(
client: &RpcClient,
faucet_addr: &SocketAddr,
id: &Keypair,
desired_balance: u64,
) -> bool {
fn airdrop_lamports(client: &RpcClient, id: &Keypair, desired_balance: u64) -> bool {
let starting_balance = client.get_balance(&id.pubkey()).unwrap_or(0);
info!("starting balance {}", starting_balance);
@ -97,39 +90,19 @@ pub fn airdrop_lamports(
info!(
"Airdropping {:?} lamports from {} for {}",
airdrop_amount,
faucet_addr,
client.url(),
id.pubkey(),
);
let blockhash = poll_get_latest_blockhash(client);
match request_airdrop_transaction(
faucet_addr,
&id.pubkey(),
airdrop_amount,
blockhash.unwrap(),
) {
Ok(transaction) => {
let mut tries = 0;
loop {
tries += 1;
let result = client.send_and_confirm_transaction(&transaction);
if result.is_ok() {
break;
}
if tries >= 5 {
let blockhash = client.get_latest_blockhash().unwrap();
if let Err(err) =
client.request_airdrop_with_blockhash(&id.pubkey(), airdrop_amount, &blockhash)
{
panic!(
"Error requesting airdrop: to addr: {faucet_addr:?} amount: {airdrop_amount} {result:?}"
)
}
}
}
Err(err) => {
panic!(
"Error requesting airdrop: {err:?} to addr: {faucet_addr:?} amount: {airdrop_amount}"
"Error requesting airdrop: {err:?} to addr: {0:?} amount: {airdrop_amount}",
id.pubkey()
);
}
};
let current_balance = client.get_balance(&id.pubkey()).unwrap_or_else(|e| {
panic!("airdrop error {e}");
@ -256,8 +229,7 @@ fn make_close_message(
#[allow(clippy::too_many_arguments)]
fn run_accounts_bench(
entrypoint_addr: SocketAddr,
faucet_addr: SocketAddr,
client: Arc<RpcClient>,
payer_keypairs: &[&Keypair],
iterations: usize,
maybe_space: Option<u64>,
@ -269,10 +241,7 @@ fn run_accounts_bench(
reclaim_accounts: bool,
) {
assert!(num_instructions > 0);
let client =
RpcClient::new_socket_with_commitment(entrypoint_addr, CommitmentConfig::confirmed());
info!("Targeting {}", entrypoint_addr);
info!("Targeting {}", client.url());
let mut latest_blockhash = Instant::now();
let mut last_log = Instant::now();
@ -303,7 +272,7 @@ fn run_accounts_bench(
info!("Starting balance(s): {:?}", balances);
let executor = TransactionExecutor::new(entrypoint_addr);
let executor = TransactionExecutor::new_with_rpc_client(client.clone());
// Create and close messages both require 2 signatures, fake a 2 signature message to calculate fees
let mut message = Message::new(
@ -344,12 +313,7 @@ fn run_accounts_bench(
"Balance {} is less than needed: {}, doing airdrop...",
balance, lamports
);
if !airdrop_lamports(
&client,
&faucet_addr,
payer_keypairs[i],
lamports * 100_000,
) {
if !airdrop_lamports(&client, payer_keypairs[i], lamports * 100_000) {
warn!("failed airdrop, exiting");
return;
}
@ -445,7 +409,7 @@ fn run_accounts_bench(
executor.close();
if reclaim_accounts {
let executor = TransactionExecutor::new(entrypoint_addr);
let executor = TransactionExecutor::new_with_rpc_client(client.clone());
loop {
let max_closed_seed = seed_tracker.max_closed.load(Ordering::Relaxed);
let max_created_seed = seed_tracker.max_created.load(Ordering::Relaxed);
@ -526,11 +490,38 @@ fn main() {
let matches = App::new(crate_name!())
.about(crate_description!())
.version(solana_version::version!())
.arg({
let arg = Arg::with_name("config_file")
.short("C")
.long("config")
.value_name("FILEPATH")
.takes_value(true)
.help("Configuration file to use");
if let Some(ref config_file) = *CONFIG_FILE {
arg.default_value(config_file)
} else {
arg
}
})
.arg(
Arg::with_name("json_rpc_url")
.short("u")
.long("url")
.value_name("URL_OR_MONIKER")
.takes_value(true)
.validator(is_url_or_moniker)
.conflicts_with("entrypoint")
.help(
"URL for Solana's JSON RPC or moniker (or their first letter): \
[mainnet-beta, testnet, devnet, localhost]",
),
)
.arg(
Arg::with_name("entrypoint")
.long("entrypoint")
.takes_value(true)
.value_name("HOST:PORT")
.conflicts_with("json_rpc_url")
.help("RPC entrypoint address. Usually <ip>:8899"),
)
.arg(
@ -538,6 +529,7 @@ fn main() {
.long("faucet")
.takes_value(true)
.value_name("HOST:PORT")
.hidden(true)
.help("Faucet entrypoint address. Usually <ip>:9900"),
)
.arg(
@ -617,22 +609,6 @@ fn main() {
let skip_gossip = !matches.is_present("check_gossip");
let port = if skip_gossip { DEFAULT_RPC_PORT } else { 8001 };
let mut entrypoint_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, port));
if let Some(addr) = matches.value_of("entrypoint") {
entrypoint_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| {
eprintln!("failed to parse entrypoint address: {e}");
exit(1)
});
}
let mut faucet_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, FAUCET_PORT));
if let Some(addr) = matches.value_of("faucet_addr") {
faucet_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| {
eprintln!("failed to parse entrypoint address: {e}");
exit(1)
});
}
let space = value_t!(matches, "space", u64).ok();
let lamports = value_t!(matches, "lamports", u64).ok();
let batch_size = value_t!(matches, "batch_size", usize).unwrap_or(4);
@ -658,6 +634,12 @@ fn main() {
payer_keypair_refs.push(keypair);
}
let client = if let Some(addr) = matches.value_of("entrypoint") {
let entrypoint_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| {
eprintln!("failed to parse entrypoint address: {e}");
exit(1)
});
let rpc_addr = if !skip_gossip {
info!("Finding cluster entry: {:?}", entrypoint_addr);
let (gossip_nodes, _validators) = discover(
@ -683,9 +665,28 @@ fn main() {
entrypoint_addr
};
run_accounts_bench(
Arc::new(RpcClient::new_socket_with_commitment(
rpc_addr,
faucet_addr,
CommitmentConfig::confirmed(),
))
} else {
let config = if let Some(config_file) = matches.value_of("config_file") {
solana_cli_config::Config::load(config_file).unwrap_or_default()
} else {
solana_cli_config::Config::default()
};
let (_, json_rpc_url) = ConfigInput::compute_json_rpc_url_setting(
matches.value_of("json_rpc_url").unwrap_or(""),
&config.json_rpc_url,
);
Arc::new(RpcClient::new_with_commitment(
json_rpc_url,
CommitmentConfig::confirmed(),
))
};
run_accounts_bench(
client,
&payer_keypair_refs,
iterations,
space,
@ -730,7 +731,6 @@ pub mod test {
..ClusterConfig::default()
};
let faucet_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 9900));
let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
let iterations = 10;
let maybe_space = None;
@ -739,9 +739,13 @@ pub mod test {
let maybe_lamports = None;
let num_instructions = 2;
let mut start = Measure::start("total accounts run");
let rpc_addr = cluster.entry_point_info.rpc().unwrap();
let client = Arc::new(RpcClient::new_socket_with_commitment(
rpc_addr,
CommitmentConfig::confirmed(),
));
run_accounts_bench(
cluster.entry_point_info.rpc().unwrap(),
faucet_addr,
client,
&[&cluster.funding_keypair],
iterations,
maybe_space,
@ -768,8 +772,10 @@ pub mod test {
Some(faucet_addr),
SocketAddrSpace::Unspecified,
);
let rpc_client =
RpcClient::new_with_commitment(test_validator.rpc_url(), CommitmentConfig::processed());
let rpc_client = Arc::new(RpcClient::new_with_commitment(
test_validator.rpc_url(),
CommitmentConfig::processed(),
));
// Created funder
let funder = Keypair::new();
@ -837,12 +843,7 @@ pub mod test {
let keypair1 = Keypair::new();
let keypair2 = Keypair::new();
run_accounts_bench(
test_validator
.rpc_url()
.replace("http://", "")
.parse()
.unwrap(),
faucet_addr,
rpc_client,
&[&keypair0, &keypair1, &keypair2],
iterations,
Some(account_len as u64),

View File

@ -27,17 +27,31 @@ pub struct TransactionExecutor {
cleared: Arc<RwLock<Vec<u64>>>,
exit: Arc<AtomicBool>,
counter: AtomicU64,
client: RpcClient,
client: Arc<RpcClient>,
}
impl TransactionExecutor {
pub fn new(entrypoint_addr: SocketAddr) -> Self {
let client = Arc::new(RpcClient::new_socket_with_commitment(
entrypoint_addr,
CommitmentConfig::confirmed(),
));
Self::new_with_rpc_client(client)
}
pub fn new_with_url<U: ToString>(url: U) -> Self {
let client = Arc::new(RpcClient::new_with_commitment(
url,
CommitmentConfig::confirmed(),
));
Self::new_with_rpc_client(client)
}
pub fn new_with_rpc_client(client: Arc<RpcClient>) -> Self {
let sigs = Arc::new(RwLock::new(Vec::new()));
let cleared = Arc::new(RwLock::new(Vec::new()));
let exit = Arc::new(AtomicBool::new(false));
let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, entrypoint_addr);
let client =
RpcClient::new_socket_with_commitment(entrypoint_addr, CommitmentConfig::confirmed());
let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, &client);
Self {
sigs,
cleared,
@ -85,18 +99,15 @@ impl TransactionExecutor {
exit: &Arc<AtomicBool>,
sigs: &Arc<RwLock<PendingQueue>>,
cleared: &Arc<RwLock<Vec<u64>>>,
entrypoint_addr: SocketAddr,
client: &Arc<RpcClient>,
) -> JoinHandle<()> {
let sigs = sigs.clone();
let exit = exit.clone();
let cleared = cleared.clone();
let client = client.clone();
Builder::new()
.name("solSigClear".to_string())
.spawn(move || {
let client = RpcClient::new_socket_with_commitment(
entrypoint_addr,
CommitmentConfig::confirmed(),
);
let mut success = 0;
let mut error_count = 0;
let mut timed_out = 0;