Refactor connection cache to support generic msgs (#29774)

tpu-client/tpu_connection_cache is refactored out the module and moved to connection-cache/connection_cache and the logic in client/connection_cache is consolidated to connection-cache/connection_cache as well. client/connection_cache only has a thin wrapper which forward calls to connection-cache/connection_cache and deal with constructions of quic/udp connection cache for clients using them both.2.

The TpuConnection is refactored to ClientConnection to make it generic and functions renamed to be proper for other workflows. eg. tpu_addr -> server_addr, send_transaction --> send_data and etc...

The enum dispatch is removed so that we can make the bulk of code of quic and udp agnostic of each other. The client is possible to load quic or udp only into its runtime.

The generic type parameter in the tpu-client/tpu_connection_cache is removed in order to create both quic and udp connection cache and use the object to send transactions with multiple branching when sending data. The generic type parameters and associated types are dropped in other types in order to make the trait "object safe" for this purpose.

I have annotated the code explaining the reasoning and the refactoring source -> destination.

There is no functional changes

bench-tps has been performed for rpc-client, thin-client and tpu-client. And it is found the performance number largely match the ones before the refactoring.
This commit is contained in:
Lijun Wang 2023-02-01 18:10:06 -08:00 committed by GitHub
parent 1cab61d3b7
commit ada6136a6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 910 additions and 1484 deletions

27
Cargo.lock generated
View File

@ -5181,6 +5181,7 @@ dependencies = [
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
"solana-connection-cache",
"solana-logger 1.16.0",
"solana-measure",
"solana-metrics",
@ -5248,6 +5249,28 @@ dependencies = [
"solana-sdk 1.16.0",
]
[[package]]
name = "solana-connection-cache"
version = "1.16.0"
dependencies = [
"async-trait",
"bincode",
"futures-util",
"indexmap",
"indicatif",
"log",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
"solana-logger 1.16.0",
"solana-measure",
"solana-metrics",
"solana-net-utils",
"solana-sdk 1.16.0",
"thiserror",
"tokio",
]
[[package]]
name = "solana-core"
version = "1.16.0"
@ -6190,6 +6213,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustls 0.20.6",
"solana-connection-cache",
"solana-logger 1.16.0",
"solana-measure",
"solana-metrics",
@ -6765,6 +6789,7 @@ dependencies = [
"bincode",
"log",
"rayon",
"solana-connection-cache",
"solana-logger 1.16.0",
"solana-rpc-client",
"solana-rpc-client-api",
@ -6817,6 +6842,7 @@ dependencies = [
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
"solana-connection-cache",
"solana-logger 1.16.0",
"solana-measure",
"solana-metrics",
@ -6885,6 +6911,7 @@ name = "solana-udp-client"
version = "1.16.0"
dependencies = [
"async-trait",
"solana-connection-cache",
"solana-net-utils",
"solana-sdk 1.16.0",
"solana-streamer",

View File

@ -18,6 +18,7 @@ members = [
"cli-output",
"client",
"client-test",
"connection-cache",
"core",
"dos",
"download-utils",

View File

@ -32,7 +32,7 @@ use {
transaction::Transaction,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
std::{
sync::{atomic::Ordering, Arc, RwLock},
thread::sleep,

View File

@ -8,9 +8,7 @@ use {
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
},
solana_tpu_client::tpu_connection_cache::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC,
},
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
std::{
net::{Ipv4Addr, SocketAddr},
process::exit,

View File

@ -31,7 +31,7 @@ use {
stake::{instruction::LockupArgs, state::Lockup},
transaction::{TransactionError, VersionedTransaction},
},
solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP,
solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
solana_vote_program::vote_state::VoteAuthorize,
std::{collections::HashMap, error, io::stdout, str::FromStr, sync::Arc, time::Duration},
thiserror::Error,

View File

@ -17,7 +17,7 @@ use {
},
solana_remote_wallet::remote_wallet::RemoteWalletManager,
solana_rpc_client_api::config::RpcSendTransactionConfig,
solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP,
solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
std::{collections::HashMap, error, path::PathBuf, sync::Arc, time::Duration},
};

View File

@ -21,6 +21,8 @@ log = "0.4.17"
quinn = "0.9.3"
rand = "0.7.0"
rayon = "1.5.3"
solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" }
solana-measure = { path = "../measure", version = "=1.16.0" }
solana-metrics = { path = "../metrics", version = "=1.16.0" }
solana-net-utils = { path = "../net-utils", version = "=1.16.0" }

View File

@ -1,475 +1,136 @@
pub use solana_tpu_client::tpu_connection_cache::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
};
use {
crate::{
nonblocking::tpu_connection::NonblockingConnection, tpu_connection::BlockingConnection,
},
indexmap::map::{Entry, IndexMap},
quinn::Endpoint,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_quic_client::nonblocking::quic_client::{
QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint,
},
solana_sdk::{
pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, timing::AtomicInterval,
},
solana_streamer::{
nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType},
streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate,
},
solana_tpu_client::{
connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL},
tpu_connection_cache::MAX_CONNECTIONS,
solana_connection_cache::{
client_connection::ClientConnection as BlockingClientConnection,
connection_cache::{
ConnectionCache as BackendConnectionCache, NewConnectionConfig, ProtocolType,
},
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
solana_quic_client::{QuicConfig, QuicConnectionManager},
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_streamer::streamer::StakedNodes,
solana_udp_client::UdpConnectionManager,
std::{
error::Error,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc, RwLock},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, RwLock},
},
};
pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
pub const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true;
pub const MAX_CONNECTIONS: usize = 1024;
/// A thin wrapper over connection-cache/ConnectionCache to ease
/// construction of the ConnectionCache for code dealing both with udp and quic.
/// For the scenario only using udp or quic, use connection-cache/ConnectionCache directly.
pub struct ConnectionCache {
map: RwLock<IndexMap<SocketAddr, ConnectionPool>>,
stats: Arc<ConnectionCacheStats>,
last_stats: AtomicInterval,
connection_pool_size: usize,
tpu_udp_socket: Arc<UdpSocket>,
client_certificate: Arc<QuicClientCertificate>,
use_quic: bool,
maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
maybe_client_pubkey: Option<Pubkey>,
// The optional specified endpoint for the quic based client connections
// If not specified, the connection cache we create as needed.
client_endpoint: Option<Endpoint>,
}
/// Models the pool of connections
struct ConnectionPool {
/// The connections in the pool
connections: Vec<Arc<BaseTpuConnection>>,
/// Connections in this pool share the same endpoint
endpoint: Option<Arc<QuicLazyInitializedEndpoint>>,
}
impl ConnectionPool {
/// Get a connection from the pool. It must have at least one connection in the pool.
/// This randomly picks a connection in the pool.
fn borrow_connection(&self) -> Arc<BaseTpuConnection> {
let mut rng = thread_rng();
let n = rng.gen_range(0, self.connections.len());
self.connections[n].clone()
}
/// Check if we need to create a new connection. If the count of the connections
/// is smaller than the pool size.
fn need_new_connection(&self, required_pool_size: usize) -> bool {
self.connections.len() < required_pool_size
}
cache: BackendConnectionCache,
}
impl ConnectionCache {
/// Create a quic connection_cache
pub fn new(connection_pool_size: usize) -> Self {
Self::_new_with_endpoint(connection_pool_size, None)
Self::new_with_client_options(connection_pool_size, None, None, None)
}
/// Create a connection cache with a specific quic client endpoint.
pub fn new_with_endpoint(connection_pool_size: usize, client_endpoint: Endpoint) -> Self {
Self::_new_with_endpoint(connection_pool_size, Some(client_endpoint))
}
fn _new_with_endpoint(connection_pool_size: usize, client_endpoint: Option<Endpoint>) -> Self {
/// Create a quic conneciton_cache with more client options
pub fn new_with_client_options(
connection_pool_size: usize,
client_endpoint: Option<Endpoint>,
cert_info: Option<(&Keypair, IpAddr)>,
stake_info: Option<(&Arc<RwLock<StakedNodes>>, &Pubkey)>,
) -> Self {
// The minimum pool size is 1.
let connection_pool_size = 1.max(connection_pool_size);
Self {
use_quic: true,
connection_pool_size,
client_endpoint,
..Self::default()
let mut config = QuicConfig::new().unwrap();
if let Some(client_endpoint) = client_endpoint {
config.update_client_endpoint(client_endpoint);
}
if let Some(cert_info) = cert_info {
config
.update_client_certificate(cert_info.0, cert_info.1)
.unwrap();
}
if let Some(stake_info) = stake_info {
config.set_staked_nodes(stake_info.0, stake_info.1);
}
let connection_manager =
Box::new(QuicConnectionManager::new_with_connection_config(config));
let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap();
Self { cache }
}
#[deprecated(
since = "1.15.0",
note = "This method does not do anything. Please use `new_with_client_options` instead to set the client certificate."
)]
pub fn update_client_certificate(
&mut self,
keypair: &Keypair,
ipaddr: IpAddr,
_keypair: &Keypair,
_ipaddr: IpAddr,
) -> Result<(), Box<dyn Error>> {
let (cert, priv_key) = new_self_signed_tls_certificate(keypair, ipaddr)?;
self.client_certificate = Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
});
Ok(())
}
#[deprecated(
since = "1.15.0",
note = "This method does not do anything. Please use `new_with_client_options` instead to set staked nodes information."
)]
pub fn set_staked_nodes(
&mut self,
staked_nodes: &Arc<RwLock<StakedNodes>>,
client_pubkey: &Pubkey,
_staked_nodes: &Arc<RwLock<StakedNodes>>,
_client_pubkey: &Pubkey,
) {
self.maybe_staked_nodes = Some(staked_nodes.clone());
self.maybe_client_pubkey = Some(*client_pubkey);
}
pub fn with_udp(connection_pool_size: usize) -> Self {
// The minimum pool size is 1.
let connection_pool_size = 1.max(connection_pool_size);
Self {
use_quic: false,
connection_pool_size,
..Self::default()
}
let connection_manager = Box::<UdpConnectionManager>::default();
let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap();
Self { cache }
}
pub fn use_quic(&self) -> bool {
self.use_quic
matches!(self.cache.get_protocol_type(), ProtocolType::QUIC)
}
fn create_endpoint(&self, force_use_udp: bool) -> Option<Arc<QuicLazyInitializedEndpoint>> {
if self.use_quic() && !force_use_udp {
Some(Arc::new(QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
self.client_endpoint.as_ref().cloned(),
)))
} else {
None
}
pub fn get_connection(&self, addr: &SocketAddr) -> Arc<dyn BlockingClientConnection> {
self.cache.get_connection(addr)
}
fn compute_max_parallel_streams(&self) -> usize {
let (client_type, stake, total_stake) =
self.maybe_client_pubkey
.map_or((ConnectionPeerType::Unstaked, 0, 0), |pubkey| {
self.maybe_staked_nodes.as_ref().map_or(
(ConnectionPeerType::Unstaked, 0, 0),
|stakes| {
let rstakes = stakes.read().unwrap();
rstakes.pubkey_stake_map.get(&pubkey).map_or(
(ConnectionPeerType::Unstaked, 0, rstakes.total_stake),
|stake| (ConnectionPeerType::Staked, *stake, rstakes.total_stake),
)
},
)
});
compute_max_allowed_uni_streams(client_type, stake, total_stake)
}
/// Create a lazy connection object under the exclusive lock of the cache map if there is not
/// enough used connections in the connection pool for the specified address.
/// Returns CreateConnectionResult.
fn create_connection(
&self,
lock_timing_ms: &mut u64,
addr: &SocketAddr,
force_use_udp: bool,
) -> CreateConnectionResult {
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
let mut map = self.map.write().unwrap();
get_connection_map_lock_measure.stop();
*lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
// Read again, as it is possible that between read lock dropped and the write lock acquired
// another thread could have setup the connection.
let (to_create_connection, endpoint) =
map.get(addr)
.map_or((true, self.create_endpoint(force_use_udp)), |pool| {
(
pool.need_new_connection(self.connection_pool_size),
pool.endpoint.clone(),
)
});
let (cache_hit, num_evictions, eviction_timing_ms) = if to_create_connection {
let connection = if !self.use_quic() || force_use_udp {
BaseTpuConnection::Udp(self.tpu_udp_socket.clone())
} else {
BaseTpuConnection::Quic(Arc::new(QuicClient::new(
endpoint.as_ref().unwrap().clone(),
*addr,
self.compute_max_parallel_streams(),
)))
};
let connection = Arc::new(connection);
// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS);
map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
match map.entry(*addr) {
Entry::Occupied(mut entry) => {
let pool = entry.get_mut();
pool.connections.push(connection);
}
Entry::Vacant(entry) => {
entry.insert(ConnectionPool {
connections: vec![connection],
endpoint,
});
}
}
(
false,
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
} else {
(true, 0, 0)
};
let pool = map.get(addr).unwrap();
let connection = pool.borrow_connection();
CreateConnectionResult {
connection,
cache_hit,
connection_cache_stats: self.stats.clone(),
num_evictions,
eviction_timing_ms,
}
}
fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult {
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
let map = self.map.read().unwrap();
get_connection_map_lock_measure.stop();
let port_offset = if self.use_quic() { QUIC_PORT_OFFSET } else { 0 };
let port = addr
.port()
.checked_add(port_offset)
.unwrap_or_else(|| addr.port());
let force_use_udp = port == addr.port();
let addr = SocketAddr::new(addr.ip(), port);
let mut lock_timing_ms = get_connection_map_lock_measure.as_ms();
let report_stats = self
.last_stats
.should_update(CONNECTION_STAT_SUBMISSION_INTERVAL);
let mut get_connection_map_measure = Measure::start("get_connection_hit_measure");
let CreateConnectionResult {
connection,
cache_hit,
connection_cache_stats,
num_evictions,
eviction_timing_ms,
} = match map.get(&addr) {
Some(pool) => {
if pool.need_new_connection(self.connection_pool_size) {
// create more connection and put it in the pool
drop(map);
self.create_connection(&mut lock_timing_ms, &addr, force_use_udp)
} else {
let connection = pool.borrow_connection();
CreateConnectionResult {
connection,
cache_hit: true,
connection_cache_stats: self.stats.clone(),
num_evictions: 0,
eviction_timing_ms: 0,
}
}
}
None => {
// Upgrade to write access by dropping read lock and acquire write lock
drop(map);
self.create_connection(&mut lock_timing_ms, &addr, force_use_udp)
}
};
get_connection_map_measure.stop();
GetConnectionResult {
connection,
cache_hit,
report_stats,
map_timing_ms: get_connection_map_measure.as_ms(),
lock_timing_ms,
connection_cache_stats,
num_evictions,
eviction_timing_ms,
}
}
fn get_connection_and_log_stats(
pub fn get_nonblocking_connection(
&self,
addr: &SocketAddr,
) -> (Arc<BaseTpuConnection>, Arc<ConnectionCacheStats>) {
let mut get_connection_measure = Measure::start("get_connection_measure");
let GetConnectionResult {
connection,
cache_hit,
report_stats,
map_timing_ms,
lock_timing_ms,
connection_cache_stats,
num_evictions,
eviction_timing_ms,
} = self.get_or_add_connection(addr);
if report_stats {
connection_cache_stats.report();
}
if cache_hit {
connection_cache_stats
.cache_hits
.fetch_add(1, Ordering::Relaxed);
connection_cache_stats
.get_connection_hit_ms
.fetch_add(map_timing_ms, Ordering::Relaxed);
} else {
connection_cache_stats
.cache_misses
.fetch_add(1, Ordering::Relaxed);
connection_cache_stats
.get_connection_miss_ms
.fetch_add(map_timing_ms, Ordering::Relaxed);
connection_cache_stats
.cache_evictions
.fetch_add(num_evictions, Ordering::Relaxed);
connection_cache_stats
.eviction_time_ms
.fetch_add(eviction_timing_ms, Ordering::Relaxed);
}
get_connection_measure.stop();
connection_cache_stats
.get_connection_lock_ms
.fetch_add(lock_timing_ms, Ordering::Relaxed);
connection_cache_stats
.get_connection_ms
.fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);
(connection, connection_cache_stats)
}
pub fn get_connection(&self, addr: &SocketAddr) -> BlockingConnection {
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
connection.new_blocking_connection(*addr, connection_cache_stats)
}
pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingConnection {
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
connection.new_nonblocking_connection(*addr, connection_cache_stats)
) -> Arc<dyn NonblockingClientConnection> {
self.cache.get_nonblocking_connection(addr)
}
}
impl Default for ConnectionCache {
fn default() -> Self {
let (cert, priv_key) =
new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.expect("Failed to initialize QUIC client certificates");
Self {
map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
stats: Arc::new(ConnectionCacheStats::default()),
last_stats: AtomicInterval::default(),
connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_udp_socket: Arc::new(
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.expect("Unable to bind to UDP socket"),
),
client_certificate: Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
}),
use_quic: DEFAULT_TPU_USE_QUIC,
maybe_staked_nodes: None,
maybe_client_pubkey: None,
client_endpoint: None,
if DEFAULT_CONNECTION_CACHE_USE_QUIC {
let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
ConnectionCache::new_with_client_options(
DEFAULT_CONNECTION_POOL_SIZE,
None,
Some(cert_info),
None,
)
} else {
ConnectionCache::with_udp(DEFAULT_CONNECTION_POOL_SIZE)
}
}
}
enum BaseTpuConnection {
Udp(Arc<UdpSocket>),
Quic(Arc<QuicClient>),
}
impl BaseTpuConnection {
fn new_blocking_connection(
&self,
addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> BlockingConnection {
use crate::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection};
match self {
BaseTpuConnection::Udp(udp_socket) => {
UdpTpuConnection::new_from_addr(udp_socket.clone(), addr).into()
}
BaseTpuConnection::Quic(quic_client) => {
QuicTpuConnection::new_with_client(quic_client.clone(), stats).into()
}
}
}
fn new_nonblocking_connection(
&self,
addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> NonblockingConnection {
use crate::nonblocking::{quic_client::QuicTpuConnection, udp_client::UdpTpuConnection};
match self {
BaseTpuConnection::Udp(udp_socket) => {
UdpTpuConnection::new_from_addr(udp_socket.try_clone().unwrap(), addr).into()
}
BaseTpuConnection::Quic(quic_client) => {
QuicTpuConnection::new_with_client(quic_client.clone(), stats).into()
}
}
}
}
struct GetConnectionResult {
connection: Arc<BaseTpuConnection>,
cache_hit: bool,
report_stats: bool,
map_timing_ms: u64,
lock_timing_ms: u64,
connection_cache_stats: Arc<ConnectionCacheStats>,
num_evictions: u64,
eviction_timing_ms: u64,
}
struct CreateConnectionResult {
connection: Arc<BaseTpuConnection>,
cache_hit: bool,
connection_cache_stats: Arc<ConnectionCacheStats>,
num_evictions: u64,
eviction_timing_ms: u64,
}
#[cfg(test)]
mod tests {
use {
crate::{
connection_cache::{ConnectionCache, MAX_CONNECTIONS},
tpu_connection::TpuConnection,
},
crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded,
rand::{Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_sdk::{
pubkey::Pubkey,
quic::{
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_PORT_OFFSET, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
},
signature::Keypair,
},
solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats,
streamer::StakedNodes,
@ -499,145 +160,6 @@ mod tests {
)
}
fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
let a = rng.gen_range(1, 255);
let b = rng.gen_range(1, 255);
let c = rng.gen_range(1, 255);
let d = rng.gen_range(1, 255);
let addr_str = format!("{a}.{b}.{c}.{d}:80");
addr_str.parse().expect("Invalid address")
}
#[test]
fn test_connection_cache() {
solana_logger::setup();
// Allow the test to run deterministically
// with the same pseudorandom sequence between runs
// and on different platforms - the cryptographic security
// property isn't important here but ChaChaRng provides a way
// to get the same pseudorandom sequence on different platforms
let mut rng = ChaChaRng::seed_from_u64(42);
// Generate a bunch of random addresses and create TPUConnections to them
// Since TPUConnection::new is infallible, it should't matter whether or not
// we can actually connect to those addresses - TPUConnection implementations should either
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let connection_cache = ConnectionCache::default();
let port_offset = if connection_cache.use_quic() {
QUIC_PORT_OFFSET
} else {
0
};
let addrs = (0..MAX_CONNECTIONS)
.map(|_| {
let addr = get_addr(&mut rng);
connection_cache.get_connection(&addr);
addr
})
.collect::<Vec<_>>();
{
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
addrs.iter().for_each(|a| {
let port = a
.port()
.checked_add(port_offset)
.unwrap_or_else(|| a.port());
let addr = &SocketAddr::new(a.ip(), port);
let conn = &map.get(addr).expect("Address not found").connections[0];
let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
assert!(addr.ip() == conn.tpu_addr().ip());
});
}
let addr = &get_addr(&mut rng);
connection_cache.get_connection(addr);
let port = addr
.port()
.checked_add(port_offset)
.unwrap_or_else(|| addr.port());
let addr_with_quic_port = SocketAddr::new(addr.ip(), port);
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
let _conn = map.get(&addr_with_quic_port).expect("Address not found");
}
#[test]
fn test_connection_cache_max_parallel_chunks() {
solana_logger::setup();
let mut connection_cache = ConnectionCache::default();
assert_eq!(
connection_cache.compute_max_parallel_streams(),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let pubkey = Pubkey::new_unique();
connection_cache.set_staked_nodes(&staked_nodes, &pubkey);
assert_eq!(
connection_cache.compute_max_parallel_streams(),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
staked_nodes.write().unwrap().total_stake = 10000;
assert_eq!(
connection_cache.compute_max_parallel_streams(),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
staked_nodes
.write()
.unwrap()
.pubkey_stake_map
.insert(pubkey, 1);
let delta =
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
assert_eq!(
connection_cache.compute_max_parallel_streams(),
(QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize
);
staked_nodes
.write()
.unwrap()
.pubkey_stake_map
.remove(&pubkey);
staked_nodes
.write()
.unwrap()
.pubkey_stake_map
.insert(pubkey, 1000);
assert_ne!(
connection_cache.compute_max_parallel_streams(),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
}
// Test that we can get_connection with a connection cache configured for quic
// on an address with a port that, if QUIC_PORT_OFFSET were added to it, it would overflow to
// an invalid port.
#[test]
fn test_overflow_address() {
let port = u16::MAX - QUIC_PORT_OFFSET + 1;
assert!(port.checked_add(QUIC_PORT_OFFSET).is_none());
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let connection_cache = ConnectionCache::new(1);
let conn = connection_cache.get_connection(&addr);
// We (intentionally) don't have an interface that allows us to distinguish between
// UDP and Quic connections, so check instead that the port is valid (non-zero)
// and is the same as the input port (falling back on UDP)
assert!(conn.tpu_addr().port() != 0);
assert!(conn.tpu_addr().port() == port);
}
#[test]
fn test_connection_with_specified_client_endpoint() {
let port = u16::MAX - QUIC_PORT_OFFSET + 1;
@ -670,19 +192,20 @@ mod tests {
)
.unwrap();
let connection_cache = ConnectionCache::new_with_endpoint(1, response_recv_endpoint);
let connection_cache =
ConnectionCache::new_with_client_options(1, Some(response_recv_endpoint), None, None);
// server port 1:
let port1 = 9001;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1);
let conn = connection_cache.get_connection(&addr);
assert_eq!(conn.tpu_addr().port(), port1 + QUIC_PORT_OFFSET);
assert_eq!(conn.server_addr().port(), port1 + QUIC_PORT_OFFSET);
// server port 2:
let port2 = 9002;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2);
let conn = connection_cache.get_connection(&addr);
assert_eq!(conn.tpu_addr().port(), port2 + QUIC_PORT_OFFSET);
assert_eq!(conn.server_addr().port(), port2 + QUIC_PORT_OFFSET);
response_recv_exit.store(true, Ordering::Relaxed);
response_recv_thread.join().unwrap();

View File

@ -9,7 +9,6 @@ pub mod tpu_connection;
pub mod transaction_executor;
pub mod udp_client;
#[macro_use]
extern crate solana_metrics;
pub use solana_rpc_client::mock_sender_for_cli;

View File

@ -1,59 +1,8 @@
//! Simple nonblocking client that connects to a given UDP port with the QUIC protocol
//! and provides an interface for sending transactions which is restricted by the
//! server's flow control.
#[deprecated(
since = "1.15.0",
note = "Please use `solana_quic_client::nonblocking::quic_client::QuicClientConnection` instead."
)]
pub use solana_quic_client::nonblocking::quic_client::QuicClientConnection as QuicTpuConnection;
pub use solana_quic_client::nonblocking::quic_client::{
QuicClient, QuicClientCertificate, QuicError, QuicLazyInitializedEndpoint, QuicTpuConnection,
QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint,
};
use {
crate::nonblocking::tpu_connection::TpuConnection,
async_trait::async_trait,
log::*,
solana_sdk::transport::Result as TransportResult,
solana_tpu_client::tpu_connection::ClientStats,
std::{net::SocketAddr, sync::Arc},
};
#[async_trait]
impl TpuConnection for QuicTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
self.client.tpu_addr()
}
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
let stats = ClientStats::default();
let len = buffers.len();
let res = self
.client
.send_batch(buffers, &stats, self.connection_stats.clone())
.await;
self.connection_stats
.add_client_stats(&stats, len, res.is_ok());
res?;
Ok(())
}
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
let stats = Arc::new(ClientStats::default());
let send_buffer =
self.client
.send_buffer(wire_transaction, &stats, self.connection_stats.clone());
if let Err(e) = send_buffer.await {
warn!(
"Failed to send transaction async to {}, error: {:?} ",
self.tpu_addr(),
e
);
datapoint_warn!("send-wire-async", ("failure", 1, i64),);
self.connection_stats.add_client_stats(&stats, 1, false);
} else {
self.connection_stats.add_client_stats(&stats, 1, true);
}
Ok(())
}
}

