Refactor make_connection (#25225)

1. Move logics related to creating endpoint, creating new connection and retry 0rtt connections into a wrapper construct QuicNewConnection to put the logic together.
2. get_or_add_connection: logic is much simplified to allow manage the connection cache -- the QUIC connection is lazy constructed. get_or_add_connection should no longer be a global hotspot.
3.  Per connection stats update is moved out of get_or_add_connection to avoid bad connection impacting good connections.
This commit is contained in:
Lijun Wang 2022-05-16 11:13:18 -07:00 committed by GitHub
parent 3e44046a73
commit 196de1c043
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 160 additions and 161 deletions

View File

@ -6,15 +6,13 @@ use {
}, },
indexmap::map::IndexMap, indexmap::map::IndexMap,
lazy_static::lazy_static, lazy_static::lazy_static,
quinn_proto::ConnectionStats,
rand::{thread_rng, Rng}, rand::{thread_rng, Rng},
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::{ solana_sdk::{
timing::AtomicInterval, transaction::VersionedTransaction, transport::TransportError, timing::AtomicInterval, transaction::VersionedTransaction, transport::TransportError,
}, },
std::{ std::{
net::{IpAddr, Ipv4Addr, SocketAddr}, net::SocketAddr,
sync::{ sync::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, RwLock, Arc, RwLock,
@ -242,7 +240,6 @@ struct GetConnectionResult {
map_timing_ms: u64, map_timing_ms: u64,
lock_timing_ms: u64, lock_timing_ms: u64,
connection_cache_stats: Arc<ConnectionCacheStats>, connection_cache_stats: Arc<ConnectionCacheStats>,
other_stats: Option<(Arc<ClientStats>, ConnectionStats)>,
num_evictions: u64, num_evictions: u64,
eviction_timing_ms: u64, eviction_timing_ms: u64,
} }
@ -259,81 +256,55 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
.should_update(CONNECTION_STAT_SUBMISSION_INTERVAL); .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL);
let mut get_connection_map_measure = Measure::start("get_connection_hit_measure"); let mut get_connection_map_measure = Measure::start("get_connection_hit_measure");
let ( let (connection, cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) =
connection, match map.map.get(addr) {
cache_hit, Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0),
connection_cache_stats, None => {
maybe_stats, // Upgrade to write access by dropping read lock and acquire write lock
num_evictions, drop(map);
eviction_timing_ms, let mut get_connection_map_lock_measure =
) = match map.map.get(addr) { Measure::start("get_connection_map_lock_measure");
Some(connection) => { let mut map = (*CONNECTION_MAP).write().unwrap();
let mut stats = None; get_connection_map_lock_measure.stop();
// update connection stats
if let Connection::Quic(conn) = connection {
stats = conn.stats().map(|s| (conn.base_stats(), s));
}
(connection.clone(), true, map.stats.clone(), stats, 0, 0)
}
None => {
// Upgrade to write access by dropping read lock and acquire write lock
drop(map);
let mut get_connection_map_lock_measure =
Measure::start("get_connection_map_lock_measure");
let mut map = (*CONNECTION_MAP).write().unwrap();
get_connection_map_lock_measure.stop();
lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms()); lock_timing_ms =
lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
// Read again, as it is possible that between read lock dropped and the write lock acquired // Read again, as it is possible that between read lock dropped and the write lock acquired
// another thread could have setup the connection. // another thread could have setup the connection.
match map.map.get(addr) { match map.map.get(addr) {
Some(connection) => { Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0),
let mut stats = None; None => {
// update connection stats let connection = if map.use_quic {
if let Connection::Quic(conn) = connection { Connection::Quic(Arc::new(QuicTpuConnection::new(*addr)))
stats = conn.stats().map(|s| (conn.base_stats(), s)); } else {
Connection::Udp(Arc::new(UdpTpuConnection::new(*addr)))
};
// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS);
map.map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
map.map.insert(*addr, connection.clone());
(
connection,
false,
map.stats.clone(),
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
} }
(connection.clone(), true, map.stats.clone(), stats, 0, 0)
}
None => {
let (_, send_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.unwrap();
let connection = if map.use_quic {
Connection::Quic(Arc::new(QuicTpuConnection::new(send_socket, *addr)))
} else {
Connection::Udp(Arc::new(UdpTpuConnection::new(send_socket, *addr)))
};
// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS);
map.map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
map.map.insert(*addr, connection.clone());
(
connection,
false,
map.stats.clone(),
None,
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
} }
} }
} };
};
get_connection_map_measure.stop(); get_connection_map_measure.stop();
GetConnectionResult { GetConnectionResult {
@ -343,7 +314,6 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
map_timing_ms: get_connection_map_measure.as_ms(), map_timing_ms: get_connection_map_measure.as_ms(),
lock_timing_ms, lock_timing_ms,
connection_cache_stats, connection_cache_stats,
other_stats: maybe_stats,
num_evictions, num_evictions,
eviction_timing_ms, eviction_timing_ms,
} }
@ -360,11 +330,16 @@ fn get_connection(addr: &SocketAddr) -> (Connection, Arc<ConnectionCacheStats>)
map_timing_ms, map_timing_ms,
lock_timing_ms, lock_timing_ms,
connection_cache_stats, connection_cache_stats,
other_stats,
num_evictions, num_evictions,
eviction_timing_ms, eviction_timing_ms,
} = get_or_add_connection(addr); } = get_or_add_connection(addr);
let other_stats = if let Connection::Quic(conn) = &connection {
conn.stats().map(|s| (conn.base_stats(), s))
} else {
None
};
if report_stats { if report_stats {
connection_cache_stats.report(); connection_cache_stats.report();
} }

