persists repair-peers cache across repair service loops (#18400)

The repair-peers cache is reset each time repair service loop runs,
and so computed repeatedly for the same slots:
https://github.com/solana-labs/solana/blob/d2b07dca9/core/src/repair_service.rs#L275

This commit uses an LRU cache to persists repair-peers for each slot.
In addition to LRU eviction rules, in order to avoid re-using outdated
data, each entry also has 10 seconds TTL.
This commit is contained in:
behzad nouri 2021-07-07 14:12:09 +00:00 committed by GitHub
parent e752299ac5
commit a0551b4054
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 27 deletions

View File

@ -136,6 +136,9 @@ impl ClusterSlots {
}
pub fn compute_weights(&self, slot: Slot, repair_peers: &[ContactInfo]) -> Vec<u64> {
if repair_peers.is_empty() {
return Vec::default();
}
let stakes = {
let validator_stakes = self.validator_stakes.read().unwrap();
repair_peers

View File

@ -7,9 +7,10 @@ use crate::{
repair_weight::RepairWeight,
replay_stage::DUPLICATE_THRESHOLD,
result::Result,
serve_repair::{RepairType, ServeRepair},
serve_repair::{RepairType, ServeRepair, REPAIR_PEERS_CACHE_CAPACITY},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use lru::LruCache;
use solana_gossip::cluster_info::ClusterInfo;
use solana_ledger::{
blockstore::{Blockstore, SlotMeta},
@ -193,6 +194,7 @@ impl RepairService {
let mut last_stats = Instant::now();
let duplicate_slot_repair_statuses: HashMap<Slot, DuplicateSlotRepairStatus> =
HashMap::new();
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);
loop {
if exit.load(Ordering::Relaxed) {
@ -272,14 +274,13 @@ impl RepairService {
)
};
let mut cache = HashMap::new();
let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed");
let mut outstanding_requests = outstanding_requests.write().unwrap();
repairs.into_iter().for_each(|repair_request| {
if let Ok((to, req)) = serve_repair.repair_request(
cluster_slots,
repair_request,
&mut cache,
&mut peers_cache,
&mut repair_stats,
&repair_info.repair_validators,
&mut outstanding_requests,

View File

@ -6,7 +6,11 @@ use crate::{
result::{Error, Result},
};
use bincode::serialize;
use rand::distributions::{Distribution, WeightedIndex};
use lru::LruCache;
use rand::{
distributions::{Distribution, WeightedError, WeightedIndex},
Rng,
};
use solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
@ -22,7 +26,7 @@ use solana_perf::packet::{limited_deserialize, Packets, PacketsRecycler};
use solana_sdk::{clock::Slot, pubkey::Pubkey, timing::duration_as_ms};
use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::HashSet,
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
@ -32,6 +36,10 @@ use std::{
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
// Number of slots to cache their respective repair peers and sampling weights.
pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128;
// Limit cache entries ttl in order to avoid re-using outdated data.
const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10);
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum RepairType {
@ -99,7 +107,38 @@ pub struct ServeRepair {
cluster_info: Arc<ClusterInfo>,
}
type RepairCache = HashMap<Slot, (Vec<ContactInfo>, WeightedIndex<u64>)>;
// Cache entry for repair peers for a slot.
pub(crate) struct RepairPeers {
asof: Instant,
peers: Vec<(Pubkey, /*ContactInfo.serve_repair:*/ SocketAddr)>,
weighted_index: WeightedIndex<u64>,
}
impl RepairPeers {
fn new(asof: Instant, peers: &[ContactInfo], weights: &[u64]) -> Result<Self> {
if peers.is_empty() {
return Err(Error::from(ClusterInfoError::NoPeers));
}
if peers.len() != weights.len() {
return Err(Error::from(WeightedError::InvalidWeight));
}
let weighted_index = WeightedIndex::new(weights)?;
let peers = peers
.iter()
.map(|peer| (peer.id, peer.serve_repair))
.collect();
Ok(Self {
asof,
peers,
weighted_index,
})
}
fn sample<R: Rng>(&self, rng: &mut R) -> (Pubkey, SocketAddr) {
let index = self.weighted_index.sample(rng);
self.peers[index]
}
}
impl ServeRepair {
/// Without a valid keypair gossip will not function. Only useful for tests.
@ -378,11 +417,11 @@ impl ServeRepair {
Ok(out)
}
pub fn repair_request(
pub(crate) fn repair_request(
&self,
cluster_slots: &ClusterSlots,
repair_request: RepairType,
cache: &mut RepairCache,
peers_cache: &mut LruCache<Slot, RepairPeers>,
repair_stats: &mut RepairStats,
repair_validators: &Option<HashSet<Pubkey>>,
outstanding_requests: &mut OutstandingRepairs,
@ -390,25 +429,21 @@ impl ServeRepair {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let slot = repair_request.slot();
let (repair_peers, weighted_index) = match cache.entry(slot) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let repair_peers = match peers_cache.get(&slot) {
Some(entry) if entry.asof.elapsed() < REPAIR_PEERS_CACHE_TTL => entry,
_ => {
peers_cache.pop(&slot);
let repair_peers = self.repair_peers(repair_validators, slot);
if repair_peers.is_empty() {
return Err(Error::from(ClusterInfoError::NoPeers));
}
let weights = cluster_slots.compute_weights(slot, &repair_peers);
debug_assert_eq!(weights.len(), repair_peers.len());
let weighted_index = WeightedIndex::new(weights)?;
entry.insert((repair_peers, weighted_index))
let repair_peers = RepairPeers::new(Instant::now(), &repair_peers, &weights)?;
peers_cache.put(slot, repair_peers);
peers_cache.get(&slot).unwrap()
}
};
let n = weighted_index.sample(&mut rand::thread_rng());
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
let (peer, addr) = repair_peers.sample(&mut rand::thread_rng());
let nonce =
outstanding_requests.add_request(repair_request, solana_sdk::timing::timestamp());
let repair_peer_id = repair_peers[n].id;
let out = self.map_repair_request(&repair_request, &repair_peer_id, repair_stats, nonce)?;
let out = self.map_repair_request(&repair_request, &peer, repair_stats, nonce)?;
Ok((addr, out))
}
@ -754,7 +789,7 @@ mod tests {
let rv = serve_repair.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
@ -782,7 +817,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
@ -816,7 +851,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
@ -997,7 +1032,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
&mut OutstandingRepairs::default(),
@ -1014,7 +1049,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
&mut OutstandingRepairs::default(),
@ -1035,7 +1070,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut OutstandingRepairs::default(),