View File

@ -2,7 +2,6 @@ pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSender
use {
crate::{
connection_cache::ConnectionCache,
nonblocking::tpu_connection::TpuConnection,
tpu_client::{TpuClientConfig, MAX_FANOUT_SLOTS},
},
bincode::serialize,
@ -46,7 +45,7 @@ async fn send_wire_transaction_to_addr(
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction(wire_transaction.clone()).await
conn.send_data(&wire_transaction).await
}
async fn send_wire_transaction_batch_to_addr(
@ -55,7 +54,7 @@ async fn send_wire_transaction_batch_to_addr(
wire_transactions: &[Vec<u8>],
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction_batch(wire_transactions).await
conn.send_data_batch(wire_transactions).await
}
impl TpuClient {

View File

@ -1,42 +1,5 @@
//! Trait defining async send functions, to be used for UDP or QUIC sending
use {
async_trait::async_trait,
enum_dispatch::enum_dispatch,
solana_quic_client::nonblocking::quic_client::QuicTpuConnection,
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
solana_udp_client::nonblocking::udp_client::UdpTpuConnection,
std::net::SocketAddr,
};
// Due to the existence of `crate::connection_cache::Connection`, if this is named
// `Connection`, enum_dispatch gets confused between the two and throws errors when
// trying to convert later.
#[enum_dispatch]
pub enum NonblockingConnection {
QuicTpuConnection,
UdpTpuConnection,
}
#[async_trait]
#[enum_dispatch(NonblockingConnection)]
pub trait TpuConnection {
fn tpu_addr(&self) -> &SocketAddr;
async fn serialize_and_send_transaction(
&self,
transaction: &VersionedTransaction,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(&wire_transaction).await
}
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
}
#[deprecated(
since = "1.15.0",
note = "Please use `solana_connection_cache::nonblocking::client_connection::ClientConnection` instead."
)]
pub use solana_connection_cache::nonblocking::client_connection::ClientConnection as TpuConnection;

View File

@ -1,35 +1,5 @@
//! Simple UDP client that communicates with the given UDP port with UDP and provides
//! an interface for sending transactions
pub use solana_udp_client::nonblocking::udp_client::UdpTpuConnection;
use {
crate::nonblocking::tpu_connection::TpuConnection, async_trait::async_trait,
core::iter::repeat, solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr,
};
#[async_trait]
impl TpuConnection for UdpTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
&self.addr
}
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
self.socket
.send_to(wire_transaction.as_ref(), self.addr)
.await?;
Ok(())
}
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect();
batch_send(&self.socket, &pkts).await?;
Ok(())
}
}
#[deprecated(
since = "1.15.0",
note = "Please use `solana_udp_client::nonblocking::udp_client::UdpClientConnection` instead."
)]
pub use solana_udp_client::nonblocking::udp_client::UdpClientConnection as UdpTpuConnection;

