add in method for building a `TpuClient` for `LocalCluster` tests (#258)

* add in method for building a TpuClient for LocalCluster tests

* add cluster trait. leave dependency on solana_client::tpu_client
This commit is contained in:
Greg Cusack 2024-03-18 17:58:11 -07:00 committed by GHA: Update Upstream From Fork
parent 21eff36754
commit f35bda5067
7 changed files with 72 additions and 57 deletions

1
Cargo.lock generated
View File

@ -6334,6 +6334,7 @@ dependencies = [
"solana-ledger",
"solana-logger",
"solana-pubsub-client",
"solana-quic-client",
"solana-rpc-client",
"solana-rpc-client-api",
"solana-runtime",

View File

@ -7,13 +7,11 @@ use {
cli::{Config, InstructionPaddingConfig},
send_batch::generate_durable_nonce_accounts,
},
solana_client::{
connection_cache::ConnectionCache,
tpu_client::{TpuClient, TpuClientConfig},
},
solana_client::tpu_client::{TpuClient, TpuClientConfig},
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_local_cluster::{
cluster::Cluster,
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
@ -78,24 +76,9 @@ fn test_bench_tps_local_cluster(config: Config) {
cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000);
let ConnectionCache::Quic(cache) = &*cluster.connection_cache else {
panic!("Expected a Quic ConnectionCache.");
};
let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap());
let client = Arc::new(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient {err:?}");
}),
);
let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));
let lamports_per_account = 100;

View File

@ -21,6 +21,8 @@ pub use {
solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
};
pub type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
pub enum TpuClientWrapper {
Quic(TpuClient<QuicPool, QuicConnectionManager, QuicConfig>),
Udp(TpuClient<UdpPool, UdpConnectionManager, UdpConfig>),

View File

@ -818,7 +818,7 @@ fn main() {
pub mod test {
use {
super::*,
solana_client::tpu_client::TpuClient,
solana_client::tpu_client::QuicTpuClient,
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_gossip::contact_info::LegacyContactInfo,
@ -827,10 +827,8 @@ pub mod test {
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc::rpc::JsonRpcConfig,
solana_sdk::timing::timestamp,
solana_tpu_client::tpu_client::TpuClientConfig,
};
const TEST_SEND_BATCH_SIZE: usize = 1;
@ -838,32 +836,7 @@ pub mod test {
// thin wrapper for the run_dos function
// to avoid specifying everywhere generic parameters
fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) {
run_dos::<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>>(
nodes, iterations, None, params,
);
}
fn build_tpu_quic_client(
cluster: &LocalCluster,
) -> Arc<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>> {
let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap());
let ConnectionCache::Quic(cache) = &*cluster.connection_cache else {
panic!("Expected a Quic ConnectionCache.");
};
Arc::new(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
)
run_dos::<QuicTpuClient>(nodes, iterations, None, params);
}
#[test]
@ -1003,7 +976,9 @@ pub mod test {
.unwrap();
let nodes_slice = [node];
let client = build_tpu_quic_client(&cluster);
let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));
// creates one transaction with 8 valid signatures and sends it 10 times
run_dos(
@ -1135,7 +1110,9 @@ pub mod test {
.unwrap();
let nodes_slice = [node];
let client = build_tpu_quic_client(&cluster);
let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));
// creates one transaction and sends it 10 times
// this is done in single thread

View File

@ -24,6 +24,7 @@ solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-logger = { workspace = true }
solana-pubsub-client = { workspace = true }
solana-quic-client = { workspace = true }
solana-rpc-client = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-runtime = { workspace = true }

View File

@ -1,11 +1,11 @@
use {
solana_client::thin_client::ThinClient,
solana_client::{thin_client::ThinClient, tpu_client::QuicTpuClient},
solana_core::validator::{Validator, ValidatorConfig},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::shred::Shred,
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair},
solana_streamer::socket::SocketAddrSpace,
std::{path::PathBuf, sync::Arc},
std::{io::Result, path::PathBuf, sync::Arc},
};
pub struct ValidatorInfo {
@ -38,6 +38,11 @@ impl ClusterValidatorInfo {
pub trait Cluster {
fn get_node_pubkeys(&self) -> Vec<Pubkey>;
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient>;
fn build_tpu_quic_client(&self) -> Result<QuicTpuClient>;
fn build_tpu_quic_client_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient>;
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>;
fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo;
fn restart_node(

View File

@ -7,7 +7,12 @@ use {
itertools::izip,
log::*,
solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_client::{
connection_cache::ConnectionCache,
rpc_client::RpcClient,
thin_client::ThinClient,
tpu_client::{QuicTpuClient, TpuClient, TpuClientConfig},
},
solana_core::{
consensus::tower_storage::FileTowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
@ -802,6 +807,34 @@ impl LocalCluster {
..SnapshotConfig::new_load_only()
}
}
fn build_tpu_client<F>(&self, rpc_client_builder: F) -> Result<QuicTpuClient>
where
F: FnOnce(String) -> Arc<RpcClient>,
{
let rpc_pubsub_url = format!("ws://{}/", self.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", self.entry_point_info.rpc().unwrap());
let cache = match &*self.connection_cache {
ConnectionCache::Quic(cache) => cache,
ConnectionCache::Udp(_) => {
return Err(Error::new(
ErrorKind::Other,
"Expected a Quic ConnectionCache. Got UDP",
))
}
};
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client_builder(rpc_url),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.map_err(|err| Error::new(ErrorKind::Other, format!("TpuSenderError: {}", err)))?;
Ok(tpu_client)
}
}
impl Cluster for LocalCluster {
@ -820,6 +853,19 @@ impl Cluster for LocalCluster {
})
}
fn build_tpu_quic_client(&self) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| Arc::new(RpcClient::new(rpc_url)))
}
fn build_tpu_quic_client_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| {
Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config))
})
}
fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo {
let mut node = self.validators.remove(pubkey).unwrap();