Fixed a race condition where in multiple thread environment that wasteful (#25176)

Duplicate and useless connections are being created which overloads the server and cause unreliable
connections on the client side.
This commit is contained in:
Lijun Wang 2022-05-14 15:02:13 -07:00 committed by GitHub
parent 35d2a0fd69
commit abd4ef889e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 32 deletions

View File

@ -276,18 +276,6 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
(connection.clone(), true, map.stats.clone(), stats, 0, 0)
}
None => {
let (_, send_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.unwrap();
let connection = if map.use_quic {
Connection::Quic(Arc::new(QuicTpuConnection::new(send_socket, *addr)))
} else {
Connection::Udp(Arc::new(UdpTpuConnection::new(send_socket, *addr)))
};
// Upgrade to write access by dropping read lock and acquire write lock
drop(map);
let mut get_connection_map_lock_measure =
@ -297,27 +285,53 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS);
map.map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
// Read again, as it is possible that between read lock dropped and the write lock acquired
// another thread could have setup the connection.
match map.map.get(addr) {
Some(connection) => {
let mut stats = None;
// update connection stats
if let Connection::Quic(conn) = connection {
stats = conn.stats().map(|s| (conn.base_stats(), s));
}
(connection.clone(), true, map.stats.clone(), stats, 0, 0)
}
None => {
let (_, send_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,
)
.unwrap();
map.map.insert(*addr, connection.clone());
(
connection,
false,
map.stats.clone(),
None,
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
let connection = if map.use_quic {
Connection::Quic(Arc::new(QuicTpuConnection::new(send_socket, *addr)))
} else {
Connection::Udp(Arc::new(UdpTpuConnection::new(send_socket, *addr)))
};
// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS);
map.map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
map.map.insert(*addr, connection.clone());
(
connection,
false,
map.stats.clone(),
None,
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
}
}
}
};
get_connection_map_measure.stop();