View File

@ -1,44 +1,5 @@
//! Simple client that connects to a given UDP port with the QUIC protocol and provides
//! an interface for sending transactions which is restricted by the server's flow control.
pub use solana_quic_client::quic_client::QuicTpuConnection;
use {
crate::{
nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection,
tpu_connection::TpuConnection,
},
solana_quic_client::quic_client::temporary_pub::*,
solana_sdk::transport::Result as TransportResult,
std::net::SocketAddr,
};
impl TpuConnection for QuicTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
self.inner.tpu_addr()
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?;
Ok(())
}
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();
let _handle = RUNTIME
.spawn(async move { send_wire_transaction_async(inner, wire_transaction).await });
Ok(())
}
fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();
let _handle =
RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await });
Ok(())
}
}
#[deprecated(
since = "1.15.0",
note = "Please use `solana_quic_client::quic_client::QuicClientConnection` instead."
)]
pub use solana_quic_client::quic_client::QuicClientConnection as QuicTpuConnection;

View File

@ -4,8 +4,9 @@
//! unstable and may change in future releases.
use {
crate::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
crate::connection_cache::ConnectionCache,
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response},
solana_sdk::{
@ -144,7 +145,7 @@ impl ThinClient {
let conn = self.connection_cache.get_connection(self.tpu_addr());
// Send the transaction if there has been no confirmation (e.g. the first time)
#[allow(clippy::needless_borrow)]
conn.send_wire_transaction(&wire_transaction)?;
conn.send_data(&wire_transaction)?;
}
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
@ -531,7 +532,9 @@ impl AsyncClient for ThinClient {
transaction: VersionedTransaction,
) -> TransportResult<Signature> {
let conn = self.connection_cache.get_connection(self.tpu_addr());
conn.serialize_and_send_transaction(&transaction)?;
let wire_transaction =
bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
conn.send_data(&wire_transaction)?;
Ok(transaction.signatures[0])
}
@ -540,7 +543,11 @@ impl AsyncClient for ThinClient {
batch: Vec<VersionedTransaction>,
) -> TransportResult<()> {
let conn = self.connection_cache.get_connection(self.tpu_addr());
conn.par_serialize_and_send_transaction_batch(&batch[..])?;
let buffers = batch
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
conn.send_data_batch(&buffers)?;
Ok(())
}
}

View File

@ -1,56 +1,6 @@
pub use solana_tpu_client::tpu_connection::ClientStats;
use {
enum_dispatch::enum_dispatch,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_quic_client::quic_client::QuicTpuConnection,
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
solana_udp_client::udp_client::UdpTpuConnection,
std::net::SocketAddr,
};
#[enum_dispatch]
pub enum BlockingConnection {
UdpTpuConnection,
QuicTpuConnection,
}
#[enum_dispatch(BlockingConnection)]
pub trait TpuConnection {
fn tpu_addr(&self) -> &SocketAddr;
fn serialize_and_send_transaction(
&self,
transaction: &VersionedTransaction,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(wire_transaction)
}
fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
self.send_wire_transaction_batch(&[wire_transaction])
}
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()>;
fn par_serialize_and_send_transaction_batch(
&self,
transactions: &[VersionedTransaction],
) -> TransportResult<()> {
let buffers = transactions
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
self.send_wire_transaction_batch(&buffers)
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()>;
}
#[deprecated(
since = "1.15.0",
note = "Please use `solana_connection_cache::client_connection::ClientConnection` instead."
)]
pub use solana_connection_cache::client_connection::ClientConnection as TpuConnection;
pub use solana_connection_cache::client_connection::ClientStats;

View File

@ -1,35 +1,5 @@
//! Simple TPU client that communicates with the given UDP port with UDP and provides
//! an interface for sending transactions
pub use solana_udp_client::udp_client::UdpTpuConnection;
use {
crate::tpu_connection::TpuConnection, core::iter::repeat,
solana_sdk::transport::Result as TransportResult, solana_streamer::sendmmsg::batch_send,
std::net::SocketAddr,
};
impl TpuConnection for UdpTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
&self.addr
}
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
self.socket.send_to(wire_transaction.as_ref(), self.addr)?;
Ok(())
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect();
batch_send(&self.socket, &pkts)?;
Ok(())
}
fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect();
batch_send(&self.socket, &pkts)?;
Ok(())
}
}
#[deprecated(
since = "1.15.0",
note = "Please use `solana_udp_client::udp_client::UdpClientConnection` instead."
)]
pub use solana_udp_client::udp_client::UdpClientConnection as UdpTpuConnection;

View File

@ -0,0 +1,30 @@
[package]
name = "solana-connection-cache"
version = "1.16.0"
description = "Solana Connection Cache"
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
documentation = "https://docs.rs/solana-connection-cache"
edition = "2021"
[dependencies]
async-trait = "0.1.57"
bincode = "1.3.3"
futures-util = "0.3.25"
indexmap = "1.9.1"
indicatif = { version = "0.17.1", optional = true }
log = "0.4.17"
rand = "0.7.0"
rayon = "1.5.3"
solana-measure = { path = "../measure", version = "=1.16.0" }
solana-metrics = { path = "../metrics", version = "=1.16.0" }
solana-net-utils = { path = "../net-utils", version = "=1.16.0" }
solana-sdk = { path = "../sdk", version = "=1.16.0" }
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
[dev-dependencies]
rand_chacha = "0.2.2"
solana-logger = { path = "../logger", version = "=1.16.0" }

View File

@ -0,0 +1,34 @@
use {
solana_metrics::MovingStat,
solana_sdk::transport::Result as TransportResult,
std::{net::SocketAddr, sync::atomic::AtomicU64},
};
#[derive(Default)]
pub struct ClientStats {
pub total_connections: AtomicU64,
pub connection_reuse: AtomicU64,
pub connection_errors: AtomicU64,
pub zero_rtt_accepts: AtomicU64,
pub zero_rtt_rejects: AtomicU64,
// these will be the last values of these stats
pub congestion_events: MovingStat,
pub streams_blocked_uni: MovingStat,
pub data_blocked: MovingStat,
pub acks: MovingStat,
pub make_connection_ms: AtomicU64,
pub send_timeout: AtomicU64,
}
pub trait ClientConnection: Sync + Send {
fn server_addr(&self) -> &SocketAddr;
fn send_data(&self, buffer: &[u8]) -> TransportResult<()>;
fn send_data_async(&self, buffer: Vec<u8>) -> TransportResult<()>;
fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()>;
fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()>;
}

View File

