prunes gossip nodes based on timeliness of delivered messages

As described here:
https://github.com/solana-labs/solana/issues/28642#issuecomment-1337449607
current gossip pruning code fails to maintain spanning trees across
cluster.

This commit instead implements a pruning code based on timeliness of
delivered messages. If a messages is delivered timely enough (in terms
of number of duplicates already observed for that value), it counts
towards the respective node's score. Once there are enough many CRDS
upserts from a specific origin, redundant nodes are pruned based on the
tracked score.

Since the pruning leaves some configurable redundancy and the scores are
reset frequently, it should better tolerate active-set rotations.
This commit is contained in:
behzad nouri 2022-12-07 11:56:18 -05:00
parent b06656cbba
commit 95d3393008
6 changed files with 231 additions and 195 deletions

View File

@ -2294,7 +2294,6 @@ impl ClusterInfo {
let prune_messages: Vec<_> = {
let gossip_crds = self.gossip.crds.read().unwrap();
let wallclock = timestamp();
let self_pubkey = self.id();
thread_pool.install(|| {
prunes
.into_par_iter()

View File

@ -68,8 +68,7 @@ impl CrdsGossip {
where
I: IntoIterator<Item = Pubkey>,
{
self.push
.prune_received_cache_many(self_pubkey, origins, stakes)
self.push.prune_received_cache(self_pubkey, origins, stakes)
}
pub fn new_push_messages(
@ -318,10 +317,6 @@ impl CrdsGossip {
timeouts: &HashMap<Pubkey, u64>,
) -> usize {
let mut rv = 0;
if now > 5 * self.push.msg_timeout {
let min = now - 5 * self.push.msg_timeout;
self.push.purge_old_received_cache(min);
}
if now > self.pull.crds_timeout {
//sanity check
assert_eq!(timeouts[self_pubkey], std::u64::MAX);

View File

@ -15,11 +15,12 @@ use {
crate::{
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
contact_info::ContactInfo,
crds::{Crds, Cursor, GossipRoute},
crds::{Crds, CrdsError, Cursor, GossipRoute},
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
ping_pong::PingCache,
received_cache::ReceivedCache,
weighted_shuffle::WeightedShuffle,
},
bincode::serialized_size,
@ -49,15 +50,15 @@ use {
},
};
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
pub(crate) const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
// With a fanout of 6, a 1000 node cluster should only take ~4 hops to converge.
// However since pushes are stake weighed, some trailing nodes
// might need more time to receive values. 30 seconds should be plenty.
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3;
const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3;
// Do not push to peers which have not been updated for this long.
const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000;
@ -69,17 +70,9 @@ pub struct CrdsGossipPush {
/// Cursor into the crds table for values to push.
crds_cursor: Mutex<Cursor>,
/// Cache that tracks which validators a message was received from
/// bool indicates it has been pruned.
///
/// This cache represents a lagging view of which validators
/// currently have this node in their `active_set`
#[allow(clippy::type_complexity)]
received_cache: Mutex<
HashMap<
Pubkey, // origin/owner
HashMap</*gossip peer:*/ Pubkey, (/*pruned:*/ bool, /*timestamp:*/ u64)>,
>,
>,
received_cache: Mutex<ReceivedCache>,
last_pushed_to: RwLock<LruCache</*node:*/ Pubkey, /*timestamp:*/ u64>>,
num_active: usize,
push_fanout: usize,
@ -97,7 +90,7 @@ impl Default for CrdsGossipPush {
max_bytes: PACKET_DATA_SIZE * 64,
active_set: RwLock::default(),
crds_cursor: Mutex::default(),
received_cache: Mutex::default(),
received_cache: Mutex::new(ReceivedCache::new(2 * CRDS_UNIQUE_PUBKEY_CAPACITY)),
last_pushed_to: RwLock::new(LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY)),
num_active: CRDS_GOSSIP_NUM_ACTIVE,
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
@ -115,12 +108,7 @@ impl CrdsGossipPush {
crds.read().unwrap().get_entries(&mut cursor).count()
}
fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 {
let min_path_stake = self_stake.min(origin_stake);
((CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT * min_path_stake as f64).round() as u64).max(1)
}
pub(crate) fn prune_received_cache_many<I>(
pub(crate) fn prune_received_cache<I>(
&self,
self_pubkey: &Pubkey,
origins: I, // Unique pubkeys of crds values' owners.
@ -133,81 +121,19 @@ impl CrdsGossipPush {
origins
.into_iter()
.flat_map(|origin| {
let peers = Self::prune_received_cache(
self_pubkey,
&origin,
stakes,
received_cache.deref_mut(),
);
peers.into_iter().zip(repeat(origin))
received_cache
.prune(
self_pubkey,
origin,
CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT,
CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES,
stakes,
)
.zip(repeat(origin))
})
.into_group_map()
}
fn prune_received_cache(
self_pubkey: &Pubkey,
origin: &Pubkey,
stakes: &HashMap<Pubkey, u64>,
received_cache: &mut HashMap<
Pubkey, // origin/owner
HashMap</*gossip peer:*/ Pubkey, (/*pruned:*/ bool, /*timestamp:*/ u64)>,
>,
) -> Vec<Pubkey> {
let origin_stake = stakes.get(origin).unwrap_or(&0);
let self_stake = stakes.get(self_pubkey).unwrap_or(&0);
let peers = match received_cache.get_mut(origin) {
None => return Vec::default(),
Some(peers) => peers,
};
let peer_stake_total: u64 = peers
.iter()
.filter(|(_, (pruned, _))| !pruned)
.filter_map(|(peer, _)| stakes.get(peer))
.sum();
let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake);
if peer_stake_total < prune_stake_threshold {
return Vec::new();
}
let mut rng = rand::thread_rng();
let shuffled_staked_peers = {
let peers: Vec<_> = peers
.iter()
.filter(|(_, (pruned, _))| !pruned)
.filter_map(|(peer, _)| Some((*peer, *stakes.get(peer)?)))
.filter(|(_, stake)| *stake > 0)
.collect();
let weights: Vec<_> = peers.iter().map(|(_, stake)| *stake).collect();
WeightedShuffle::new("prune-received-cache", &weights)
.shuffle(&mut rng)
.map(move |i| peers[i])
};
let mut keep = HashSet::new();
let mut peer_stake_sum = 0;
keep.insert(*origin);
for (peer, stake) in shuffled_staked_peers {
if peer == *origin {
continue;
}
keep.insert(peer);
peer_stake_sum += stake;
if peer_stake_sum >= prune_stake_threshold
&& keep.len() >= CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES
{
break;
}
}
for (peer, (pruned, _)) in peers.iter_mut() {
if !*pruned && !keep.contains(peer) {
*pruned = true;
}
}
peers
.keys()
.filter(|peer| !keep.contains(peer))
.copied()
.collect()
}
fn wallclock_window(&self, now: u64) -> impl RangeBounds<u64> {
now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout)
}
@ -223,33 +149,26 @@ impl CrdsGossipPush {
now: u64,
) -> Vec<Result<Pubkey, CrdsGossipError>> {
self.num_total.fetch_add(values.len(), Ordering::Relaxed);
let values: Vec<_> = {
let wallclock_window = self.wallclock_window(now);
let mut received_cache = self.received_cache.lock().unwrap();
values
.into_iter()
.map(|value| {
if !wallclock_window.contains(&value.wallclock()) {
return Err(CrdsGossipError::PushMessageTimeout);
}
let origin = value.pubkey();
let peers = received_cache.entry(origin).or_default();
peers
.entry(*from)
.and_modify(|(_pruned, timestamp)| *timestamp = now)
.or_insert((/*pruned:*/ false, now));
Ok(value)
})
.collect()
};
let mut received_cache = self.received_cache.lock().unwrap();
let mut crds = crds.write().unwrap();
let wallclock_window = self.wallclock_window(now);
values
.into_iter()
.map(|value| {
let value = value?;
if !wallclock_window.contains(&value.wallclock()) {
return Err(CrdsGossipError::PushMessageTimeout);
}
let origin = value.pubkey();
match crds.insert(value, now, GossipRoute::PushMessage) {
Ok(()) => Ok(origin),
Ok(()) => {
received_cache.record(origin, *from, /*num_dups:*/ 0);
Ok(origin)
}
Err(CrdsError::DuplicatePush(num_dups)) => {
received_cache.record(origin, *from, usize::from(num_dups));
self.num_old.fetch_add(1, Ordering::Relaxed);
Err(CrdsGossipError::PushMessageOldVersion)
}
Err(_) => {
self.num_old.fetch_add(1, Ordering::Relaxed);
Err(CrdsGossipError::PushMessageOldVersion)
@ -477,14 +396,6 @@ impl CrdsGossipPush {
.collect()
}
/// Purge received push message cache
pub(crate) fn purge_old_received_cache(&self, min_time: u64) {
self.received_cache.lock().unwrap().retain(|_, v| {
v.retain(|_, (_, t)| *t > min_time);
!v.is_empty()
});
}
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let active_set = {
@ -502,7 +413,7 @@ impl CrdsGossipPush {
}
clone
};
let received_cache = self.received_cache.lock().unwrap().clone();
let received_cache = self.received_cache.lock().unwrap().mock_clone();
let crds_cursor = *self.crds_cursor.lock().unwrap();
Self {
active_set: RwLock::new(active_set),
@ -533,68 +444,6 @@ mod tests {
)
}
#[test]
fn test_prune() {
let crds = RwLock::<Crds>::default();
let push = CrdsGossipPush::default();
let mut stakes = HashMap::new();
let self_id = solana_sdk::pubkey::new_rand();
let origin = solana_sdk::pubkey::new_rand();
stakes.insert(self_id, 100);
stakes.insert(origin, 100);
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&origin, 0,
)));
let low_staked_peers = (0..10).map(|_| solana_sdk::pubkey::new_rand());
let mut low_staked_set = HashSet::new();
low_staked_peers.for_each(|p| {
push.process_push_message(&crds, &p, vec![value.clone()], 0);
low_staked_set.insert(p);
stakes.insert(p, 1);
});
let pruned = {
let mut received_cache = push.received_cache.lock().unwrap();
CrdsGossipPush::prune_received_cache(
&self_id,
&origin,
&stakes,
received_cache.deref_mut(),
)
};
assert!(
pruned.is_empty(),
"should not prune if min threshold has not been reached"
);
let high_staked_peer = solana_sdk::pubkey::new_rand();
let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10;
stakes.insert(high_staked_peer, high_stake);
push.process_push_message(&crds, &high_staked_peer, vec![value], 0);
let pruned = {
let mut received_cache = push.received_cache.lock().unwrap();
CrdsGossipPush::prune_received_cache(
&self_id,
&origin,
&stakes,
received_cache.deref_mut(),
)
};
assert!(
pruned.len() < low_staked_set.len() + 1,
"should not prune all peers"
);
pruned.iter().for_each(|p| {
assert!(
low_staked_set.contains(p),
"only low staked peers should be pruned"
);
});
}
#[test]
fn test_process_push_one() {
let crds = RwLock::<Crds>::default();
@ -1162,9 +1011,6 @@ mod tests {
[Err(CrdsGossipError::PushMessageOldVersion)],
);
// purge the old pushed
push.purge_old_received_cache(1);
// push it again
assert_eq!(
push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),

View File

@ -19,6 +19,7 @@ pub mod epoch_slots;
pub mod gossip_error;
pub mod gossip_service;
pub mod ping_pong;
mod received_cache;
pub mod weighted_shuffle;
#[macro_use]

View File

@ -0,0 +1,191 @@
use {
itertools::Itertools,
lru::LruCache,
solana_sdk::pubkey::Pubkey,
std::{cmp::Reverse, collections::HashMap},
};
// For each origin, tracks which nodes have sent messages from that origin and
// their respective score in terms of timeliness of delivered messages.
pub(crate) struct ReceivedCache(LruCache</*origin/owner:*/ Pubkey, ReceivedCacheEntry>);
#[derive(Clone, Default)]
struct ReceivedCacheEntry {
nodes: HashMap<Pubkey, /*score:*/ usize>,
num_upserts: usize,
}
impl ReceivedCache {
// Minimum number of upserts before a cache entry can be pruned.
const MIN_NUM_UPSERTS: usize = 20;
pub(crate) fn new(capacity: usize) -> Self {
Self(LruCache::new(capacity))
}
pub(crate) fn record(&mut self, origin: Pubkey, node: Pubkey, num_dups: usize) {
match self.0.get_mut(&origin) {
Some(entry) => entry.record(node, num_dups),
None => {
let mut entry = ReceivedCacheEntry::default();
entry.record(node, num_dups);
self.0.put(origin, entry);
}
}
}
pub(crate) fn prune(
&mut self,
pubkey: &Pubkey, // This node.
origin: Pubkey, // CRDS value owner.
stake_threshold: f64,
min_ingress_nodes: usize,
stakes: &HashMap<Pubkey, u64>,
) -> impl Iterator<Item = Pubkey> {
match self.0.peek_mut(&origin) {
None => None,
Some(entry) if entry.num_upserts < Self::MIN_NUM_UPSERTS => None,
Some(entry) => Some(
std::mem::take(entry)
.prune(pubkey, &origin, stake_threshold, min_ingress_nodes, stakes)
.filter(move |node| node != &origin),
),
}
.into_iter()
.flatten()
}
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let mut cache = LruCache::new(self.0.cap());
for (&origin, entry) in self.0.iter().rev() {
cache.put(origin, entry.clone());
}
Self(cache)
}
}
impl ReceivedCacheEntry {
// Limit how big the cache can get if it is spammed
// with old messages with random pubkeys.
const CAPACITY: usize = 50;
// Threshold for the number of duplicates before which a message
// is counted as timely towards node's score.
const NUM_DUPS_THRESHOLD: usize = 2;
fn record(&mut self, node: Pubkey, num_dups: usize) {
if num_dups == 0 {
self.num_upserts = self.num_upserts.saturating_add(1);
}
// If the message has been timely enough increment node's score.
if num_dups < Self::NUM_DUPS_THRESHOLD {
let score = self.nodes.entry(node).or_default();
*score = score.saturating_add(1);
} else if self.nodes.len() < Self::CAPACITY {
// Ensure that node is inserted into the cache for later pruning.
let _ = self.nodes.entry(node).or_default();
}
}
fn prune(
self,
pubkey: &Pubkey, // This node.
origin: &Pubkey, // CRDS value owner.
stake_threshold: f64,
min_ingress_nodes: usize,
stakes: &HashMap<Pubkey, u64>,
) -> impl Iterator<Item = Pubkey> {
debug_assert!((0.0..=1.0).contains(&stake_threshold));
debug_assert!(self.num_upserts >= ReceivedCache::MIN_NUM_UPSERTS);
// Enforce a minimum aggregate ingress stake; see:
// https://github.com/solana-labs/solana/issues/3214
let min_ingress_stake = {
let stake = stakes.get(pubkey).min(stakes.get(origin));
(stake.copied().unwrap_or_default() as f64 * stake_threshold) as u64
};
self.nodes
.into_iter()
.map(|(node, score)| {
let stake = stakes.get(&node).copied().unwrap_or_default();
(node, score, stake)
})
.sorted_unstable_by_key(|&(_, score, stake)| Reverse((score, stake)))
.scan(0u64, |acc, (node, _score, stake)| {
let old = *acc;
*acc = acc.saturating_add(stake);
Some((node, old))
})
.skip(min_ingress_nodes)
.skip_while(move |&(_, stake)| stake < min_ingress_stake)
.map(|(node, _stake)| node)
}
}
#[cfg(test)]
mod tests {
use {
super::*,
std::{collections::HashSet, iter::repeat_with},
};
#[test]
fn test_received_cache() {
let mut cache = ReceivedCache::new(/*capacity:*/ 100);
let pubkey = Pubkey::new_unique();
let origin = Pubkey::new_unique();
let records = vec![
vec![3, 1, 7, 5],
vec![7, 6, 5, 2],
vec![2, 0, 0, 2],
vec![3, 5, 0, 6],
vec![6, 2, 6, 2],
];
let nodes: Vec<_> = repeat_with(Pubkey::new_unique)
.take(records.len())
.collect();
for (node, records) in nodes.iter().zip(records) {
for (num_dups, k) in records.into_iter().enumerate() {
for _ in 0..k {
cache.record(origin, *node, num_dups);
}
}
}
assert_eq!(cache.0.get(&origin).unwrap().num_upserts, 21);
let scores: HashMap<Pubkey, usize> = [
(nodes[0], 4),
(nodes[1], 13),
(nodes[2], 2),
(nodes[3], 8),
(nodes[4], 8),
]
.into_iter()
.collect();
assert_eq!(cache.0.get(&origin).unwrap().nodes, scores);
let stakes = [
(nodes[0], 6),
(nodes[1], 1),
(nodes[2], 5),
(nodes[3], 3),
(nodes[4], 7),
(pubkey, 9),
(origin, 9),
]
.into_iter()
.collect();
let prunes: HashSet<Pubkey> = [nodes[0], nodes[2], nodes[3]].into_iter().collect();
assert_eq!(
cache
.mock_clone()
.prune(&pubkey, origin, 0.5, 2, &stakes)
.collect::<HashSet<_>>(),
prunes
);
let prunes: HashSet<Pubkey> = [nodes[0], nodes[2]].into_iter().collect();
assert_eq!(
cache
.prune(&pubkey, origin, 1.0, 0, &stakes)
.collect::<HashSet<_>>(),
prunes
);
}
}

View File

@ -655,7 +655,11 @@ fn test_star_network_push_ring_200() {
let thread_pool = build_gossip_thread_pool();
network_simulator(&thread_pool, &mut network, 0.9);
}
// With the new pruning logic, this test is no longer valid and can be deleted.
// Ignoring it for now until the pruning code is stable.
#[test]
#[ignore]
#[serial]
fn test_connected_staked_network() {
solana_logger::setup();