diff --git a/Cargo.lock b/Cargo.lock index f9f5dbbc5..06d28868c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6420,6 +6420,7 @@ dependencies = [ "solana-ledger", "solana-logger", "solana-pubsub-client", + "solana-quic-client", "solana-rpc-client", "solana-rpc-client-api", "solana-runtime", diff --git a/bench-tps/tests/bench_tps.rs b/bench-tps/tests/bench_tps.rs index 7a2b0fe20..bfff1f7e1 100644 --- a/bench-tps/tests/bench_tps.rs +++ b/bench-tps/tests/bench_tps.rs @@ -7,13 +7,11 @@ use { cli::{Config, InstructionPaddingConfig}, send_batch::generate_durable_nonce_accounts, }, - solana_client::{ - connection_cache::ConnectionCache, - tpu_client::{TpuClient, TpuClientConfig}, - }, + solana_client::tpu_client::{TpuClient, TpuClientConfig}, solana_core::validator::ValidatorConfig, solana_faucet::faucet::run_local_faucet, solana_local_cluster::{ + cluster::Cluster, local_cluster::{ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }, @@ -78,24 +76,9 @@ fn test_bench_tps_local_cluster(config: Config) { cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000); - let ConnectionCache::Quic(cache) = &*cluster.connection_cache else { - panic!("Expected a Quic ConnectionCache."); - }; - - let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); - let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); - - let client = Arc::new( - TpuClient::new_with_connection_cache( - Arc::new(RpcClient::new(rpc_url)), - rpc_pubsub_url.as_str(), - TpuClientConfig::default(), - cache.clone(), - ) - .unwrap_or_else(|err| { - panic!("Could not create TpuClient {err:?}"); - }), - ); + let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + })); let lamports_per_account = 100; diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 038dd8677..555d3aad8 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -21,6 +21,8 @@ pub use { solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS}, }; +pub type QuicTpuClient = TpuClient; + pub enum TpuClientWrapper { Quic(TpuClient), Udp(TpuClient), diff --git a/dos/src/main.rs b/dos/src/main.rs index 3bf7cce0e..15874a869 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -818,7 +818,7 @@ fn main() { pub mod test { use { super::*, - solana_client::tpu_client::TpuClient, + solana_client::tpu_client::QuicTpuClient, solana_core::validator::ValidatorConfig, solana_faucet::faucet::run_local_faucet, solana_gossip::contact_info::LegacyContactInfo, @@ -827,10 +827,8 @@ pub mod test { local_cluster::{ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }, - solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc::rpc::JsonRpcConfig, solana_sdk::timing::timestamp, - solana_tpu_client::tpu_client::TpuClientConfig, }; const TEST_SEND_BATCH_SIZE: usize = 1; @@ -838,32 +836,7 @@ pub mod test { // thin wrapper for the run_dos function // to avoid specifying everywhere generic parameters fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) { - run_dos::>( - nodes, iterations, None, params, - ); - } - - fn build_tpu_quic_client( - cluster: &LocalCluster, - ) -> Arc> { - let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); - let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); - - let ConnectionCache::Quic(cache) = &*cluster.connection_cache else { - panic!("Expected a Quic ConnectionCache."); - }; - - Arc::new( - TpuClient::new_with_connection_cache( - Arc::new(RpcClient::new(rpc_url)), - rpc_pubsub_url.as_str(), - TpuClientConfig::default(), - cache.clone(), - ) - .unwrap_or_else(|err| { - panic!("Could not create TpuClient with Quic Cache {err:?}"); - }), - ) + run_dos::(nodes, iterations, None, params); } #[test] @@ -1003,7 +976,9 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = build_tpu_quic_client(&cluster); + let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + })); // creates one transaction with 8 valid signatures and sends it 10 times run_dos( @@ -1135,7 +1110,9 @@ pub mod test { .unwrap(); let nodes_slice = [node]; - let client = build_tpu_quic_client(&cluster); + let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| { + panic!("Could not create TpuClient with Quic Cache {err:?}"); + })); // creates one transaction and sends it 10 times // this is done in single thread diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index 4248fc029..07b300302 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -24,6 +24,7 @@ solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-logger = { workspace = true } solana-pubsub-client = { workspace = true } +solana-quic-client = { workspace = true } solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index 03ec1b7ab..425f65c48 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -1,11 +1,11 @@ use { - solana_client::thin_client::ThinClient, + solana_client::{thin_client::ThinClient, tpu_client::QuicTpuClient}, solana_core::validator::{Validator, ValidatorConfig}, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, solana_ledger::shred::Shred, - solana_sdk::{pubkey::Pubkey, signature::Keypair}, + solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair}, solana_streamer::socket::SocketAddrSpace, - std::{path::PathBuf, sync::Arc}, + std::{io::Result, path::PathBuf, sync::Arc}, }; pub struct ValidatorInfo { @@ -38,6 +38,11 @@ impl ClusterValidatorInfo { pub trait Cluster { fn get_node_pubkeys(&self) -> Vec; fn get_validator_client(&self, pubkey: &Pubkey) -> Option; + fn build_tpu_quic_client(&self) -> Result; + fn build_tpu_quic_client_with_commitment( + &self, + commitment_config: CommitmentConfig, + ) -> Result; fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>; fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo; fn restart_node( diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 9d1b483d8..400f4f73f 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -7,7 +7,12 @@ use { itertools::izip, log::*, solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs, - solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, + solana_client::{ + connection_cache::ConnectionCache, + rpc_client::RpcClient, + thin_client::ThinClient, + tpu_client::{QuicTpuClient, TpuClient, TpuClientConfig}, + }, solana_core::{ consensus::tower_storage::FileTowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, @@ -802,6 +807,34 @@ impl LocalCluster { ..SnapshotConfig::new_load_only() } } + + fn build_tpu_client(&self, rpc_client_builder: F) -> Result + where + F: FnOnce(String) -> Arc, + { + let rpc_pubsub_url = format!("ws://{}/", self.entry_point_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", self.entry_point_info.rpc().unwrap()); + + let cache = match &*self.connection_cache { + ConnectionCache::Quic(cache) => cache, + ConnectionCache::Udp(_) => { + return Err(Error::new( + ErrorKind::Other, + "Expected a Quic ConnectionCache. Got UDP", + )) + } + }; + + let tpu_client = TpuClient::new_with_connection_cache( + rpc_client_builder(rpc_url), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .map_err(|err| Error::new(ErrorKind::Other, format!("TpuSenderError: {}", err)))?; + + Ok(tpu_client) + } } impl Cluster for LocalCluster { @@ -820,6 +853,19 @@ impl Cluster for LocalCluster { }) } + fn build_tpu_quic_client(&self) -> Result { + self.build_tpu_client(|rpc_url| Arc::new(RpcClient::new(rpc_url))) + } + + fn build_tpu_quic_client_with_commitment( + &self, + commitment_config: CommitmentConfig, + ) -> Result { + self.build_tpu_client(|rpc_url| { + Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config)) + }) + } + fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo { let mut node = self.validators.remove(pubkey).unwrap();