Add TpuClient support to bench-tps (#24227)

* Add fallible send methods, and rpc_client helper

* Add helper to return RpcClient url

* Implement BenchTpsClient for TpuClient

* Add cli rpc and identity handling

* Handle different kinds of clients in main, use TpuClient

* Add tpu_client integration test
This commit is contained in:
Tyera Eulberg 2022-04-12 11:43:29 -04:00 committed by GitHub
parent bdbca3362e
commit 8487030ea6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 466 additions and 101 deletions

3
Cargo.lock generated
View File

@ -4436,6 +4436,8 @@ dependencies = [
"serde_json",
"serde_yaml",
"serial_test",
"solana-clap-utils",
"solana-cli-config",
"solana-client",
"solana-core",
"solana-faucet",
@ -4450,6 +4452,7 @@ dependencies = [
"solana-runtime",
"solana-sdk",
"solana-streamer",
"solana-test-validator",
"solana-version",
"thiserror",
]

View File

@ -15,6 +15,8 @@ log = "0.4.14"
rayon = "1.5.1"
serde_json = "1.0.79"
serde_yaml = "0.8.23"
solana-clap-utils = { path = "../clap-utils", version = "=1.11.0" }
solana-cli-config = { path = "../cli-config", version = "=1.11.0" }
solana-client = { path = "../client", version = "=1.11.0" }
solana-core = { path = "../core", version = "=1.11.0" }
solana-faucet = { path = "../faucet", version = "=1.11.0" }
@ -34,6 +36,7 @@ thiserror = "1.0"
[dev-dependencies]
serial_test = "0.6.0"
solana-local-cluster = { path = "../local-cluster", version = "=1.11.0" }
solana-test-validator = { path = "../test-validator", version = "=1.11.0" }
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@ -83,3 +83,4 @@ pub trait BenchTpsClient {
mod bank_client;
mod thin_client;
mod tpu_client;

View File

@ -0,0 +1,99 @@
use {
crate::bench_tps_client::{BenchTpsClient, Result},
solana_client::tpu_client::TpuClient,
solana_sdk::{
commitment_config::CommitmentConfig, epoch_info::EpochInfo, hash::Hash, message::Message,
pubkey::Pubkey, signature::Signature, transaction::Transaction,
},
};
impl BenchTpsClient for TpuClient {
fn send_transaction(&self, transaction: Transaction) -> Result<Signature> {
let signature = transaction.signatures[0];
self.try_send_transaction(&transaction)?;
Ok(signature)
}
fn send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
for transaction in transactions {
BenchTpsClient::send_transaction(self, transaction)?;
}
Ok(())
}
fn get_latest_blockhash(&self) -> Result<Hash> {
self.rpc_client()
.get_latest_blockhash()
.map_err(|err| err.into())
}
fn get_latest_blockhash_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<(Hash, u64)> {
self.rpc_client()
.get_latest_blockhash_with_commitment(commitment_config)
.map_err(|err| err.into())
}
fn get_transaction_count(&self) -> Result<u64> {
self.rpc_client()
.get_transaction_count()
.map_err(|err| err.into())
}
fn get_transaction_count_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<u64> {
self.rpc_client()
.get_transaction_count_with_commitment(commitment_config)
.map_err(|err| err.into())
}
fn get_epoch_info(&self) -> Result<EpochInfo> {
self.rpc_client().get_epoch_info().map_err(|err| err.into())
}
fn get_balance(&self, pubkey: &Pubkey) -> Result<u64> {
self.rpc_client()
.get_balance(pubkey)
.map_err(|err| err.into())
}
fn get_balance_with_commitment(
&self,
pubkey: &Pubkey,
commitment_config: CommitmentConfig,
) -> Result<u64> {
self.rpc_client()
.get_balance_with_commitment(pubkey, commitment_config)
.map(|res| res.value)
.map_err(|err| err.into())
}
fn get_fee_for_message(&self, message: &Message) -> Result<u64> {
self.rpc_client()
.get_fee_for_message(message)
.map_err(|err| err.into())
}
fn get_minimum_balance_for_rent_exemption(&self, data_len: usize) -> Result<u64> {
self.rpc_client()
.get_minimum_balance_for_rent_exemption(data_len)
.map_err(|err| err.into())
}
fn addr(&self) -> String {
self.rpc_client().url()
}
fn request_airdrop_with_blockhash(
&self,
pubkey: &Pubkey,
lamports: u64,
recent_blockhash: &Hash,
) -> Result<Signature> {
self.rpc_client()
.request_airdrop_with_blockhash(pubkey, lamports, recent_blockhash)
.map_err(|err| err.into())
}
}

View File

@ -1,5 +1,7 @@
use {
clap::{crate_description, crate_name, App, Arg, ArgMatches},
solana_clap_utils::input_validators::{is_url, is_url_or_moniker},
solana_cli_config::{ConfigInput, CONFIG_FILE},
solana_sdk::{
fee_calculator::FeeRateGovernor,
pubkey::Pubkey,
@ -10,9 +12,26 @@ use {
const NUM_LAMPORTS_PER_ACCOUNT_DEFAULT: u64 = solana_sdk::native_token::LAMPORTS_PER_SOL;
pub enum ExternalClientType {
// Submits transactions directly to leaders using a ThinClient, broadcasting to multiple
// leaders when num_nodes > 1
ThinClient,
// Submits transactions directly to leaders using a TpuClient, broadcasting to upcoming leaders
// via TpuClient default configuration
TpuClient,
}
impl Default for ExternalClientType {
fn default() -> Self {
Self::ThinClient
}
}
/// Holds the configuration for a single run of the benchmark
pub struct Config {
pub entrypoint_addr: SocketAddr,
pub json_rpc_url: String,
pub websocket_url: String,
pub id: Keypair,
pub threads: usize,
pub num_nodes: usize,
@ -29,12 +48,15 @@ pub struct Config {
pub num_lamports_per_account: u64,
pub target_slots_per_epoch: u64,
pub target_node: Option<Pubkey>,
pub external_client_type: ExternalClientType,
}
impl Default for Config {
fn default() -> Config {
Config {
entrypoint_addr: SocketAddr::from(([127, 0, 0, 1], 8001)),
json_rpc_url: ConfigInput::default().json_rpc_url,
websocket_url: ConfigInput::default().websocket_url,
id: Keypair::new(),
threads: 4,
num_nodes: 1,
@ -51,6 +73,7 @@ impl Default for Config {
num_lamports_per_account: NUM_LAMPORTS_PER_ACCOUNT_DEFAULT,
target_slots_per_epoch: 0,
target_node: None,
external_client_type: ExternalClientType::default(),
}
}
}
@ -59,6 +82,42 @@ impl Default for Config {
pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
App::new(crate_name!()).about(crate_description!())
.version(version)
.arg({
let arg = Arg::with_name("config_file")
.short("C")
.long("config")
.value_name("FILEPATH")
.takes_value(true)
.global(true)
.help("Configuration file to use");
if let Some(ref config_file) = *CONFIG_FILE {
arg.default_value(config_file)
} else {
arg
}
})
.arg(
Arg::with_name("json_rpc_url")
.short("u")
.long("url")
.value_name("URL_OR_MONIKER")
.takes_value(true)
.global(true)
.validator(is_url_or_moniker)
.help(
"URL for Solana's JSON RPC or moniker (or their first letter): \
[mainnet-beta, testnet, devnet, localhost]",
),
)
.arg(
Arg::with_name("websocket_url")
.long("ws")
.value_name("URL")
.takes_value(true)
.global(true)
.validator(is_url)
.help("WebSocket URL for the solana cluster"),
)
.arg(
Arg::with_name("entrypoint")
.short("n")
@ -189,6 +248,12 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
"Wait until epochs are this many slots long.",
),
)
.arg(
Arg::with_name("tpu_client")
.long("use-tpu-client")
.takes_value(false)
.help("Submit transactions with a TpuClient")
)
}
/// Parses a clap `ArgMatches` structure into a `Config`
@ -199,6 +264,35 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
pub fn extract_args(matches: &ArgMatches) -> Config {
let mut args = Config::default();
let config = if let Some(config_file) = matches.value_of("config_file") {
solana_cli_config::Config::load(config_file).unwrap_or_default()
} else {
solana_cli_config::Config::default()
};
let (_, json_rpc_url) = ConfigInput::compute_json_rpc_url_setting(
matches.value_of("json_rpc_url").unwrap_or(""),
&config.json_rpc_url,
);
args.json_rpc_url = json_rpc_url;
let (_, websocket_url) = ConfigInput::compute_websocket_url_setting(
matches.value_of("websocket_url").unwrap_or(""),
&config.websocket_url,
matches.value_of("json_rpc_url").unwrap_or(""),
&config.json_rpc_url,
);
args.websocket_url = websocket_url;
let (_, id_path) = ConfigInput::compute_keypair_path_setting(
matches.value_of("identity").unwrap_or(""),
&config.keypair_path,
);
args.id = read_keypair_file(id_path).expect("could not parse identity path");
if matches.is_present("tpu_client") {
args.external_client_type = ExternalClientType::TpuClient;
}
if let Some(addr) = matches.value_of("entrypoint") {
args.entrypoint_addr = solana_net_utils::parse_host_port(addr).unwrap_or_else(|e| {
eprintln!("failed to parse entrypoint address: {}", e);
@ -206,11 +300,6 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
});
}
if matches.is_present("identity") {
args.id = read_keypair_file(matches.value_of("identity").unwrap())
.expect("can't read client identity");
}
if let Some(t) = matches.value_of("threads") {
args.threads = t.to_string().parse().expect("can't parse threads");
}

72
bench-tps/src/keypairs.rs Normal file
View File

@ -0,0 +1,72 @@
use {
crate::{
bench::{fund_keypairs, generate_and_fund_keypairs},
bench_tps_client::BenchTpsClient,
},
log::*,
solana_genesis::Base64Account,
solana_sdk::signature::{Keypair, Signer},
std::{collections::HashMap, fs::File, path::Path, process::exit, sync::Arc},
};
pub fn get_keypairs<T>(
client: Arc<T>,
id: &Keypair,
keypair_count: usize,
num_lamports_per_account: u64,
client_ids_and_stake_file: &str,
read_from_client_file: bool,
) -> Vec<Keypair>
where
T: 'static + BenchTpsClient + Send + Sync,
{
if read_from_client_file {
let path = Path::new(client_ids_and_stake_file);
let file = File::open(path).unwrap();
info!("Reading {}", client_ids_and_stake_file);
let accounts: HashMap<String, Base64Account> = serde_yaml::from_reader(file).unwrap();
let mut keypairs = vec![];
let mut last_balance = 0;
accounts
.into_iter()
.for_each(|(keypair, primordial_account)| {
let bytes: Vec<u8> = serde_json::from_str(keypair.as_str()).unwrap();
keypairs.push(Keypair::from_bytes(&bytes).unwrap());
last_balance = primordial_account.balance;
});
if keypairs.len() < keypair_count {
eprintln!(
"Expected {} accounts in {}, only received {} (--tx_count mismatch?)",
keypair_count,
client_ids_and_stake_file,
keypairs.len(),
);
exit(1);
}
// Sort keypairs so that do_bench_tps() uses the same subset of accounts for each run.
// This prevents the amount of storage needed for bench-tps accounts from creeping up
// across multiple runs.
keypairs.sort_by_key(|x| x.pubkey().to_string());
fund_keypairs(
client,
id,
&keypairs,
keypairs.len().saturating_sub(keypair_count) as u64,
last_balance,
)
.unwrap_or_else(|e| {
eprintln!("Error could not fund keys: {:?}", e);
exit(1);
});
keypairs
} else {
generate_and_fund_keypairs(client, id, keypair_count, num_lamports_per_account)
.unwrap_or_else(|e| {
eprintln!("Error could not fund keys: {:?}", e);
exit(1);
})
}
}

View File

@ -1,5 +1,6 @@
#![allow(clippy::integer_arithmetic)]
pub mod bench;
mod bench_tps_client;
pub mod bench_tps_client;
pub mod cli;
pub mod keypairs;
mod perf_utils;

View File

@ -2,15 +2,18 @@
use {
log::*,
solana_bench_tps::{
bench::{do_bench_tps, fund_keypairs, generate_and_fund_keypairs, generate_keypairs},
cli,
bench::{do_bench_tps, generate_keypairs},
cli::{self, ExternalClientType},
keypairs::get_keypairs,
},
solana_client::{
rpc_client::RpcClient,
tpu_client::{TpuClient, TpuClientConfig},
},
solana_genesis::Base64Account,
solana_gossip::gossip_service::{discover_cluster, get_client, get_multi_client},
solana_sdk::{
fee_calculator::FeeRateGovernor,
signature::{Keypair, Signer},
system_program,
commitment_config::CommitmentConfig, fee_calculator::FeeRateGovernor, system_program,
},
solana_streamer::socket::SocketAddrSpace,
std::{collections::HashMap, fs::File, io::prelude::*, path::Path, process::exit, sync::Arc},
@ -28,6 +31,8 @@ fn main() {
let cli::Config {
entrypoint_addr,
json_rpc_url,
websocket_url,
id,
num_nodes,
tx_count,
@ -39,6 +44,7 @@ fn main() {
multi_client,
num_lamports_per_account,
target_node,
external_client_type,
..
} = &cli_config;
@ -74,88 +80,72 @@ fn main() {
}
info!("Connecting to the cluster");
let nodes = discover_cluster(entrypoint_addr, *num_nodes, SocketAddrSpace::Unspecified)
.unwrap_or_else(|err| {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1);
});
let client = if *multi_client {
let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified);
if nodes.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes
);
exit(1);
}
Arc::new(client)
} else if let Some(target_node) = target_node {
info!("Searching for target_node: {:?}", target_node);
let mut target_client = None;
for node in nodes {
if node.id == *target_node {
target_client = Some(Arc::new(get_client(&[node], &SocketAddrSpace::Unspecified)));
break;
}
}
target_client.unwrap_or_else(|| {
eprintln!("Target node {} not found", target_node);
exit(1);
})
} else {
Arc::new(get_client(&nodes, &SocketAddrSpace::Unspecified))
};
let keypairs = if *read_from_client_file {
let path = Path::new(&client_ids_and_stake_file);
let file = File::open(path).unwrap();
info!("Reading {}", client_ids_and_stake_file);
let accounts: HashMap<String, Base64Account> = serde_yaml::from_reader(file).unwrap();
let mut keypairs = vec![];
let mut last_balance = 0;
accounts
.into_iter()
.for_each(|(keypair, primordial_account)| {
let bytes: Vec<u8> = serde_json::from_str(keypair.as_str()).unwrap();
keypairs.push(Keypair::from_bytes(&bytes).unwrap());
last_balance = primordial_account.balance;
});
if keypairs.len() < keypair_count {
eprintln!(
"Expected {} accounts in {}, only received {} (--tx_count mismatch?)",
match external_client_type {
ExternalClientType::ThinClient => {
let nodes = discover_cluster(entrypoint_addr, *num_nodes, SocketAddrSpace::Unspecified)
.unwrap_or_else(|err| {
eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
exit(1);
});
let client = if *multi_client {
let (client, num_clients) = get_multi_client(&nodes, &SocketAddrSpace::Unspecified);
if nodes.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
num_nodes
);
exit(1);
}
Arc::new(client)
} else if let Some(target_node) = target_node {
info!("Searching for target_node: {:?}", target_node);
let mut target_client = None;
for node in nodes {
if node.id == *target_node {
target_client =
Some(Arc::new(get_client(&[node], &SocketAddrSpace::Unspecified)));
break;
}
}
target_client.unwrap_or_else(|| {
eprintln!("Target node {} not found", target_node);
exit(1);
})
} else {
Arc::new(get_client(&nodes, &SocketAddrSpace::Unspecified))
};
let keypairs = get_keypairs(
client.clone(),
id,
keypair_count,
*num_lamports_per_account,
client_ids_and_stake_file,
keypairs.len(),
*read_from_client_file,
);
exit(1);
do_bench_tps(client, cli_config, keypairs);
}
// Sort keypairs so that do_bench_tps() uses the same subset of accounts for each run.
// This prevents the amount of storage needed for bench-tps accounts from creeping up
// across multiple runs.
keypairs.sort_by_key(|x| x.pubkey().to_string());
fund_keypairs(
client.clone(),
id,
&keypairs,
keypairs.len().saturating_sub(keypair_count) as u64,
last_balance,
)
.unwrap_or_else(|e| {
eprintln!("Error could not fund keys: {:?}", e);
exit(1);
});
keypairs
} else {
generate_and_fund_keypairs(client.clone(), id, keypair_count, *num_lamports_per_account)
.unwrap_or_else(|e| {
eprintln!("Error could not fund keys: {:?}", e);
exit(1);
})
};
do_bench_tps(client, cli_config, keypairs);
ExternalClientType::TpuClient => {
let rpc_client = Arc::new(RpcClient::new_with_commitment(
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
));
let client = Arc::new(
TpuClient::new(rpc_client, websocket_url, TpuClientConfig::default())
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {:?}", err);
exit(1);
}),
);
let keypairs = get_keypairs(
client.clone(),
id,
keypair_count,
*num_lamports_per_account,
client_ids_and_stake_file,
*read_from_client_file,
);
do_bench_tps(client, cli_config, keypairs);
}
}
}

View File

@ -6,16 +6,24 @@ use {
bench::{do_bench_tps, generate_and_fund_keypairs},
cli::Config,
},
solana_client::thin_client::create_client,
solana_client::{
rpc_client::RpcClient,
thin_client::create_client,
tpu_client::{TpuClient, TpuClientConfig},
},
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet_with_port,
solana_faucet::faucet::{run_local_faucet, run_local_faucet_with_port},
solana_local_cluster::{
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
solana_rpc::rpc::JsonRpcConfig,
solana_sdk::signature::{Keypair, Signer},
solana_sdk::{
commitment_config::CommitmentConfig,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
solana_test_validator::TestValidator,
std::{sync::Arc, time::Duration},
};
@ -83,6 +91,43 @@ fn test_bench_tps_local_cluster(config: Config) {
assert!(_total > 100);
}
fn test_bench_tps_test_validator(config: Config) {
solana_logger::setup();
let mint_keypair = Keypair::new();
let mint_pubkey = mint_keypair.pubkey();
let faucet_addr = run_local_faucet(mint_keypair, None);
let test_validator =
TestValidator::with_no_fees(mint_pubkey, Some(faucet_addr), SocketAddrSpace::Unspecified);
let rpc_client = Arc::new(RpcClient::new_with_commitment(
test_validator.rpc_url(),
CommitmentConfig::processed(),
));
let websocket_url = test_validator.rpc_pubsub_url();
let client =
Arc::new(TpuClient::new(rpc_client, &websocket_url, TpuClientConfig::default()).unwrap());
let lamports_per_account = 100;
let keypair_count = config.tx_count * config.keypair_multiplier;
let keypairs = generate_and_fund_keypairs(
client.clone(),
&config.id,
keypair_count,
lamports_per_account,
)
.unwrap();
let _total = do_bench_tps(client, config, keypairs);
#[cfg(not(debug_assertions))]
assert!(_total > 100);
}
#[test]
#[serial]
fn test_bench_tps_local_cluster_solana() {
@ -92,3 +137,13 @@ fn test_bench_tps_local_cluster_solana() {
..Config::default()
});
}
#[test]
#[serial]
fn test_bench_tps_tpu_client() {
test_bench_tps_test_validator(Config {
tx_count: 100,
duration: Duration::from_secs(10),
..Config::default()
});
}

View File

@ -197,6 +197,10 @@ impl RpcSender for HttpSender {
return Ok(json["result"].take());
}
}
fn url(&self) -> String {
self.url.clone()
}
}
#[cfg(test)]

View File

@ -470,4 +470,8 @@ impl RpcSender for MockSender {
};
Ok(val)
}
fn url(&self) -> String {
format!("MockSender: {}", self.url)
}
}

