Quic handshake timeout (#26306)

* Implement timeout for the quic client and server handshake
This commit is contained in:
ryleung-solana 2022-07-11 14:10:12 +08:00 committed by GitHub
parent 275e47f931
commit 4772a93109
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 168 additions and 117 deletions

View File

@ -12,12 +12,16 @@ use {
itertools::Itertools, itertools::Itertools,
log::*, log::*,
quinn::{ quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, NewConnection, VarInt, WriteError, ClientConfig, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, NewConnection,
VarInt, WriteError,
}, },
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_net_utils::VALIDATOR_PORT_RANGE, solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::{ solana_sdk::{
quic::{QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS}, quic::{
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_KEEP_ALIVE_MS, QUIC_MAX_TIMEOUT_MS,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
},
transport::Result as TransportResult, transport::Result as TransportResult,
}, },
std::{ std::{
@ -26,7 +30,7 @@ use {
thread, thread,
time::Duration, time::Duration,
}, },
tokio::sync::RwLock, tokio::{sync::RwLock, time::timeout},
}; };
struct SkipServerVerification; struct SkipServerVerification;
@ -142,21 +146,29 @@ impl QuicNewConnection {
.connect(addr, "connect") .connect(addr, "connect")
.expect("QuicNewConnection::make_connection endpoint.connect"); .expect("QuicNewConnection::make_connection endpoint.connect");
stats.total_connections.fetch_add(1, Ordering::Relaxed); stats.total_connections.fetch_add(1, Ordering::Relaxed);
let connecting_result = connecting.await; if let Ok(connecting_result) = timeout(
if connecting_result.is_err() { Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS),
stats.connection_errors.fetch_add(1, Ordering::Relaxed); connecting,
)
.await
{
if connecting_result.is_err() {
stats.connection_errors.fetch_add(1, Ordering::Relaxed);
}
make_connection_measure.stop();
stats
.make_connection_ms
.fetch_add(make_connection_measure.as_ms(), Ordering::Relaxed);
let connection = connecting_result?;
Ok(Self {
endpoint,
connection: Arc::new(connection),
})
} else {
Err(ConnectionError::TimedOut.into())
} }
make_connection_measure.stop();
stats
.make_connection_ms
.fetch_add(make_connection_measure.as_ms(), Ordering::Relaxed);
let connection = connecting_result?;
Ok(Self {
endpoint,
connection: Arc::new(connection),
})
} }
fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint { fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
@ -179,17 +191,35 @@ impl QuicNewConnection {
stats.total_connections.fetch_add(1, Ordering::Relaxed); stats.total_connections.fetch_add(1, Ordering::Relaxed);
let connection = match connecting.into_0rtt() { let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => { Ok((connection, zero_rtt)) => {
if zero_rtt.await { if let Ok(zero_rtt) = timeout(
stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed); Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS),
zero_rtt,
)
.await
{
if zero_rtt {
stats.zero_rtt_accepts.fetch_add(1, Ordering::Relaxed);
} else {
stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed);
}
connection
} else { } else {
stats.zero_rtt_rejects.fetch_add(1, Ordering::Relaxed); return Err(ConnectionError::TimedOut.into());
} }
connection
} }
Err(connecting) => { Err(connecting) => {
stats.connection_errors.fetch_add(1, Ordering::Relaxed); stats.connection_errors.fetch_add(1, Ordering::Relaxed);
let connecting = connecting.await;
connecting? if let Ok(connecting_result) = timeout(
Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS),
connecting,
)
.await
{
connecting_result?
} else {
return Err(ConnectionError::TimedOut.into());
}
} }
}; };
self.connection = Arc::new(connection); self.connection = Arc::new(connection);

View File

@ -6,3 +6,8 @@ pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128;
pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000; pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000;
pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000; pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;
// Based on commonly-used handshake timeouts for various TCP
// applications. Different applications vary, but most seem to
// be in the 30-60 second range
pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS: u64 = 60_000;

View File