View File

@ -15,6 +15,7 @@ use {
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, NewConnection, VarInt, WriteError, ClientConfig, Endpoint, EndpointConfig, IdleTimeout, NewConnection, VarInt, WriteError,
}, },
quinn_proto::ConnectionStats, quinn_proto::ConnectionStats,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::{ solana_sdk::{
quic::{ quic::{
QUIC_KEEP_ALIVE_MS, QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS, QUIC_PORT_OFFSET, QUIC_KEEP_ALIVE_MS, QUIC_MAX_CONCURRENT_STREAMS, QUIC_MAX_TIMEOUT_MS, QUIC_PORT_OFFSET,
@ -22,7 +23,7 @@ use {
transport::Result as TransportResult, transport::Result as TransportResult,
}, },
std::{ std::{
net::{SocketAddr, UdpSocket}, net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc}, sync::{atomic::Ordering, Arc},
time::Duration, time::Duration,
}, },
@ -57,9 +58,91 @@ lazy_static! {
.unwrap(); .unwrap();
} }
struct QuicClient { /// A wrapper over NewConnection with additional capability to create the endpoint as part
/// of creating a new connection.
#[derive(Clone)]
struct QuicNewConnection {
endpoint: Endpoint, endpoint: Endpoint,
connection: Arc<Mutex<Option<Arc<NewConnection>>>>, connection: Arc<NewConnection>,
}
impl QuicNewConnection {
/// Create a QuicNewConnection given the remote address 'addr'.
async fn make_connection(addr: SocketAddr, stats: &ClientStats) -> Result<Self, WriteError> {
let (_, client_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.unwrap();
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_no_client_auth();
crypto.enable_early_data = true;
let mut endpoint =
QuicNewConnection::create_endpoint(EndpointConfig::default(), client_socket).await;
let mut config = ClientConfig::new(Arc::new(crypto));
let transport_config = Arc::get_mut(&mut config.transport).unwrap();
let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));
endpoint.set_default_client_config(config);
let connecting = endpoint.connect(addr, "connect").unwrap();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let connecting_result = connecting.await;
if connecting_result.is_err() {
stats.connection_errors.fetch_add(1, Ordering::Relaxed);
}
let connection = connecting_result?;
Ok(Self {
endpoint,
connection: Arc::new(connection),
})
}
// If this function becomes public, it should be changed to
// not expose details of the specific Quic implementation we're using
async fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
quinn::Endpoint::new(config, None, client_socket).unwrap().0
}
// Attempts to make a faster connection by taking advantage of pre-existing key material.
// Only works if connection to this endpoint was previously established.
async fn make_connection_0rtt(
&mut self,
addr: SocketAddr,
stats: &ClientStats,
) -> Result<Arc<NewConnection>, WriteError> {
let connecting = self.endpoint.connect(addr, "connect").unwrap();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if zero_rtt.await {
stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed);
} else {
stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed);
}
connection
}
Err(connecting) => {
stats.connection_errors.fetch_add(1, Ordering::Relaxed);
let connecting = connecting.await;
connecting?
}
};
self.connection = Arc::new(connection);
Ok(self.connection.clone())
}
}
struct QuicClient {
connection: Arc<Mutex<Option<QuicNewConnection>>>,
addr: SocketAddr, addr: SocketAddr,
stats: Arc<ClientStats>, stats: Arc<ClientStats>,
} }
@ -79,9 +162,9 @@ impl QuicTpuConnection {
} }
impl TpuConnection for QuicTpuConnection { impl TpuConnection for QuicTpuConnection {
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self { fn new(tpu_addr: SocketAddr) -> Self {
let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET); let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET);
let client = Arc::new(QuicClient::new(client_socket, tpu_addr)); let client = Arc::new(QuicClient::new(tpu_addr));
Self { client } Self { client }
} }
@ -156,29 +239,8 @@ impl TpuConnection for QuicTpuConnection {
} }
impl QuicClient { impl QuicClient {
pub fn new(client_socket: UdpSocket, addr: SocketAddr) -> Self { pub fn new(addr: SocketAddr) -> Self {
let _guard = RUNTIME.enter();
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_no_client_auth();
crypto.enable_early_data = true;
let create_endpoint = QuicClient::create_endpoint(EndpointConfig::default(), client_socket);
let mut endpoint = RUNTIME.block_on(create_endpoint);
let mut config = ClientConfig::new(Arc::new(crypto));
let transport_config = Arc::get_mut(&mut config.transport).unwrap();
let timeout = IdleTimeout::from(VarInt::from_u32(QUIC_MAX_TIMEOUT_MS));
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(QUIC_KEEP_ALIVE_MS)));
endpoint.set_default_client_config(config);
Self { Self {
endpoint,
connection: Arc::new(Mutex::new(None)), connection: Arc::new(Mutex::new(None)),
addr, addr,
stats: Arc::new(ClientStats::default()), stats: Arc::new(ClientStats::default()),
@ -188,13 +250,7 @@ impl QuicClient {
pub fn stats(&self) -> Option<ConnectionStats> { pub fn stats(&self) -> Option<ConnectionStats> {
let conn_guard = self.connection.lock(); let conn_guard = self.connection.lock();
let x = RUNTIME.block_on(conn_guard); let x = RUNTIME.block_on(conn_guard);
x.as_ref().map(|c| c.connection.stats()) x.as_ref().map(|c| c.connection.connection.stats())
}
// If this function becomes public, it should be changed to
// not expose details of the specific Quic implementation we're using
async fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
quinn::Endpoint::new(config, None, client_socket).unwrap().0
} }
async fn _send_buffer_using_conn( async fn _send_buffer_using_conn(
@ -207,44 +263,6 @@ impl QuicClient {
Ok(()) Ok(())
} }
async fn make_connection(&self, stats: &ClientStats) -> Result<Arc<NewConnection>, WriteError> {
let connecting = self.endpoint.connect(self.addr, "connect").unwrap();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let connecting_result = connecting.await;
if connecting_result.is_err() {
stats.connection_errors.fetch_add(1, Ordering::Relaxed);
}
let connection = connecting_result?;
Ok(Arc::new(connection))
}
// Attempts to make a faster connection by taking advantage of pre-existing key material.
// Only works if connection to this endpoint was previously established.
async fn make_connection_0rtt(
&self,
stats: &ClientStats,
) -> Result<Arc<NewConnection>, WriteError> {
let connecting = self.endpoint.connect(self.addr, "connect").unwrap();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if zero_rtt.await {
stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed);
} else {
stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed);
}
connection
}
Err(connecting) => {
stats.connection_errors.fetch_add(1, Ordering::Relaxed);
let connecting = connecting.await;
connecting?
}
};
Ok(Arc::new(connection))
}
// Attempts to send data, connecting/reconnecting as necessary // Attempts to send data, connecting/reconnecting as necessary
// On success, returns the connection used to successfully send the data // On success, returns the connection used to successfully send the data
async fn _send_buffer( async fn _send_buffer(
@ -255,16 +273,16 @@ impl QuicClient {
let connection = { let connection = {
let mut conn_guard = self.connection.lock().await; let mut conn_guard = self.connection.lock().await;
let maybe_conn = (*conn_guard).clone(); let maybe_conn = conn_guard.clone();
match maybe_conn { match maybe_conn {
Some(conn) => { Some(conn) => {
stats.connection_reuse.fetch_add(1, Ordering::Relaxed); stats.connection_reuse.fetch_add(1, Ordering::Relaxed);
conn.clone() conn.connection.clone()
} }
None => { None => {
let connection = self.make_connection(stats).await?; let conn = QuicNewConnection::make_connection(self.addr, stats).await?;
*conn_guard = Some(connection.clone()); *conn_guard = Some(conn.clone());
connection conn.connection.clone()
} }
} }
}; };
@ -272,10 +290,9 @@ impl QuicClient {
Ok(()) => Ok(connection), Ok(()) => Ok(connection),
_ => { _ => {
let connection = { let connection = {
let connection = self.make_connection_0rtt(stats).await?;
let mut conn_guard = self.connection.lock().await; let mut conn_guard = self.connection.lock().await;
*conn_guard = Some(connection.clone()); let conn = conn_guard.as_mut().unwrap();
connection conn.make_connection_0rtt(self.addr, stats).await?
}; };
Self::_send_buffer_using_conn(data, &connection).await?; Self::_send_buffer_using_conn(data, &connection).await?;
Ok(connection) Ok(connection)

View File

@ -3,7 +3,7 @@ use {
solana_metrics::MovingStat, solana_metrics::MovingStat,
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
std::{ std::{
net::{SocketAddr, UdpSocket}, net::SocketAddr,
sync::{atomic::AtomicU64, Arc}, sync::{atomic::AtomicU64, Arc},
}, },
}; };
@ -24,7 +24,7 @@ pub struct ClientStats {
} }
pub trait TpuConnection { pub trait TpuConnection {
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self; fn new(tpu_addr: SocketAddr) -> Self;
fn tpu_addr(&self) -> &SocketAddr; fn tpu_addr(&self) -> &SocketAddr;

View File

@ -4,10 +4,11 @@
use { use {
crate::tpu_connection::{ClientStats, TpuConnection}, crate::tpu_connection::{ClientStats, TpuConnection},
core::iter::repeat, core::iter::repeat,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::transport::Result as TransportResult, solana_sdk::transport::Result as TransportResult,
solana_streamer::sendmmsg::batch_send, solana_streamer::sendmmsg::batch_send,
std::{ std::{
net::{SocketAddr, UdpSocket}, net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::Arc, sync::Arc,
}, },
}; };
@ -18,7 +19,13 @@ pub struct UdpTpuConnection {
} }
impl TpuConnection for UdpTpuConnection { impl TpuConnection for UdpTpuConnection {
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self { fn new(tpu_addr: SocketAddr) -> Self {
let (_, client_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.unwrap();
Self { Self {
socket: client_socket, socket: client_socket,
addr: tpu_addr, addr: tpu_addr,

View File

@ -44,7 +44,7 @@ mod tests {
let addr = s.local_addr().unwrap().ip(); let addr = s.local_addr().unwrap().ip();
let port = s.local_addr().unwrap().port() - QUIC_PORT_OFFSET; let port = s.local_addr().unwrap().port() - QUIC_PORT_OFFSET;
let tpu_addr = SocketAddr::new(addr, port); let tpu_addr = SocketAddr::new(addr, port);
let client = QuicTpuConnection::new(UdpSocket::bind("127.0.0.1:0").unwrap(), tpu_addr); let client = QuicTpuConnection::new(tpu_addr);
// Send a full size packet with single byte writes. // Send a full size packet with single byte writes.
let num_bytes = PACKET_DATA_SIZE; let num_bytes = PACKET_DATA_SIZE;