Remove `ThinClient` from `dos/` (#117)

* remove `ThinClient` from `dos/` and replace `ThinClient` with `TpuClient`

* remove test for valid_client_facing_addr since it is no longer used
This commit is contained in:
Greg Cusack 2024-03-11 18:19:48 -04:00 committed by GHA: Update Upstream From Fork
parent 096a1f4e5c
commit d5c5f06ddb
8 changed files with 92 additions and 100 deletions

4
Cargo.lock generated
View File

@ -5896,11 +5896,11 @@ dependencies = [
"solana-measure",
"solana-net-utils",
"solana-perf",
"solana-quic-client",
"solana-rpc",
"solana-rpc-client",
"solana-sdk",
"solana-streamer",
"solana-thin-client",
"solana-tpu-client",
"solana-version",
]
@ -6105,6 +6105,7 @@ dependencies = [
"solana-bloom",
"solana-clap-utils",
"solana-client",
"solana-connection-cache",
"solana-entry",
"solana-frozen-abi",
"solana-frozen-abi-macro",
@ -6118,7 +6119,6 @@ dependencies = [
"solana-runtime",
"solana-sdk",
"solana-streamer",
"solana-thin-client",
"solana-tpu-client",
"solana-version",
"solana-vote",

View File

@ -13,6 +13,7 @@ use {
transport::Result as TransportResult,
},
solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient},
solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
std::sync::Arc,
};
pub use {
@ -20,6 +21,11 @@ pub use {
solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
};
pub enum TpuClientWrapper {
Quic(TpuClient<QuicPool, QuicConnectionManager, QuicConfig>),
Udp(TpuClient<UdpPool, UdpConnectionManager, UdpConfig>),
}
/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
/// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency.

View File

@ -26,6 +26,7 @@ solana-logger = { workspace = true }
solana-measure = { workspace = true }
solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
solana-quic-client = { workspace = true }
solana-rpc = { workspace = true }
solana-rpc-client = { workspace = true }
solana-sdk = { workspace = true }
@ -38,4 +39,3 @@ targets = ["x86_64-unknown-linux-gnu"]
[dev-dependencies]
solana-local-cluster = { workspace = true }
solana-thin-client = { workspace = true }

View File

@ -46,12 +46,15 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_client::{
connection_cache::ConnectionCache, tpu_client::TpuClientWrapper,
tpu_connection::TpuConnection,
},
solana_core::repair::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair},
solana_dos::cli::*,
solana_gossip::{
contact_info::Protocol,
gossip_service::{discover, get_multi_client},
gossip_service::{discover, get_client},
legacy_contact_info::LegacyContactInfo as ContactInfo,
},
solana_measure::measure::Measure,
@ -791,33 +794,30 @@ fn main() {
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
};
let (client, num_clients) = get_multi_client(
&validators,
&SocketAddrSpace::Unspecified,
Arc::new(connection_cache),
);
if validators.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
validators.len()
);
exit(1);
}
(gossip_nodes, Some(Arc::new(client)))
let client = get_client(&validators, Arc::new(connection_cache));
(gossip_nodes, Some(client))
} else {
(vec![], None)
};
info!("done found {} nodes", nodes.len());
run_dos(&nodes, 0, client, cmd_params);
if let Some(tpu_client) = client {
match tpu_client {
TpuClientWrapper::Quic(quic_client) => {
run_dos(&nodes, 0, Some(Arc::new(quic_client)), cmd_params);
}
TpuClientWrapper::Udp(udp_client) => {
run_dos(&nodes, 0, Some(Arc::new(udp_client)), cmd_params);
}
};
}
}
#[cfg(test)]
pub mod test {
use {
super::*,
solana_client::thin_client::ThinClient,
solana_client::tpu_client::TpuClient,
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_gossip::contact_info::LegacyContactInfo,
@ -826,8 +826,10 @@ pub mod test {
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc::rpc::JsonRpcConfig,
solana_sdk::timing::timestamp,
solana_tpu_client::tpu_client::TpuClientConfig,
};
const TEST_SEND_BATCH_SIZE: usize = 1;
@ -835,7 +837,32 @@ pub mod test {
// thin wrapper for the run_dos function
// to avoid specifying everywhere generic parameters
fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) {
run_dos::<ThinClient>(nodes, iterations, None, params);
run_dos::<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>>(
nodes, iterations, None, params,
);
}
fn build_tpu_quic_client(
cluster: &LocalCluster,
) -> Arc<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>> {
let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap());
let ConnectionCache::Quic(cache) = &*cluster.connection_cache else {
panic!("Expected a Quic ConnectionCache.");
};
Arc::new(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
)
}
#[test]
@ -975,14 +1002,7 @@ pub mod test {
.unwrap();
let nodes_slice = [node];
let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(),
cluster
.entry_point_info
.tpu(cluster.connection_cache.protocol())
.unwrap(),
cluster.connection_cache.clone(),
));
let client = build_tpu_quic_client(&cluster);
// creates one transaction with 8 valid signatures and sends it 10 times
run_dos(
@ -1114,14 +1134,7 @@ pub mod test {
.unwrap();
let nodes_slice = [node];
let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(),
cluster
.entry_point_info
.tpu(cluster.connection_cache.protocol())
.unwrap(),
cluster.connection_cache.clone(),
));
let client = build_tpu_quic_client(&cluster);
// creates one transaction and sends it 10 times
// this is done in single thread