View File

@ -502,6 +502,11 @@ impl RpcClient {
Self::new_with_timeout(url, timeout)
}
/// Get the configured url of the client's sender
pub fn url(&self) -> String {
self.sender.url()
}
async fn get_node_version(&self) -> Result<semver::Version, RpcError> {
let r_node_version = self.node_version.read().await;
if let Some(version) = &*r_node_version {

View File

@ -535,6 +535,11 @@ impl RpcClient {
Self::new_with_timeout(url, timeout)
}
/// Get the configured url of the client's sender
pub fn url(&self) -> String {
self.rpc_client.url()
}
/// Get the configured default [commitment level][cl].
///
/// [cl]: https://docs.solana.com/developing/clients/jsonrpc-api#configuring-state-commitment

View File

@ -32,4 +32,5 @@ pub trait RpcSender {
params: serde_json::Value,
) -> Result<serde_json::Value>;
fn get_transport_stats(&self) -> RpcTransportStats;
fn url(&self) -> String;
}

View File

@ -90,20 +90,49 @@ impl TpuClient {
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool {
let mut sent = false;
self.try_send_wire_transaction(wire_transaction).is_ok()
}
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
/// Returns the last error if all sends fail
pub fn try_send_transaction(
&self,
transaction: &Transaction,
) -> std::result::Result<(), std::io::Error> {
let wire_transaction = serialize(transaction).expect("serialization should succeed");
self.try_send_wire_transaction(&wire_transaction)
}
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
/// Returns the last error if all sends fail
fn try_send_wire_transaction(
&self,
wire_transaction: &[u8],
) -> std::result::Result<(), std::io::Error> {
let mut last_error: Option<std::io::Error> = None;
let mut some_success = false;
for tpu_address in self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots)
{
if self
.send_socket
.send_to(wire_transaction, tpu_address)
.is_ok()
{
sent = true;
let result = self.send_socket.send_to(wire_transaction, tpu_address);
if let Err(err) = result {
last_error = Some(err);
} else {
some_success = true;
}
}
sent
if !some_success {
Err(if let Some(err) = last_error {
err
} else {
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted")
})
} else {
Ok(())
}
}
/// Create a new client that disconnects when dropped
@ -266,6 +295,10 @@ impl TpuClient {
}
Err(TpuSenderError::Custom("Max retries exceeded".into()))
}
pub fn rpc_client(&self) -> &RpcClient {
&self.rpc_client
}
}
impl Drop for TpuClient {