@ -1,14 +1,15 @@
use {
crate::{
client_connection::ClientConnection as BlockingClientConnection,
connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL},
nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection,
tpu_connection::TpuConnection as BlockingTpuConnection,
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
indexmap::map::IndexMap,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_sdk::timing::AtomicInterval,
std::{
any::Any,
net::SocketAddr,
sync::{atomic::Ordering, Arc, RwLock},
},
@ -18,38 +19,56 @@ use {
// Should be non-zero
pub static MAX_CONNECTIONS: usize = 1024;
/// Used to decide whether the TPU and underlying connection cache should use
/// QUIC connections.
pub const DEFAULT_TPU_USE_QUIC: bool = true;
/// Default connection pool size per remote address
pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
/// Default TPU connection pool size per remote address
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
/// Defines the protocol types of an implementation supports.
pub enum ProtocolType {
UDP,
QUIC,
}
pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
pub trait ConnectionManager: Sync + Send {
fn new_connection_pool(&self) -> Box<dyn ConnectionPool>;
fn new_connection_config(&self) -> Box<dyn NewConnectionConfig>;
fn get_port_offset(&self) -> u16;
fn get_protocol_type(&self) -> ProtocolType;
}
pub struct TpuConnectionCache<P: ConnectionPool> {
pub map: RwLock<IndexMap<SocketAddr, P>>,
pub struct ConnectionCache {
pub map: RwLock<IndexMap<SocketAddr, Box<dyn ConnectionPool>>>,
pub connection_manager: Box<dyn ConnectionManager>,
pub stats: Arc<ConnectionCacheStats>,
pub last_stats: AtomicInterval,
pub connection_pool_size: usize,
pub tpu_config: P::TpuConfig,
pub connection_config: Box<dyn NewConnectionConfig>,
}
impl<P: ConnectionPool> TpuConnectionCache<P> {
impl ConnectionCache {
pub fn new(
connection_manager: Box<dyn ConnectionManager>,
connection_pool_size: usize,
) -> Result<Self, <P::TpuConfig as NewTpuConfig>::ClientError> {
let config = P::TpuConfig::new()?;
Ok(Self::new_with_config(connection_pool_size, config))
) -> Result<Self, ClientError> {
let config = connection_manager.new_connection_config();
Ok(Self::new_with_config(
connection_pool_size,
config,
connection_manager,
))
}
pub fn new_with_config(connection_pool_size: usize, tpu_config: P::TpuConfig) -> Self {
pub fn new_with_config(
connection_pool_size: usize,
connection_config: Box<dyn NewConnectionConfig>,
connection_manager: Box<dyn ConnectionManager>,
) -> Self {
Self {
map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
stats: Arc::new(ConnectionCacheStats::default()),
connection_manager,
last_stats: AtomicInterval::default(),
connection_pool_size: 1.max(connection_pool_size), // The minimum pool size is 1.
tpu_config,
connection_config,
}
}
@ -60,7 +79,7 @@ impl<P: ConnectionPool> TpuConnectionCache<P> {
&self,
lock_timing_ms: &mut u64,
addr: &SocketAddr,
) -> CreateConnectionResult<P::PoolTpuConnection> {
) -> CreateConnectionResult {
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
let mut map = self.map.write().unwrap();
get_connection_map_lock_measure.stop();
@ -94,10 +113,13 @@ impl<P: ConnectionPool> TpuConnectionCache<P> {
map.entry(*addr)
.and_modify(|pool| {
pool.add_connection(&self.tpu_config, addr);
pool.add_connection(&*self.connection_config, addr);
})
.or_insert_with(|| P::new_with_connection(&self.tpu_config, addr));
.or_insert_with(|| {
let mut pool = self.connection_manager.new_connection_pool();
pool.add_connection(&*self.connection_config, addr);
pool
});
(
false,
num_evictions,
@ -119,15 +141,12 @@ impl<P: ConnectionPool> TpuConnectionCache<P> {
}
}
fn get_or_add_connection(
&self,
addr: &SocketAddr,
) -> GetConnectionResult<P::PoolTpuConnection> {
fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult {
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
let map = self.map.read().unwrap();
get_connection_map_lock_measure.stop();
let port_offset = P::PORT_OFFSET;
let port_offset = self.connection_manager.get_port_offset();
let port = addr
.port()
@ -188,7 +207,7 @@ impl<P: ConnectionPool> TpuConnectionCache<P> {
fn get_connection_and_log_stats(
&self,
addr: &SocketAddr,
) -> (Arc<P::PoolTpuConnection>, Arc<ConnectionCacheStats>) {
) -> (Arc<dyn BaseClientConnection>, Arc<ConnectionCacheStats>) {
let mut get_connection_measure = Measure::start("get_connection_measure");
let GetConnectionResult {
connection,
@ -238,10 +257,7 @@ impl<P: ConnectionPool> TpuConnectionCache<P> {
(connection, connection_cache_stats)
}
pub fn get_connection(
&self,
addr: &SocketAddr,
) -> <P::PoolTpuConnection as BaseTpuConnection>::BlockingConnectionType {
pub fn get_connection(&self, addr: &SocketAddr) -> Arc<dyn BlockingClientConnection> {
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
connection.new_blocking_connection(*addr, connection_cache_stats)
}
@ -249,10 +265,14 @@ impl<P: ConnectionPool> TpuConnectionCache<P> {
pub fn get_nonblocking_connection(
&self,
addr: &SocketAddr,
) -> <P::PoolTpuConnection as BaseTpuConnection>::NonblockingConnectionType {
) -> Arc<dyn NonblockingClientConnection> {
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
connection.new_nonblocking_connection(*addr, connection_cache_stats)
}
pub fn get_protocol_type(&self) -> ProtocolType {
self.connection_manager.get_protocol_type()
}
}
#[derive(Error, Debug)]
@ -261,33 +281,37 @@ pub enum ConnectionPoolError {
IndexOutOfRange,
}
pub trait NewTpuConfig {
type ClientError: std::fmt::Debug;
fn new() -> Result<Self, Self::ClientError>
where
Self: Sized;
#[derive(Error, Debug)]
pub enum ClientError {
#[error("Certificate error: {0}")]
CertificateError(String),
#[error("IO error: {0:?}")]
IoError(#[from] std::io::Error),
}
pub trait ConnectionPool {
type PoolTpuConnection: BaseTpuConnection;
type TpuConfig: NewTpuConfig;
const PORT_OFFSET: u16 = 0;
pub trait NewConnectionConfig: Sync + Send {
fn new() -> Result<Self, ClientError>
where
Self: Sized;
fn as_any(&self) -> &dyn Any;
/// Create a new connection pool based on protocol-specific configuration
fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self;
fn as_mut_any(&mut self) -> &mut dyn Any;
}
pub trait ConnectionPool: Sync + Send {
/// Add a connection to the pool
fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr);
fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr);
/// Get the number of current connections in the pool
fn num_connections(&self) -> usize;
/// Get a connection based on its index in the pool, without checking if the
fn get(&self, index: usize) -> Result<Arc<Self::PoolTpuConnection>, ConnectionPoolError>;
fn get(&self, index: usize) -> Result<Arc<dyn BaseClientConnection>, ConnectionPoolError>;
/// Get a connection from the pool. It must have at least one connection in the pool.
/// This randomly picks a connection in the pool.
fn borrow_connection(&self) -> Arc<Self::PoolTpuConnection> {
fn borrow_connection(&self) -> Arc<dyn BaseClientConnection> {
let mut rng = thread_rng();
let n = rng.gen_range(0, self.num_connections());
self.get(n).expect("index is within num_connections")
@ -300,30 +324,27 @@ pub trait ConnectionPool {
fn create_pool_entry(
&self,
config: &Self::TpuConfig,
config: &dyn NewConnectionConfig,
addr: &SocketAddr,
) -> Self::PoolTpuConnection;
) -> Arc<dyn BaseClientConnection>;
}
pub trait BaseTpuConnection {
type BlockingConnectionType: BlockingTpuConnection;
type NonblockingConnectionType: NonblockingTpuConnection;
pub trait BaseClientConnection: Sync + Send {
fn new_blocking_connection(
&self,
addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> Self::BlockingConnectionType;
) -> Arc<dyn BlockingClientConnection>;
fn new_nonblocking_connection(
&self,
addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> Self::NonblockingConnectionType;
) -> Arc<dyn NonblockingClientConnection>;
}
struct GetConnectionResult<T: BaseTpuConnection> {
connection: Arc<T>,
struct GetConnectionResult {
connection: Arc<dyn BaseClientConnection>,
cache_hit: bool,
report_stats: bool,
map_timing_ms: u64,
@ -333,8 +354,8 @@ struct GetConnectionResult<T: BaseTpuConnection> {
eviction_timing_ms: u64,
}
struct CreateConnectionResult<T: BaseTpuConnection> {
connection: Arc<T>,
struct CreateConnectionResult {
connection: Arc<dyn BaseClientConnection>,
cache_hit: bool,
connection_cache_stats: Arc<ConnectionCacheStats>,
num_evictions: u64,
@ -346,8 +367,8 @@ mod tests {
use {
super::*,
crate::{
nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection,
tpu_connection::TpuConnection as BlockingTpuConnection,
client_connection::ClientConnection as BlockingClientConnection,
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
async_trait::async_trait,
rand::{Rng, SeedableRng},
@ -362,23 +383,11 @@ mod tests {
const MOCK_PORT_OFFSET: u16 = 42;
pub struct MockUdpPool {
connections: Vec<Arc<MockUdp>>,
connections: Vec<Arc<dyn BaseClientConnection>>,
}
impl ConnectionPool for MockUdpPool {
type PoolTpuConnection = MockUdp;
type TpuConfig = MockUdpConfig;
const PORT_OFFSET: u16 = MOCK_PORT_OFFSET;
fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self {
let mut pool = Self {
connections: vec![],
};
pool.add_connection(config, addr);
pool
}
fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) {
let connection = Arc::new(self.create_pool_entry(config, addr));
fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) {
let connection = self.create_pool_entry(config, addr);
self.connections.push(connection);
}
@ -386,7 +395,7 @@ mod tests {
self.connections.len()
}
fn get(&self, index: usize) -> Result<Arc<Self::PoolTpuConnection>, ConnectionPoolError> {
fn get(&self, index: usize) -> Result<Arc<dyn BaseClientConnection>, ConnectionPoolError> {
self.connections
.get(index)
.cloned()
@ -395,21 +404,26 @@ mod tests {
fn create_pool_entry(
&self,
config: &Self::TpuConfig,
config: &dyn NewConnectionConfig,
_addr: &SocketAddr,
) -> Self::PoolTpuConnection {
MockUdp(config.tpu_udp_socket.clone())
) -> Arc<dyn BaseClientConnection> {
let config: &MockUdpConfig = match config.as_any().downcast_ref::<MockUdpConfig>() {
Some(b) => b,
None => panic!("Expecting a MockUdpConfig!"),
};
Arc::new(MockUdp(config.udp_socket.clone()))
}
}
pub struct MockUdpConfig {
tpu_udp_socket: Arc<UdpSocket>,
udp_socket: Arc<UdpSocket>,
}
impl Default for MockUdpConfig {
fn default() -> Self {
Self {
tpu_udp_socket: Arc::new(
udp_socket: Arc::new(
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.expect("Unable to bind to UDP socket"),
),
@ -417,85 +431,105 @@ mod tests {
}
}
impl NewTpuConfig for MockUdpConfig {
type ClientError = String;
fn new() -> Result<Self, String> {
impl NewConnectionConfig for MockUdpConfig {
fn new() -> Result<Self, ClientError> {
Ok(Self {
tpu_udp_socket: Arc::new(
udp_socket: Arc::new(
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.map_err(|_| "Unable to bind to UDP socket".to_string())?,
.map_err(Into::<ClientError>::into)?,
),
})
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
}
pub struct MockUdp(Arc<UdpSocket>);
impl BaseTpuConnection for MockUdp {
type BlockingConnectionType = MockUdpTpuConnection;
type NonblockingConnectionType = MockUdpTpuConnection;
impl BaseClientConnection for MockUdp {
fn new_blocking_connection(
&self,
addr: SocketAddr,
_stats: Arc<ConnectionCacheStats>,
) -> MockUdpTpuConnection {
MockUdpTpuConnection {
) -> Arc<dyn BlockingClientConnection> {
Arc::new(MockUdpConnection {
_socket: self.0.clone(),
addr,
}
})
}
fn new_nonblocking_connection(
&self,
addr: SocketAddr,
_stats: Arc<ConnectionCacheStats>,
) -> MockUdpTpuConnection {
MockUdpTpuConnection {
) -> Arc<dyn NonblockingClientConnection> {
Arc::new(MockUdpConnection {
_socket: self.0.clone(),
addr,
}
})
}
}
pub struct MockUdpTpuConnection {
pub struct MockUdpConnection {
_socket: Arc<UdpSocket>,
addr: SocketAddr,
}
impl BlockingTpuConnection for MockUdpTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
#[derive(Default)]
pub struct MockConnectionManager {}
impl ConnectionManager for MockConnectionManager {
fn new_connection_pool(&self) -> Box<dyn ConnectionPool> {
Box::new(MockUdpPool {
connections: Vec::default(),
})
}
fn new_connection_config(&self) -> Box<dyn NewConnectionConfig> {
Box::new(MockUdpConfig::new().unwrap())
}
fn get_port_offset(&self) -> u16 {
MOCK_PORT_OFFSET
}
fn get_protocol_type(&self) -> ProtocolType {
ProtocolType::UDP
}
}
impl BlockingClientConnection for MockUdpConnection {
fn server_addr(&self) -> &SocketAddr {
&self.addr
}
fn send_wire_transaction_async(&self, _wire_transaction: Vec<u8>) -> TransportResult<()> {
fn send_data(&self, _buffer: &[u8]) -> TransportResult<()> {
unimplemented!()
}
fn send_wire_transaction_batch<T>(&self, _buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
fn send_data_async(&self, _data: Vec<u8>) -> TransportResult<()> {
unimplemented!()
}
fn send_wire_transaction_batch_async(&self, _buffers: Vec<Vec<u8>>) -> TransportResult<()> {
fn send_data_batch(&self, _buffers: &[Vec<u8>]) -> TransportResult<()> {
unimplemented!()
}
fn send_data_batch_async(&self, _buffers: Vec<Vec<u8>>) -> TransportResult<()> {
unimplemented!()
}
}
#[async_trait]
impl NonblockingTpuConnection for MockUdpTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
impl NonblockingClientConnection for MockUdpConnection {
fn server_addr(&self) -> &SocketAddr {
&self.addr
}
async fn send_wire_transaction<T>(&self, _wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
async fn send_data(&self, _data: &[u8]) -> TransportResult<()> {
unimplemented!()
}
async fn send_wire_transaction_batch<T>(&self, _buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
async fn send_data_batch(&self, _buffers: &[Vec<u8>]) -> TransportResult<()> {
unimplemented!()
}
}
@ -512,7 +546,7 @@ mod tests {
}
#[test]
fn test_tpu_connection_cache() {
fn test_connection_cache() {
solana_logger::setup();
// Allow the test to run deterministically
// with the same pseudorandom sequence between runs
@ -521,13 +555,14 @@ mod tests {
// to get the same pseudorandom sequence on different platforms
let mut rng = ChaChaRng::seed_from_u64(42);
// Generate a bunch of random addresses and create TPUConnections to them
// Since TPUConnection::new is infallible, it should't matter whether or not
// we can actually connect to those addresses - TPUConnection implementations should either
// Generate a bunch of random addresses and create connections to them
// Since ClientConnection::new is infallible, it should't matter whether or not
// we can actually connect to those addresses - ClientConnection implementations should either
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let connection_manager = Box::<MockConnectionManager>::default();
let connection_cache =
TpuConnectionCache::<MockUdpPool>::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap();
ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap();
let port_offset = MOCK_PORT_OFFSET;
let addrs = (0..MAX_CONNECTIONS)
.map(|_| {
@ -546,9 +581,9 @@ mod tests {
.unwrap_or_else(|| a.port());
let addr = &SocketAddr::new(a.ip(), port);
let conn = &map.get(addr).expect("Address not found").connections[0];
let conn = &map.get(addr).expect("Address not found").get(0).unwrap();
let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
assert!(addr.ip() == BlockingTpuConnection::tpu_addr(&conn).ip());
assert!(addr.ip() == conn.server_addr().ip());
});
}
@ -573,13 +608,14 @@ mod tests {
let port = u16::MAX - MOCK_PORT_OFFSET + 1;
assert!(port.checked_add(MOCK_PORT_OFFSET).is_none());
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let connection_cache = TpuConnectionCache::<MockUdpPool>::new(1).unwrap();
let connection_manager = Box::<MockConnectionManager>::default();
let connection_cache = ConnectionCache::new(connection_manager, 1).unwrap();
let conn: MockUdpTpuConnection = connection_cache.get_connection(&addr);
let conn = connection_cache.get_connection(&addr);
// We (intentionally) don't have an interface that allows us to distinguish between
// UDP and Quic connections, so check instead that the port is valid (non-zero)
// and is the same as the input port (falling back on UDP)
assert!(BlockingTpuConnection::tpu_addr(&conn).port() != 0);
assert!(BlockingTpuConnection::tpu_addr(&conn).port() == port);
assert!(conn.server_addr().port() != 0);
assert!(conn.server_addr().port() == port);
}
}

View File

@ -1,5 +1,5 @@
use {
crate::tpu_connection::ClientStats,
crate::client_connection::ClientStats,
std::sync::atomic::{AtomicU64, Ordering},
};
@ -161,22 +161,16 @@ impl ConnectionCacheStats {
i64
),
(
"tx_streams_blocked_uni",
self.total_client_stats
.tx_streams_blocked_uni
.load_and_reset(),
"streams_blocked_uni",
self.total_client_stats.streams_blocked_uni.load_and_reset(),
i64
),
(
"tx_data_blocked",
self.total_client_stats.tx_data_blocked.load_and_reset(),
i64
),
(
"tx_acks",
self.total_client_stats.tx_acks.load_and_reset(),
"data_blocked",
self.total_client_stats.data_blocked.load_and_reset(),
i64
),
("acks", self.total_client_stats.acks.load_and_reset(), i64),
(
"num_packets",
self.sent_packets.swap(0, Ordering::Relaxed),

View File

@ -0,0 +1,9 @@
#![allow(clippy::integer_arithmetic)]
pub mod client_connection;
pub mod connection_cache;
pub mod connection_cache_stats;
pub mod nonblocking;
#[macro_use]
extern crate solana_metrics;

View File

@ -0,0 +1,15 @@
//! Trait defining async send functions, to be used for UDP or QUIC sending
use {
async_trait::async_trait, solana_sdk::transport::Result as TransportResult,
std::net::SocketAddr,
};
#[async_trait]
pub trait ClientConnection {
fn server_addr(&self) -> &SocketAddr;
async fn send_data(&self, buffer: &[u8]) -> TransportResult<()>;
async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()>;
}

View File

@ -0,0 +1 @@
pub mod client_connection;

View File

@ -7,7 +7,7 @@ use {
tracer_packet_stats::TracerPacketStats,
unprocessed_transaction_storage::UnprocessedTransactionStorage,
},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_perf::{data_budget::DataBudget, packet::Packet},
@ -197,7 +197,7 @@ impl Forwarder {
.forwarded_transaction_count
.fetch_add(packet_vec_len, Ordering::Relaxed);
let conn = connection_cache.get_connection(&addr);
conn.send_wire_transaction_batch_async(packet_vec)
conn.send_data_batch_async(packet_vec)
};
measure.stop();

View File

@ -13,7 +13,7 @@ use {
solana_streamer::streamer::{
self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats,
},
solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP,
solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
std::{
net::UdpSocket,
sync::{

View File

@ -759,11 +759,12 @@ impl Validator {
let connection_cache = match use_quic {
true => {
let mut connection_cache = ConnectionCache::new(tpu_connection_pool_size);
connection_cache
.update_client_certificate(&identity_keypair, node.info.gossip.ip())
.expect("Failed to update QUIC client certificates");
connection_cache.set_staked_nodes(&staked_nodes, &identity_keypair.pubkey());
let connection_cache = ConnectionCache::new_with_client_options(
tpu_connection_pool_size,
None,
Some((&identity_keypair, node.info.gossip.ip())),
Some((&staked_nodes, &identity_keypair.pubkey())),
);
Arc::new(connection_cache)
}
false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)),
@ -2089,7 +2090,7 @@ mod tests {
crossbeam_channel::{bounded, RecvTimeoutError},
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
solana_tpu_client::tpu_connection_cache::{
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
},
std::{fs::remove_dir_all, thread, time::Duration},

View File

@ -3,7 +3,7 @@
use {
rand::{thread_rng, Rng},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_poh::poh_recorder::PohRecorder,
std::{
@ -50,7 +50,7 @@ impl WarmQuicCacheService {
.lookup_contact_info(&leader_pubkey, |leader| leader.tpu)
{
let conn = connection_cache.get_connection(&addr);
if let Err(err) = conn.send_wire_transaction([0u8]) {
if let Err(err) = conn.send_data(&[0u8]) {
warn!(
"Failed to warmup QUIC connection to the leader {:?}, Error {:?}",
leader_pubkey, err

View File

@ -45,7 +45,7 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_client::connection_cache::ConnectionCache,
solana_core::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair},
solana_dos::cli::*,
solana_gossip::{
@ -67,7 +67,7 @@ use {
transaction::Transaction,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
std::{
net::{SocketAddr, UdpSocket},
process::exit,
@ -285,7 +285,7 @@ fn create_sender_thread(
Ok(tx_batch) => {
let len = tx_batch.batch.len();
let mut measure_send_txs = Measure::start("measure_send_txs");
let res = connection.send_wire_transaction_batch_async(tx_batch.batch);
let res = connection.send_data_batch_async(tx_batch.batch);
measure_send_txs.stop();
time_send_ns += measure_send_txs.as_ns();

View File

@ -44,7 +44,7 @@ use {
},
solana_stake_program::{config::create_account as create_stake_config_account, stake_state},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_connection_cache::{
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
},
solana_vote_program::{

View File

@ -4495,6 +4495,7 @@ dependencies = [
"quinn",
"rand 0.7.3",
"rayon",
"solana-connection-cache",
"solana-measure",
"solana-metrics",
"solana-net-utils",
@ -4532,6 +4533,25 @@ dependencies = [
"solana-sdk 1.16.0",
]
[[package]]
name = "solana-connection-cache"
version = "1.16.0"
dependencies = [
"async-trait",
"bincode",
"futures-util",
"indexmap",
"log",
"rand 0.7.3",
"rayon",
"solana-measure",
"solana-metrics",
"solana-net-utils",
"solana-sdk 1.16.0",
"thiserror",
"tokio",
]
[[package]]
name = "solana-core"
version = "1.16.0"
@ -5180,6 +5200,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustls 0.20.6",
"solana-connection-cache",
"solana-measure",
"solana-metrics",
"solana-net-utils",
@ -6047,6 +6068,8 @@ version = "1.16.0"
dependencies = [
"bincode",
"log",
"rayon",
"solana-connection-cache",
"solana-rpc-client",
"solana-rpc-client-api",
"solana-sdk 1.16.0",
@ -6065,6 +6088,7 @@ dependencies = [
"log",
"rand 0.7.3",
"rayon",
"solana-connection-cache",
"solana-measure",
"solana-metrics",
"solana-net-utils",
@ -6105,6 +6129,7 @@ name = "solana-udp-client"
version = "1.16.0"
dependencies = [
"async-trait",
"solana-connection-cache",
"solana-net-utils",
"solana-sdk 1.16.0",
"solana-streamer",

View File

@ -20,6 +20,8 @@ quinn = "0.9.3"
quinn-proto = "0.9.2"
quinn-udp = "0.3.2"
rustls = { version = "0.20.6", default-features = false, features = ["dangerous_configuration", "logging"] }
solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" }
solana-measure = { path = "../measure", version = "=1.16.0" }
solana-metrics = { path = "../metrics", version = "=1.16.0" }
solana-net-utils = { path = "../net-utils", version = "=1.16.0" }

View File

@ -9,10 +9,20 @@ extern crate solana_metrics;
use {
crate::{
nonblocking::quic_client::{
QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint,
QuicTpuConnection as NonblockingQuicTpuConnection,
QuicClient, QuicClientCertificate,
QuicClientConnection as NonblockingQuicClientConnection, QuicLazyInitializedEndpoint,
},
quic_client::QuicTpuConnection as BlockingQuicTpuConnection,
quic_client::QuicClientConnection as BlockingQuicClientConnection,
},
quinn::Endpoint,
solana_connection_cache::{
client_connection::ClientConnection as BlockingClientConnection,
connection_cache::{
BaseClientConnection, ClientError, ConnectionManager, ConnectionPool,
ConnectionPoolError, NewConnectionConfig, ProtocolType,
},
connection_cache_stats::ConnectionCacheStats,
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair},
solana_streamer::{
@ -20,13 +30,8 @@ use {
streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate,
},
solana_tpu_client::{
connection_cache_stats::ConnectionCacheStats,
tpu_connection_cache::{
BaseTpuConnection, ConnectionPool, ConnectionPoolError, NewTpuConfig,
},
},
std::{
any::Any,
error::Error,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, RwLock},
@ -41,25 +46,12 @@ pub enum QuicClientError {
}
pub struct QuicPool {
connections: Vec<Arc<Quic>>,
connections: Vec<Arc<dyn BaseClientConnection>>,
endpoint: Arc<QuicLazyInitializedEndpoint>,
}
impl ConnectionPool for QuicPool {
type PoolTpuConnection = Quic;
type TpuConfig = QuicConfig;
const PORT_OFFSET: u16 = QUIC_PORT_OFFSET;
fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self {
let mut pool = Self {
connections: vec![],
endpoint: config.create_endpoint(),
};
pool.add_connection(config, addr);
pool
}
fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) {
let connection = Arc::new(self.create_pool_entry(config, addr));
fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) {
let connection = self.create_pool_entry(config, addr);
self.connections.push(connection);
}
@ -67,7 +59,7 @@ impl ConnectionPool for QuicPool {
self.connections.len()
}
fn get(&self, index: usize) -> Result<Arc<Self::PoolTpuConnection>, ConnectionPoolError> {
fn get(&self, index: usize) -> Result<Arc<dyn BaseClientConnection>, ConnectionPoolError> {
self.connections
.get(index)
.cloned()
@ -76,14 +68,15 @@ impl ConnectionPool for QuicPool {
fn create_pool_entry(
&self,
config: &Self::TpuConfig,
config: &dyn NewConnectionConfig,
addr: &SocketAddr,
) -> Self::PoolTpuConnection {
Quic(Arc::new(QuicClient::new(
) -> Arc<dyn BaseClientConnection> {
let config = QuicConfig::downcast_ref(config);
Arc::new(Quic(Arc::new(QuicClient::new(
self.endpoint.clone(),
*addr,
config.compute_max_parallel_streams(),
)))
))))
}
}
@ -91,15 +84,17 @@ pub struct QuicConfig {
client_certificate: Arc<QuicClientCertificate>,
maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
maybe_client_pubkey: Option<Pubkey>,
// The optional specified endpoint for the quic based client connections
// If not specified, the connection cache will create as needed.
client_endpoint: Option<Endpoint>,
}
impl NewTpuConfig for QuicConfig {
type ClientError = QuicClientError;
fn new() -> Result<Self, QuicClientError> {
impl NewConnectionConfig for QuicConfig {
fn new() -> Result<Self, ClientError> {
let (cert, priv_key) =
new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.map_err(|err| QuicClientError::CertificateError(err.to_string()))?;
.map_err(|err| ClientError::CertificateError(err.to_string()))?;
Ok(Self {
client_certificate: Arc::new(QuicClientCertificate {
certificate: cert,
@ -107,16 +102,25 @@ impl NewTpuConfig for QuicConfig {
}),
maybe_staked_nodes: None,
maybe_client_pubkey: None,
client_endpoint: None,
})
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
}
impl QuicConfig {
fn create_endpoint(&self) -> Arc<QuicLazyInitializedEndpoint> {
Arc::new(QuicLazyInitializedEndpoint::new(
fn create_endpoint(&self) -> QuicLazyInitializedEndpoint {
QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
None,
))
self.client_endpoint.as_ref().cloned(),
)
}
fn compute_max_parallel_streams(&self) -> usize {
@ -158,30 +162,84 @@ impl QuicConfig {
self.maybe_staked_nodes = Some(staked_nodes.clone());
self.maybe_client_pubkey = Some(*client_pubkey);
}
pub fn update_client_endpoint(&mut self, client_endpoint: Endpoint) {
self.client_endpoint = Some(client_endpoint);
}
/// Convenient function to downcast a generic NewConnectionConfig reference to QuicConfig
pub fn downcast_ref(config: &dyn NewConnectionConfig) -> &Self {
match config.as_any().downcast_ref::<QuicConfig>() {
Some(config) => config,
None => panic!("Expecting a QuicConfig!"),
}
}
}
pub struct Quic(Arc<QuicClient>);
impl BaseTpuConnection for Quic {
type BlockingConnectionType = BlockingQuicTpuConnection;
type NonblockingConnectionType = NonblockingQuicTpuConnection;
impl BaseClientConnection for Quic {
fn new_blocking_connection(
&self,
_addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> BlockingQuicTpuConnection {
BlockingQuicTpuConnection::new_with_client(self.0.clone(), stats)
) -> Arc<dyn BlockingClientConnection> {
Arc::new(BlockingQuicClientConnection::new_with_client(
self.0.clone(),
stats,
))
}
fn new_nonblocking_connection(
&self,
_addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> NonblockingQuicTpuConnection {
NonblockingQuicTpuConnection::new_with_client(self.0.clone(), stats)
) -> Arc<dyn NonblockingClientConnection> {
Arc::new(NonblockingQuicClientConnection::new_with_client(
self.0.clone(),
stats,
))
}
}
#[derive(Default)]
pub struct QuicConnectionManager {
connection_config: Option<Box<dyn NewConnectionConfig>>,
}
impl ConnectionManager for QuicConnectionManager {
fn new_connection_pool(&self) -> Box<dyn ConnectionPool> {
Box::new(QuicPool {
connections: Vec::default(),
endpoint: Arc::new(self.connection_config.as_ref().map_or(
QuicLazyInitializedEndpoint::default(),
|config| {
let config = QuicConfig::downcast_ref(config.as_ref());
config.create_endpoint()
},
)),
})
}
fn new_connection_config(&self) -> Box<dyn NewConnectionConfig> {
Box::new(QuicConfig::new().unwrap())
}
fn get_port_offset(&self) -> u16 {
QUIC_PORT_OFFSET
}
fn get_protocol_type(&self) -> ProtocolType {
ProtocolType::QUIC
}
}
impl QuicConnectionManager {
pub fn new_with_connection_config(config: QuicConfig) -> Self {
Self {
connection_config: Some(Box::new(config)),
}
}
}
#[cfg(test)]
mod tests {
use {
@ -190,33 +248,29 @@ mod tests {
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
},
solana_tpu_client::tpu_connection_cache::{
TpuConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE,
},
};
#[test]
fn test_connection_cache_max_parallel_chunks() {
solana_logger::setup();
let connection_cache =
TpuConnectionCache::<QuicPool>::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap();
let mut tpu_config = connection_cache.tpu_config;
let mut connection_config = QuicConfig::new().unwrap();
assert_eq!(
tpu_config.compute_max_parallel_streams(),
connection_config.compute_max_parallel_streams(),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let pubkey = Pubkey::new_unique();
tpu_config.set_staked_nodes(&staked_nodes, &pubkey);
connection_config.set_staked_nodes(&staked_nodes, &pubkey);
assert_eq!(
tpu_config.compute_max_parallel_streams(),
connection_config.compute_max_parallel_streams(),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
staked_nodes.write().unwrap().total_stake = 10000;
assert_eq!(
tpu_config.compute_max_parallel_streams(),
connection_config.compute_max_parallel_streams(),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
@ -230,7 +284,7 @@ mod tests {
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
assert_eq!(
tpu_config.compute_max_parallel_streams(),
connection_config.compute_max_parallel_streams(),
(QUIC_MIN_STAKED_CONCURRENT_STREAMS as f64 + (1f64 / 10000f64) * delta) as usize
);
@ -245,7 +299,7 @@ mod tests {
.pubkey_stake_map
.insert(pubkey, 1000);
assert_ne!(
tpu_config.compute_max_parallel_streams(),
connection_config.compute_max_parallel_streams(),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
}

View File

@ -1,5 +1,5 @@
//! Simple nonblocking client that connects to a given UDP port with the QUIC protocol
//! and provides an interface for sending transactions which is restricted by the
//! and provides an interface for sending data which is restricted by the
//! server's flow control.
use {
async_mutex::Mutex,
@ -11,6 +11,10 @@ use {
ClientConfig, ConnectError, Connection, ConnectionError, Endpoint, EndpointConfig,
IdleTimeout, TokioRuntime, TransportConfig, VarInt, WriteError,
},
solana_connection_cache::{
client_connection::ClientStats, connection_cache_stats::ConnectionCacheStats,
nonblocking::client_connection::ClientConnection,
},
solana_measure::measure::Measure,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_rpc_client_api::client_error::ErrorKind as ClientErrorKind,
@ -25,10 +29,6 @@ use {
solana_streamer::{
nonblocking::quic::ALPN_TPU_PROTOCOL_ID, tls_certificates::new_self_signed_tls_certificate,
},
solana_tpu_client::{
connection_cache_stats::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection,
tpu_connection::ClientStats,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc},
@ -397,21 +397,21 @@ impl QuicClient {
connection_stats
.total_client_stats
.tx_streams_blocked_uni
.streams_blocked_uni
.update_stat(
&self.stats.tx_streams_blocked_uni,
&self.stats.streams_blocked_uni,
new_stats.frame_tx.streams_blocked_uni,
);
connection_stats
.total_client_stats
.tx_data_blocked
.update_stat(&self.stats.tx_data_blocked, new_stats.frame_tx.data_blocked);
.data_blocked
.update_stat(&self.stats.data_blocked, new_stats.frame_tx.data_blocked);
connection_stats
.total_client_stats
.tx_acks
.update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks);
.acks
.update_stat(&self.stats.acks, new_stats.frame_tx.acks);
last_connection_id = connection.stable_id();
match Self::_send_buffer_using_conn(data, &connection).await {
@ -438,7 +438,7 @@ impl QuicClient {
// if we come here, that means we have exhausted maximum retries, return the error
info!(
"Ran into an error sending transactions {:?}, exhausted retries to {}",
"Ran into an error sending data {:?}, exhausted retries to {}",
last_error, self.addr
);
// If we get here but last_error is None, then we have a logic error
@ -470,12 +470,12 @@ impl QuicClient {
where
T: AsRef<[u8]>,
{
// Start off by "testing" the connection by sending the first transaction
// Start off by "testing" the connection by sending the first buffer
// This will also connect to the server if not already connected
// and reconnect and retry if the first send attempt failed
// (for example due to a timed out connection), returning an error
// or the connection that was used to successfully send the transaction.
// We will use the returned connection to send the rest of the transactions in the batch
// or the connection that was used to successfully send the buffer.
// We will use the returned connection to send the rest of the buffers in the batch
// to avoid touching the mutex in self, and not bother reconnecting if we fail along the way
// since testing even in the ideal GCE environment has found no cases
// where reconnecting and retrying in the middle of a batch send
@ -515,7 +515,7 @@ impl QuicClient {
Ok(())
}
pub fn tpu_addr(&self) -> &SocketAddr {
pub fn server_addr(&self) -> &SocketAddr {
&self.addr
}
@ -524,12 +524,12 @@ impl QuicClient {
}
}
pub struct QuicTpuConnection {
pub struct QuicClientConnection {
pub client: Arc<QuicClient>,
pub connection_stats: Arc<ConnectionCacheStats>,
}
impl QuicTpuConnection {
impl QuicClientConnection {
pub fn base_stats(&self) -> Arc<ClientStats> {
self.client.stats()
}
@ -563,15 +563,12 @@ impl QuicTpuConnection {
}
#[async_trait]
impl TpuConnection for QuicTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
self.client.tpu_addr()
impl ClientConnection for QuicClientConnection {
fn server_addr(&self) -> &SocketAddr {
self.client.server_addr()
}
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
let stats = ClientStats::default();
let len = buffers.len();
let res = self
@ -584,18 +581,15 @@ impl TpuConnection for QuicTpuConnection {
Ok(())
}
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
async fn send_data(&self, data: &[u8]) -> TransportResult<()> {
let stats = Arc::new(ClientStats::default());
let send_buffer =
self.client
.send_buffer(wire_transaction, &stats, self.connection_stats.clone());
let send_buffer = self
.client
.send_buffer(data, &stats, self.connection_stats.clone());
if let Err(e) = send_buffer.await {
warn!(
"Failed to send transaction async to {}, error: {:?} ",
self.tpu_addr(),
"Failed to send data async to {}, error: {:?} ",
self.server_addr(),
e
);
datapoint_warn!("send-wire-async", ("failure", 1, i64),);

View File

@ -1,18 +1,18 @@
//! Simple client that connects to a given UDP port with the QUIC protocol and provides
//! an interface for sending transactions which is restricted by the server's flow control.
//! an interface for sending data which is restricted by the server's flow control.
use {
crate::nonblocking::quic_client::{
QuicClient, QuicLazyInitializedEndpoint, QuicTpuConnection as NonblockingQuicTpuConnection,
QuicClient, QuicClientConnection as NonblockingQuicConnection, QuicLazyInitializedEndpoint,
},
lazy_static::lazy_static,
log::*,
solana_sdk::transport::{Result as TransportResult, TransportError},
solana_tpu_client::{
solana_connection_cache::{
client_connection::{ClientConnection, ClientStats},
connection_cache_stats::ConnectionCacheStats,
nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection,
tpu_connection::{ClientStats, TpuConnection},
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
solana_sdk::transport::{Result as TransportResult, TransportError},
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
@ -21,125 +21,119 @@ use {
tokio::{runtime::Runtime, time::timeout},
};
pub mod temporary_pub {
use super::*;
pub const MAX_OUTSTANDING_TASK: u64 = 2000;
pub const SEND_DATA_TIMEOUT_MS: u64 = 10000;
pub const MAX_OUTSTANDING_TASK: u64 = 2000;
pub const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000;
/// A semaphore used for limiting the number of asynchronous tasks spawn to the
/// runtime. Before spawnning a task, use acquire. After the task is done (be it
/// succsess or failure), call release.
struct AsyncTaskSemaphore {
/// Keep the counter info about the usage
counter: Mutex<u64>,
/// Conditional variable for signaling when counter is decremented
cond_var: Condvar,
/// The maximum usage allowed by this semaphore.
permits: u64,
}
/// A semaphore used for limiting the number of asynchronous tasks spawn to the
/// runtime. Before spawnning a task, use acquire. After the task is done (be it
/// succsess or failure), call release.
pub struct AsyncTaskSemaphore {
/// Keep the counter info about the usage
counter: Mutex<u64>,
/// Conditional variable for signaling when counter is decremented
cond_var: Condvar,
/// The maximum usage allowed by this semaphore.
permits: u64,
}
impl AsyncTaskSemaphore {
pub fn new(permits: u64) -> Self {
Self {
counter: Mutex::new(0),
cond_var: Condvar::new(),
permits,
}
}
/// When returned, the lock has been locked and usage count has been
/// incremented. When the returned MutexGuard is dropped the lock is dropped
/// without decrementing the usage count.
pub fn acquire(&self) -> MutexGuard<u64> {
let mut count = self.counter.lock().unwrap();
*count += 1;
while *count > self.permits {
count = self.cond_var.wait(count).unwrap();
}
count
}
/// Acquire the lock and decrement the usage count
pub fn release(&self) {
let mut count = self.counter.lock().unwrap();
*count -= 1;
self.cond_var.notify_one();
impl AsyncTaskSemaphore {
pub fn new(permits: u64) -> Self {
Self {
counter: Mutex::new(0),
cond_var: Condvar::new(),
permits,
}
}
lazy_static! {
pub static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore =
AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK);
pub static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("quic-client")
.enable_all()
.build()
.unwrap();
/// When returned, the lock has been locked and usage count has been
/// incremented. When the returned MutexGuard is dropped the lock is dropped
/// without decrementing the usage count.
pub fn acquire(&self) -> MutexGuard<u64> {
let mut count = self.counter.lock().unwrap();
*count += 1;
while *count > self.permits {
count = self.cond_var.wait(count).unwrap();
}
count
}
pub async fn send_wire_transaction_async(
connection: Arc<NonblockingQuicTpuConnection>,
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let result = timeout(
Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS),
connection.send_wire_transaction(wire_transaction),
)
.await;
ASYNC_TASK_SEMAPHORE.release();
handle_send_result(result, connection)
/// Acquire the lock and decrement the usage count
pub fn release(&self) {
let mut count = self.counter.lock().unwrap();
*count -= 1;
self.cond_var.notify_one();
}
}
pub async fn send_wire_transaction_batch_async(
connection: Arc<NonblockingQuicTpuConnection>,
buffers: Vec<Vec<u8>>,
) -> TransportResult<()> {
let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64;
lazy_static! {
static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore =
AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK);
static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("quic-client")
.enable_all()
.build()
.unwrap();
}
let result = timeout(
Duration::from_millis(time_out),
connection.send_wire_transaction_batch(&buffers),
)
.await;
ASYNC_TASK_SEMAPHORE.release();
handle_send_result(result, connection)
}
async fn send_data_async(
connection: Arc<NonblockingQuicConnection>,
buffer: Vec<u8>,
) -> TransportResult<()> {
let result = timeout(
Duration::from_millis(SEND_DATA_TIMEOUT_MS),
connection.send_data(&buffer),
)
.await;
ASYNC_TASK_SEMAPHORE.release();
handle_send_result(result, connection)
}
/// Check the send result and update stats if timedout. Returns the checked result.
pub fn handle_send_result(
result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
connection: Arc<NonblockingQuicTpuConnection>,
) -> Result<(), TransportError> {
match result {
Ok(result) => result,
Err(_err) => {
let client_stats = ClientStats::default();
client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
let stats = connection.connection_stats();
stats.add_client_stats(&client_stats, 0, false);
info!("Timedout sending transaction {:?}", connection.tpu_addr());
Err(TransportError::Custom(
"Timedout sending transaction".to_string(),
))
}
async fn send_data_batch_async(
connection: Arc<NonblockingQuicConnection>,
buffers: Vec<Vec<u8>>,
) -> TransportResult<()> {
let time_out = SEND_DATA_TIMEOUT_MS * buffers.len() as u64;
let result = timeout(
Duration::from_millis(time_out),
connection.send_data_batch(&buffers),
)
.await;
ASYNC_TASK_SEMAPHORE.release();
handle_send_result(result, connection)
}
/// Check the send result and update stats if timedout. Returns the checked result.
fn handle_send_result(
result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
connection: Arc<NonblockingQuicConnection>,
) -> Result<(), TransportError> {
match result {
Ok(result) => result,
Err(_err) => {
let client_stats = ClientStats::default();
client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
let stats = connection.connection_stats();
stats.add_client_stats(&client_stats, 0, false);
info!("Timedout sending data {:?}", connection.server_addr());
Err(TransportError::Custom("Timedout sending data".to_string()))
}
}
}
use temporary_pub::*;
pub struct QuicTpuConnection {
pub inner: Arc<NonblockingQuicTpuConnection>,
pub struct QuicClientConnection {
pub inner: Arc<NonblockingQuicConnection>,
}
impl QuicTpuConnection {
impl QuicClientConnection {
pub fn new(
endpoint: Arc<QuicLazyInitializedEndpoint>,
tpu_addr: SocketAddr,
server_addr: SocketAddr,
connection_stats: Arc<ConnectionCacheStats>,
) -> Self {
let inner = Arc::new(NonblockingQuicTpuConnection::new(
let inner = Arc::new(NonblockingQuicConnection::new(
endpoint,
tpu_addr,
server_addr,
connection_stats,
));
Self { inner }
@ -149,7 +143,7 @@ impl QuicTpuConnection {
client: Arc<QuicClient>,
connection_stats: Arc<ConnectionCacheStats>,
) -> Self {
let inner = Arc::new(NonblockingQuicTpuConnection::new_with_client(
let inner = Arc::new(NonblockingQuicConnection::new_with_client(
client,
connection_stats,
));
@ -157,33 +151,33 @@ impl QuicTpuConnection {
}
}
impl TpuConnection for QuicTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
self.inner.tpu_addr()
impl ClientConnection for QuicClientConnection {
fn server_addr(&self) -> &SocketAddr {
self.inner.server_addr()
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?;
fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
RUNTIME.block_on(self.inner.send_data_batch(buffers))?;
Ok(())
}
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
fn send_data_async(&self, data: Vec<u8>) -> TransportResult<()> {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();
let _handle = RUNTIME
.spawn(async move { send_wire_transaction_async(inner, wire_transaction).await });
let _handle = RUNTIME.spawn(async move { send_data_async(inner, data).await });
Ok(())
}
fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();
let _handle =
RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await });
let _handle = RUNTIME.spawn(async move { send_data_batch_async(inner, buffers).await });
Ok(())
}
fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
RUNTIME.block_on(self.inner.send_data(buffer))?;
Ok(())
}
}

View File

@ -3,6 +3,7 @@ mod tests {
use {
crossbeam_channel::{unbounded, Receiver},
log::*,
solana_connection_cache::connection_cache_stats::ConnectionCacheStats,
solana_perf::packet::PacketBatch,
solana_quic_client::nonblocking::quic_client::{
QuicClientCertificate, QuicLazyInitializedEndpoint,
@ -12,7 +13,6 @@ mod tests {
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS, quic::StreamStats,
streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate,
},
solana_tpu_client::connection_cache_stats::ConnectionCacheStats,
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
@ -67,8 +67,8 @@ mod tests {
#[test]
fn test_quic_client_multiple_writes() {
use {
solana_quic_client::quic_client::QuicTpuConnection,
solana_tpu_client::tpu_connection::TpuConnection,
solana_connection_cache::client_connection::ClientConnection,
solana_quic_client::quic_client::QuicClientConnection,
};
solana_logger::setup();
let (sender, receiver) = unbounded();
@ -93,7 +93,7 @@ mod tests {
let port = s.local_addr().unwrap().port();
let tpu_addr = SocketAddr::new(addr, port);
let connection_cache_stats = Arc::new(ConnectionCacheStats::default());
let client = QuicTpuConnection::new(
let client = QuicClientConnection::new(
Arc::new(QuicLazyInitializedEndpoint::default()),
tpu_addr,
connection_cache_stats,
@ -104,7 +104,7 @@ mod tests {
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
assert!(client.send_wire_transaction_batch_async(packets).is_ok());
assert!(client.send_data_batch_async(packets).is_ok());
check_packets(receiver, num_bytes, num_expected_packets);
exit.store(true, Ordering::Relaxed);
@ -114,8 +114,8 @@ mod tests {
#[tokio::test]
async fn test_nonblocking_quic_client_multiple_writes() {
use {
solana_quic_client::nonblocking::quic_client::QuicTpuConnection,
solana_tpu_client::nonblocking::tpu_connection::TpuConnection,
solana_connection_cache::nonblocking::client_connection::ClientConnection,
solana_quic_client::nonblocking::quic_client::QuicClientConnection,
};
solana_logger::setup();
let (sender, receiver) = unbounded();
@ -140,7 +140,7 @@ mod tests {
let port = s.local_addr().unwrap().port();
let tpu_addr = SocketAddr::new(addr, port);
let connection_cache_stats = Arc::new(ConnectionCacheStats::default());
let client = QuicTpuConnection::new(
let client = QuicClientConnection::new(
Arc::new(QuicLazyInitializedEndpoint::default()),
tpu_addr,
connection_cache_stats,
@ -150,8 +150,7 @@ mod tests {
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
assert!(client.send_wire_transaction_batch(&packets).await.is_ok());
assert!(client.send_data_batch(&packets).await.is_ok());
check_packets(receiver, num_bytes, num_expected_packets);
exit.store(true, Ordering::Relaxed);
@ -168,8 +167,8 @@ mod tests {
/// In this we demonstrate that the request sender and the response receiver use the
/// same quic Endpoint, and the same UDP socket.
use {
solana_quic_client::quic_client::QuicTpuConnection,
solana_tpu_client::tpu_connection::TpuConnection,
solana_connection_cache::client_connection::ClientConnection,
solana_quic_client::quic_client::QuicClientConnection,
};
solana_logger::setup();
@ -239,15 +238,13 @@ mod tests {
let endpoint =
QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint));
let request_sender =
QuicTpuConnection::new(Arc::new(endpoint), tpu_addr, connection_cache_stats);
QuicClientConnection::new(Arc::new(endpoint), tpu_addr, connection_cache_stats);
// Send a full size packet with single byte writes as a request.
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
assert!(request_sender
.send_wire_transaction_batch_async(packets)
.is_ok());
assert!(request_sender.send_data_batch_async(packets).is_ok());
check_packets(receiver, num_bytes, num_expected_packets);
info!("Received requests!");
@ -264,16 +261,14 @@ mod tests {
let endpoint2 = QuicLazyInitializedEndpoint::new(client_certificate2, None);
let connection_cache_stats2 = Arc::new(ConnectionCacheStats::default());
let response_sender =
QuicTpuConnection::new(Arc::new(endpoint2), server_addr, connection_cache_stats2);
QuicClientConnection::new(Arc::new(endpoint2), server_addr, connection_cache_stats2);
// Send a full size packet with single byte writes.
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
assert!(response_sender
.send_wire_transaction_batch_async(packets)
.is_ok());
assert!(response_sender.send_data_batch_async(packets).is_ok());
check_packets(receiver2, num_bytes, num_expected_packets);
info!("Received responses!");

View File

@ -7,7 +7,7 @@ use {
serde_json::{json, Value},
solana_account_decoder::UiAccount,
solana_client::{
connection_cache::{ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE},
connection_cache::ConnectionCache,
tpu_client::{TpuClient, TpuClientConfig},
},
solana_pubsub_client::nonblocking::pubsub_client::PubsubClient,
@ -29,6 +29,7 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
solana_test_validator::TestValidator,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_transaction_status::TransactionStatus,
std::{
collections::HashSet,

View File

@ -2,7 +2,7 @@ use {
crate::tpu_info::TpuInfo,
crossbeam_channel::{Receiver, RecvTimeoutError},
log::*,
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_client::connection_cache::ConnectionCache,
solana_measure::measure::Measure,
solana_metrics::datapoint_warn,
solana_runtime::{bank::Bank, bank_forks::BankForks},
@ -706,7 +706,7 @@ impl SendTransactionService {
connection_cache: &Arc<ConnectionCache>,
) -> Result<(), TransportError> {
let conn = connection_cache.get_connection(tpu_address);
conn.send_wire_transaction_async(wire_transaction.to_vec())
conn.send_data_async(wire_transaction.to_vec())
}
fn send_transactions_with_metrics(
@ -716,7 +716,7 @@ impl SendTransactionService {
) -> Result<(), TransportError> {
let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect();
let conn = connection_cache.get_connection(tpu_address);
conn.send_wire_transaction_batch_async(wire_transactions)
conn.send_data_batch_async(wire_transactions)
}
fn send_transactions(

View File

@ -44,7 +44,7 @@ use {
signature::{read_keypair_file, write_keypair_file, Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_connection_cache::{
solana_tpu_client::tpu_client::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
},
std::{

View File

@ -12,13 +12,14 @@ edition = "2021"
[dependencies]
bincode = "1.3.3"
log = "0.4.17"
rayon = "1.5.3"
solana-connection-cache = { path = "../connection-cache", version = "=1.16.0", default-features = false }
solana-rpc-client = { path = "../rpc-client", version = "=1.16.0", default-features = false }
solana-rpc-client-api = { path = "../rpc-client-api", version = "=1.16.0" }
solana-sdk = { path = "../sdk", version = "=1.16.0" }
solana-tpu-client = { path = "../tpu-client", version = "=1.16.0", default-features = false }
[dev-dependencies]
rayon = "1.5.3"
solana-logger = { path = "../logger", version = "=1.16.0" }
[package.metadata.docs.rs]

View File

@ -5,6 +5,8 @@
use {
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::connection_cache::ConnectionCache,
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response},
solana_sdk::{
@ -25,10 +27,6 @@ use {
transaction::{self, Transaction, VersionedTransaction},
transport::Result as TransportResult,
},
solana_tpu_client::{
tpu_connection::TpuConnection,
tpu_connection_cache::{ConnectionPool, TpuConnectionCache},
},
std::{
io,
net::SocketAddr,
@ -113,21 +111,21 @@ pub mod temporary_pub {
use temporary_pub::*;
/// An object for querying and sending transactions to the network.
pub struct ThinClient<P: ConnectionPool> {
pub struct ThinClient {
rpc_clients: Vec<RpcClient>,
tpu_addrs: Vec<SocketAddr>,
optimizer: ClientOptimizer,
connection_cache: Arc<TpuConnectionCache<P>>,
connection_cache: Arc<ConnectionCache>,
}
impl<P: ConnectionPool> ThinClient<P> {
impl ThinClient {
/// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
/// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
/// (currently hardcoded to UDP)
pub fn new(
rpc_addr: SocketAddr,
tpu_addr: SocketAddr,
connection_cache: Arc<TpuConnectionCache<P>>,
connection_cache: Arc<ConnectionCache>,
) -> Self {
Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
}
@ -136,7 +134,7 @@ impl<P: ConnectionPool> ThinClient<P> {
rpc_addr: SocketAddr,
tpu_addr: SocketAddr,
timeout: Duration,
connection_cache: Arc<TpuConnectionCache<P>>,
connection_cache: Arc<ConnectionCache>,
) -> Self {
let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
Self::new_from_client(rpc_client, tpu_addr, connection_cache)
@ -145,7 +143,7 @@ impl<P: ConnectionPool> ThinClient<P> {
fn new_from_client(
rpc_client: RpcClient,
tpu_addr: SocketAddr,
connection_cache: Arc<TpuConnectionCache<P>>,
connection_cache: Arc<ConnectionCache>,
) -> Self {
Self {
rpc_clients: vec![rpc_client],
@ -158,7 +156,7 @@ impl<P: ConnectionPool> ThinClient<P> {
pub fn new_from_addrs(
rpc_addrs: Vec<SocketAddr>,
tpu_addrs: Vec<SocketAddr>,
connection_cache: Arc<TpuConnectionCache<P>>,
connection_cache: Arc<ConnectionCache>,
) -> Self {
assert!(!rpc_addrs.is_empty());
assert_eq!(rpc_addrs.len(), tpu_addrs.len());
@ -221,7 +219,7 @@ impl<P: ConnectionPool> ThinClient<P> {
let conn = self.connection_cache.get_connection(self.tpu_addr());
// Send the transaction if there has been no confirmation (e.g. the first time)
#[allow(clippy::needless_borrow)]
conn.send_wire_transaction(&wire_transaction)?;
conn.send_data(&wire_transaction)?;
}
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
@ -316,13 +314,13 @@ impl<P: ConnectionPool> ThinClient<P> {
}
}
impl<P: ConnectionPool> Client for ThinClient<P> {
impl Client for ThinClient {
fn tpu_addr(&self) -> String {
self.tpu_addr().to_string()
}
}
impl<P: ConnectionPool> SyncClient for ThinClient<P> {
impl SyncClient for ThinClient {
fn send_and_confirm_message<T: Signers>(
&self,
keypairs: &T,
@ -602,13 +600,15 @@ impl<P: ConnectionPool> SyncClient for ThinClient<P> {
}
}
impl<P: ConnectionPool> AsyncClient for ThinClient<P> {
impl AsyncClient for ThinClient {
fn async_send_versioned_transaction(
&self,
transaction: VersionedTransaction,
) -> TransportResult<Signature> {
let conn = self.connection_cache.get_connection(self.tpu_addr());
conn.serialize_and_send_transaction(&transaction)?;
let wire_transaction =
bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
conn.send_data(&wire_transaction)?;
Ok(transaction.signatures[0])
}
@ -617,7 +617,11 @@ impl<P: ConnectionPool> AsyncClient for ThinClient<P> {
batch: Vec<VersionedTransaction>,
) -> TransportResult<()> {
let conn = self.connection_cache.get_connection(self.tpu_addr());
conn.par_serialize_and_send_transaction_batch(&batch[..])?;
let buffers = batch
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
conn.send_data_batch(&buffers)?;
Ok(())
}
}
@ -636,7 +640,7 @@ fn min_index(array: &[u64]) -> (u64, usize) {
#[cfg(test)]
mod tests {
use {super::*, rayon::prelude::*};
use super::*;
#[test]
fn test_client_optimizer() {

View File

@ -18,6 +18,8 @@ indicatif = { version = "0.17.1", optional = true }
log = "0.4.17"
rand = "0.7.0"
rayon = "1.5.3"
solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" }
solana-measure = { path = "../measure", version = "=1.16.0" }
solana-metrics = { path = "../metrics", version = "=1.16.0" }
solana-net-utils = { path = "../net-utils", version = "=1.16.0" }

View File

@ -1,10 +1,6 @@
#![allow(clippy::integer_arithmetic)]
pub mod connection_cache_stats;
pub mod nonblocking;
pub mod tpu_client;
pub mod tpu_connection;
pub mod tpu_connection_cache;
#[macro_use]
extern crate solana_metrics;

View File

@ -1,2 +1 @@
pub mod tpu_client;
pub mod tpu_connection;

View File

@ -7,16 +7,13 @@ use {
solana_sdk::{message::Message, signers::Signers, transaction::TransactionError},
};
use {
crate::{
nonblocking::tpu_connection::TpuConnection,
tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS},
tpu_connection_cache::{
ConnectionPool, TpuConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE,
},
},
crate::tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS},
bincode::serialize,
futures_util::{future::join_all, stream::StreamExt},
log::*,
solana_connection_cache::connection_cache::{
ConnectionCache, ConnectionManager, DEFAULT_CONNECTION_POOL_SIZE,
},
solana_pubsub_client::nonblocking::pubsub_client::{PubsubClient, PubsubClientError},
solana_rpc_client::nonblocking::rpc_client::RpcClient,
solana_rpc_client_api::{
@ -253,33 +250,33 @@ impl LeaderTpuCache {
/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
pub struct TpuClient<P: ConnectionPool> {
pub struct TpuClient {
fanout_slots: u64,
leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>,
rpc_client: Arc<RpcClient>,
connection_cache: Arc<TpuConnectionCache<P>>,
connection_cache: Arc<ConnectionCache>,
}
async fn send_wire_transaction_to_addr<P: ConnectionPool>(
connection_cache: &TpuConnectionCache<P>,
async fn send_wire_transaction_to_addr(
connection_cache: &ConnectionCache,
addr: &SocketAddr,
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction(wire_transaction.clone()).await
conn.send_data(&wire_transaction).await
}
async fn send_wire_transaction_batch_to_addr<P: ConnectionPool>(
connection_cache: &TpuConnectionCache<P>,
async fn send_wire_transaction_batch_to_addr(
connection_cache: &ConnectionCache,
addr: &SocketAddr,
wire_transactions: &[Vec<u8>],
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction_batch(wire_transactions).await
conn.send_data_batch(wire_transactions).await
}
impl<P: ConnectionPool> TpuClient<P> {
impl TpuClient {
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
@ -394,9 +391,11 @@ impl<P: ConnectionPool> TpuClient<P> {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_manager: Box<dyn ConnectionManager>,
) -> Result<Self> {
let connection_cache =
Arc::new(TpuConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap()); // TODO: Handle error properly, as the TpuConnectionCache ctor is now fallible.
let connection_cache = Arc::new(
ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(),
); // TODO: Handle error properly, as the ConnectionCache ctor is now fallible.
Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await
}
@ -405,7 +404,7 @@ impl<P: ConnectionPool> TpuClient<P> {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<TpuConnectionCache<P>>,
connection_cache: Arc<ConnectionCache>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let leader_tpu_service =
@ -554,7 +553,7 @@ impl<P: ConnectionPool> TpuClient<P> {
}
}
impl<P: ConnectionPool> Drop for TpuClient<P> {
impl Drop for TpuClient {
fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed);
}

View File

@ -1,29 +0,0 @@
//! Trait defining async send functions, to be used for UDP or QUIC sending
use {
async_trait::async_trait,
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
std::net::SocketAddr,
};
#[async_trait]
pub trait TpuConnection {
fn tpu_addr(&self) -> &SocketAddr;
async fn serialize_and_send_transaction(
&self,
transaction: &VersionedTransaction,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(&wire_transaction).await
}
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
}

View File

@ -1,10 +1,8 @@
pub use crate::nonblocking::tpu_client::TpuSenderError;
use {
crate::{
nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
tpu_connection_cache::{ConnectionPool, TpuConnectionCache},
},
crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::connection_cache::{ConnectionCache, ConnectionManager},
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult},
std::{
@ -19,6 +17,10 @@ use {
tokio::time::Duration,
};
pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
pub const DEFAULT_TPU_USE_QUIC: bool = true;
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
pub mod temporary_pub {
use super::*;
@ -57,14 +59,14 @@ impl Default for TpuClientConfig {
/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
pub struct TpuClient<P: ConnectionPool> {
pub struct TpuClient {
_deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket
//todo: get rid of this field
rpc_client: Arc<RpcClient>,
tpu_client: Arc<NonblockingTpuClient<P>>,
tpu_client: Arc<NonblockingTpuClient>,
}
impl<P: ConnectionPool> TpuClient<P> {
impl TpuClient {
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
pub fn send_transaction(&self, transaction: &Transaction) -> bool {
@ -108,9 +110,14 @@ impl<P: ConnectionPool> TpuClient<P> {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_manager: Box<dyn ConnectionManager>,
) -> Result<Self> {
let create_tpu_client =
NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config);
let create_tpu_client = NonblockingTpuClient::new(
rpc_client.get_inner_client().clone(),
websocket_url,
config,
connection_manager,
);
let tpu_client =
tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
@ -126,7 +133,7 @@ impl<P: ConnectionPool> TpuClient<P> {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<TpuConnectionCache<P>>,
connection_cache: Arc<ConnectionCache>,
) -> Result<Self> {
let create_tpu_client = NonblockingTpuClient::new_with_connection_cache(
rpc_client.get_inner_client().clone(),

View File

@ -1,63 +0,0 @@
use {
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_metrics::MovingStat,
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
std::{net::SocketAddr, sync::atomic::AtomicU64},
};
#[derive(Default)]
pub struct ClientStats {
pub total_connections: AtomicU64,
pub connection_reuse: AtomicU64,
pub connection_errors: AtomicU64,
pub zero_rtt_accepts: AtomicU64,
pub zero_rtt_rejects: AtomicU64,
// these will be the last values of these stats
pub congestion_events: MovingStat,
pub tx_streams_blocked_uni: MovingStat,
pub tx_data_blocked: MovingStat,
pub tx_acks: MovingStat,
pub make_connection_ms: AtomicU64,
pub send_timeout: AtomicU64,
}
pub trait TpuConnection {
fn tpu_addr(&self) -> &SocketAddr;
fn serialize_and_send_transaction(
&self,
transaction: &VersionedTransaction,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(wire_transaction)
}
fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
self.send_wire_transaction_batch(&[wire_transaction])
}
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()>;
fn par_serialize_and_send_transaction_batch(
&self,
transactions: &[VersionedTransaction],
) -> TransportResult<()> {
let buffers = transactions
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
self.send_wire_transaction_batch(&buffers)
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync;
fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()>;
}

View File

@ -11,6 +11,8 @@ edition = "2021"
[dependencies]
async-trait = "0.1.57"
solana-connection-cache = { path = "../connection-cache", version = "=1.16.0", default-features = false }
solana-net-utils = { path = "../net-utils", version = "=1.16.0" }
solana-sdk = { path = "../sdk", version = "=1.16.0" }
solana-streamer = { path = "../streamer", version = "=1.16.0" }

View File

@ -5,45 +5,31 @@ pub mod udp_client;
use {
crate::{
nonblocking::udp_client::UdpTpuConnection as NonblockingUdpTpuConnection,
udp_client::UdpTpuConnection as BlockingUdpTpuConnection,
nonblocking::udp_client::UdpClientConnection as NonblockingUdpConnection,
udp_client::UdpClientConnection as BlockingUdpConnection,
},
solana_tpu_client::{
connection_cache_stats::ConnectionCacheStats,
tpu_connection_cache::{
BaseTpuConnection, ConnectionPool, ConnectionPoolError, NewTpuConfig,
solana_connection_cache::{
client_connection::ClientConnection as BlockingClientConnection,
connection_cache::{
BaseClientConnection, ClientError, ConnectionManager, ConnectionPool,
ConnectionPoolError, NewConnectionConfig, ProtocolType,
},
connection_cache_stats::ConnectionCacheStats,
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
std::{
any::Any,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::Arc,
},
thiserror::Error,
};
#[derive(Error, Debug)]
pub enum UdpClientError {
#[error("IO error: {0:?}")]
IoError(#[from] std::io::Error),
}
pub struct UdpPool {
connections: Vec<Arc<Udp>>,
connections: Vec<Arc<dyn BaseClientConnection>>,
}
impl ConnectionPool for UdpPool {
type PoolTpuConnection = Udp;
type TpuConfig = UdpConfig;
fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self {
let mut pool = Self {
connections: vec![],
};
pool.add_connection(config, addr);
pool
}
fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) {
let connection = Arc::new(self.create_pool_entry(config, addr));
fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) {
let connection = self.create_pool_entry(config, addr);
self.connections.push(connection);
}
@ -51,7 +37,7 @@ impl ConnectionPool for UdpPool {
self.connections.len()
}
fn get(&self, index: usize) -> Result<Arc<Self::PoolTpuConnection>, ConnectionPoolError> {
fn get(&self, index: usize) -> Result<Arc<dyn BaseClientConnection>, ConnectionPoolError> {
self.connections
.get(index)
.cloned()
@ -60,47 +46,80 @@ impl ConnectionPool for UdpPool {
fn create_pool_entry(
&self,
config: &Self::TpuConfig,
config: &dyn NewConnectionConfig,
_addr: &SocketAddr,
) -> Self::PoolTpuConnection {
Udp(config.tpu_udp_socket.clone())
) -> Arc<dyn BaseClientConnection> {
let config: &UdpConfig = match config.as_any().downcast_ref::<UdpConfig>() {
Some(b) => b,
None => panic!("Expecting a UdpConfig!"),
};
Arc::new(Udp(config.udp_socket.clone()))
}
}
pub struct UdpConfig {
tpu_udp_socket: Arc<UdpSocket>,
udp_socket: Arc<UdpSocket>,
}
impl NewTpuConfig for UdpConfig {
type ClientError = UdpClientError;
fn new() -> Result<Self, UdpClientError> {
impl NewConnectionConfig for UdpConfig {
fn new() -> Result<Self, ClientError> {
let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.map_err(Into::<UdpClientError>::into)?;
.map_err(Into::<ClientError>::into)?;
Ok(Self {
tpu_udp_socket: Arc::new(socket),
udp_socket: Arc::new(socket),
})
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
}
pub struct Udp(Arc<UdpSocket>);
impl BaseTpuConnection for Udp {
type BlockingConnectionType = BlockingUdpTpuConnection;
type NonblockingConnectionType = NonblockingUdpTpuConnection;
impl BaseClientConnection for Udp {
fn new_blocking_connection(
&self,
addr: SocketAddr,
_stats: Arc<ConnectionCacheStats>,
) -> BlockingUdpTpuConnection {
BlockingUdpTpuConnection::new_from_addr(self.0.clone(), addr)
) -> Arc<dyn BlockingClientConnection> {
Arc::new(BlockingUdpConnection::new_from_addr(self.0.clone(), addr))
}
fn new_nonblocking_connection(
&self,
addr: SocketAddr,
_stats: Arc<ConnectionCacheStats>,
) -> NonblockingUdpTpuConnection {
NonblockingUdpTpuConnection::new_from_addr(self.0.try_clone().unwrap(), addr)
) -> Arc<dyn NonblockingClientConnection> {
Arc::new(NonblockingUdpConnection::new_from_addr(
self.0.try_clone().unwrap(),
addr,
))
}
}
#[derive(Default)]
pub struct UdpConnectionManager {}
impl ConnectionManager for UdpConnectionManager {
fn new_connection_pool(&self) -> Box<dyn ConnectionPool> {
Box::new(UdpPool {
connections: Vec::default(),
})
}
fn new_connection_config(&self) -> Box<dyn NewConnectionConfig> {
Box::new(UdpConfig::new().unwrap())
}
fn get_port_offset(&self) -> u16 {
0
}
fn get_protocol_type(&self) -> ProtocolType {
ProtocolType::UDP
}
}

View File

@ -1,50 +1,43 @@
//! Simple UDP client that communicates with the given UDP port with UDP and provides
//! an interface for sending transactions
//! an interface for sending data
use {
async_trait::async_trait, core::iter::repeat, solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send,
solana_tpu_client::nonblocking::tpu_connection::TpuConnection, std::net::SocketAddr,
async_trait::async_trait, core::iter::repeat,
solana_connection_cache::nonblocking::client_connection::ClientConnection,
solana_sdk::transport::Result as TransportResult,
solana_streamer::nonblocking::sendmmsg::batch_send, std::net::SocketAddr,
tokio::net::UdpSocket,
};
pub struct UdpTpuConnection {
pub struct UdpClientConnection {
pub socket: UdpSocket,
pub addr: SocketAddr,
}
impl UdpTpuConnection {
pub fn new_from_addr(socket: std::net::UdpSocket, tpu_addr: SocketAddr) -> Self {
impl UdpClientConnection {
pub fn new_from_addr(socket: std::net::UdpSocket, server_addr: SocketAddr) -> Self {
socket.set_nonblocking(true).unwrap();
let socket = UdpSocket::from_std(socket).unwrap();
Self {
socket,
addr: tpu_addr,
addr: server_addr,
}
}
}
#[async_trait]
impl TpuConnection for UdpTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
impl ClientConnection for UdpClientConnection {
fn server_addr(&self) -> &SocketAddr {
&self.addr
}
async fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
self.socket
.send_to(wire_transaction.as_ref(), self.addr)
.await?;
async fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
self.socket.send_to(buffer, self.addr).await?;
Ok(())
}
async fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect();
async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect();
batch_send(&self.socket, &pkts).await?;
Ok(())
}
@ -60,20 +53,17 @@ mod tests {
tokio::net::UdpSocket,
};
async fn check_send_one(connection: &UdpTpuConnection, reader: &UdpSocket) {
async fn check_send_one(connection: &UdpClientConnection, reader: &UdpSocket) {
let packet = vec![111u8; PACKET_DATA_SIZE];
connection.send_wire_transaction(&packet).await.unwrap();
connection.send_data(&packet).await.unwrap();
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap();
assert_eq!(1, recv);
}
async fn check_send_batch(connection: &UdpTpuConnection, reader: &UdpSocket) {
async fn check_send_batch(connection: &UdpClientConnection, reader: &UdpSocket) {
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
connection
.send_wire_transaction_batch(&packets)
.await
.unwrap();
connection.send_data_batch(&packets).await.unwrap();
let mut packets = vec![Packet::default(); 32];
let recv = recv_mmsg(reader, &mut packets[..]).await.unwrap();
assert_eq!(32, recv);
@ -85,7 +75,7 @@ mod tests {
let addr = addr_str.parse().unwrap();
let socket =
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)).unwrap();
let connection = UdpTpuConnection::new_from_addr(socket, addr);
let connection = UdpClientConnection::new_from_addr(socket, addr);
let reader = UdpSocket::bind(addr_str).await.expect("bind");
check_send_one(&connection, &reader).await;
check_send_batch(&connection, &reader).await;

View File

@ -1,63 +1,58 @@
//! Simple TPU client that communicates with the given UDP port with UDP and provides
//! an interface for sending transactions
//! Simple client that communicates with the given UDP port with UDP and provides
//! an interface for sending data
use {
core::iter::repeat,
solana_connection_cache::client_connection::ClientConnection,
solana_sdk::transport::Result as TransportResult,
solana_streamer::sendmmsg::batch_send,
solana_tpu_client::{
connection_cache_stats::ConnectionCacheStats, tpu_connection::TpuConnection,
},
std::{
net::{SocketAddr, UdpSocket},
sync::Arc,
},
};
pub struct UdpTpuConnection {
pub struct UdpClientConnection {
pub socket: Arc<UdpSocket>,
pub addr: SocketAddr,
}
impl UdpTpuConnection {
pub fn new_from_addr(local_socket: Arc<UdpSocket>, tpu_addr: SocketAddr) -> Self {
impl UdpClientConnection {
pub fn new_from_addr(local_socket: Arc<UdpSocket>, server_addr: SocketAddr) -> Self {
Self {
socket: local_socket,
addr: tpu_addr,
addr: server_addr,
}
}
pub fn new(
local_socket: Arc<UdpSocket>,
tpu_addr: SocketAddr,
_connection_stats: Arc<ConnectionCacheStats>,
) -> Self {
Self::new_from_addr(local_socket, tpu_addr)
}
}
impl TpuConnection for UdpTpuConnection {
fn tpu_addr(&self) -> &SocketAddr {
impl ClientConnection for UdpClientConnection {
fn server_addr(&self) -> &SocketAddr {
&self.addr
}
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
self.socket.send_to(wire_transaction.as_ref(), self.addr)?;
fn send_data_async(&self, data: Vec<u8>) -> TransportResult<()> {
self.socket.send_to(data.as_ref(), self.addr)?;
Ok(())
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]> + Send + Sync,
{
let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect();
fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
let pkts: Vec<_> = buffers.iter().zip(repeat(self.server_addr())).collect();
batch_send(&self.socket, &pkts)?;
Ok(())
}
fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect();
fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let pkts: Vec<_> = buffers
.into_iter()
.zip(repeat(self.server_addr()))
.collect();
batch_send(&self.socket, &pkts)?;
Ok(())
}
fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
self.socket.send_to(buffer, self.addr)?;
Ok(())
}
}

View File

@ -35,7 +35,7 @@ use {
solana_send_transaction_service::send_transaction_service::{
self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE,
},
solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
std::{path::PathBuf, str::FromStr},
};

View File

@ -51,7 +51,7 @@ use {
},
solana_send_transaction_service::send_transaction_service::{self},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP,
solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
solana_validator::{
admin_rpc_service,
admin_rpc_service::{load_staked_nodes_overrides, StakedNodesOverrides},