Close QUIC connection before dropping the entry (#26269)

This commit is contained in:
Pankaj Garg 2022-06-28 13:46:56 -07:00 committed by GitHub
parent fa77cc5e48
commit e8ed7c1c46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 6 deletions

View File

@ -7,7 +7,8 @@ use {
futures_util::stream::StreamExt,
percentage::Percentage,
quinn::{
Connecting, Endpoint, EndpointConfig, Incoming, IncomingUniStreams, NewConnection, VarInt,
Connecting, Connection, Endpoint, EndpointConfig, Incoming, IncomingUniStreams,
NewConnection, VarInt,
},
solana_perf::packet::PacketBatch,
solana_sdk::{
@ -177,6 +178,7 @@ async fn setup_connection(
if stake != 0 || max_unstaked_connections > 0 {
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
&remote_addr,
Some(connection),
timing::timestamp(),
max_connections_per_ip,
) {
@ -375,14 +377,21 @@ struct ConnectionEntry {
exit: Arc<AtomicBool>,
last_update: Arc<AtomicU64>,
port: u16,
connection: Option<Connection>,
}
impl ConnectionEntry {
fn new(exit: Arc<AtomicBool>, last_update: Arc<AtomicU64>, port: u16) -> Self {
fn new(
exit: Arc<AtomicBool>,
last_update: Arc<AtomicU64>,
port: u16,
connection: Option<Connection>,
) -> Self {
Self {
exit,
last_update,
port,
connection,
}
}
@ -393,6 +402,9 @@ impl ConnectionEntry {
impl Drop for ConnectionEntry {
fn drop(&mut self) {
if let Some(conn) = self.connection.take() {
conn.close(0u32.into(), &[0u8]);
}
self.exit.store(true, Ordering::Relaxed);
}
}
@ -438,6 +450,7 @@ impl ConnectionTable {
fn try_add_connection(
&mut self,
addr: &SocketAddr,
connection: Option<Connection>,
last_update: u64,
max_connections_per_ip: usize,
) -> Option<(Arc<AtomicU64>, Arc<AtomicBool>)> {
@ -454,6 +467,7 @@ impl ConnectionTable {
exit.clone(),
last_update.clone(),
addr.port(),
connection,
));
self.total_size += 1;
Some((last_update, exit))
@ -843,12 +857,12 @@ pub mod test {
.collect();
for (i, socket) in sockets.iter().enumerate() {
table
.try_add_connection(socket, i as u64, max_connections_per_ip)
.try_add_connection(socket, None, i as u64, max_connections_per_ip)
.unwrap();
}
num_entries += 1;
table
.try_add_connection(&sockets[0], 5, max_connections_per_ip)
.try_add_connection(&sockets[0], None, 5, max_connections_per_ip)
.unwrap();
let new_size = 3;
@ -880,11 +894,11 @@ pub mod test {
.collect();
for (i, socket) in sockets.iter().enumerate() {
table
.try_add_connection(socket, (i * 2) as u64, max_connections_per_ip)
.try_add_connection(socket, None, (i * 2) as u64, max_connections_per_ip)
.unwrap();
table
.try_add_connection(socket, (i * 2 + 1) as u64, max_connections_per_ip)
.try_add_connection(socket, None, (i * 2 + 1) as u64, max_connections_per_ip)
.unwrap();
}
@ -893,6 +907,7 @@ pub mod test {
table
.try_add_connection(
&single_connection_addr,
None,
(num_ips * 2) as u64,
max_connections_per_ip,
)