Add TpuConnectionCache and begin disentangling quic and udp (#28080)
* Add new empty crates and tpu_connection_cache module * Add BaseTpuConnection trait and impl for udp and quic * Add ConnectionPool trait and impl for udp and quic (quic-client doesn't build) * Add a couple quic-specific apis to QuicConfig (from ConnectionCache) * Re-export quic- and udp-client modules to prep for move * Move ConnectionCacheStats to new module * Move consts * Duplicate ConnectionCache into tpu_connection_cache (doesn't build) * Move methods to QuicConfig and remove unneeded methods (doesn't build) * Genericize new ConnectionCache * Rename new struct * Copy unit tests (tests don't build) * Fixup unit tests * Move quic-specific test to quic-client crate * DRY by using ConnectionPool method * Prevent evicting pool about to be used
This commit is contained in:
parent
fcd301eeed
commit
10db560278
|
@ -5902,6 +5902,17 @@ dependencies = [
|
|||
"url 2.2.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-quic-client"
|
||||
version = "1.15.0"
|
||||
dependencies = [
|
||||
"solana-logger 1.15.0",
|
||||
"solana-metrics",
|
||||
"solana-sdk 1.15.0",
|
||||
"solana-streamer",
|
||||
"solana-tpu-client",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-rayon-threadlimit"
|
||||
version = "1.15.0"
|
||||
|
@ -6589,6 +6600,14 @@ dependencies = [
|
|||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-udp-client"
|
||||
version = "1.15.0"
|
||||
dependencies = [
|
||||
"solana-net-utils",
|
||||
"solana-tpu-client",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-upload-perf"
|
||||
version = "1.15.0"
|
||||
|
|
|
@ -59,6 +59,7 @@ members = [
|
|||
"programs/vote",
|
||||
"programs/zk-token-proof",
|
||||
"pubsub-client",
|
||||
"quic-client",
|
||||
"rayon-threadlimit",
|
||||
"rbpf-cli",
|
||||
"remote-wallet",
|
||||
|
@ -87,6 +88,7 @@ members = [
|
|||
"tpu-client",
|
||||
"transaction-dos",
|
||||
"transaction-status",
|
||||
"udp-client",
|
||||
"upload-perf",
|
||||
"validator",
|
||||
"version",
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
[package]
|
||||
name = "solana-quic-client"
|
||||
version = "1.15.0"
|
||||
description = "Solana Quic Client"
|
||||
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
|
||||
repository = "https://github.com/solana-labs/solana"
|
||||
license = "Apache-2.0"
|
||||
homepage = "https://solana.com/"
|
||||
documentation = "https://docs.rs/solana-quic-client"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
solana-metrics = { path = "../metrics", version = "=1.15.0" }
|
||||
solana-sdk = { path = "../sdk", version = "=1.15.0" }
|
||||
solana-streamer = { path = "../streamer", version = "=1.15.0" }
|
||||
solana-tpu-client = { path = "../tpu-client", version = "=1.15.0" }
|
||||
|
||||
[dev-dependencies]
|
||||
solana-logger = { path = "../logger", version = "=1.15.0" }
|
|
@ -0,0 +1,231 @@
|
|||
#![allow(clippy::integer_arithmetic)]
|
||||
|
||||
pub mod nonblocking;
|
||||
pub mod quic_client;
|
||||
|
||||
use {
|
||||
crate::{
|
||||
nonblocking::quic_client::{
|
||||
QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint,
|
||||
QuicTpuConnection as NonblockingQuicTpuConnection,
|
||||
},
|
||||
quic_client::QuicTpuConnection as BlockingQuicTpuConnection,
|
||||
},
|
||||
solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair},
|
||||
solana_streamer::{
|
||||
nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType},
|
||||
streamer::StakedNodes,
|
||||
tls_certificates::new_self_signed_tls_certificate_chain,
|
||||
},
|
||||
solana_tpu_client::{
|
||||
connection_cache_stats::ConnectionCacheStats,
|
||||
tpu_connection_cache::{BaseTpuConnection, ConnectionPool, ConnectionPoolError},
|
||||
},
|
||||
std::{
|
||||
error::Error,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
sync::{Arc, RwLock},
|
||||
},
|
||||
};
|
||||
|
||||
pub struct QuicPool {
|
||||
connections: Vec<Arc<Quic>>,
|
||||
endpoint: Arc<QuicLazyInitializedEndpoint>,
|
||||
}
|
||||
impl ConnectionPool for QuicPool {
|
||||
type PoolTpuConnection = Quic;
|
||||
type TpuConfig = QuicConfig;
|
||||
const PORT_OFFSET: u16 = QUIC_PORT_OFFSET;
|
||||
|
||||
fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self {
|
||||
let mut pool = Self {
|
||||
connections: vec![],
|
||||
endpoint: config.create_endpoint(),
|
||||
};
|
||||
pool.add_connection(config, addr);
|
||||
pool
|
||||
}
|
||||
|
||||
fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) {
|
||||
let connection = Arc::new(self.create_pool_entry(config, addr));
|
||||
self.connections.push(connection);
|
||||
}
|
||||
|
||||
fn num_connections(&self) -> usize {
|
||||
self.connections.len()
|
||||
}
|
||||
|
||||
fn get(&self, index: usize) -> Result<Arc<Self::PoolTpuConnection>, ConnectionPoolError> {
|
||||
self.connections
|
||||
.get(index)
|
||||
.cloned()
|
||||
.ok_or(ConnectionPoolError::IndexOutOfRange)
|
||||
}
|
||||
|
||||
fn create_pool_entry(
|
||||
&self,
|
||||
config: &Self::TpuConfig,
|
||||
addr: &SocketAddr,
|
||||
) -> Self::PoolTpuConnection {
|
||||
Quic(Arc::new(QuicClient::new(
|
||||
self.endpoint.clone(),
|
||||
*addr,
|
||||
config.compute_max_parallel_streams(),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct QuicConfig {
|
||||
client_certificate: Arc<QuicClientCertificate>,
|
||||
maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
|
||||
maybe_client_pubkey: Option<Pubkey>,
|
||||
}
|
||||
|
||||
impl Default for QuicConfig {
|
||||
fn default() -> Self {
|
||||
let (certs, priv_key) = new_self_signed_tls_certificate_chain(
|
||||
&Keypair::new(),
|
||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
)
|
||||
.expect("Failed to initialize QUIC client certificates");
|
||||
Self {
|
||||
client_certificate: Arc::new(QuicClientCertificate {
|
||||
certificates: certs,
|
||||
key: priv_key,
|
||||
}),
|
||||
maybe_staked_nodes: None,
|
||||
maybe_client_pubkey: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl QuicConfig {
|
||||
fn create_endpoint(&self) -> Arc<QuicLazyInitializedEndpoint> {
|
||||
Arc::new(QuicLazyInitializedEndpoint::new(
|
||||
self.client_certificate.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
fn compute_max_parallel_streams(&self) -> usize {
|
||||
let (client_type, stake, total_stake) =
|
||||
self.maybe_client_pubkey
|
||||
.map_or((ConnectionPeerType::Unstaked, 0, 0), |pubkey| {
|
||||
self.maybe_staked_nodes.as_ref().map_or(
|
||||
(ConnectionPeerType::Unstaked, 0, 0),
|
||||
|stakes| {
|
||||
let rstakes = stakes.read().unwrap();
|
||||
rstakes.pubkey_stake_map.get(&pubkey).map_or(
|
||||
(ConnectionPeerType::Unstaked, 0, rstakes.total_stake),
|
||||
|stake| (ConnectionPeerType::Staked, *stake, rstakes.total_stake),
|
||||
)
|
||||
},
|
||||
)
|
||||
});
|
||||
compute_max_allowed_uni_streams(client_type, stake, total_stake)
|
||||
}
|
||||
|
||||
pub fn update_client_certificate(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
ipaddr: IpAddr,
|
||||
) -> Result<(), Box<dyn Error>> {
|
||||
let (certs, priv_key) = new_self_signed_tls_certificate_chain(keypair, ipaddr)?;
|
||||
self.client_certificate = Arc::new(QuicClientCertificate {
|
||||
certificates: certs,
|
||||
key: priv_key,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_staked_nodes(
|
||||
&mut self,
|
||||
staked_nodes: &Arc<RwLock<StakedNodes>>,
|
||||
client_pubkey: &Pubkey,
|
||||
) {
|
||||
self.maybe_staked_nodes = Some(staked_nodes.clone());
|
||||
self.maybe_client_pubkey = Some(*client_pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Quic(Arc<QuicClient>);
|
||||
impl BaseTpuConnection for Quic {
|
||||
type BlockingConnectionType = BlockingQuicTpuConnection;
|
||||
type NonblockingConnectionType = NonblockingQuicTpuConnection;
|
||||
|
||||
fn new_blocking_connection(
|
||||
&self,
|
||||
_addr: SocketAddr,
|
||||
stats: Arc<ConnectionCacheStats>,
|
||||
) -> BlockingQuicTpuConnection {
|
||||
BlockingQuicTpuConnection::new_with_client(self.0.clone(), stats)
|
||||
}
|
||||
|
||||
fn new_nonblocking_connection(
|
||||
&self,
|
||||
_addr: SocketAddr,
|
||||
stats: Arc<ConnectionCacheStats>,
|
||||
) -> NonblockingQuicTpuConnection {
|
||||
NonblockingQuicTpuConnection::new_with_client(self.0.clone(), stats)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {
|
||||
super::*,
|
||||
solana_sdk::quic::{
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
|
||||
},
|
||||
solana_tpu_client::tpu_connection_cache::TpuConnectionCache,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_connection_cache_max_parallel_chunks() {
|
||||
solana_logger::setup();
|
||||
let connection_cache = TpuConnectionCache::<QuicPool>::default();
|
||||
let mut tpu_config = connection_cache.tpu_config;
|
||||
assert_eq!(
|
||||
tpu_config.compute_max_parallel_streams(),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
|
||||
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
|
||||
let pubkey = Pubkey::new_unique();
|
||||
tpu_config.set_staked_nodes(&staked_nodes, &pubkey);
|
||||
assert_eq!(
|
||||
tpu_config.compute_max_parallel_streams(),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
|
||||
staked_nodes.write().unwrap().total_stake = 10000;
|
||||
assert_eq!(
|
||||
tpu_config.compute_max_parallel_streams(),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
|
||||
staked_nodes
|
||||
.write()
|
||||
.unwrap()
|
||||
.pubkey_stake_map
|
||||
.insert(pubkey, 1);
|
||||
assert_eq!(
|
||||
tpu_config.compute_max_parallel_streams(),
|
||||
QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
|
||||
staked_nodes
|
||||
.write()
|
||||
.unwrap()
|
||||
.pubkey_stake_map
|
||||
.remove(&pubkey);
|
||||
staked_nodes
|
||||
.write()
|
||||
.unwrap()
|
||||
.pubkey_stake_map
|
||||
.insert(pubkey, 1000);
|
||||
assert_ne!(
|
||||
tpu_config.compute_max_parallel_streams(),
|
||||
QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
pub mod quic_client;
|
|
@ -0,0 +1,5 @@
|
|||
//! Simple nonblocking client that connects to a given UDP port with the QUIC protocol
|
||||
//! and provides an interface for sending transactions which is restricted by the
|
||||
//! server's flow control.
|
||||
|
||||
pub use solana_tpu_client::nonblocking::quic_client::*;
|
|
@ -0,0 +1,4 @@
|
|||
//! Simple client that connects to a given UDP port with the QUIC protocol and provides
|
||||
//! an interface for sending transactions which is restricted by the server's flow control.
|
||||
|
||||
pub use solana_tpu_client::quic_client::*;
|
|
@ -1,10 +1,15 @@
|
|||
pub use crate::tpu_connection_cache::{
|
||||
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
|
||||
};
|
||||
use {
|
||||
crate::{
|
||||
connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL},
|
||||
nonblocking::{
|
||||
quic_client::{QuicClient, QuicClientCertificate, QuicLazyInitializedEndpoint},
|
||||
tpu_connection::NonblockingConnection,
|
||||
},
|
||||
tpu_connection::{BlockingConnection, ClientStats},
|
||||
tpu_connection::BlockingConnection,
|
||||
tpu_connection_cache::MAX_CONNECTIONS,
|
||||
},
|
||||
indexmap::map::{Entry, IndexMap},
|
||||
rand::{thread_rng, Rng},
|
||||
|
@ -20,214 +25,10 @@ use {
|
|||
std::{
|
||||
error::Error,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
sync::{atomic::Ordering, Arc, RwLock},
|
||||
},
|
||||
};
|
||||
|
||||
// Should be non-zero
|
||||
static MAX_CONNECTIONS: usize = 1024;
|
||||
|
||||
/// Used to decide whether the TPU and underlying connection cache should use
|
||||
/// QUIC connections.
|
||||
pub const DEFAULT_TPU_USE_QUIC: bool = true;
|
||||
|
||||
/// Default TPU connection pool size per remote address
|
||||
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
|
||||
|
||||
pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ConnectionCacheStats {
|
||||
cache_hits: AtomicU64,
|
||||
cache_misses: AtomicU64,
|
||||
cache_evictions: AtomicU64,
|
||||
eviction_time_ms: AtomicU64,
|
||||
sent_packets: AtomicU64,
|
||||
total_batches: AtomicU64,
|
||||
batch_success: AtomicU64,
|
||||
batch_failure: AtomicU64,
|
||||
get_connection_ms: AtomicU64,
|
||||
get_connection_lock_ms: AtomicU64,
|
||||
get_connection_hit_ms: AtomicU64,
|
||||
get_connection_miss_ms: AtomicU64,
|
||||
|
||||
// Need to track these separately per-connection
|
||||
// because we need to track the base stat value from quinn
|
||||
pub total_client_stats: ClientStats,
|
||||
}
|
||||
|
||||
const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000;
|
||||
|
||||
impl ConnectionCacheStats {
|
||||
pub fn add_client_stats(
|
||||
&self,
|
||||
client_stats: &ClientStats,
|
||||
num_packets: usize,
|
||||
is_success: bool,
|
||||
) {
|
||||
self.total_client_stats.total_connections.fetch_add(
|
||||
client_stats.total_connections.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.connection_reuse.fetch_add(
|
||||
client_stats.connection_reuse.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.connection_errors.fetch_add(
|
||||
client_stats.connection_errors.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.zero_rtt_accepts.fetch_add(
|
||||
client_stats.zero_rtt_accepts.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.zero_rtt_rejects.fetch_add(
|
||||
client_stats.zero_rtt_rejects.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.make_connection_ms.fetch_add(
|
||||
client_stats.make_connection_ms.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.sent_packets
|
||||
.fetch_add(num_packets as u64, Ordering::Relaxed);
|
||||
self.total_batches.fetch_add(1, Ordering::Relaxed);
|
||||
if is_success {
|
||||
self.batch_success.fetch_add(1, Ordering::Relaxed);
|
||||
} else {
|
||||
self.batch_failure.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
fn report(&self) {
|
||||
datapoint_info!(
|
||||
"quic-client-connection-stats",
|
||||
(
|
||||
"cache_hits",
|
||||
self.cache_hits.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"cache_misses",
|
||||
self.cache_misses.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"cache_evictions",
|
||||
self.cache_evictions.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"eviction_time_ms",
|
||||
self.eviction_time_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_connection_ms",
|
||||
self.get_connection_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_connection_lock_ms",
|
||||
self.get_connection_lock_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_connection_hit_ms",
|
||||
self.get_connection_hit_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_connection_miss_ms",
|
||||
self.get_connection_miss_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"make_connection_ms",
|
||||
self.total_client_stats
|
||||
.make_connection_ms
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"total_connections",
|
||||
self.total_client_stats
|
||||
.total_connections
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"connection_reuse",
|
||||
self.total_client_stats
|
||||
.connection_reuse
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"connection_errors",
|
||||
self.total_client_stats
|
||||
.connection_errors
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"zero_rtt_accepts",
|
||||
self.total_client_stats
|
||||
.zero_rtt_accepts
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"zero_rtt_rejects",
|
||||
self.total_client_stats
|
||||
.zero_rtt_rejects
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"congestion_events",
|
||||
self.total_client_stats.congestion_events.load_and_reset(),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"tx_streams_blocked_uni",
|
||||
self.total_client_stats
|
||||
.tx_streams_blocked_uni
|
||||
.load_and_reset(),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"tx_data_blocked",
|
||||
self.total_client_stats.tx_data_blocked.load_and_reset(),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"tx_acks",
|
||||
self.total_client_stats.tx_acks.load_and_reset(),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"num_packets",
|
||||
self.sent_packets.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"total_batches",
|
||||
self.total_batches.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"batch_failure",
|
||||
self.batch_failure.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectionCache {
|
||||
map: RwLock<IndexMap<SocketAddr, ConnectionPool>>,
|
||||
stats: Arc<ConnectionCacheStats>,
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
use {
|
||||
crate::tpu_connection::ClientStats,
|
||||
std::sync::atomic::{AtomicU64, Ordering},
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ConnectionCacheStats {
|
||||
pub(crate) cache_hits: AtomicU64,
|
||||
pub(crate) cache_misses: AtomicU64,
|
||||
pub(crate) cache_evictions: AtomicU64,
|
||||
pub(crate) eviction_time_ms: AtomicU64,
|
||||
pub(crate) sent_packets: AtomicU64,
|
||||
pub(crate) total_batches: AtomicU64,
|
||||
pub(crate) batch_success: AtomicU64,
|
||||
pub(crate) batch_failure: AtomicU64,
|
||||
pub(crate) get_connection_ms: AtomicU64,
|
||||
pub(crate) get_connection_lock_ms: AtomicU64,
|
||||
pub(crate) get_connection_hit_ms: AtomicU64,
|
||||
pub(crate) get_connection_miss_ms: AtomicU64,
|
||||
|
||||
// Need to track these separately per-connection
|
||||
// because we need to track the base stat value from quinn
|
||||
pub total_client_stats: ClientStats,
|
||||
}
|
||||
|
||||
pub(crate) const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000;
|
||||
|
||||
impl ConnectionCacheStats {
|
||||
pub fn add_client_stats(
|
||||
&self,
|
||||
client_stats: &ClientStats,
|
||||
num_packets: usize,
|
||||
is_success: bool,
|
||||
) {
|
||||
self.total_client_stats.total_connections.fetch_add(
|
||||
client_stats.total_connections.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.connection_reuse.fetch_add(
|
||||
client_stats.connection_reuse.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.connection_errors.fetch_add(
|
||||
client_stats.connection_errors.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.zero_rtt_accepts.fetch_add(
|
||||
client_stats.zero_rtt_accepts.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.zero_rtt_rejects.fetch_add(
|
||||
client_stats.zero_rtt_rejects.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.total_client_stats.make_connection_ms.fetch_add(
|
||||
client_stats.make_connection_ms.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
self.sent_packets
|
||||
.fetch_add(num_packets as u64, Ordering::Relaxed);
|
||||
self.total_batches.fetch_add(1, Ordering::Relaxed);
|
||||
if is_success {
|
||||
self.batch_success.fetch_add(1, Ordering::Relaxed);
|
||||
} else {
|
||||
self.batch_failure.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn report(&self) {
|
||||
datapoint_info!(
|
||||
"quic-client-connection-stats",
|
||||
(
|
||||
"cache_hits",
|
||||
self.cache_hits.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"cache_misses",
|
||||
self.cache_misses.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"cache_evictions",
|
||||
self.cache_evictions.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"eviction_time_ms",
|
||||
self.eviction_time_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_connection_ms",
|
||||
self.get_connection_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_connection_lock_ms",
|
||||
self.get_connection_lock_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_connection_hit_ms",
|
||||
self.get_connection_hit_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_connection_miss_ms",
|
||||
self.get_connection_miss_ms.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"make_connection_ms",
|
||||
self.total_client_stats
|
||||
.make_connection_ms
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"total_connections",
|
||||
self.total_client_stats
|
||||
.total_connections
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"connection_reuse",
|
||||
self.total_client_stats
|
||||
.connection_reuse
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"connection_errors",
|
||||
self.total_client_stats
|
||||
.connection_errors
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"zero_rtt_accepts",
|
||||
self.total_client_stats
|
||||
.zero_rtt_accepts
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"zero_rtt_rejects",
|
||||
self.total_client_stats
|
||||
.zero_rtt_rejects
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"congestion_events",
|
||||
self.total_client_stats.congestion_events.load_and_reset(),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"tx_streams_blocked_uni",
|
||||
self.total_client_stats
|
||||
.tx_streams_blocked_uni
|
||||
.load_and_reset(),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"tx_data_blocked",
|
||||
self.total_client_stats.tx_data_blocked.load_and_reset(),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"tx_acks",
|
||||
self.total_client_stats.tx_acks.load_and_reset(),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"num_packets",
|
||||
self.sent_packets.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"total_batches",
|
||||
self.total_batches.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"batch_failure",
|
||||
self.batch_failure.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
|
@ -1,10 +1,12 @@
|
|||
#![allow(clippy::integer_arithmetic)]
|
||||
|
||||
pub mod connection_cache;
|
||||
pub mod connection_cache_stats;
|
||||
pub mod nonblocking;
|
||||
pub mod quic_client;
|
||||
pub mod tpu_client;
|
||||
pub mod tpu_connection;
|
||||
pub mod tpu_connection_cache;
|
||||
pub mod udp_client;
|
||||
|
||||
#[macro_use]
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
//! server's flow control.
|
||||
use {
|
||||
crate::{
|
||||
connection_cache::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection,
|
||||
connection_cache_stats::ConnectionCacheStats, nonblocking::tpu_connection::TpuConnection,
|
||||
tpu_connection::ClientStats,
|
||||
},
|
||||
async_mutex::Mutex,
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
use {
|
||||
crate::{
|
||||
connection_cache::ConnectionCacheStats,
|
||||
connection_cache_stats::ConnectionCacheStats,
|
||||
nonblocking::{
|
||||
quic_client::{
|
||||
QuicClient, QuicLazyInitializedEndpoint,
|
||||
|
|
|
@ -0,0 +1,567 @@
|
|||
use {
|
||||
crate::{
|
||||
connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL},
|
||||
nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection,
|
||||
tpu_connection::TpuConnection as BlockingTpuConnection,
|
||||
},
|
||||
indexmap::map::IndexMap,
|
||||
rand::{thread_rng, Rng},
|
||||
solana_measure::measure::Measure,
|
||||
solana_sdk::timing::AtomicInterval,
|
||||
std::{
|
||||
net::SocketAddr,
|
||||
sync::{atomic::Ordering, Arc, RwLock},
|
||||
},
|
||||
thiserror::Error,
|
||||
};
|
||||
|
||||
// Should be non-zero
|
||||
pub(crate) static MAX_CONNECTIONS: usize = 1024;
|
||||
|
||||
/// Used to decide whether the TPU and underlying connection cache should use
|
||||
/// QUIC connections.
|
||||
pub const DEFAULT_TPU_USE_QUIC: bool = true;
|
||||
|
||||
/// Default TPU connection pool size per remote address
|
||||
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
|
||||
|
||||
pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
|
||||
|
||||
pub struct TpuConnectionCache<P: ConnectionPool> {
|
||||
pub map: RwLock<IndexMap<SocketAddr, P>>,
|
||||
pub stats: Arc<ConnectionCacheStats>,
|
||||
pub last_stats: AtomicInterval,
|
||||
pub connection_pool_size: usize,
|
||||
pub tpu_config: P::TpuConfig,
|
||||
}
|
||||
|
||||
impl<P: ConnectionPool> TpuConnectionCache<P> {
|
||||
pub fn new(connection_pool_size: usize) -> Self {
|
||||
Self {
|
||||
connection_pool_size: 1.max(connection_pool_size), // The minimum pool size is 1.
|
||||
..Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a lazy connection object under the exclusive lock of the cache map if there is not
|
||||
/// enough used connections in the connection pool for the specified address.
|
||||
/// Returns CreateConnectionResult.
|
||||
fn create_connection(
|
||||
&self,
|
||||
lock_timing_ms: &mut u64,
|
||||
addr: &SocketAddr,
|
||||
) -> CreateConnectionResult<P::PoolTpuConnection> {
|
||||
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
|
||||
let mut map = self.map.write().unwrap();
|
||||
get_connection_map_lock_measure.stop();
|
||||
*lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
|
||||
// Read again, as it is possible that between read lock dropped and the write lock acquired
|
||||
// another thread could have setup the connection.
|
||||
|
||||
let should_create_connection = map
|
||||
.get(addr)
|
||||
.map(|pool| pool.need_new_connection(self.connection_pool_size))
|
||||
.unwrap_or(true);
|
||||
|
||||
let (cache_hit, num_evictions, eviction_timing_ms) = if should_create_connection {
|
||||
// evict a connection if the cache is reaching upper bounds
|
||||
let mut num_evictions = 0;
|
||||
let mut get_connection_cache_eviction_measure =
|
||||
Measure::start("get_connection_cache_eviction_measure");
|
||||
let existing_index = map.get_index_of(addr);
|
||||
while map.len() >= MAX_CONNECTIONS {
|
||||
let mut rng = thread_rng();
|
||||
let n = rng.gen_range(0, MAX_CONNECTIONS);
|
||||
if let Some(index) = existing_index {
|
||||
if n == index {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
map.swap_remove_index(n);
|
||||
num_evictions += 1;
|
||||
}
|
||||
get_connection_cache_eviction_measure.stop();
|
||||
|
||||
map.entry(*addr)
|
||||
.and_modify(|pool| {
|
||||
pool.add_connection(&self.tpu_config, addr);
|
||||
})
|
||||
.or_insert_with(|| P::new_with_connection(&self.tpu_config, addr));
|
||||
|
||||
(
|
||||
false,
|
||||
num_evictions,
|
||||
get_connection_cache_eviction_measure.as_ms(),
|
||||
)
|
||||
} else {
|
||||
(true, 0, 0)
|
||||
};
|
||||
|
||||
let pool = map.get(addr).unwrap();
|
||||
let connection = pool.borrow_connection();
|
||||
|
||||
CreateConnectionResult {
|
||||
connection,
|
||||
cache_hit,
|
||||
connection_cache_stats: self.stats.clone(),
|
||||
num_evictions,
|
||||
eviction_timing_ms,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_or_add_connection(
|
||||
&self,
|
||||
addr: &SocketAddr,
|
||||
) -> GetConnectionResult<P::PoolTpuConnection> {
|
||||
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
|
||||
let map = self.map.read().unwrap();
|
||||
get_connection_map_lock_measure.stop();
|
||||
|
||||
let port_offset = P::PORT_OFFSET;
|
||||
|
||||
let port = addr
|
||||
.port()
|
||||
.checked_add(port_offset)
|
||||
.unwrap_or_else(|| addr.port());
|
||||
let addr = SocketAddr::new(addr.ip(), port);
|
||||
|
||||
let mut lock_timing_ms = get_connection_map_lock_measure.as_ms();
|
||||
|
||||
let report_stats = self
|
||||
.last_stats
|
||||
.should_update(CONNECTION_STAT_SUBMISSION_INTERVAL);
|
||||
|
||||
let mut get_connection_map_measure = Measure::start("get_connection_hit_measure");
|
||||
let CreateConnectionResult {
|
||||
connection,
|
||||
cache_hit,
|
||||
connection_cache_stats,
|
||||
num_evictions,
|
||||
eviction_timing_ms,
|
||||
} = match map.get(&addr) {
|
||||
Some(pool) => {
|
||||
if pool.need_new_connection(self.connection_pool_size) {
|
||||
// create more connection and put it in the pool
|
||||
drop(map);
|
||||
self.create_connection(&mut lock_timing_ms, &addr)
|
||||
} else {
|
||||
let connection = pool.borrow_connection();
|
||||
CreateConnectionResult {
|
||||
connection,
|
||||
cache_hit: true,
|
||||
connection_cache_stats: self.stats.clone(),
|
||||
num_evictions: 0,
|
||||
eviction_timing_ms: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Upgrade to write access by dropping read lock and acquire write lock
|
||||
drop(map);
|
||||
self.create_connection(&mut lock_timing_ms, &addr)
|
||||
}
|
||||
};
|
||||
get_connection_map_measure.stop();
|
||||
|
||||
GetConnectionResult {
|
||||
connection,
|
||||
cache_hit,
|
||||
report_stats,
|
||||
map_timing_ms: get_connection_map_measure.as_ms(),
|
||||
lock_timing_ms,
|
||||
connection_cache_stats,
|
||||
num_evictions,
|
||||
eviction_timing_ms,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_connection_and_log_stats(
|
||||
&self,
|
||||
addr: &SocketAddr,
|
||||
) -> (Arc<P::PoolTpuConnection>, Arc<ConnectionCacheStats>) {
|
||||
let mut get_connection_measure = Measure::start("get_connection_measure");
|
||||
let GetConnectionResult {
|
||||
connection,
|
||||
cache_hit,
|
||||
report_stats,
|
||||
map_timing_ms,
|
||||
lock_timing_ms,
|
||||
connection_cache_stats,
|
||||
num_evictions,
|
||||
eviction_timing_ms,
|
||||
} = self.get_or_add_connection(addr);
|
||||
|
||||
if report_stats {
|
||||
connection_cache_stats.report();
|
||||
}
|
||||
|
||||
if cache_hit {
|
||||
connection_cache_stats
|
||||
.cache_hits
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
connection_cache_stats
|
||||
.get_connection_hit_ms
|
||||
.fetch_add(map_timing_ms, Ordering::Relaxed);
|
||||
} else {
|
||||
connection_cache_stats
|
||||
.cache_misses
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
connection_cache_stats
|
||||
.get_connection_miss_ms
|
||||
.fetch_add(map_timing_ms, Ordering::Relaxed);
|
||||
connection_cache_stats
|
||||
.cache_evictions
|
||||
.fetch_add(num_evictions, Ordering::Relaxed);
|
||||
connection_cache_stats
|
||||
.eviction_time_ms
|
||||
.fetch_add(eviction_timing_ms, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
get_connection_measure.stop();
|
||||
connection_cache_stats
|
||||
.get_connection_lock_ms
|
||||
.fetch_add(lock_timing_ms, Ordering::Relaxed);
|
||||
connection_cache_stats
|
||||
.get_connection_ms
|
||||
.fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);
|
||||
|
||||
(connection, connection_cache_stats)
|
||||
}
|
||||
|
||||
pub fn get_connection(
|
||||
&self,
|
||||
addr: &SocketAddr,
|
||||
) -> <P::PoolTpuConnection as BaseTpuConnection>::BlockingConnectionType {
|
||||
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
|
||||
connection.new_blocking_connection(*addr, connection_cache_stats)
|
||||
}
|
||||
|
||||
pub fn get_nonblocking_connection(
|
||||
&self,
|
||||
addr: &SocketAddr,
|
||||
) -> <P::PoolTpuConnection as BaseTpuConnection>::NonblockingConnectionType {
|
||||
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
|
||||
connection.new_nonblocking_connection(*addr, connection_cache_stats)
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: ConnectionPool> Default for TpuConnectionCache<P> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
|
||||
stats: Arc::new(ConnectionCacheStats::default()),
|
||||
last_stats: AtomicInterval::default(),
|
||||
connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
|
||||
tpu_config: P::TpuConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum ConnectionPoolError {
|
||||
#[error("connection index is out of range of the pool")]
|
||||
IndexOutOfRange,
|
||||
}
|
||||
|
||||
pub trait ConnectionPool {
|
||||
type PoolTpuConnection: BaseTpuConnection;
|
||||
type TpuConfig: Default;
|
||||
const PORT_OFFSET: u16 = 0;
|
||||
|
||||
/// Create a new connection pool based on protocol-specific configuration
|
||||
fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self;
|
||||
|
||||
/// Add a connection to the pool
|
||||
fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr);
|
||||
|
||||
/// Get the number of current connections in the pool
|
||||
fn num_connections(&self) -> usize;
|
||||
|
||||
/// Get a connection based on its index in the pool, without checking if the
|
||||
fn get(&self, index: usize) -> Result<Arc<Self::PoolTpuConnection>, ConnectionPoolError>;
|
||||
|
||||
/// Get a connection from the pool. It must have at least one connection in the pool.
|
||||
/// This randomly picks a connection in the pool.
|
||||
fn borrow_connection(&self) -> Arc<Self::PoolTpuConnection> {
|
||||
let mut rng = thread_rng();
|
||||
let n = rng.gen_range(0, self.num_connections());
|
||||
self.get(n).expect("index is within num_connections")
|
||||
}
|
||||
/// Check if we need to create a new connection. If the count of the connections
|
||||
/// is smaller than the pool size.
|
||||
fn need_new_connection(&self, required_pool_size: usize) -> bool {
|
||||
self.num_connections() < required_pool_size
|
||||
}
|
||||
|
||||
fn create_pool_entry(
|
||||
&self,
|
||||
config: &Self::TpuConfig,
|
||||
addr: &SocketAddr,
|
||||
) -> Self::PoolTpuConnection;
|
||||
}
|
||||
|
||||
pub trait BaseTpuConnection {
|
||||
type BlockingConnectionType: BlockingTpuConnection;
|
||||
type NonblockingConnectionType: NonblockingTpuConnection;
|
||||
|
||||
fn new_blocking_connection(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
stats: Arc<ConnectionCacheStats>,
|
||||
) -> Self::BlockingConnectionType;
|
||||
|
||||
fn new_nonblocking_connection(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
stats: Arc<ConnectionCacheStats>,
|
||||
) -> Self::NonblockingConnectionType;
|
||||
}
|
||||
|
||||
struct GetConnectionResult<T: BaseTpuConnection> {
|
||||
connection: Arc<T>,
|
||||
cache_hit: bool,
|
||||
report_stats: bool,
|
||||
map_timing_ms: u64,
|
||||
lock_timing_ms: u64,
|
||||
connection_cache_stats: Arc<ConnectionCacheStats>,
|
||||
num_evictions: u64,
|
||||
eviction_timing_ms: u64,
|
||||
}
|
||||
|
||||
struct CreateConnectionResult<T: BaseTpuConnection> {
|
||||
connection: Arc<T>,
|
||||
cache_hit: bool,
|
||||
connection_cache_stats: Arc<ConnectionCacheStats>,
|
||||
num_evictions: u64,
|
||||
eviction_timing_ms: u64,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {
|
||||
super::*,
|
||||
crate::{
|
||||
nonblocking::tpu_connection::TpuConnection as NonblockingTpuConnection,
|
||||
tpu_connection::TpuConnection as BlockingTpuConnection,
|
||||
},
|
||||
async_trait::async_trait,
|
||||
rand::{Rng, SeedableRng},
|
||||
rand_chacha::ChaChaRng,
|
||||
solana_sdk::transport::Result as TransportResult,
|
||||
std::{
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
|
||||
sync::Arc,
|
||||
},
|
||||
};
|
||||
|
||||
const MOCK_PORT_OFFSET: u16 = 42;
|
||||
|
||||
pub struct MockUdpPool {
|
||||
connections: Vec<Arc<MockUdp>>,
|
||||
}
|
||||
impl ConnectionPool for MockUdpPool {
|
||||
type PoolTpuConnection = MockUdp;
|
||||
type TpuConfig = MockUdpConfig;
|
||||
const PORT_OFFSET: u16 = MOCK_PORT_OFFSET;
|
||||
|
||||
fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self {
|
||||
let mut pool = Self {
|
||||
connections: vec![],
|
||||
};
|
||||
pool.add_connection(config, addr);
|
||||
pool
|
||||
}
|
||||
|
||||
fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) {
|
||||
let connection = Arc::new(self.create_pool_entry(config, addr));
|
||||
self.connections.push(connection);
|
||||
}
|
||||
|
||||
fn num_connections(&self) -> usize {
|
||||
self.connections.len()
|
||||
}
|
||||
|
||||
fn get(&self, index: usize) -> Result<Arc<Self::PoolTpuConnection>, ConnectionPoolError> {
|
||||
self.connections
|
||||
.get(index)
|
||||
.cloned()
|
||||
.ok_or(ConnectionPoolError::IndexOutOfRange)
|
||||
}
|
||||
|
||||
fn create_pool_entry(
|
||||
&self,
|
||||
config: &Self::TpuConfig,
|
||||
_addr: &SocketAddr,
|
||||
) -> Self::PoolTpuConnection {
|
||||
MockUdp(config.tpu_udp_socket.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockUdpConfig {
|
||||
tpu_udp_socket: Arc<UdpSocket>,
|
||||
}
|
||||
|
||||
impl Default for MockUdpConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
tpu_udp_socket: Arc::new(
|
||||
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
|
||||
.expect("Unable to bind to UDP socket"),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockUdp(Arc<UdpSocket>);
|
||||
impl BaseTpuConnection for MockUdp {
|
||||
type BlockingConnectionType = MockUdpTpuConnection;
|
||||
type NonblockingConnectionType = MockUdpTpuConnection;
|
||||
|
||||
fn new_blocking_connection(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
_stats: Arc<ConnectionCacheStats>,
|
||||
) -> MockUdpTpuConnection {
|
||||
MockUdpTpuConnection {
|
||||
_socket: self.0.clone(),
|
||||
addr,
|
||||
}
|
||||
}
|
||||
|
||||
fn new_nonblocking_connection(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
_stats: Arc<ConnectionCacheStats>,
|
||||
) -> MockUdpTpuConnection {
|
||||
MockUdpTpuConnection {
|
||||
_socket: self.0.clone(),
|
||||
addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockUdpTpuConnection {
|
||||
_socket: Arc<UdpSocket>,
|
||||
addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl BlockingTpuConnection for MockUdpTpuConnection {
|
||||
fn tpu_addr(&self) -> &SocketAddr {
|
||||
&self.addr
|
||||
}
|
||||
fn send_wire_transaction_async(&self, _wire_transaction: Vec<u8>) -> TransportResult<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
fn send_wire_transaction_batch<T>(&self, _buffers: &[T]) -> TransportResult<()>
|
||||
where
|
||||
T: AsRef<[u8]> + Send + Sync,
|
||||
{
|
||||
unimplemented!()
|
||||
}
|
||||
fn send_wire_transaction_batch_async(&self, _buffers: Vec<Vec<u8>>) -> TransportResult<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NonblockingTpuConnection for MockUdpTpuConnection {
|
||||
fn tpu_addr(&self) -> &SocketAddr {
|
||||
&self.addr
|
||||
}
|
||||
async fn send_wire_transaction<T>(&self, _wire_transaction: T) -> TransportResult<()>
|
||||
where
|
||||
T: AsRef<[u8]> + Send + Sync,
|
||||
{
|
||||
unimplemented!()
|
||||
}
|
||||
async fn send_wire_transaction_batch<T>(&self, _buffers: &[T]) -> TransportResult<()>
|
||||
where
|
||||
T: AsRef<[u8]> + Send + Sync,
|
||||
{
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
|
||||
let a = rng.gen_range(1, 255);
|
||||
let b = rng.gen_range(1, 255);
|
||||
let c = rng.gen_range(1, 255);
|
||||
let d = rng.gen_range(1, 255);
|
||||
|
||||
let addr_str = format!("{}.{}.{}.{}:80", a, b, c, d);
|
||||
|
||||
addr_str.parse().expect("Invalid address")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tpu_connection_cache() {
|
||||
solana_logger::setup();
|
||||
// Allow the test to run deterministically
|
||||
// with the same pseudorandom sequence between runs
|
||||
// and on different platforms - the cryptographic security
|
||||
// property isn't important here but ChaChaRng provides a way
|
||||
// to get the same pseudorandom sequence on different platforms
|
||||
let mut rng = ChaChaRng::seed_from_u64(42);
|
||||
|
||||
// Generate a bunch of random addresses and create TPUConnections to them
|
||||
// Since TPUConnection::new is infallible, it should't matter whether or not
|
||||
// we can actually connect to those addresses - TPUConnection implementations should either
|
||||
// be lazy and not connect until first use or handle connection errors somehow
|
||||
// (without crashing, as would be required in a real practical validator)
|
||||
let connection_cache = TpuConnectionCache::<MockUdpPool>::default();
|
||||
let port_offset = MOCK_PORT_OFFSET;
|
||||
let addrs = (0..MAX_CONNECTIONS)
|
||||
.into_iter()
|
||||
.map(|_| {
|
||||
let addr = get_addr(&mut rng);
|
||||
connection_cache.get_connection(&addr);
|
||||
addr
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
{
|
||||
let map = connection_cache.map.read().unwrap();
|
||||
assert!(map.len() == MAX_CONNECTIONS);
|
||||
addrs.iter().for_each(|a| {
|
||||
let port = a
|
||||
.port()
|
||||
.checked_add(port_offset)
|
||||
.unwrap_or_else(|| a.port());
|
||||
let addr = &SocketAddr::new(a.ip(), port);
|
||||
|
||||
let conn = &map.get(addr).expect("Address not found").connections[0];
|
||||
let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
|
||||
assert!(addr.ip() == BlockingTpuConnection::tpu_addr(&conn).ip());
|
||||
});
|
||||
}
|
||||
|
||||
let addr = &get_addr(&mut rng);
|
||||
connection_cache.get_connection(addr);
|
||||
|
||||
let port = addr
|
||||
.port()
|
||||
.checked_add(port_offset)
|
||||
.unwrap_or_else(|| addr.port());
|
||||
let addr_with_quic_port = SocketAddr::new(addr.ip(), port);
|
||||
let map = connection_cache.map.read().unwrap();
|
||||
assert!(map.len() == MAX_CONNECTIONS);
|
||||
let _conn = map.get(&addr_with_quic_port).expect("Address not found");
|
||||
}
|
||||
|
||||
// Test that we can get_connection with a connection cache configured
|
||||
// on an address with a port that would overflow to
|
||||
// an invalid port.
|
||||
#[test]
|
||||
fn test_overflow_address() {
|
||||
let port = u16::MAX - MOCK_PORT_OFFSET + 1;
|
||||
assert!(port.checked_add(MOCK_PORT_OFFSET).is_none());
|
||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
|
||||
let connection_cache = TpuConnectionCache::<MockUdpPool>::new(1);
|
||||
|
||||
let conn: MockUdpTpuConnection = connection_cache.get_connection(&addr);
|
||||
// We (intentionally) don't have an interface that allows us to distinguish between
|
||||
// UDP and Quic connections, so check instead that the port is valid (non-zero)
|
||||
// and is the same as the input port (falling back on UDP)
|
||||
assert!(BlockingTpuConnection::tpu_addr(&conn).port() != 0);
|
||||
assert!(BlockingTpuConnection::tpu_addr(&conn).port() == port);
|
||||
}
|
||||
}
|
|
@ -2,7 +2,7 @@
|
|||
//! an interface for sending transactions
|
||||
|
||||
use {
|
||||
crate::{connection_cache::ConnectionCacheStats, tpu_connection::TpuConnection},
|
||||
crate::{connection_cache_stats::ConnectionCacheStats, tpu_connection::TpuConnection},
|
||||
core::iter::repeat,
|
||||
solana_sdk::transport::Result as TransportResult,
|
||||
solana_streamer::sendmmsg::batch_send,
|
||||
|
|
|
@ -6,7 +6,7 @@ mod tests {
|
|||
solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair},
|
||||
solana_streamer::{quic::StreamStats, streamer::StakedNodes},
|
||||
solana_tpu_client::{
|
||||
connection_cache::ConnectionCacheStats,
|
||||
connection_cache_stats::ConnectionCacheStats,
|
||||
nonblocking::quic_client::QuicLazyInitializedEndpoint,
|
||||
},
|
||||
std::{
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
[package]
|
||||
name = "solana-udp-client"
|
||||
version = "1.15.0"
|
||||
description = "Solana UDP Client"
|
||||
authors = ["Solana Maintainers <maintainers@solana.foundation>"]
|
||||
repository = "https://github.com/solana-labs/solana"
|
||||
license = "Apache-2.0"
|
||||
homepage = "https://solana.com/"
|
||||
documentation = "https://docs.rs/solana-udp-client"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
solana-net-utils = { path = "../net-utils", version = "=1.15.0" }
|
||||
solana-tpu-client = { path = "../tpu-client", version = "=1.15.0" }
|
|
@ -0,0 +1,96 @@
|
|||
#![allow(clippy::integer_arithmetic)]
|
||||
|
||||
pub mod nonblocking;
|
||||
pub mod udp_client;
|
||||
|
||||
use {
|
||||
crate::{
|
||||
nonblocking::udp_client::UdpTpuConnection as NonblockingUdpTpuConnection,
|
||||
udp_client::UdpTpuConnection as BlockingUdpTpuConnection,
|
||||
},
|
||||
solana_tpu_client::{
|
||||
connection_cache_stats::ConnectionCacheStats,
|
||||
tpu_connection_cache::{BaseTpuConnection, ConnectionPool, ConnectionPoolError},
|
||||
},
|
||||
std::{
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
|
||||
sync::Arc,
|
||||
},
|
||||
};
|
||||
|
||||
pub struct UdpPool {
|
||||
connections: Vec<Arc<Udp>>,
|
||||
}
|
||||
impl ConnectionPool for UdpPool {
|
||||
type PoolTpuConnection = Udp;
|
||||
type TpuConfig = UdpConfig;
|
||||
|
||||
fn new_with_connection(config: &Self::TpuConfig, addr: &SocketAddr) -> Self {
|
||||
let mut pool = Self {
|
||||
connections: vec![],
|
||||
};
|
||||
pool.add_connection(config, addr);
|
||||
pool
|
||||
}
|
||||
|
||||
fn add_connection(&mut self, config: &Self::TpuConfig, addr: &SocketAddr) {
|
||||
let connection = Arc::new(self.create_pool_entry(config, addr));
|
||||
self.connections.push(connection);
|
||||
}
|
||||
|
||||
fn num_connections(&self) -> usize {
|
||||
self.connections.len()
|
||||
}
|
||||
|
||||
fn get(&self, index: usize) -> Result<Arc<Self::PoolTpuConnection>, ConnectionPoolError> {
|
||||
self.connections
|
||||
.get(index)
|
||||
.cloned()
|
||||
.ok_or(ConnectionPoolError::IndexOutOfRange)
|
||||
}
|
||||
|
||||
fn create_pool_entry(
|
||||
&self,
|
||||
config: &Self::TpuConfig,
|
||||
_addr: &SocketAddr,
|
||||
) -> Self::PoolTpuConnection {
|
||||
Udp(config.tpu_udp_socket.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UdpConfig {
|
||||
tpu_udp_socket: Arc<UdpSocket>,
|
||||
}
|
||||
|
||||
impl Default for UdpConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
tpu_udp_socket: Arc::new(
|
||||
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
|
||||
.expect("Unable to bind to UDP socket"),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Udp(Arc<UdpSocket>);
|
||||
impl BaseTpuConnection for Udp {
|
||||
type BlockingConnectionType = BlockingUdpTpuConnection;
|
||||
type NonblockingConnectionType = NonblockingUdpTpuConnection;
|
||||
|
||||
fn new_blocking_connection(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
_stats: Arc<ConnectionCacheStats>,
|
||||
) -> BlockingUdpTpuConnection {
|
||||
BlockingUdpTpuConnection::new_from_addr(self.0.clone(), addr)
|
||||
}
|
||||
|
||||
fn new_nonblocking_connection(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
_stats: Arc<ConnectionCacheStats>,
|
||||
) -> NonblockingUdpTpuConnection {
|
||||
NonblockingUdpTpuConnection::new_from_addr(self.0.try_clone().unwrap(), addr)
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
pub mod udp_client;
|
|
@ -0,0 +1,4 @@
|
|||
//! Simple UDP client that communicates with the given UDP port with UDP and provides
|
||||
//! an interface for sending transactions
|
||||
|
||||
pub use solana_tpu_client::nonblocking::udp_client::*;
|
|
@ -0,0 +1,4 @@
|
|||
//! Simple TPU client that communicates with the given UDP port with UDP and provides
|
||||
//! an interface for sending transactions
|
||||
|
||||
pub use solana_tpu_client::udp_client::*;
|
Loading…
Reference in New Issue