diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index 7a9ca5fd3d..f5099f7720 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -136,6 +136,9 @@ impl ClusterSlots { } pub fn compute_weights(&self, slot: Slot, repair_peers: &[ContactInfo]) -> Vec { + if repair_peers.is_empty() { + return Vec::default(); + } let stakes = { let validator_stakes = self.validator_stakes.read().unwrap(); repair_peers diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index fb251de9f9..9e781cfebf 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -7,9 +7,10 @@ use crate::{ repair_weight::RepairWeight, replay_stage::DUPLICATE_THRESHOLD, result::Result, - serve_repair::{RepairType, ServeRepair}, + serve_repair::{RepairType, ServeRepair, REPAIR_PEERS_CACHE_CAPACITY}, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; +use lru::LruCache; use solana_gossip::cluster_info::ClusterInfo; use solana_ledger::{ blockstore::{Blockstore, SlotMeta}, @@ -193,6 +194,7 @@ impl RepairService { let mut last_stats = Instant::now(); let duplicate_slot_repair_statuses: HashMap = HashMap::new(); + let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY); loop { if exit.load(Ordering::Relaxed) { @@ -272,14 +274,13 @@ impl RepairService { ) }; - let mut cache = HashMap::new(); let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed"); let mut outstanding_requests = outstanding_requests.write().unwrap(); repairs.into_iter().for_each(|repair_request| { if let Ok((to, req)) = serve_repair.repair_request( cluster_slots, repair_request, - &mut cache, + &mut peers_cache, &mut repair_stats, &repair_info.repair_validators, &mut outstanding_requests, diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 43b76249f0..0740f651cf 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -6,7 +6,11 @@ use crate::{ result::{Error, Result}, }; use bincode::serialize; -use rand::distributions::{Distribution, WeightedIndex}; +use lru::LruCache; +use rand::{ + distributions::{Distribution, WeightedError, WeightedIndex}, + Rng, +}; use solana_gossip::{ cluster_info::{ClusterInfo, ClusterInfoError}, contact_info::ContactInfo, @@ -22,7 +26,7 @@ use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler}; use solana_sdk::{clock::Slot, pubkey::Pubkey, timing::duration_as_ms}; use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::HashSet, net::SocketAddr, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, @@ -32,6 +36,10 @@ use std::{ /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; +// Number of slots to cache their respective repair peers and sampling weights. +pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128; +// Limit cache entries ttl in order to avoid re-using outdated data. +const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10); #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)] pub enum RepairType { @@ -99,7 +107,38 @@ pub struct ServeRepair { cluster_info: Arc, } -type RepairCache = HashMap, WeightedIndex)>; +// Cache entry for repair peers for a slot. +pub(crate) struct RepairPeers { + asof: Instant, + peers: Vec<(Pubkey, /*ContactInfo.serve_repair:*/ SocketAddr)>, + weighted_index: WeightedIndex, +} + +impl RepairPeers { + fn new(asof: Instant, peers: &[ContactInfo], weights: &[u64]) -> Result { + if peers.is_empty() { + return Err(Error::from(ClusterInfoError::NoPeers)); + } + if peers.len() != weights.len() { + return Err(Error::from(WeightedError::InvalidWeight)); + } + let weighted_index = WeightedIndex::new(weights)?; + let peers = peers + .iter() + .map(|peer| (peer.id, peer.serve_repair)) + .collect(); + Ok(Self { + asof, + peers, + weighted_index, + }) + } + + fn sample(&self, rng: &mut R) -> (Pubkey, SocketAddr) { + let index = self.weighted_index.sample(rng); + self.peers[index] + } +} impl ServeRepair { /// Without a valid keypair gossip will not function. Only useful for tests. @@ -378,11 +417,11 @@ impl ServeRepair { Ok(out) } - pub fn repair_request( + pub(crate) fn repair_request( &self, cluster_slots: &ClusterSlots, repair_request: RepairType, - cache: &mut RepairCache, + peers_cache: &mut LruCache, repair_stats: &mut RepairStats, repair_validators: &Option>, outstanding_requests: &mut OutstandingRepairs, @@ -390,25 +429,21 @@ impl ServeRepair { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location let slot = repair_request.slot(); - let (repair_peers, weighted_index) = match cache.entry(slot) { - Entry::Occupied(entry) => entry.into_mut(), - Entry::Vacant(entry) => { + let repair_peers = match peers_cache.get(&slot) { + Some(entry) if entry.asof.elapsed() < REPAIR_PEERS_CACHE_TTL => entry, + _ => { + peers_cache.pop(&slot); let repair_peers = self.repair_peers(repair_validators, slot); - if repair_peers.is_empty() { - return Err(Error::from(ClusterInfoError::NoPeers)); - } let weights = cluster_slots.compute_weights(slot, &repair_peers); - debug_assert_eq!(weights.len(), repair_peers.len()); - let weighted_index = WeightedIndex::new(weights)?; - entry.insert((repair_peers, weighted_index)) + let repair_peers = RepairPeers::new(Instant::now(), &repair_peers, &weights)?; + peers_cache.put(slot, repair_peers); + peers_cache.get(&slot).unwrap() } }; - let n = weighted_index.sample(&mut rand::thread_rng()); - let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port + let (peer, addr) = repair_peers.sample(&mut rand::thread_rng()); let nonce = outstanding_requests.add_request(repair_request, solana_sdk::timing::timestamp()); - let repair_peer_id = repair_peers[n].id; - let out = self.map_repair_request(&repair_request, &repair_peer_id, repair_stats, nonce)?; + let out = self.map_repair_request(&repair_request, &peer, repair_stats, nonce)?; Ok((addr, out)) } @@ -754,7 +789,7 @@ mod tests { let rv = serve_repair.repair_request( &cluster_slots, RepairType::Shred(0, 0), - &mut HashMap::new(), + &mut LruCache::new(100), &mut RepairStats::default(), &None, &mut outstanding_requests, @@ -782,7 +817,7 @@ mod tests { .repair_request( &cluster_slots, RepairType::Shred(0, 0), - &mut HashMap::new(), + &mut LruCache::new(100), &mut RepairStats::default(), &None, &mut outstanding_requests, @@ -816,7 +851,7 @@ mod tests { .repair_request( &cluster_slots, RepairType::Shred(0, 0), - &mut HashMap::new(), + &mut LruCache::new(100), &mut RepairStats::default(), &None, &mut outstanding_requests, @@ -997,7 +1032,7 @@ mod tests { .repair_request( &cluster_slots, RepairType::Shred(0, 0), - &mut HashMap::new(), + &mut LruCache::new(100), &mut RepairStats::default(), &trusted_validators, &mut OutstandingRepairs::default(), @@ -1014,7 +1049,7 @@ mod tests { .repair_request( &cluster_slots, RepairType::Shred(0, 0), - &mut HashMap::new(), + &mut LruCache::new(100), &mut RepairStats::default(), &trusted_validators, &mut OutstandingRepairs::default(), @@ -1035,7 +1070,7 @@ mod tests { .repair_request( &cluster_slots, RepairType::Shred(0, 0), - &mut HashMap::new(), + &mut LruCache::new(100), &mut RepairStats::default(), &None, &mut OutstandingRepairs::default(),