diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index 2b188dfd8f..6b1b353c69 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -1,4 +1,7 @@ -use crate::{cluster_info::ClusterInfo, epoch_slots::EpochSlots, serve_repair::RepairType}; +use crate::{ + cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots, + serve_repair::RepairType, +}; use solana_ledger::{bank_forks::BankForks, staking_utils}; use solana_sdk::{clock::Slot, pubkey::Pubkey}; @@ -108,11 +111,7 @@ impl ClusterSlots { .collect() } - pub fn update_peers( - &mut self, - cluster_info: &RwLock, - bank_forks: &RwLock, - ) { + fn update_peers(&mut self, cluster_info: &RwLock, bank_forks: &RwLock) { let root = bank_forks.read().unwrap().root(); let (epoch, _) = bank_forks .read() @@ -141,14 +140,20 @@ impl ClusterSlots { self.epoch = Some(epoch); } } - pub fn peers(&self, slot: Slot) -> Vec<(Rc, u64)> { - let mut peers: HashMap, u64> = self.validator_stakes.clone(); - if let Some(slot_peers) = self.lookup(slot) { - slot_peers - .iter() - .for_each(|(x, y)| *peers.entry(x.clone()).or_insert(0) += *y); - } - peers.into_iter().filter(|x| *x.0 != self.self_id).collect() + + pub fn compute_weights(&self, slot: Slot, repair_peers: &[ContactInfo]) -> Vec<(u64, usize)> { + let slot_peers = self.lookup(slot); + repair_peers + .iter() + .enumerate() + .map(|(i, x)| { + ( + 1 + slot_peers.and_then(|v| v.get(&x.id)).cloned().unwrap_or(0) + + self.validator_stakes.get(&x.id).cloned().unwrap_or(0), + i, + ) + }) + .collect() } pub fn generate_repairs_for_missing_slots( @@ -217,6 +222,53 @@ mod tests { assert_eq!(cs.lookup(1).unwrap().get(&Pubkey::default()), Some(&0)); } + #[test] + fn test_compute_weights() { + let cs = ClusterSlots::default(); + let ci = ContactInfo::default(); + assert_eq!(cs.compute_weights(0, &[ci]), vec![(1, 0)]); + } + + #[test] + fn test_best_peer_2() { + let mut cs = ClusterSlots::default(); + let mut c1 = ContactInfo::default(); + let mut c2 = ContactInfo::default(); + let mut map = HashMap::new(); + let k1 = Pubkey::new_rand(); + let k2 = Pubkey::new_rand(); + map.insert(Rc::new(k1.clone()), std::u64::MAX / 2); + map.insert(Rc::new(k2.clone()), 0); + cs.cluster_slots.insert(0, map); + c1.id = k1; + c2.id = k2; + assert_eq!( + cs.compute_weights(0, &[c1, c2]), + vec![(std::u64::MAX / 2 + 1, 0), (1, 1)] + ); + } + + #[test] + fn test_best_peer_3() { + let mut cs = ClusterSlots::default(); + let mut c1 = ContactInfo::default(); + let mut c2 = ContactInfo::default(); + let mut map = HashMap::new(); + let k1 = Pubkey::new_rand(); + let k2 = Pubkey::new_rand(); + map.insert(Rc::new(k2.clone()), 0); + cs.cluster_slots.insert(0, map); + //make sure default weights are used as well + cs.validator_stakes + .insert(Rc::new(k1.clone()), std::u64::MAX / 2); + c1.id = k1; + c2.id = k2; + assert_eq!( + cs.compute_weights(0, &[c1, c2]), + vec![(std::u64::MAX / 2 + 1, 0), (1, 1)] + ); + } + #[test] fn test_update_new_staked_slot() { let mut cs = ClusterSlots::default(); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 7174a9f03f..88526b26df 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -12,6 +12,7 @@ use solana_ledger::{ }; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey}; use std::{ + collections::HashMap, iter::Iterator, net::SocketAddr, net::UdpSocket, @@ -129,11 +130,12 @@ impl RepairService { }; if let Ok(repairs) = repairs { + let mut cache = HashMap::new(); let reqs: Vec<((SocketAddr, Vec), RepairType)> = repairs .into_iter() .filter_map(|repair_request| { serve_repair - .repair_request(&repair_request) + .repair_request(&cluster_slots, &repair_request, &mut cache) .map(|result| (result, repair_request)) .ok() }) diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index c0a55143eb..0e107aeba2 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -2,22 +2,25 @@ use crate::packet::limited_deserialize; use crate::streamer::{PacketReceiver, PacketSender}; use crate::{ cluster_info::{ClusterInfo, ClusterInfoError}, + cluster_slots::ClusterSlots, contact_info::ContactInfo, packet::Packet, result::{Error, Result}, + weighted_shuffle::weighted_best, }; use bincode::serialize; -use rand::{thread_rng, Rng}; use solana_ledger::blockstore::Blockstore; use solana_measure::thread_mem_usage; use solana_metrics::{datapoint_debug, inc_new_counter_debug}; use solana_perf::packet::{Packets, PacketsRecycler}; use solana_sdk::{ clock::Slot, + pubkey::Pubkey, signature::{Keypair, Signer}, timing::duration_as_ms, }; use std::{ + collections::HashMap, net::SocketAddr, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, @@ -61,6 +64,8 @@ pub struct ServeRepair { cluster_info: Arc>, } +type RepairCache = HashMap, Vec<(u64, usize)>)>; + impl ServeRepair { /// Without a valid keypair gossip will not function. Only useful for tests. pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self { @@ -269,21 +274,30 @@ impl ServeRepair { Ok(out) } - pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec)> { + pub fn repair_request( + &self, + cluster_slots: &ClusterSlots, + repair_request: &RepairType, + cache: &mut RepairCache, + ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location - let valid: Vec<_> = self - .cluster_info - .read() - .unwrap() - .repair_peers(repair_request.slot()); - if valid.is_empty() { - return Err(ClusterInfoError::NoPeers.into()); + if cache.get(&repair_request.slot()).is_none() { + let repair_peers: Vec<_> = self + .cluster_info + .read() + .unwrap() + .repair_peers(repair_request.slot()); + if repair_peers.is_empty() { + return Err(ClusterInfoError::NoPeers.into()); + } + let weights = cluster_slots.compute_weights(repair_request.slot(), &repair_peers); + cache.insert(repair_request.slot(), (repair_peers, weights)); } - let n = thread_rng().gen::() % valid.len(); - let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port + let (repair_peers, weights) = cache.get(&repair_request.slot()).unwrap(); + let n = weighted_best(&weights, Pubkey::new_rand().to_bytes()); + let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port let out = self.map_repair_request(repair_request)?; - Ok((addr, out)) } @@ -563,10 +577,15 @@ mod tests { #[test] fn window_index_request() { + let cluster_slots = ClusterSlots::default(); let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me))); let serve_repair = ServeRepair::new(cluster_info.clone()); - let rv = serve_repair.repair_request(&RepairType::Shred(0, 0)); + let rv = serve_repair.repair_request( + &cluster_slots, + &RepairType::Shred(0, 0), + &mut HashMap::new(), + ); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243); @@ -587,7 +606,11 @@ mod tests { }; cluster_info.write().unwrap().insert_info(nxt.clone()); let rv = serve_repair - .repair_request(&RepairType::Shred(0, 0)) + .repair_request( + &cluster_slots, + &RepairType::Shred(0, 0), + &mut HashMap::new(), + ) .unwrap(); assert_eq!(nxt.serve_repair, serve_repair_addr); assert_eq!(rv.0, nxt.serve_repair); @@ -614,7 +637,11 @@ mod tests { while !one || !two { //this randomly picks an option, so eventually it should pick both let rv = serve_repair - .repair_request(&RepairType::Shred(0, 0)) + .repair_request( + &cluster_slots, + &RepairType::Shred(0, 0), + &mut HashMap::new(), + ) .unwrap(); if rv.0 == serve_repair_addr { one = true;