prunes received-cache only once per unique owner's key (#17039)
This commit is contained in:
parent
0aa7824884
commit
0e646d10bb
|
@ -2423,8 +2423,9 @@ impl ClusterInfo {
|
|||
self.stats
|
||||
.skip_push_message_shred_version
|
||||
.add_relaxed(num_crds_values - num_filtered_crds_values);
|
||||
// Update crds values and obtain updated keys.
|
||||
let updated_labels: Vec<_> = {
|
||||
// Origins' pubkeys of updated crds values.
|
||||
// TODO: Should this also include origins of new crds values?
|
||||
let origins: HashSet<_> = {
|
||||
let mut gossip =
|
||||
self.time_gossip_write_lock("process_push", &self.stats.process_push_message);
|
||||
let now = timestamp();
|
||||
|
@ -2433,13 +2434,13 @@ impl ClusterInfo {
|
|||
.flat_map(|(from, crds_values)| {
|
||||
gossip.process_push_message(&from, crds_values, now)
|
||||
})
|
||||
.map(|v| v.value.label())
|
||||
.map(|v| v.value.pubkey())
|
||||
.collect()
|
||||
};
|
||||
// Generate prune messages.
|
||||
let prunes = self
|
||||
.time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache)
|
||||
.prune_received_cache(updated_labels, stakes);
|
||||
.prune_received_cache(origins, stakes);
|
||||
let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes
|
||||
.into_iter()
|
||||
.flat_map(|(from, prunes)| {
|
||||
|
|
|
@ -10,10 +10,11 @@ use crate::{
|
|||
crds_gossip_error::CrdsGossipError,
|
||||
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
|
||||
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
|
||||
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
|
||||
crds_value::{CrdsData, CrdsValue},
|
||||
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
|
||||
ping_pong::PingCache,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use rayon::ThreadPool;
|
||||
use solana_ledger::shred::Shred;
|
||||
use solana_sdk::{
|
||||
|
@ -80,21 +81,24 @@ impl CrdsGossip {
|
|||
}
|
||||
|
||||
/// remove redundant paths in the network
|
||||
pub fn prune_received_cache(
|
||||
pub fn prune_received_cache<I>(
|
||||
&mut self,
|
||||
labels: Vec<CrdsValueLabel>,
|
||||
origins: I, // Unique pubkeys of crds values' owners.
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
) -> HashMap<Pubkey, HashSet<Pubkey>> {
|
||||
let id = &self.id;
|
||||
let push = &mut self.push;
|
||||
let mut prune_map: HashMap<Pubkey, HashSet<_>> = HashMap::new();
|
||||
for origin in labels.iter().map(|k| k.pubkey()) {
|
||||
let peers = push.prune_received_cache(id, &origin, stakes);
|
||||
for from in peers {
|
||||
prune_map.entry(from).or_default().insert(origin);
|
||||
}
|
||||
}
|
||||
prune_map
|
||||
) -> HashMap</*gossip peer:*/ Pubkey, /*origins:*/ Vec<Pubkey>>
|
||||
where
|
||||
I: IntoIterator<Item = Pubkey>,
|
||||
{
|
||||
let self_pubkey = self.id;
|
||||
origins
|
||||
.into_iter()
|
||||
.flat_map(|origin| {
|
||||
self.push
|
||||
.prune_received_cache(&self_pubkey, &origin, stakes)
|
||||
.into_iter()
|
||||
.zip(std::iter::repeat(origin))
|
||||
})
|
||||
.into_group_map()
|
||||
}
|
||||
|
||||
pub fn new_push_messages(
|
||||
|
|
|
@ -53,7 +53,10 @@ pub struct CrdsGossipPush {
|
|||
/// bool indicates it has been pruned.
|
||||
/// This cache represents a lagging view of which validators
|
||||
/// currently have this node in their `active_set`
|
||||
received_cache: HashMap<Pubkey, HashMap<Pubkey, (bool, u64)>>,
|
||||
received_cache: HashMap<
|
||||
Pubkey, // origin/owner
|
||||
HashMap</*gossip peer:*/ Pubkey, (/*pruned:*/ bool, /*timestamp:*/ u64)>,
|
||||
>,
|
||||
last_pushed_to: LruCache<Pubkey, u64>,
|
||||
pub num_active: usize,
|
||||
pub push_fanout: usize,
|
||||
|
@ -102,67 +105,58 @@ impl CrdsGossipPush {
|
|||
) -> Vec<Pubkey> {
|
||||
let origin_stake = stakes.get(origin).unwrap_or(&0);
|
||||
let self_stake = stakes.get(self_pubkey).unwrap_or(&0);
|
||||
let cache = self.received_cache.get(origin);
|
||||
if cache.is_none() {
|
||||
return Vec::new();
|
||||
}
|
||||
let peers = cache.unwrap();
|
||||
|
||||
let peers = match self.received_cache.get_mut(origin) {
|
||||
None => return Vec::default(),
|
||||
Some(peers) => peers,
|
||||
};
|
||||
let peer_stake_total: u64 = peers
|
||||
.iter()
|
||||
.filter(|v| !(v.1).0)
|
||||
.map(|v| stakes.get(v.0).unwrap_or(&0))
|
||||
.filter(|(_, (pruned, _))| !pruned)
|
||||
.filter_map(|(peer, _)| stakes.get(peer))
|
||||
.sum();
|
||||
let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake);
|
||||
if peer_stake_total < prune_stake_threshold {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let staked_peers: Vec<(Pubkey, u64)> = peers
|
||||
.iter()
|
||||
.filter(|v| !(v.1).0)
|
||||
.filter_map(|p| stakes.get(p.0).map(|s| (*p.0, *s)))
|
||||
.filter(|(_, s)| *s > 0)
|
||||
.collect();
|
||||
|
||||
let mut seed = [0; 32];
|
||||
rand::thread_rng().fill(&mut seed[..]);
|
||||
let shuffle = weighted_shuffle(
|
||||
&staked_peers.iter().map(|(_, stake)| *stake).collect_vec(),
|
||||
seed,
|
||||
);
|
||||
|
||||
let shuffled_staked_peers = {
|
||||
let peers: Vec<_> = peers
|
||||
.iter()
|
||||
.filter(|(_, (pruned, _))| !pruned)
|
||||
.filter_map(|(peer, _)| Some((*peer, *stakes.get(peer)?)))
|
||||
.filter(|(_, stake)| *stake > 0)
|
||||
.collect();
|
||||
let mut seed = [0; 32];
|
||||
rand::thread_rng().fill(&mut seed[..]);
|
||||
let weights: Vec<_> = peers.iter().map(|(_, stake)| *stake).collect();
|
||||
weighted_shuffle(&weights, seed)
|
||||
.into_iter()
|
||||
.map(move |i| peers[i])
|
||||
};
|
||||
let mut keep = HashSet::new();
|
||||
let mut peer_stake_sum = 0;
|
||||
keep.insert(*origin);
|
||||
for next in shuffle {
|
||||
let (next_peer, next_stake) = staked_peers[next];
|
||||
if next_peer == *origin {
|
||||
for (peer, stake) in shuffled_staked_peers {
|
||||
if peer == *origin {
|
||||
continue;
|
||||
}
|
||||
keep.insert(next_peer);
|
||||
peer_stake_sum += next_stake;
|
||||
keep.insert(peer);
|
||||
peer_stake_sum += stake;
|
||||
if peer_stake_sum >= prune_stake_threshold
|
||||
&& keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let pruned_peers: Vec<Pubkey> = peers
|
||||
for (peer, (pruned, _)) in peers.iter_mut() {
|
||||
if !*pruned && !keep.contains(peer) {
|
||||
*pruned = true;
|
||||
}
|
||||
}
|
||||
peers
|
||||
.keys()
|
||||
.filter(|p| !keep.contains(p))
|
||||
.cloned()
|
||||
.collect();
|
||||
pruned_peers.iter().for_each(|p| {
|
||||
self.received_cache
|
||||
.get_mut(origin)
|
||||
.unwrap()
|
||||
.get_mut(p)
|
||||
.unwrap()
|
||||
.0 = true;
|
||||
});
|
||||
pruned_peers
|
||||
.filter(|peer| !keep.contains(peer))
|
||||
.copied()
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn wallclock_window(&self, now: u64) -> impl RangeBounds<u64> {
|
||||
|
|
|
@ -357,15 +357,11 @@ fn network_run_push(
|
|||
})
|
||||
.unwrap();
|
||||
|
||||
let updated_labels: Vec<_> =
|
||||
updated.into_iter().map(|u| u.value.label()).collect();
|
||||
let origins: HashSet<_> =
|
||||
updated.into_iter().map(|u| u.value.pubkey()).collect();
|
||||
let prunes_map = network
|
||||
.get(&to)
|
||||
.map(|node| {
|
||||
node.lock()
|
||||
.unwrap()
|
||||
.prune_received_cache(updated_labels, &stakes)
|
||||
})
|
||||
.map(|node| node.lock().unwrap().prune_received_cache(origins, &stakes))
|
||||
.unwrap();
|
||||
|
||||
for (from, prune_set) in prunes_map {
|
||||
|
|
Loading…
Reference in New Issue