From 3dad27d84d87d7917bd4684194df967b2c8c76e4 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 2 May 2022 19:27:40 -0700 Subject: [PATCH] Optimize connection cache eviction logic (#24911) * Optimize connection cache eviction logic * add eviction count and timing to metrics * use swap_remove --- client/src/connection_cache.rs | 90 +++++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 23 deletions(-) diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 246b1339d5..4b55644878 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -13,7 +13,7 @@ use { timing::AtomicInterval, transaction::VersionedTransaction, transport::TransportError, }, std::{ - collections::BTreeMap, + collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicU64, Ordering}, @@ -35,6 +35,8 @@ pub enum Connection { struct ConnectionCacheStats { cache_hits: AtomicU64, cache_misses: AtomicU64, + cache_evictions: AtomicU64, + eviction_time_ms: AtomicU64, sent_packets: AtomicU64, total_batches: AtomicU64, batch_success: AtomicU64, @@ -84,6 +86,16 @@ impl ConnectionCacheStats { self.cache_misses.swap(0, Ordering::Relaxed), i64 ), + ( + "cache_evictions", + self.cache_evictions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "eviction_time_ms", + self.eviction_time_ms.swap(0, Ordering::Relaxed), + i64 + ), ( "get_connection_ms", self.get_connection_ms.swap(0, Ordering::Relaxed), @@ -160,7 +172,8 @@ impl ConnectionCacheStats { } struct ConnectionMap { - map: BTreeMap, + map: HashMap, + list_of_peers: Vec, stats: Arc, last_stats: AtomicInterval, use_quic: bool, @@ -169,7 +182,8 @@ struct ConnectionMap { impl ConnectionMap { pub fn new() -> Self { Self { - map: BTreeMap::new(), + map: HashMap::with_capacity(MAX_CONNECTIONS), + list_of_peers: vec![], stats: Arc::new(ConnectionCacheStats::default()), last_stats: AtomicInterval::default(), use_quic: false, @@ -194,10 +208,12 @@ struct GetConnectionResult { connection: Connection, cache_hit: bool, report_stats: bool, - map_timing: u64, - lock_timing: u64, + map_timing_ms: u64, + lock_timing_ms: u64, connection_cache_stats: Arc, other_stats: Option<(Arc, ConnectionStats)>, + num_evictions: u64, + eviction_timing_ms: u64, } fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { @@ -205,21 +221,28 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { let map = (*CONNECTION_MAP).read().unwrap(); get_connection_map_lock_measure.stop(); - let mut lock_timing = get_connection_map_lock_measure.as_ms(); + let mut lock_timing_ms = get_connection_map_lock_measure.as_ms(); let report_stats = map .last_stats .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL); let mut get_connection_map_measure = Measure::start("get_connection_hit_measure"); - let (connection, cache_hit, connection_cache_stats, maybe_stats) = match map.map.get(addr) { + let ( + connection, + cache_hit, + connection_cache_stats, + maybe_stats, + num_evictions, + eviction_timing_ms, + ) = match map.map.get(addr) { Some(connection) => { let mut stats = None; // update connection stats if let Connection::Quic(conn) = connection { stats = conn.stats().map(|s| (conn.base_stats(), s)); } - (connection.clone(), true, map.stats.clone(), stats) + (connection.clone(), true, map.stats.clone(), stats, 0, 0) } None => { let (_, send_socket) = solana_net_utils::bind_in_range( @@ -241,20 +264,31 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { let mut map = (*CONNECTION_MAP).write().unwrap(); get_connection_map_lock_measure.stop(); - lock_timing = lock_timing.saturating_add(get_connection_map_lock_measure.as_ms()); + lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms()); - // evict a connection if the map is reaching upper bounds - while map.map.len() >= MAX_CONNECTIONS { + // evict a connection if the cache is reaching upper bounds + let mut num_evictions = 0; + let mut get_connection_cache_eviction_measure = + Measure::start("get_connection_cache_eviction_measure"); + while map.list_of_peers.len() >= MAX_CONNECTIONS { let mut rng = thread_rng(); let n = rng.gen_range(0, MAX_CONNECTIONS); - if let Some((nth_addr, _)) = map.map.iter().nth(n) { - let nth_addr = *nth_addr; - map.map.remove(&nth_addr); - } + let nth_addr = map.list_of_peers.swap_remove(n); + map.map.remove(&nth_addr); + num_evictions += 1; } + get_connection_cache_eviction_measure.stop(); map.map.insert(*addr, connection.clone()); - (connection, false, map.stats.clone(), None) + map.list_of_peers.push(*addr); + ( + connection, + false, + map.stats.clone(), + None, + num_evictions, + get_connection_cache_eviction_measure.as_ms(), + ) } }; get_connection_map_measure.stop(); @@ -263,10 +297,12 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { connection, cache_hit, report_stats, - map_timing: get_connection_map_measure.as_ms(), - lock_timing, + map_timing_ms: get_connection_map_measure.as_ms(), + lock_timing_ms, connection_cache_stats, other_stats: maybe_stats, + num_evictions, + eviction_timing_ms, } } @@ -278,10 +314,12 @@ fn get_connection(addr: &SocketAddr) -> (Connection, Arc) connection, cache_hit, report_stats, - map_timing, - lock_timing, + map_timing_ms, + lock_timing_ms, connection_cache_stats, other_stats, + num_evictions, + eviction_timing_ms, } = get_or_add_connection(addr); if report_stats { @@ -325,20 +363,26 @@ fn get_connection(addr: &SocketAddr) -> (Connection, Arc) .fetch_add(1, Ordering::Relaxed); connection_cache_stats .get_connection_hit_ms - .fetch_add(map_timing, Ordering::Relaxed); + .fetch_add(map_timing_ms, Ordering::Relaxed); } else { connection_cache_stats .cache_misses .fetch_add(1, Ordering::Relaxed); connection_cache_stats .get_connection_miss_ms - .fetch_add(map_timing, Ordering::Relaxed); + .fetch_add(map_timing_ms, Ordering::Relaxed); + connection_cache_stats + .cache_evictions + .fetch_add(num_evictions, Ordering::Relaxed); + connection_cache_stats + .eviction_time_ms + .fetch_add(eviction_timing_ms, Ordering::Relaxed); } get_connection_measure.stop(); connection_cache_stats .get_connection_lock_ms - .fetch_add(lock_timing, Ordering::Relaxed); + .fetch_add(lock_timing_ms, Ordering::Relaxed); connection_cache_stats .get_connection_ms .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);