Bidirectional quic communication support (#29155)

* Support bi-directional quic communication, use the same endpoint for the quic server and client
This is needed for supporting using quic for repair

* Added comments on the bi-directional communication tests

* Removed some debug logs

* clippy issue
This commit is contained in:
Lijun Wang 2022-12-09 10:59:43 -08:00 committed by GitHub
parent 6a90abd056
commit ecea802fe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 195 additions and 34 deletions

View File

@ -118,6 +118,7 @@ impl ConnectionCache {
if self.use_quic() && !force_use_udp {
Some(Arc::new(QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
None,
)))
} else {
None

View File

@ -157,7 +157,7 @@ impl Tpu {
let (verified_sender, verified_receiver) = unbounded();
let stats = Arc::new(StreamStats::default());
let tpu_quic_t = spawn_server(
let (_, tpu_quic_t) = spawn_server(
transactions_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu.ip(),
@ -172,7 +172,7 @@ impl Tpu {
)
.unwrap();
let tpu_forwards_quic_t = spawn_server(
let (_, tpu_forwards_quic_t) = spawn_server(
transactions_forwards_quic_sockets,
keypair,
cluster_info.my_contact_info().tpu_forwards.ip(),

View File

@ -117,6 +117,7 @@ impl QuicConfig {
fn create_endpoint(&self) -> Arc<QuicLazyInitializedEndpoint> {
Arc::new(QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
None,
))
}

View File

@ -71,6 +71,7 @@ pub struct QuicClientCertificate {
pub struct QuicLazyInitializedEndpoint {
endpoint: RwLock<Option<Arc<Endpoint>>>,
client_certificate: Arc<QuicClientCertificate>,
client_endpoint: Option<Endpoint>,
}
#[derive(Error, Debug)]
@ -90,19 +91,30 @@ impl From<QuicError> for ClientErrorKind {
}
impl QuicLazyInitializedEndpoint {
pub fn new(client_certificate: Arc<QuicClientCertificate>) -> Self {
pub fn new(
client_certificate: Arc<QuicClientCertificate>,
client_endpoint: Option<Endpoint>,
) -> Self {
Self {
endpoint: RwLock::new(None),
client_certificate,
client_endpoint,
}
}
fn create_endpoint(&self) -> Endpoint {
let (_, client_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range");
let mut endpoint = if let Some(endpoint) = &self.client_endpoint {
endpoint.clone()
} else {
let client_socket = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.expect("QuicLazyInitializedEndpoint::create_endpoint bind_in_range")
.1;
QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket)
};
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
@ -115,9 +127,6 @@ impl QuicLazyInitializedEndpoint {
crypto.enable_early_data = true;
crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
let mut endpoint =
QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket);
let mut config = ClientConfig::new(Arc::new(crypto));
let transport_config = Arc::get_mut(&mut config.transport)
.expect("QuicLazyInitializedEndpoint::create_endpoint Arc::get_mut");
@ -126,6 +135,7 @@ impl QuicLazyInitializedEndpoint {
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));
endpoint.set_default_client_config(config);
endpoint
}
@ -160,10 +170,13 @@ impl Default for QuicLazyInitializedEndpoint {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to create QUIC client certificate");
Self::new(Arc::new(QuicClientCertificate {
certificates: certs,
key: priv_key,
}))
Self::new(
Arc::new(QuicClientCertificate {
certificates: certs,
key: priv_key,
}),
None,
)
}
}

View File

@ -2,13 +2,19 @@
mod tests {
use {
crossbeam_channel::{unbounded, Receiver},
log::*,
solana_perf::packet::PacketBatch,
solana_quic_client::nonblocking::quic_client::QuicLazyInitializedEndpoint,
solana_quic_client::nonblocking::quic_client::{
QuicClientCertificate, QuicLazyInitializedEndpoint,
},
solana_sdk::{packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{quic::StreamStats, streamer::StakedNodes},
solana_streamer::{
quic::StreamStats, streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate_chain,
},
solana_tpu_client::connection_cache_stats::ConnectionCacheStats,
std::{
net::{IpAddr, SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
@ -68,7 +74,7 @@ mod tests {
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (s, exit, keypair, ip, stats) = server_args();
let t = solana_streamer::quic::spawn_server(
let (_, t) = solana_streamer::quic::spawn_server(
s.try_clone().unwrap(),
&keypair,
ip,
@ -115,7 +121,7 @@ mod tests {
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (s, exit, keypair, ip, stats) = server_args();
let t = solana_streamer::nonblocking::quic::spawn_server(
let (_, t) = solana_streamer::nonblocking::quic::spawn_server(
s.try_clone().unwrap(),
&keypair,
ip,
@ -151,4 +157,140 @@ mod tests {
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
}
#[test]
fn test_quic_bi_direction() {
/// This tests bi-directional quic communication. There are the following components
/// The request receiver -- responsible for receiving requests
/// The request sender -- responsible sending requests to the request reciever using quic
/// The response receiver -- responsible for receiving the responses to the requests
/// The response sender -- responsible for sending responses to the response receiver.
/// In this we demonstrate that the request sender and the response receiver use the
/// same quic Endpoint, and the same UDP socket.
use {
solana_quic_client::quic_client::QuicTpuConnection,
solana_tpu_client::tpu_connection::TpuConnection,
};
solana_logger::setup();
// Request Receiver
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (request_recv_socket, request_recv_exit, keypair, request_recv_ip, request_recv_stats) =
server_args();
let (request_recv_endpoint, request_recv_thread) = solana_streamer::quic::spawn_server(
request_recv_socket.try_clone().unwrap(),
&keypair,
request_recv_ip,
sender,
request_recv_exit.clone(),
1,
staked_nodes.clone(),
10,
10,
request_recv_stats,
1000,
)
.unwrap();
drop(request_recv_endpoint);
// Response Receiver:
let (
response_recv_socket,
response_recv_exit,
keypair2,
response_recv_ip,
response_recv_stats,
) = server_args();
let (sender2, receiver2) = unbounded();
let addr = response_recv_socket.local_addr().unwrap().ip();
let port = response_recv_socket.local_addr().unwrap().port();
let server_addr = SocketAddr::new(addr, port);
let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
response_recv_socket,
&keypair2,
response_recv_ip,
sender2,
response_recv_exit.clone(),
1,
staked_nodes,
10,
10,
response_recv_stats,
1000,
)
.unwrap();
// Request Sender, it uses the same endpoint as the response receiver:
let addr = request_recv_socket.local_addr().unwrap().ip();
let port = request_recv_socket.local_addr().unwrap().port();
let tpu_addr = SocketAddr::new(addr, port);
let connection_cache_stats = Arc::new(ConnectionCacheStats::default());
let (certs, priv_key) = new_self_signed_tls_certificate_chain(
&Keypair::new(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC client certificates");
let client_certificate = Arc::new(QuicClientCertificate {
certificates: certs,
key: priv_key,
});
let endpoint =
QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint));
let request_sender =
QuicTpuConnection::new(Arc::new(endpoint), tpu_addr, connection_cache_stats);
// Send a full size packet with single byte writes as a request.
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
assert!(request_sender
.send_wire_transaction_batch_async(packets)
.is_ok());
check_packets(receiver, num_bytes, num_expected_packets);
info!("Received requests!");
// Response sender
let (certs, priv_key) = new_self_signed_tls_certificate_chain(
&Keypair::new(),
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
)
.expect("Failed to initialize QUIC client certificates");
let client_certificate2 = Arc::new(QuicClientCertificate {
certificates: certs,
key: priv_key,
});
let endpoint2 = QuicLazyInitializedEndpoint::new(client_certificate2, None);
let connection_cache_stats2 = Arc::new(ConnectionCacheStats::default());
let response_sender =
QuicTpuConnection::new(Arc::new(endpoint2), server_addr, connection_cache_stats2);
// Send a full size packet with single byte writes.
let num_bytes = PACKET_DATA_SIZE;
let num_expected_packets: usize = 3000;
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
assert!(response_sender
.send_wire_transaction_batch_async(packets)
.is_ok());
check_packets(receiver2, num_bytes, num_expected_packets);
info!("Received responses!");
// Drop the clients explicitly to avoid hung on drops
drop(request_sender);
drop(response_sender);
request_recv_exit.store(true, Ordering::Relaxed);
request_recv_thread.join().unwrap();
info!("Request receiver exited!");
response_recv_exit.store(true, Ordering::Relaxed);
response_recv_thread.join().unwrap();
info!("Response receiver exited!");
}
}

View File

@ -71,10 +71,11 @@ pub fn spawn_server(
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
) -> Result<JoinHandle<()>, QuicServerError> {
) -> Result<(Endpoint, JoinHandle<()>), QuicServerError> {
info!("Start quic server on {:?}", sock);
let (config, _cert) = configure_server(keypair, gossip_host)?;
let (_, incoming) = {
let (endpoint, incoming) = {
Endpoint::new(EndpointConfig::default(), Some(config), sock)
.map_err(|_e| QuicServerError::EndpointFailed)?
};
@ -90,7 +91,7 @@ pub fn spawn_server(
stats,
wait_for_chunk_timeout_ms,
));
Ok(handle)
Ok((endpoint, handle))
}
pub async fn run_server(
@ -126,6 +127,7 @@ pub async fn run_server(
}
if let Ok(Some(connection)) = timeout_connection {
info!("Got a connection {:?}", connection.remote_address());
tokio::spawn(setup_connection(
connection,
unstaked_connection_table.clone(),
@ -139,6 +141,8 @@ pub async fn run_server(
wait_for_chunk_timeout_ms,
));
sleep(Duration::from_micros(WAIT_BETWEEN_NEW_CONNECTIONS_US)).await;
} else {
info!("Timed out waiting for connection");
}
}
}
@ -1005,7 +1009,7 @@ pub mod test {
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
let (_, t) = spawn_server(
s,
&keypair,
ip,
@ -1317,7 +1321,7 @@ pub mod test {
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
let (_, t) = spawn_server(
s,
&keypair,
ip,
@ -1348,7 +1352,7 @@ pub mod test {
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
let (_, t) = spawn_server(
s,
&keypair,
ip,

View File

@ -5,7 +5,7 @@ use {
},
crossbeam_channel::Sender,
pem::Pem,
quinn::{IdleTimeout, ServerConfig, VarInt},
quinn::{Endpoint, IdleTimeout, ServerConfig, VarInt},
rustls::{server::ClientCertVerified, Certificate, DistinguishedNames},
solana_perf::packet::PacketBatch,
solana_sdk::{
@ -27,7 +27,7 @@ use {
pub const MAX_STAKED_CONNECTIONS: usize = 2000;
pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 4;
const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1;
struct SkipClientVerification;
@ -313,9 +313,9 @@ pub fn spawn_server(
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
) -> Result<thread::JoinHandle<()>, QuicServerError> {
) -> Result<(Endpoint, thread::JoinHandle<()>), QuicServerError> {
let runtime = rt();
let task = {
let (endpoint, task) = {
let _guard = runtime.enter();
crate::nonblocking::quic::spawn_server(
sock,
@ -339,7 +339,7 @@ pub fn spawn_server(
}
})
.unwrap();
Ok(handle)
Ok((endpoint, handle))
}
#[cfg(test)]
@ -363,7 +363,7 @@ mod test {
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
let (_, t) = spawn_server(
s,
&keypair,
ip,
@ -419,7 +419,7 @@ mod test {
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
let (_, t) = spawn_server(
s,
&keypair,
ip,
@ -462,7 +462,7 @@ mod test {
let server_address = s.local_addr().unwrap();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let stats = Arc::new(StreamStats::default());
let t = spawn_server(
let (_, t) = spawn_server(
s,
&keypair,
ip,