Remove Default impls for TpuConnectionCache and ConnectionPool clients (#28788)

* Remove TpuConnectionCache impl Default

* Add fallible ctors for Quic/UdpClients

* Add Quic/TpuClient ctor trait

* Remove Quic/UdpClient impl Default
This commit is contained in:
Tyera Eulberg 2022-11-17 20:13:43 -07:00 committed by GitHub
parent 813f1f993a
commit 8b63f73100
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 81 additions and 34 deletions

2
Cargo.lock generated
View File

@ -5963,6 +5963,7 @@ dependencies = [
"solana-sdk 1.15.0", "solana-sdk 1.15.0",
"solana-streamer", "solana-streamer",
"solana-tpu-client", "solana-tpu-client",
"thiserror",
] ]
[[package]] [[package]]
@ -6654,6 +6655,7 @@ version = "1.15.0"
dependencies = [ dependencies = [
"solana-net-utils", "solana-net-utils",
"solana-tpu-client", "solana-tpu-client",
"thiserror",
] ]
[[package]] [[package]]

View File

@ -14,6 +14,7 @@ solana-metrics = { path = "../metrics", version = "=1.15.0" }
solana-sdk = { path = "../sdk", version = "=1.15.0" } solana-sdk = { path = "../sdk", version = "=1.15.0" }
solana-streamer = { path = "../streamer", version = "=1.15.0" } solana-streamer = { path = "../streamer", version = "=1.15.0" }
solana-tpu-client = { path = "../tpu-client", version = "=1.15.0" } solana-tpu-client = { path = "../tpu-client", version = "=1.15.0" }
thiserror = "1.0"
[dev-dependencies] [dev-dependencies]
solana-logger = { path = "../logger", version = "=1.15.0" } solana-logger = { path = "../logger", version = "=1.15.0" }

View File

