QUIC stream timeouts if no data is received (#26116)

This commit is contained in:
Pankaj Garg 2022-06-21 18:56:47 -07:00 committed by GitHub
parent 49648c16c4
commit d5dbfb67fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 30 deletions

View File

@ -32,6 +32,7 @@ use {
};
const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: f64 = 100_000f64;
const WAIT_FOR_STREAM_TIMEOUT_MS: u64 = 1000;
#[allow(clippy::too_many_arguments)]
pub fn spawn_server(
@ -225,35 +226,55 @@ async fn handle_connection(
stats.total_connections.load(Ordering::Relaxed),
);
while !stream_exit.load(Ordering::Relaxed) {
match uni_streams.next().await {
Some(stream_result) => match stream_result {
Ok(mut stream) => {
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let mut maybe_batch = None;
while !stream_exit.load(Ordering::Relaxed) {
if handle_chunk(
&stream.read_chunk(PACKET_DATA_SIZE, false).await,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
stake,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
if let Ok(stream) = tokio::time::timeout(
Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS),
uni_streams.next(),
)
.await
{
match stream {
Some(stream_result) => match stream_result {
Ok(mut stream) => {
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let mut maybe_batch = None;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS),
stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
{
if handle_chunk(
&chunk,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
stake,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
} else {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
}
}
}
}
Err(e) => {
debug!("stream error: {:?}", e);
Err(e) => {
debug!("stream error: {:?}", e);
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
break;
}
},
None => {
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
break;
}
},
None => {
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
break;
}
}
}
@ -503,6 +524,7 @@ pub mod test {
Arc<AtomicBool>,
crossbeam_channel::Receiver<PacketBatch>,
SocketAddr,
Arc<StreamStats>,
) {
let s = UdpSocket::bind("127.0.0.1:0").unwrap();
let exit = Arc::new(AtomicBool::new(false));
@ -522,10 +544,10 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
stats,
stats.clone(),
)
.unwrap();
(t, exit, receiver, server_address)
(t, exit, receiver, server_address, stats)
}
pub async fn make_client_endpoint(addr: &SocketAddr) -> NewConnection {
@ -668,7 +690,7 @@ pub mod test {
#[tokio::test]
async fn test_quic_server_exit() {
let (t, exit, _receiver, _server_address) = setup_quic_server();
let (t, exit, _receiver, _server_address, _stats) = setup_quic_server();
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
}
@ -676,16 +698,47 @@ pub mod test {
#[tokio::test]
async fn test_quic_timeout() {
solana_logger::setup();
let (t, exit, receiver, server_address) = setup_quic_server();
let (t, exit, receiver, server_address, _stats) = setup_quic_server();
check_timeout(receiver, server_address).await;
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
}
#[tokio::test]
async fn test_quic_stream_timeout() {
solana_logger::setup();
let (t, exit, _receiver, server_address, stats) = setup_quic_server();
let conn1 = make_client_endpoint(&server_address).await;
assert_eq!(stats.total_streams.load(Ordering::Relaxed), 0);
assert_eq!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 0);
// Send one byte to start the stream
let mut s1 = conn1.connection.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap_or_default();
// Wait long enough for the stream to timeout in receiving chunks
sleep(Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS * 2)).await;
// Test that the stream was created, but timed out in read
assert_eq!(stats.total_streams.load(Ordering::Relaxed), 1);
assert_eq!(stats.total_stream_read_timeouts.load(Ordering::Relaxed), 1);
for _ in 0..PACKET_DATA_SIZE {
// Ignoring any errors here. s1.finish() will test the error condition
s1.write_all(&[0u8]).await.unwrap_or_default();
}
// Test that more writes are not successful to the stream
s1.finish().await.unwrap_err();
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
}
#[tokio::test]
async fn test_quic_server_block_multiple_connections() {
solana_logger::setup();
let (t, exit, _receiver, server_address) = setup_quic_server();
let (t, exit, _receiver, server_address, _stats) = setup_quic_server();
check_block_multiple_connections(server_address).await;
exit.store(true, Ordering::Relaxed);
t.await.unwrap();
@ -694,7 +747,7 @@ pub mod test {
#[tokio::test]
async fn test_quic_server_multiple_writes() {
solana_logger::setup();
let (t, exit, receiver, server_address) = setup_quic_server();
let (t, exit, receiver, server_address, _stats) = setup_quic_server();
check_multiple_writes(receiver, server_address).await;
exit.store(true, Ordering::Relaxed);
t.await.unwrap();

View File

@ -155,6 +155,7 @@ pub struct StreamStats {
pub(crate) total_packet_batches_sent: AtomicUsize,
pub(crate) total_packet_batches_none: AtomicUsize,
pub(crate) total_stream_read_errors: AtomicUsize,
pub(crate) total_stream_read_timeouts: AtomicUsize,
pub(crate) num_evictions: AtomicUsize,
pub(crate) connection_add_failed: AtomicUsize,
pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
@ -246,6 +247,11 @@ impl StreamStats {
self.total_stream_read_errors.swap(0, Ordering::Relaxed),
i64
),
(
"stream_read_timeouts",
self.total_stream_read_timeouts.swap(0, Ordering::Relaxed),
i64
),
);
}
}