View File

@ -31,6 +31,7 @@ serde_derive = { workspace = true }
solana-bloom = { workspace = true }
solana-clap-utils = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-entry = { workspace = true }
solana-frozen-abi = { workspace = true }
solana-frozen-abi-macro = { workspace = true }
@ -44,7 +45,6 @@ solana-rayon-threadlimit = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-thin-client = { workspace = true }
solana-tpu-client = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }

View File

@ -4,7 +4,11 @@ use {
crate::{cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo},
crossbeam_channel::{unbounded, Sender},
rand::{thread_rng, Rng},
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_client::{
connection_cache::ConnectionCache,
rpc_client::RpcClient,
tpu_client::{TpuClient, TpuClientConfig, TpuClientWrapper},
},
solana_perf::recycler::Recycler,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
@ -197,35 +201,37 @@ pub fn discover(
#[deprecated(since = "1.18.6", note = "Interface will change")]
pub fn get_client(
nodes: &[ContactInfo],
socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>,
) -> ThinClient {
let protocol = connection_cache.protocol();
let nodes: Vec<_> = nodes
.iter()
.filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space))
.collect();
) -> TpuClientWrapper {
let select = thread_rng().gen_range(0..nodes.len());
let (rpc, tpu) = nodes[select];
ThinClient::new(rpc, tpu, connection_cache)
}
#[deprecated(since = "1.18.6", note = "Will be removed in favor of get_client")]
pub fn get_multi_client(
nodes: &[ContactInfo],
socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>,
) -> (ThinClient, usize) {
let protocol = connection_cache.protocol();
let (rpc_addrs, tpu_addrs): (Vec<_>, Vec<_>) = nodes
.iter()
.filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space))
.unzip();
let num_nodes = tpu_addrs.len();
(
ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache),
num_nodes,
)
let rpc_pubsub_url = format!("ws://{}/", nodes[select].rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", nodes[select].rpc().unwrap());
match &*connection_cache {
ConnectionCache::Quic(cache) => TpuClientWrapper::Quic(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
),
ConnectionCache::Udp(cache) => TpuClientWrapper::Udp(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Udp Cache {err:?}");
}),
),
}
}
fn spy(

View File

@ -229,21 +229,6 @@ impl LegacyContactInfo {
pub fn is_valid_address(addr: &SocketAddr, socket_addr_space: &SocketAddrSpace) -> bool {
addr.port() != 0u16 && Self::is_valid_ip(addr.ip()) && socket_addr_space.check(addr)
}
pub(crate) fn valid_client_facing_addr(
&self,
protocol: Protocol,
socket_addr_space: &SocketAddrSpace,
) -> Option<(SocketAddr, SocketAddr)> {
Some((
self.rpc()
.ok()
.filter(|addr| socket_addr_space.check(addr))?,
self.tpu(protocol)
.ok()
.filter(|addr| socket_addr_space.check(addr))?,
))
}
}
impl TryFrom<&ContactInfo> for LegacyContactInfo {
@ -342,24 +327,6 @@ mod tests {
assert!(ci.serve_repair.ip().is_unspecified());
}
#[test]
fn test_valid_client_facing() {
let mut ci = LegacyContactInfo::default();
assert_eq!(
ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified),
None
);
ci.tpu = socketaddr!(Ipv4Addr::LOCALHOST, 123);
assert_eq!(
ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified),
None
);
ci.rpc = socketaddr!(Ipv4Addr::LOCALHOST, 234);
assert!(ci
.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified)
.is_some());
}
#[test]
fn test_sanitize() {
let mut ci = LegacyContactInfo::default();

View File

@ -5105,6 +5105,7 @@ dependencies = [
"solana-bloom",
"solana-clap-utils",
"solana-client",
"solana-connection-cache",
"solana-entry",
"solana-frozen-abi",
"solana-frozen-abi-macro",
@ -5118,7 +5119,6 @@ dependencies = [
"solana-runtime",
"solana-sdk",
"solana-streamer",
"solana-thin-client",
"solana-tpu-client",
"solana-version",
"solana-vote",