From abd4ef889eb172c15e62118fb5ad8da525f7b569 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Sat, 14 May 2022 15:02:13 -0700 Subject: [PATCH] Fixed a race condition where in multiple thread environment that wasteful (#25176) Duplicate and useless connections are being created which overloads the server and cause unreliable connections on the client side. --- client/src/connection_cache.rs | 78 ++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index c4ebe54d3..61d5a10b7 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -276,18 +276,6 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { (connection.clone(), true, map.stats.clone(), stats, 0, 0) } None => { - let (_, send_socket) = solana_net_utils::bind_in_range( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - VALIDATOR_PORT_RANGE, - ) - .unwrap(); - - let connection = if map.use_quic { - Connection::Quic(Arc::new(QuicTpuConnection::new(send_socket, *addr))) - } else { - Connection::Udp(Arc::new(UdpTpuConnection::new(send_socket, *addr))) - }; - // Upgrade to write access by dropping read lock and acquire write lock drop(map); let mut get_connection_map_lock_measure = @@ -297,27 +285,53 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms()); - // evict a connection if the cache is reaching upper bounds - let mut num_evictions = 0; - let mut get_connection_cache_eviction_measure = - Measure::start("get_connection_cache_eviction_measure"); - while map.map.len() >= MAX_CONNECTIONS { - let mut rng = thread_rng(); - let n = rng.gen_range(0, MAX_CONNECTIONS); - map.map.swap_remove_index(n); - num_evictions += 1; - } - get_connection_cache_eviction_measure.stop(); + // Read again, as it is possible that between read lock dropped and the write lock acquired + // another thread could have setup the connection. + match map.map.get(addr) { + Some(connection) => { + let mut stats = None; + // update connection stats + if let Connection::Quic(conn) = connection { + stats = conn.stats().map(|s| (conn.base_stats(), s)); + } + (connection.clone(), true, map.stats.clone(), stats, 0, 0) + } + None => { + let (_, send_socket) = solana_net_utils::bind_in_range( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + VALIDATOR_PORT_RANGE, + ) + .unwrap(); - map.map.insert(*addr, connection.clone()); - ( - connection, - false, - map.stats.clone(), - None, - num_evictions, - get_connection_cache_eviction_measure.as_ms(), - ) + let connection = if map.use_quic { + Connection::Quic(Arc::new(QuicTpuConnection::new(send_socket, *addr))) + } else { + Connection::Udp(Arc::new(UdpTpuConnection::new(send_socket, *addr))) + }; + + // evict a connection if the cache is reaching upper bounds + let mut num_evictions = 0; + let mut get_connection_cache_eviction_measure = + Measure::start("get_connection_cache_eviction_measure"); + while map.map.len() >= MAX_CONNECTIONS { + let mut rng = thread_rng(); + let n = rng.gen_range(0, MAX_CONNECTIONS); + map.map.swap_remove_index(n); + num_evictions += 1; + } + get_connection_cache_eviction_measure.stop(); + + map.map.insert(*addr, connection.clone()); + ( + connection, + false, + map.stats.clone(), + None, + num_evictions, + get_connection_cache_eviction_measure.as_ms(), + ) + } + } } }; get_connection_map_measure.stop();