Enforce stream receive timeout (#28513)

In the quic server handle_connection, when we timed out in receiving the chunks, we loop forever to wait for the chunk. If the client never provide another chunk, the server can hopelessly wait for that chunk and wasting server resources. Instead WAIT_FOR_CHUNK_TIMEOUT_MS is introduced to bound this to 10 seconds at maximum. The stream will be dropped if it times out.
This commit is contained in:
Lijun Wang 2022-11-02 10:09:32 -07:00 committed by GitHub
parent ee0320c9f3
commit f156bc12ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 37 additions and 10 deletions

View File

@ -30,6 +30,7 @@ use {
},
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
@ -169,6 +170,7 @@ impl Tpu {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
)
.unwrap();
@ -183,6 +185,7 @@ impl Tpu {
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
)
.unwrap();

View File

@ -42,6 +42,7 @@ use {
const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64;
const WAIT_FOR_STREAM_TIMEOUT_MS: u64 = 100;
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS: u64 = 10000;
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
@ -69,6 +70,7 @@ pub fn spawn_server(
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
) -> Result<JoinHandle<()>, QuicServerError> {
let (config, _cert) = configure_server(keypair, gossip_host)?;
@ -86,6 +88,7 @@ pub fn spawn_server(
max_staked_connections,
max_unstaked_connections,
stats,
wait_for_chunk_timeout_ms,
));
Ok(handle)
}
@ -99,6 +102,7 @@ pub async fn run_server(
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
) {
debug!("spawn quic server");
let mut last_datapoint = Instant::now();
@ -132,6 +136,7 @@ pub async fn run_server(
max_staked_connections,
max_unstaked_connections,
stats.clone(),
wait_for_chunk_timeout_ms,
));
sleep(Duration::from_micros(WAIT_BETWEEN_NEW_CONNECTIONS_US)).await;
}
@ -242,6 +247,7 @@ fn handle_and_cache_new_connection(
mut connection_table_l: MutexGuard<ConnectionTable>,
connection_table: Arc<Mutex<ConnectionTable>>,
params: &NewConnectionHandlerParams,
wait_for_chunk_timeout_ms: u64,
) -> Result<(), ConnectionHandlerError> {
let NewConnection {
connection,
@ -300,6 +306,7 @@ fn handle_and_cache_new_connection(
params.stats.clone(),
params.stake,
peer_type,
wait_for_chunk_timeout_ms,
));
Ok(())
} else {
@ -328,6 +335,7 @@ fn prune_unstaked_connections_and_add_new_connection(
connection_table: Arc<Mutex<ConnectionTable>>,
max_connections: usize,
params: &NewConnectionHandlerParams,
wait_for_chunk_timeout_ms: u64,
) -> Result<(), ConnectionHandlerError> {
let stats = params.stats.clone();
if max_connections > 0 {
@ -337,6 +345,7 @@ fn prune_unstaked_connections_and_add_new_connection(
connection_table_l,
connection_table,
params,
wait_for_chunk_timeout_ms,
)
} else {
new_connection.connection.close(
@ -391,6 +400,7 @@ fn compute_recieve_window(
}
}
#[allow(clippy::too_many_arguments)]
async fn setup_connection(
connecting: Connecting,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
@ -401,6 +411,7 @@ async fn setup_connection(
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
) {
if let Ok(connecting_result) = timeout(
Duration::from_millis(QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS),
@ -445,6 +456,7 @@ async fn setup_connection(
connection_table_l,
staked_connection_table.clone(),
&params,
wait_for_chunk_timeout_ms,
) {
stats
.connection_added_from_staked_peer
@ -460,6 +472,7 @@ async fn setup_connection(
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout_ms,
) {
stats
.connection_added_from_staked_peer
@ -479,6 +492,7 @@ async fn setup_connection(
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout_ms,
) {
stats
.connection_added_from_unstaked_peer
@ -510,6 +524,7 @@ async fn handle_connection(
stats: Arc<StreamStats>,
stake: u64,
peer_type: ConnectionPeerType,
wait_for_chunk_timeout_ms: u64,
) {
debug!(
"quic new connection {} streams: {} connections: {}",
@ -538,7 +553,7 @@ async fn handle_connection(
let mut maybe_batch = None;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS),
Duration::from_millis(wait_for_chunk_timeout_ms),
stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
@ -560,6 +575,7 @@ async fn handle_connection(
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
}
}
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
@ -989,6 +1005,7 @@ pub mod test {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
1,
)
.unwrap();
(t, exit, receiver, server_address, stats)
@ -1118,7 +1135,7 @@ pub mod test {
total_packets += packets.len();
all_packets.push(packets)
}
if total_packets > num_expected_packets {
if total_packets >= num_expected_packets {
break;
}
}
@ -1173,19 +1190,17 @@ pub mod test {
s1.write_all(&[0u8]).await.unwrap_or_default();
// Wait long enough for the stream to timeout in receiving chunks
let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(2000);
let sleep_time = (WAIT_FOR_STREAM_TIMEOUT_MS * 1000).min(3000);
sleep(Duration::from_millis(sleep_time)).await;
// Test that the stream was created, but timed out in read
assert_eq!(stats.total_streams.load(Ordering::Relaxed), 1);
assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0);
assert_ne!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0);
// Test that more writes are still successful to the stream (i.e. the stream was writable
// even after the timeouts)
for _ in 0..PACKET_DATA_SIZE {
s1.write_all(&[0u8]).await.unwrap();
}
s1.finish().await.unwrap();
// Test that more writes to the stream will fail (i.e. the stream is no longer writable
// after the timeouts)
assert!(s1.write_all(&[0u8]).await.is_err());
assert!(s1.finish().await.is_err());
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
@ -1302,6 +1317,7 @@ pub mod test {
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
)
.unwrap();
@ -1332,6 +1348,7 @@ pub mod test {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats.clone(),
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
)
.unwrap();

View File

@ -307,6 +307,7 @@ pub fn spawn_server(
max_staked_connections: usize,
max_unstaked_connections: usize,
stats: Arc<StreamStats>,
wait_for_chunk_timeout_ms: u64,
) -> Result<thread::JoinHandle<()>, QuicServerError> {
let runtime = rt();
let task = {
@ -322,6 +323,7 @@ pub fn spawn_server(
max_staked_connections,
max_unstaked_connections,
stats,
wait_for_chunk_timeout_ms,
)
}?;
let handle = thread::Builder::new()
@ -367,6 +369,7 @@ mod test {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats,
100,
)
.unwrap();
(t, exit, receiver, server_address)
@ -422,6 +425,7 @@ mod test {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats,
100,
)
.unwrap();
@ -464,6 +468,7 @@ mod test {
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
stats,
100,
)
.unwrap();

View File

@ -78,6 +78,7 @@ mod tests {
10,
10,
stats,
1000,
)
.unwrap();
@ -123,6 +124,7 @@ mod tests {
10,
10,
stats,
1000,
)
.unwrap();