@ -15,7 +15,7 @@ use {
solana_perf::packet::PacketBatch, solana_perf::packet::PacketBatch,
solana_sdk::{ solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE}, packet::{Packet, PACKET_DATA_SIZE},
quic::QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
signature::Keypair, signature::Keypair,
timing, timing,
}, },
@ -133,7 +133,7 @@ fn prune_unstaked_connection_table(
} }
async fn setup_connection( async fn setup_connection(
connection: Connecting, connecting: Connecting,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>, unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
staked_connection_table: Arc<Mutex<ConnectionTable>>, staked_connection_table: Arc<Mutex<ConnectionTable>>,
packet_sender: Sender<PacketBatch>, packet_sender: Sender<PacketBatch>,
@ -143,119 +143,129 @@ async fn setup_connection(
max_unstaked_connections: usize, max_unstaked_connections: usize,
stats: Arc<StreamStats>, stats: Arc<StreamStats>,
) { ) {
if let Ok(new_connection) = connection.await { if let Ok(connecting_result) = timeout(
stats.total_connections.fetch_add(1, Ordering::Relaxed); Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS),
stats.total_new_connections.fetch_add(1, Ordering::Relaxed); connecting,
let NewConnection { )
connection, .await
uni_streams, {
.. if let Ok(new_connection) = connecting_result {
} = new_connection; stats.total_connections.fetch_add(1, Ordering::Relaxed);
stats.total_new_connections.fetch_add(1, Ordering::Relaxed);
let NewConnection {
connection,
uni_streams,
..
} = new_connection;
let remote_addr = connection.remote_address(); let remote_addr = connection.remote_address();
let table_and_stake = { let table_and_stake = {
let staked_nodes = staked_nodes.read().unwrap(); let staked_nodes = staked_nodes.read().unwrap();
if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) { if let Some(stake) = staked_nodes.stake_map.get(&remote_addr.ip()) {
let stake = *stake; let stake = *stake;
drop(staked_nodes); drop(staked_nodes);
let mut connection_table_l = staked_connection_table.lock().unwrap(); let mut connection_table_l = staked_connection_table.lock().unwrap();
if connection_table_l.total_size >= max_staked_connections { if connection_table_l.total_size >= max_staked_connections {
let num_pruned = connection_table_l.prune_random(stake); let num_pruned = connection_table_l.prune_random(stake);
if num_pruned == 0 { if num_pruned == 0 {
if max_unstaked_connections > 0 { if max_unstaked_connections > 0 {
// If we couldn't prune a connection in the staked connection table, let's // If we couldn't prune a connection in the staked connection table, let's
// put this connection in the unstaked connection table. If needed, prune a // put this connection in the unstaked connection table. If needed, prune a
// connection from the unstaked connection table. // connection from the unstaked connection table.
connection_table_l = unstaked_connection_table.lock().unwrap(); connection_table_l = unstaked_connection_table.lock().unwrap();
prune_unstaked_connection_table( prune_unstaked_connection_table(
&mut connection_table_l, &mut connection_table_l,
max_unstaked_connections, max_unstaked_connections,
stats.clone(), stats.clone(),
); );
Some((connection_table_l, stake)) Some((connection_table_l, stake))
} else {
stats
.connection_add_failed_on_pruning
.fetch_add(1, Ordering::Relaxed);
None
}
} else { } else {
stats stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
.connection_add_failed_on_pruning Some((connection_table_l, stake))
.fetch_add(1, Ordering::Relaxed);
None
} }
} else { } else {
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
Some((connection_table_l, stake)) Some((connection_table_l, stake))
} }
} else if max_unstaked_connections > 0 {
drop(staked_nodes);
let mut connection_table_l = unstaked_connection_table.lock().unwrap();
prune_unstaked_connection_table(
&mut connection_table_l,
max_unstaked_connections,
stats.clone(),
);
Some((connection_table_l, 0))
} else { } else {
Some((connection_table_l, stake)) None
}
} else if max_unstaked_connections > 0 {
drop(staked_nodes);
let mut connection_table_l = unstaked_connection_table.lock().unwrap();
prune_unstaked_connection_table(
&mut connection_table_l,
max_unstaked_connections,
stats.clone(),
);
Some((connection_table_l, 0))
} else {
None
}
};
if let Some((mut connection_table_l, stake)) = table_and_stake {
let table_type = connection_table_l.peer_type;
let max_uni_streams = match table_type {
ConnectionPeerType::Unstaked => {
VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64)
}
ConnectionPeerType::Staked => {
let staked_nodes = staked_nodes.read().unwrap();
VarInt::from_u64(
((stake as f64 / staked_nodes.total_stake as f64)
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS) as u64,
)
} }
}; };
if let Ok(max_uni_streams) = max_uni_streams { if let Some((mut connection_table_l, stake)) = table_and_stake {
connection.set_max_concurrent_uni_streams(max_uni_streams); let table_type = connection_table_l.peer_type;
let max_uni_streams = match table_type {
ConnectionPeerType::Unstaked => {
VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64)
}
ConnectionPeerType::Staked => {
let staked_nodes = staked_nodes.read().unwrap();
VarInt::from_u64(
((stake as f64 / staked_nodes.total_stake as f64)
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS)
as u64,
)
}
};
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( if let Ok(max_uni_streams) = max_uni_streams {
&remote_addr, connection.set_max_concurrent_uni_streams(max_uni_streams);
Some(connection),
stake, if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
timing::timestamp(), &remote_addr,
max_connections_per_ip, Some(connection),
) {
drop(connection_table_l);
let stats = stats.clone();
let connection_table = match table_type {
ConnectionPeerType::Unstaked => unstaked_connection_table.clone(),
ConnectionPeerType::Staked => staked_connection_table.clone(),
};
tokio::spawn(handle_connection(
uni_streams,
packet_sender,
remote_addr,
last_update,
connection_table,
stream_exit,
stats,
stake, stake,
)); timing::timestamp(),
max_connections_per_ip,
) {
drop(connection_table_l);
let stats = stats.clone();
let connection_table = match table_type {
ConnectionPeerType::Unstaked => unstaked_connection_table.clone(),
ConnectionPeerType::Staked => staked_connection_table.clone(),
};
tokio::spawn(handle_connection(
uni_streams,
packet_sender,
remote_addr,
last_update,
connection_table,
stream_exit,
stats,
stake,
));
} else {
stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);
}
} else { } else {
stats.connection_add_failed.fetch_add(1, Ordering::Relaxed); stats
.connection_add_failed_invalid_stream_count
.fetch_add(1, Ordering::Relaxed);
} }
} else { } else {
connection.close(0u32.into(), &[0u8]);
stats stats
.connection_add_failed_invalid_stream_count .connection_add_failed_unstaked_node
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
} }
} else { } else {
connection.close(0u32.into(), &[0u8]); stats.connection_setup_error.fetch_add(1, Ordering::Relaxed);
stats
.connection_add_failed_unstaked_node
.fetch_add(1, Ordering::Relaxed);
} }
} else { } else {
stats stats

View File

@ -162,6 +162,7 @@ pub struct StreamStats {
pub(crate) connection_add_failed_unstaked_node: AtomicUsize, pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
pub(crate) connection_add_failed_on_pruning: AtomicUsize, pub(crate) connection_add_failed_on_pruning: AtomicUsize,
pub(crate) connection_setup_timeout: AtomicUsize, pub(crate) connection_setup_timeout: AtomicUsize,
pub(crate) connection_setup_error: AtomicUsize,
pub(crate) connection_removed: AtomicUsize, pub(crate) connection_removed: AtomicUsize,
pub(crate) connection_remove_failed: AtomicUsize, pub(crate) connection_remove_failed: AtomicUsize,
} }
@ -233,6 +234,11 @@ impl StreamStats {
self.connection_setup_timeout.swap(0, Ordering::Relaxed), self.connection_setup_timeout.swap(0, Ordering::Relaxed),
i64 i64
), ),
(
"connection_setup_error",
self.connection_setup_error.swap(0, Ordering::Relaxed),
i64
),
( (
"invalid_chunk", "invalid_chunk",
self.total_invalid_chunks.swap(0, Ordering::Relaxed), self.total_invalid_chunks.swap(0, Ordering::Relaxed),