Make connection cache to support specified client endpoint (#29240)

ConnectionCache is being used for managing connections for sending messages using quic. In the current implementation the connection's endpoint is created in a lazy initialized fashion and set to one per connection pool. In repair we need all connections to use the same Endpoint so that the server can send back the response to the same Endpoint.
This commit is contained in:
Lijun Wang 2022-12-22 12:35:32 -08:00 committed by GitHub
parent 49f4e2ae05
commit 0a7d8520b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 4 deletions

2
Cargo.lock generated
View File

@ -5088,12 +5088,14 @@ version = "1.15.0"
dependencies = [
"async-trait",
"bincode",
"crossbeam-channel",
"enum_dispatch",
"futures 0.3.24",
"futures-util",
"indexmap",
"indicatif",
"log",
"quinn",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",

View File

@ -18,6 +18,7 @@ futures-util = "0.3.21"
indexmap = "1.9.1"
indicatif = { version = "0.17.1" }
log = "0.4.17"
quinn = "0.8.4"
rand = "0.7.0"
rayon = "1.5.3"
solana-measure = { path = "../measure", version = "=1.15.0" }
@ -37,6 +38,7 @@ thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
[dev-dependencies]
crossbeam-channel = "0.5"
rand_chacha = "0.2.2"
solana-logger = { path = "../logger", version = "=1.15.0" }

View File

@ -6,6 +6,7 @@ use {
nonblocking::tpu_connection::NonblockingConnection, tpu_connection::BlockingConnection,
},
indexmap::map::{Entry, IndexMap},
quinn::Endpoint,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_quic_client::nonblocking::quic_client::{
@ -40,6 +41,10 @@ pub struct ConnectionCache {
use_quic: bool,
maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
maybe_client_pubkey: Option<Pubkey>,
// The optional specified endpoint for the quic based client connections
// If not specified, the connection cache we create as needed.
client_endpoint: Option<Endpoint>,
}
/// Models the pool of connections
@ -69,11 +74,21 @@ impl ConnectionPool {
impl ConnectionCache {
pub fn new(connection_pool_size: usize) -> Self {
Self::_new_with_endpoint(connection_pool_size, None)
}
/// Create a connection cache with a specific quic client endpoint.
pub fn new_with_endpoint(connection_pool_size: usize, client_endpoint: Endpoint) -> Self {
Self::_new_with_endpoint(connection_pool_size, Some(client_endpoint))
}
fn _new_with_endpoint(connection_pool_size: usize, client_endpoint: Option<Endpoint>) -> Self {
// The minimum pool size is 1.
let connection_pool_size = 1.max(connection_pool_size);
Self {
use_quic: true,
connection_pool_size,
client_endpoint,
..Self::default()
}
}
@ -118,7 +133,7 @@ impl ConnectionCache {
if self.use_quic() && !force_use_udp {
Some(Arc::new(QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
None,
self.client_endpoint.as_ref().cloned(),
)))
} else {
None
@ -377,6 +392,7 @@ impl Default for ConnectionCache {
use_quic: DEFAULT_TPU_USE_QUIC,
maybe_staked_nodes: None,
maybe_client_pubkey: None,
client_endpoint: None,
}
}
}
@ -445,6 +461,7 @@ mod tests {
connection_cache::{ConnectionCache, MAX_CONNECTIONS},
tpu_connection::TpuConnection,
},
crossbeam_channel::unbounded,
rand::{Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_sdk::{
@ -453,14 +470,34 @@ mod tests {
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_PORT_OFFSET, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
},
signature::Keypair,
},
solana_streamer::streamer::StakedNodes,
solana_streamer::{quic::StreamStats, streamer::StakedNodes},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, RwLock},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
},
};
fn server_args() -> (
UdpSocket,
Arc<AtomicBool>,
Keypair,
IpAddr,
Arc<StreamStats>,
) {
(
UdpSocket::bind("127.0.0.1:0").unwrap(),
Arc::new(AtomicBool::new(false)),
Keypair::new(),
"127.0.0.1".parse().unwrap(),
Arc::new(StreamStats::default()),
)
}
fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
let a = rng.gen_range(1, 255);
let b = rng.gen_range(1, 255);
@ -600,4 +637,54 @@ mod tests {
assert!(conn.tpu_addr().port() != 0);
assert!(conn.tpu_addr().port() == port);
}
#[test]
fn test_connection_with_specified_client_endpoint() {
let port = u16::MAX - QUIC_PORT_OFFSET + 1;
assert!(port.checked_add(QUIC_PORT_OFFSET).is_none());
// Start a response receiver:
let (
response_recv_socket,
response_recv_exit,
keypair2,
response_recv_ip,
response_recv_stats,
) = server_args();
let (sender2, _receiver2) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
response_recv_socket,
&keypair2,
response_recv_ip,
sender2,
response_recv_exit.clone(),
1,
staked_nodes,
10,
10,
response_recv_stats,
1000,
)
.unwrap();
let connection_cache = ConnectionCache::new_with_endpoint(1, response_recv_endpoint);
// server port 1:
let port1 = 9001;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port1);
let conn = connection_cache.get_connection(&addr);
assert_eq!(conn.tpu_addr().port(), port1 + QUIC_PORT_OFFSET);
// server port 2:
let port2 = 9002;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port2);
let conn = connection_cache.get_connection(&addr);
assert_eq!(conn.tpu_addr().port(), port2 + QUIC_PORT_OFFSET);
response_recv_exit.store(true, Ordering::Relaxed);
response_recv_thread.join().unwrap();
}
}

View File

@ -4410,6 +4410,7 @@ dependencies = [
"indexmap",
"indicatif",
"log",
"quinn",
"rand 0.7.3",
"rayon",
"solana-measure",