@ -19,15 +19,24 @@ use {
}, },
solana_tpu_client::{ solana_tpu_client::{
connection_cache_stats::ConnectionCacheStats, connection_cache_stats::ConnectionCacheStats,
tpu_connection_cache::{BaseTpuConnection, ConnectionPool, ConnectionPoolError}, tpu_connection_cache::{
BaseTpuConnection, ConnectionPool, ConnectionPoolError, NewTpuConfig,
},
}, },
std::{ std::{
error::Error, error::Error,
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}, },
thiserror::Error,
}; };
#[derive(Error, Debug)]
pub enum QuicClientError {
#[error("Certificate error: {0}")]
CertificateError(String),
}
pub struct QuicPool { pub struct QuicPool {
connections: Vec<Arc<Quic>>, connections: Vec<Arc<Quic>>,
endpoint: Arc<QuicLazyInitializedEndpoint>, endpoint: Arc<QuicLazyInitializedEndpoint>,
@ -81,21 +90,23 @@ pub struct QuicConfig {
maybe_client_pubkey: Option<Pubkey>, maybe_client_pubkey: Option<Pubkey>,
} }
impl Default for QuicConfig { impl NewTpuConfig for QuicConfig {
fn default() -> Self { type ClientError = QuicClientError;
fn new() -> Result<Self, QuicClientError> {
let (certs, priv_key) = new_self_signed_tls_certificate_chain( let (certs, priv_key) = new_self_signed_tls_certificate_chain(
&Keypair::new(), &Keypair::new(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
) )
.expect("Failed to initialize QUIC client certificates"); .map_err(|err| QuicClientError::CertificateError(err.to_string()))?;
Self { Ok(Self {
client_certificate: Arc::new(QuicClientCertificate { client_certificate: Arc::new(QuicClientCertificate {
certificates: certs, certificates: certs,
key: priv_key, key: priv_key,
}), }),
maybe_staked_nodes: None, maybe_staked_nodes: None,
maybe_client_pubkey: None, maybe_client_pubkey: None,
} })
} }
} }
@ -177,13 +188,16 @@ mod tests {
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
}, },
solana_tpu_client::tpu_connection_cache::TpuConnectionCache, solana_tpu_client::tpu_connection_cache::{
TpuConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE,
},
}; };
#[test] #[test]
fn test_connection_cache_max_parallel_chunks() { fn test_connection_cache_max_parallel_chunks() {
solana_logger::setup(); solana_logger::setup();
let connection_cache = TpuConnectionCache::<QuicPool>::default(); let connection_cache =
TpuConnectionCache::<QuicPool>::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap();
let mut tpu_config = connection_cache.tpu_config; let mut tpu_config = connection_cache.tpu_config;
assert_eq!( assert_eq!(
tpu_config.compute_max_parallel_streams(), tpu_config.compute_max_parallel_streams(),

View File

@ -36,10 +36,20 @@ pub struct TpuConnectionCache<P: ConnectionPool> {
} }
impl<P: ConnectionPool> TpuConnectionCache<P> { impl<P: ConnectionPool> TpuConnectionCache<P> {
pub fn new(connection_pool_size: usize) -> Self { pub fn new(
connection_pool_size: usize,
) -> Result<Self, <P::TpuConfig as NewTpuConfig>::ClientError> {
let config = P::TpuConfig::new()?;
Ok(Self::new_with_config(connection_pool_size, config))
}
pub fn new_with_config(connection_pool_size: usize, tpu_config: P::TpuConfig) -> Self {
Self { Self {
map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
stats: Arc::new(ConnectionCacheStats::default()),
last_stats: AtomicInterval::default(),
connection_pool_size: 1.max(connection_pool_size), // The minimum pool size is 1. connection_pool_size: 1.max(connection_pool_size), // The minimum pool size is 1.
..Self::default() tpu_config,
} }
} }
@ -245,27 +255,22 @@ impl<P: ConnectionPool> TpuConnectionCache<P> {
} }
} }
impl<P: ConnectionPool> Default for TpuConnectionCache<P> {
fn default() -> Self {
Self {
map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
stats: Arc::new(ConnectionCacheStats::default()),
last_stats: AtomicInterval::default(),
connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_config: P::TpuConfig::default(),
}
}
}
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum ConnectionPoolError { pub enum ConnectionPoolError {
#[error("connection index is out of range of the pool")] #[error("connection index is out of range of the pool")]
IndexOutOfRange, IndexOutOfRange,
} }
pub trait NewTpuConfig {
type ClientError;
fn new() -> Result<Self, Self::ClientError>
where
Self: Sized;
}
pub trait ConnectionPool { pub trait ConnectionPool {
type PoolTpuConnection: BaseTpuConnection; type PoolTpuConnection: BaseTpuConnection;
type TpuConfig: Default; type TpuConfig: NewTpuConfig;
const PORT_OFFSET: u16 = 0; const PORT_OFFSET: u16 = 0;
/// Create a new connection pool based on protocol-specific configuration /// Create a new connection pool based on protocol-specific configuration
@ -412,6 +417,19 @@ mod tests {
} }
} }
impl NewTpuConfig for MockUdpConfig {
type ClientError = String;
fn new() -> Result<Self, String> {
Ok(Self {
tpu_udp_socket: Arc::new(
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.map_err(|_| "Unable to bind to UDP socket".to_string())?,
),
})
}
}
pub struct MockUdp(Arc<UdpSocket>); pub struct MockUdp(Arc<UdpSocket>);
impl BaseTpuConnection for MockUdp { impl BaseTpuConnection for MockUdp {
type BlockingConnectionType = MockUdpTpuConnection; type BlockingConnectionType = MockUdpTpuConnection;
@ -508,7 +526,8 @@ mod tests {
// we can actually connect to those addresses - TPUConnection implementations should either // we can actually connect to those addresses - TPUConnection implementations should either
// be lazy and not connect until first use or handle connection errors somehow // be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator) // (without crashing, as would be required in a real practical validator)
let connection_cache = TpuConnectionCache::<MockUdpPool>::default(); let connection_cache =
TpuConnectionCache::<MockUdpPool>::new(DEFAULT_TPU_CONNECTION_POOL_SIZE).unwrap();
let port_offset = MOCK_PORT_OFFSET; let port_offset = MOCK_PORT_OFFSET;
let addrs = (0..MAX_CONNECTIONS) let addrs = (0..MAX_CONNECTIONS)
.into_iter() .into_iter()
@ -555,7 +574,7 @@ mod tests {
let port = u16::MAX - MOCK_PORT_OFFSET + 1; let port = u16::MAX - MOCK_PORT_OFFSET + 1;
assert!(port.checked_add(MOCK_PORT_OFFSET).is_none()); assert!(port.checked_add(MOCK_PORT_OFFSET).is_none());
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
let connection_cache = TpuConnectionCache::<MockUdpPool>::new(1); let connection_cache = TpuConnectionCache::<MockUdpPool>::new(1).unwrap();
let conn: MockUdpTpuConnection = connection_cache.get_connection(&addr); let conn: MockUdpTpuConnection = connection_cache.get_connection(&addr);
// We (intentionally) don't have an interface that allows us to distinguish between // We (intentionally) don't have an interface that allows us to distinguish between

View File

@ -12,3 +12,4 @@ edition = "2021"
[dependencies] [dependencies]
solana-net-utils = { path = "../net-utils", version = "=1.15.0" } solana-net-utils = { path = "../net-utils", version = "=1.15.0" }
solana-tpu-client = { path = "../tpu-client", version = "=1.15.0" } solana-tpu-client = { path = "../tpu-client", version = "=1.15.0" }
thiserror = "1.0"

View File

@ -10,14 +10,23 @@ use {
}, },
solana_tpu_client::{ solana_tpu_client::{
connection_cache_stats::ConnectionCacheStats, connection_cache_stats::ConnectionCacheStats,
tpu_connection_cache::{BaseTpuConnection, ConnectionPool, ConnectionPoolError}, tpu_connection_cache::{
BaseTpuConnection, ConnectionPool, ConnectionPoolError, NewTpuConfig,
},
}, },
std::{ std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::Arc, sync::Arc,
}, },
thiserror::Error,
}; };
#[derive(Error, Debug)]
pub enum UdpClientError {
#[error("IO error: {0:?}")]
IoError(#[from] std::io::Error),
}
pub struct UdpPool { pub struct UdpPool {
connections: Vec<Arc<Udp>>, connections: Vec<Arc<Udp>>,
} }
@ -62,14 +71,15 @@ pub struct UdpConfig {
tpu_udp_socket: Arc<UdpSocket>, tpu_udp_socket: Arc<UdpSocket>,
} }
impl Default for UdpConfig { impl NewTpuConfig for UdpConfig {
fn default() -> Self { type ClientError = UdpClientError;
Self {
tpu_udp_socket: Arc::new( fn new() -> Result<Self, UdpClientError> {
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))) let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.expect("Unable to bind to UDP socket"), .map_err(Into::<UdpClientError>::into)?;
), Ok(Self {
} tpu_udp_socket: Arc::new(socket),
})
} }
} }