From e8ed7c1c462aed0e73a7abfc099ed7a2acd8249e Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 28 Jun 2022 13:46:56 -0700 Subject: [PATCH] Close QUIC connection before dropping the entry (#26269) --- streamer/src/nonblocking/quic.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 0819195be4..42fadcc27d 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -7,7 +7,8 @@ use { futures_util::stream::StreamExt, percentage::Percentage, quinn::{ - Connecting, Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt, + Connecting, Connection, Endpoint, EndpointConfig, Incoming, IncomingUniStreams, + NewConnection, VarInt, }, solana_perf::packet::PacketBatch, solana_sdk::{ @@ -177,6 +178,7 @@ async fn setup_connection( if stake != 0 || max_unstaked_connections > 0 { if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( &remote_addr, + Some(connection), timing::timestamp(), max_connections_per_ip, ) { @@ -375,14 +377,21 @@ struct ConnectionEntry { exit: Arc, last_update: Arc, port: u16, + connection: Option, } impl ConnectionEntry { - fn new(exit: Arc, last_update: Arc, port: u16) -> Self { + fn new( + exit: Arc, + last_update: Arc, + port: u16, + connection: Option, + ) -> Self { Self { exit, last_update, port, + connection, } } @@ -393,6 +402,9 @@ impl ConnectionEntry { impl Drop for ConnectionEntry { fn drop(&mut self) { + if let Some(conn) = self.connection.take() { + conn.close(0u32.into(), &[0u8]); + } self.exit.store(true, Ordering::Relaxed); } } @@ -438,6 +450,7 @@ impl ConnectionTable { fn try_add_connection( &mut self, addr: &SocketAddr, + connection: Option, last_update: u64, max_connections_per_ip: usize, ) -> Option<(Arc, Arc)> { @@ -454,6 +467,7 @@ impl ConnectionTable { exit.clone(), last_update.clone(), addr.port(), + connection, )); self.total_size += 1; Some((last_update, exit)) @@ -843,12 +857,12 @@ pub mod test { .collect(); for (i, socket) in sockets.iter().enumerate() { table - .try_add_connection(socket, i as u64, max_connections_per_ip) + .try_add_connection(socket, None, i as u64, max_connections_per_ip) .unwrap(); } num_entries += 1; table - .try_add_connection(&sockets[0], 5, max_connections_per_ip) + .try_add_connection(&sockets[0], None, 5, max_connections_per_ip) .unwrap(); let new_size = 3; @@ -880,11 +894,11 @@ pub mod test { .collect(); for (i, socket) in sockets.iter().enumerate() { table - .try_add_connection(socket, (i * 2) as u64, max_connections_per_ip) + .try_add_connection(socket, None, (i * 2) as u64, max_connections_per_ip) .unwrap(); table - .try_add_connection(socket, (i * 2 + 1) as u64, max_connections_per_ip) + .try_add_connection(socket, None, (i * 2 + 1) as u64, max_connections_per_ip) .unwrap(); } @@ -893,6 +907,7 @@ pub mod test { table .try_add_connection( &single_connection_addr, + None, (num_ips * 2) as u64, max_connections_per_ip, )