Optimize connection cache eviction logic (#24911)

* Optimize connection cache eviction logic

* add eviction count and timing to metrics

* use swap_remove
This commit is contained in:
Pankaj Garg 2022-05-02 19:27:40 -07:00 committed by GitHub
parent d1f864619e
commit 3dad27d84d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 67 additions and 23 deletions

View File

@ -13,7 +13,7 @@ use {
timing::AtomicInterval, transaction::VersionedTransaction, transport::TransportError, timing::AtomicInterval, transaction::VersionedTransaction, transport::TransportError,
}, },
std::{ std::{
collections::BTreeMap, collections::HashMap,
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{ sync::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
@ -35,6 +35,8 @@ pub enum Connection {
struct ConnectionCacheStats { struct ConnectionCacheStats {
cache_hits: AtomicU64, cache_hits: AtomicU64,
cache_misses: AtomicU64, cache_misses: AtomicU64,
cache_evictions: AtomicU64,
eviction_time_ms: AtomicU64,
sent_packets: AtomicU64, sent_packets: AtomicU64,
total_batches: AtomicU64, total_batches: AtomicU64,
batch_success: AtomicU64, batch_success: AtomicU64,
@ -84,6 +86,16 @@ impl ConnectionCacheStats {
self.cache_misses.swap(0, Ordering::Relaxed), self.cache_misses.swap(0, Ordering::Relaxed),
i64 i64
), ),
(
"cache_evictions",
self.cache_evictions.swap(0, Ordering::Relaxed),
i64
),
(
"eviction_time_ms",
self.eviction_time_ms.swap(0, Ordering::Relaxed),
i64
),
( (
"get_connection_ms", "get_connection_ms",
self.get_connection_ms.swap(0, Ordering::Relaxed), self.get_connection_ms.swap(0, Ordering::Relaxed),
@ -160,7 +172,8 @@ impl ConnectionCacheStats {
} }
struct ConnectionMap { struct ConnectionMap {
map: BTreeMap<SocketAddr, Connection>, map: HashMap<SocketAddr, Connection>,
list_of_peers: Vec<SocketAddr>,
stats: Arc<ConnectionCacheStats>, stats: Arc<ConnectionCacheStats>,
last_stats: AtomicInterval, last_stats: AtomicInterval,
use_quic: bool, use_quic: bool,
@ -169,7 +182,8 @@ struct ConnectionMap {
impl ConnectionMap { impl ConnectionMap {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
map: BTreeMap::new(), map: HashMap::with_capacity(MAX_CONNECTIONS),
list_of_peers: vec![],
stats: Arc::new(ConnectionCacheStats::default()), stats: Arc::new(ConnectionCacheStats::default()),
last_stats: AtomicInterval::default(), last_stats: AtomicInterval::default(),
use_quic: false, use_quic: false,
@ -194,10 +208,12 @@ struct GetConnectionResult {
connection: Connection, connection: Connection,
cache_hit: bool, cache_hit: bool,
report_stats: bool, report_stats: bool,
map_timing: u64, map_timing_ms: u64,
lock_timing: u64, lock_timing_ms: u64,
connection_cache_stats: Arc<ConnectionCacheStats>, connection_cache_stats: Arc<ConnectionCacheStats>,
other_stats: Option<(Arc<ClientStats>, ConnectionStats)>, other_stats: Option<(Arc<ClientStats>, ConnectionStats)>,
num_evictions: u64,
eviction_timing_ms: u64,
} }
fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
@ -205,21 +221,28 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
let map = (*CONNECTION_MAP).read().unwrap(); let map = (*CONNECTION_MAP).read().unwrap();
get_connection_map_lock_measure.stop(); get_connection_map_lock_measure.stop();
let mut lock_timing = get_connection_map_lock_measure.as_ms(); let mut lock_timing_ms = get_connection_map_lock_measure.as_ms();
let report_stats = map let report_stats = map
.last_stats .last_stats
.should_update(CONNECTION_STAT_SUBMISSION_INTERVAL); .should_update(CONNECTION_STAT_SUBMISSION_INTERVAL);
let mut get_connection_map_measure = Measure::start("get_connection_hit_measure"); let mut get_connection_map_measure = Measure::start("get_connection_hit_measure");
let (connection, cache_hit, connection_cache_stats, maybe_stats) = match map.map.get(addr) { let (
connection,
cache_hit,
connection_cache_stats,
maybe_stats,
num_evictions,
eviction_timing_ms,
) = match map.map.get(addr) {
Some(connection) => { Some(connection) => {
let mut stats = None; let mut stats = None;
// update connection stats // update connection stats
if let Connection::Quic(conn) = connection { if let Connection::Quic(conn) = connection {
stats = conn.stats().map(|s| (conn.base_stats(), s)); stats = conn.stats().map(|s| (conn.base_stats(), s));
} }
(connection.clone(), true, map.stats.clone(), stats) (connection.clone(), true, map.stats.clone(), stats, 0, 0)
} }
None => { None => {
let (_, send_socket) = solana_net_utils::bind_in_range( let (_, send_socket) = solana_net_utils::bind_in_range(
@ -241,20 +264,31 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
let mut map = (*CONNECTION_MAP).write().unwrap(); let mut map = (*CONNECTION_MAP).write().unwrap();
get_connection_map_lock_measure.stop(); get_connection_map_lock_measure.stop();
lock_timing = lock_timing.saturating_add(get_connection_map_lock_measure.as_ms()); lock_timing_ms = lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
// evict a connection if the map is reaching upper bounds // evict a connection if the cache is reaching upper bounds
while map.map.len() >= MAX_CONNECTIONS { let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.list_of_peers.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng(); let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS); let n = rng.gen_range(0, MAX_CONNECTIONS);
if let Some((nth_addr, _)) = map.map.iter().nth(n) { let nth_addr = map.list_of_peers.swap_remove(n);
let nth_addr = *nth_addr; map.map.remove(&nth_addr);
map.map.remove(&nth_addr); num_evictions += 1;
}
} }
get_connection_cache_eviction_measure.stop();
map.map.insert(*addr, connection.clone()); map.map.insert(*addr, connection.clone());
(connection, false, map.stats.clone(), None) map.list_of_peers.push(*addr);
(
connection,
false,
map.stats.clone(),
None,
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
} }
}; };
get_connection_map_measure.stop(); get_connection_map_measure.stop();
@ -263,10 +297,12 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
connection, connection,
cache_hit, cache_hit,
report_stats, report_stats,
map_timing: get_connection_map_measure.as_ms(), map_timing_ms: get_connection_map_measure.as_ms(),
lock_timing, lock_timing_ms,
connection_cache_stats, connection_cache_stats,
other_stats: maybe_stats, other_stats: maybe_stats,
num_evictions,
eviction_timing_ms,
} }
} }
@ -278,10 +314,12 @@ fn get_connection(addr: &SocketAddr) -> (Connection, Arc<ConnectionCacheStats>)
connection, connection,
cache_hit, cache_hit,
report_stats, report_stats,
map_timing, map_timing_ms,
lock_timing, lock_timing_ms,
connection_cache_stats, connection_cache_stats,
other_stats, other_stats,
num_evictions,
eviction_timing_ms,
} = get_or_add_connection(addr); } = get_or_add_connection(addr);
if report_stats { if report_stats {
@ -325,20 +363,26 @@ fn get_connection(addr: &SocketAddr) -> (Connection, Arc<ConnectionCacheStats>)
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
connection_cache_stats connection_cache_stats
.get_connection_hit_ms .get_connection_hit_ms
.fetch_add(map_timing, Ordering::Relaxed); .fetch_add(map_timing_ms, Ordering::Relaxed);
} else { } else {
connection_cache_stats connection_cache_stats
.cache_misses .cache_misses
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
connection_cache_stats connection_cache_stats
.get_connection_miss_ms .get_connection_miss_ms
.fetch_add(map_timing, Ordering::Relaxed); .fetch_add(map_timing_ms, Ordering::Relaxed);
connection_cache_stats
.cache_evictions
.fetch_add(num_evictions, Ordering::Relaxed);
connection_cache_stats
.eviction_time_ms
.fetch_add(eviction_timing_ms, Ordering::Relaxed);
} }
get_connection_measure.stop(); get_connection_measure.stop();
connection_cache_stats connection_cache_stats
.get_connection_lock_ms .get_connection_lock_ms
.fetch_add(lock_timing, Ordering::Relaxed); .fetch_add(lock_timing_ms, Ordering::Relaxed);
connection_cache_stats connection_cache_stats
.get_connection_ms .get_connection_ms
.fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed); .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);