includes origin's stake in gossip push nodes sampling (#29343)

Gossip push samples nodes by stake. This is unnecessarily wasteful and
creates too much congestion at high staked nodes if the CRDS value to be
propagated is from a node with low or zero stake.
This commit instead maintains several active-sets for push, each
corresponding with a stake bucket. Peer sampling weights are accordingly
capped by the respective bucket stake.
This commit is contained in:
behzad nouri 2023-01-11 19:46:32 +00:00 committed by GitHub
parent 28071d5bf4
commit d89cf0d28b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 457 additions and 263 deletions

View File

@ -1526,7 +1526,7 @@ impl ClusterInfo {
let (mut push_messages, num_entries, num_nodes) = {
let _st = ScopedTimer::from(&self.stats.new_push_requests);
self.gossip
.new_push_messages(self.drain_push_queue(), timestamp())
.new_push_messages(&self_id, self.drain_push_queue(), timestamp(), stakes)
};
self.stats
.push_fanout_num_entries
@ -1815,7 +1815,7 @@ impl ClusterInfo {
.unwrap()
}
fn handle_batch_prune_messages(&self, messages: Vec<PruneData>) {
fn handle_batch_prune_messages(&self, messages: Vec<PruneData>, stakes: &HashMap<Pubkey, u64>) {
let _st = ScopedTimer::from(&self.stats.handle_batch_prune_messages_time);
if messages.is_empty() {
return;
@ -1840,6 +1840,7 @@ impl ClusterInfo {
&data.prunes,
data.wallclock,
now,
stakes,
) {
Err(CrdsGossipError::PruneMessageTimeout) => {
prune_message_timeout += 1;
@ -2451,7 +2452,7 @@ impl ClusterInfo {
push_messages.retain(|(_, data)| !data.is_empty());
}
self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
self.handle_batch_prune_messages(prune_messages);
self.handle_batch_prune_messages(prune_messages, stakes);
self.handle_batch_push_messages(
push_messages,
thread_pool,
@ -3710,6 +3711,7 @@ RPC Enabled Nodes: 1"#;
Arc::new(keypair),
SocketAddrSpace::Unspecified,
);
let stakes = HashMap::<Pubkey, u64>::default();
cluster_info
.ping_cache
.lock()
@ -3719,16 +3721,19 @@ RPC Enabled Nodes: 1"#;
cluster_info.gossip.refresh_push_active_set(
&cluster_info.keypair(),
cluster_info.my_shred_version(),
&HashMap::new(), // stakes
None, // gossip validators
&stakes,
None, // gossip validators
&cluster_info.ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
//check that all types of gossip messages are signed correctly
let (push_messages, _, _) = cluster_info
.gossip
.new_push_messages(cluster_info.drain_push_queue(), timestamp());
let (push_messages, _, _) = cluster_info.gossip.new_push_messages(
&cluster_info.id(),
cluster_info.drain_push_queue(),
timestamp(),
&stakes,
);
// there should be some pushes ready
assert!(!push_messages.is_empty());
push_messages

View File

@ -11,7 +11,7 @@ use {
crds::{Crds, GossipRoute},
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
crds_gossip_push::CrdsGossipPush,
crds_value::{CrdsData, CrdsValue},
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
legacy_contact_info::LegacyContactInfo as ContactInfo,
@ -69,8 +69,10 @@ impl CrdsGossip {
pub fn new_push_messages(
&self,
pubkey: &Pubkey, // This node.
pending_push_messages: Vec<CrdsValue>,
now: u64,
stakes: &HashMap<Pubkey, u64>,
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
@ -82,7 +84,7 @@ impl CrdsGossip {
let _ = crds.insert(entry, now, GossipRoute::LocalMessage);
}
}
self.push.new_push_messages(&self.crds, now)
self.push.new_push_messages(pubkey, &self.crds, now, stakes)
}
pub(crate) fn push_duplicate_shred(
@ -157,11 +159,13 @@ impl CrdsGossip {
origin: &[Pubkey],
wallclock: u64,
now: u64,
stakes: &HashMap<Pubkey, u64>,
) -> Result<(), CrdsGossipError> {
if now > wallclock.saturating_add(self.push.prune_timeout) {
Err(CrdsGossipError::PruneMessageTimeout)
} else if self_pubkey == destination {
self.push.process_prune_msg(self_pubkey, peer, origin);
self.push
.process_prune_msg(self_pubkey, peer, origin, stakes);
Ok(())
} else {
Err(CrdsGossipError::BadPruneDestination)
@ -187,7 +191,6 @@ impl CrdsGossip {
self_keypair,
self_shred_version,
network_size,
CRDS_GOSSIP_NUM_ACTIVE,
ping_cache,
pings,
socket_addr_space,
@ -419,6 +422,7 @@ mod test {
&[prune_pubkey],
now,
now,
&HashMap::<Pubkey, u64>::default(), // stakes
);
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
//correct dest
@ -429,6 +433,7 @@ mod test {
&[prune_pubkey], // origins
now,
now,
&HashMap::<Pubkey, u64>::default(), // stakes
);
res.unwrap();
//test timeout
@ -440,6 +445,7 @@ mod test {
&[prune_pubkey], // origins
now,
timeout,
&HashMap::<Pubkey, u64>::default(), // stakes
);
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
}

View File

@ -15,19 +15,16 @@ use {
crate::{
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
crds::{Crds, CrdsError, Cursor, GossipRoute},
crds_gossip::{self, get_stake, get_weight},
crds_gossip,
crds_value::CrdsValue,
legacy_contact_info::LegacyContactInfo as ContactInfo,
ping_pong::PingCache,
push_active_set::PushActiveSet,
received_cache::ReceivedCache,
weighted_shuffle::WeightedShuffle,
},
bincode::serialized_size,
indexmap::map::IndexMap,
itertools::Itertools,
lru::LruCache,
rand::{seq::SliceRandom, Rng},
solana_bloom::bloom::{AtomicBloom, Bloom},
rand::Rng,
solana_sdk::{
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
@ -36,7 +33,6 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
std::{
cmp,
collections::{HashMap, HashSet},
iter::repeat,
net::SocketAddr,
@ -49,7 +45,6 @@ use {
},
};
pub(crate) const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
// With a fanout of 6, a 1000 node cluster should only take ~4 hops to converge.
// However since pushes are stake weighed, some trailing nodes
@ -65,15 +60,13 @@ pub struct CrdsGossipPush {
/// Max bytes per message
max_bytes: usize,
/// Active set of validators for push
active_set: RwLock<IndexMap<Pubkey, AtomicBloom<Pubkey>>>,
active_set: RwLock<PushActiveSet>,
/// Cursor into the crds table for values to push.
crds_cursor: Mutex<Cursor>,
/// Cache that tracks which validators a message was received from
/// This cache represents a lagging view of which validators
/// currently have this node in their `active_set`
received_cache: Mutex<ReceivedCache>,
last_pushed_to: RwLock<LruCache</*node:*/ Pubkey, /*timestamp:*/ u64>>,
num_active: usize,
push_fanout: usize,
pub(crate) msg_timeout: u64,
pub prune_timeout: u64,
@ -90,8 +83,6 @@ impl Default for CrdsGossipPush {
active_set: RwLock::default(),
crds_cursor: Mutex::default(),
received_cache: Mutex::new(ReceivedCache::new(2 * CRDS_UNIQUE_PUBKEY_CAPACITY)),
last_pushed_to: RwLock::new(LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY)),
num_active: CRDS_GOSSIP_NUM_ACTIVE,
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS,
@ -149,7 +140,7 @@ impl CrdsGossipPush {
let mut received_cache = self.received_cache.lock().unwrap();
let mut crds = crds.write().unwrap();
let wallclock_window = self.wallclock_window(now);
let mut origins = HashSet::with_capacity(messages.len());
let mut origins = HashSet::new();
for (from, values) in messages {
self.num_total.fetch_add(values.len(), Ordering::Relaxed);
for value in values {
@ -183,19 +174,16 @@ impl CrdsGossipPush {
/// pruned the source addresses.
pub(crate) fn new_push_messages(
&self,
pubkey: &Pubkey, // This node.
crds: &RwLock<Crds>,
now: u64,
stakes: &HashMap<Pubkey, u64>,
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
usize, // number of push messages
) {
let active_set = self.active_set.read().unwrap();
let active_set_len = active_set.len();
let push_fanout = self.push_fanout.min(active_set_len);
if push_fanout == 0 {
return (HashMap::default(), 0, 0);
}
let mut num_pushes = 0;
let mut num_values = 0;
let mut total_bytes: usize = 0;
@ -216,28 +204,21 @@ impl CrdsGossipPush {
}
num_values += 1;
let origin = value.pubkey();
// Use a consistent index for the same origin so the active set
// learns the MST for that origin.
let offset = origin.as_ref()[0] as usize;
for i in offset..offset + push_fanout {
let index = i % active_set_len;
let (peer, filter) = active_set.get_index(index).unwrap();
if !filter.contains(&origin) || value.should_force_push(peer) {
trace!("new_push_messages insert {} {:?}", *peer, value);
push_messages.entry(*peer).or_default().push(value.clone());
num_pushes += 1;
}
let nodes = active_set.get_nodes(
pubkey,
&origin,
|node| value.should_force_push(node),
stakes,
);
for node in nodes.take(self.push_fanout) {
push_messages.entry(*node).or_default().push(value.clone());
num_pushes += 1;
}
}
drop(crds);
drop(crds_cursor);
drop(active_set);
self.num_pushes.fetch_add(num_pushes, Ordering::Relaxed);
trace!("new_push_messages {} {}", num_values, active_set_len);
let mut last_pushed_to = self.last_pushed_to.write().unwrap();
for target_pubkey in push_messages.keys().copied() {
last_pushed_to.put(target_pubkey, now);
}
(push_messages, num_values, num_pushes)
}
@ -247,26 +228,13 @@ impl CrdsGossipPush {
self_pubkey: &Pubkey,
peer: &Pubkey,
origins: &[Pubkey],
stakes: &HashMap<Pubkey, u64>,
) {
if let Some(filter) = self.active_set.read().unwrap().get(peer) {
for origin in origins {
if origin != self_pubkey {
filter.add(origin);
}
}
}
}
fn compute_need(num_active: usize, active_set_len: usize, ratio: usize) -> usize {
let num = active_set_len / ratio;
cmp::min(num_active, (num_active - active_set_len) + num)
let active_set = self.active_set.read().unwrap();
active_set.prune(self_pubkey, peer, origins, stakes);
}
/// Refresh the push active set.
///
/// # Arguments
///
/// * ratio - active_set.len()/ratio is the number of actives to rotate
#[allow(clippy::too_many_arguments)]
pub(crate) fn refresh_push_active_set(
&self,
@ -276,21 +244,13 @@ impl CrdsGossipPush {
self_keypair: &Keypair,
self_shred_version: u16,
network_size: usize,
ratio: usize,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) {
const BLOOM_FALSE_RATE: f64 = 0.1;
const BLOOM_MAX_BITS: usize = 1024 * 8 * 4;
#[cfg(debug_assertions)]
const MIN_NUM_BLOOM_ITEMS: usize = 512;
#[cfg(not(debug_assertions))]
const MIN_NUM_BLOOM_ITEMS: usize = CRDS_UNIQUE_PUBKEY_CAPACITY;
let mut rng = rand::thread_rng();
let mut new_items = HashMap::new();
// Gossip peers and respective sampling weights.
let peers = self.push_options(
// Active and valid gossip nodes with matching shred-version.
let nodes = self.push_options(
crds,
&self_keypair.pubkey(),
self_shred_version,
@ -299,57 +259,37 @@ impl CrdsGossipPush {
socket_addr_space,
);
// Check for nodes which have responded to ping messages.
let peers: Vec<_> = {
let nodes: Vec<_> = {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now();
peers
nodes
.into_iter()
.filter(|(_weight, peer)| {
let node = (peer.id, peer.gossip);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
.filter(|node| {
let (check, ping) = {
let node = (node.id, node.gossip);
ping_cache.check(now, node, &mut pingf)
};
if let Some(ping) = ping {
pings.push((peer.gossip, ping));
pings.push((node.gossip, ping));
}
check
})
.collect()
};
let (weights, peers): (Vec<_>, Vec<_>) = crds_gossip::dedup_gossip_addresses(peers)
let nodes = nodes.into_iter().map(|node| {
let stake = stakes.get(&node.id).copied().unwrap_or_default();
(stake, node)
});
let nodes = crds_gossip::dedup_gossip_addresses(nodes)
.into_values()
.map(|(weight, node)| (weight, node.id))
.unzip();
if peers.is_empty() {
.map(|(_stake, node)| node.id)
.collect::<Vec<_>>();
if nodes.is_empty() {
return;
}
let num_bloom_items = MIN_NUM_BLOOM_ITEMS.max(network_size);
let shuffle = WeightedShuffle::new("push-options", &weights).shuffle(&mut rng);
let mut active_set = self.active_set.write().unwrap();
let need = Self::compute_need(self.num_active, active_set.len(), ratio);
for peer in shuffle.map(|i| peers[i]) {
if new_items.len() >= need {
break;
}
if active_set.contains_key(&peer) || new_items.contains_key(&peer) {
continue;
}
let bloom = AtomicBloom::from(Bloom::random(
num_bloom_items,
BLOOM_FALSE_RATE,
BLOOM_MAX_BITS,
));
bloom.add(&peer);
new_items.insert(peer, bloom);
}
let mut keys: Vec<Pubkey> = active_set.keys().cloned().collect();
keys.shuffle(&mut rng);
let num = keys.len() / ratio;
for k in &keys[..num] {
active_set.swap_remove(k);
}
for (k, v) in new_items {
active_set.insert(k, v);
}
active_set.rotate(&mut rng, self.push_fanout * 3, network_size, &nodes, stakes)
}
fn push_options(
@ -360,13 +300,10 @@ impl CrdsGossipPush {
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<(/*weight:*/ u64, /*node:*/ ContactInfo)> {
) -> Vec<ContactInfo> {
let now = timestamp();
let mut rng = rand::thread_rng();
let max_weight = u16::MAX as f32 - 1.0;
let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS);
let last_pushed_to = self.last_pushed_to.read().unwrap();
// crds should be locked last after self.last_pushed_to.
let crds = crds.read().unwrap();
crds.get_nodes()
.filter_map(|value| {
@ -390,41 +327,18 @@ impl CrdsGossipPush {
gossip_validators.contains(&info.id)
})
})
.map(|info| {
let last_pushed_to = last_pushed_to.peek(&info.id).copied().unwrap_or_default();
let since = (now.saturating_sub(last_pushed_to).min(3600 * 1000) / 1024) as u32;
let stake = get_stake(&info.id, stakes);
let weight = get_weight(max_weight, since, stake);
// Weights are bounded by max_weight defined above.
// So this type-cast should be safe.
((weight * 100.0) as u64, info.clone())
})
.cloned()
.collect()
}
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let active_set = {
let active_set = self.active_set.read().unwrap();
active_set
.iter()
.map(|(k, v)| (*k, v.mock_clone()))
.collect()
};
let last_pushed_to = {
let last_pushed_to = self.last_pushed_to.read().unwrap();
let mut clone = LruCache::new(last_pushed_to.cap());
for (k, v) in last_pushed_to.iter().rev() {
clone.put(*k, *v);
}
clone
};
let active_set = self.active_set.read().unwrap().mock_clone();
let received_cache = self.received_cache.lock().unwrap().mock_clone();
let crds_cursor = *self.crds_cursor.lock().unwrap();
Self {
active_set: RwLock::new(active_set),
received_cache: Mutex::new(received_cache),
last_pushed_to: RwLock::new(last_pushed_to),
crds_cursor: Mutex::new(crds_cursor),
num_total: AtomicUsize::new(self.num_total.load(Ordering::Relaxed)),
num_old: AtomicUsize::new(self.num_old.load(Ordering::Relaxed)),
@ -535,113 +449,7 @@ mod tests {
[origin].into_iter().collect()
);
}
#[test]
fn test_compute_need() {
assert_eq!(CrdsGossipPush::compute_need(30, 0, 10), 30);
assert_eq!(CrdsGossipPush::compute_need(30, 1, 10), 29);
assert_eq!(CrdsGossipPush::compute_need(30, 30, 10), 3);
assert_eq!(CrdsGossipPush::compute_need(30, 29, 10), 3);
}
#[test]
fn test_refresh_active_set() {
solana_logger::setup();
let now = timestamp();
let mut crds = Crds::default();
let push = CrdsGossipPush::default();
let mut ping_cache = new_ping_cache();
let value1 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(value1.id, value1.gossip, Instant::now());
let value1 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(value1));
assert_eq!(
crds.insert(value1.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
let keypair = Keypair::new();
let crds = RwLock::new(crds);
let ping_cache = Mutex::new(ping_cache);
push.refresh_push_active_set(
&crds,
&HashMap::new(), // stakes
None, // gossip_validators
&keypair,
0, // self_shred_version
1, // network_sizer
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
let active_set = push.active_set.read().unwrap();
assert!(active_set.get(&value1.label().pubkey()).is_some());
let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
value2.gossip.set_port(1245);
ping_cache
.lock()
.unwrap()
.mock_pong(value2.id, value2.gossip, Instant::now());
let value2 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(value2));
assert!(active_set.get(&value2.label().pubkey()).is_none());
drop(active_set);
assert_eq!(
crds.write()
.unwrap()
.insert(value2.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
for _ in 0..30 {
push.refresh_push_active_set(
&crds,
&HashMap::new(), // stakes
None, // gossip_validators
&keypair,
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
let active_set = push.active_set.read().unwrap();
if active_set.get(&value2.label().pubkey()).is_some() {
break;
}
}
{
let active_set = push.active_set.read().unwrap();
assert!(active_set.get(&value2.label().pubkey()).is_some());
}
for k in 0..push.num_active {
let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
value2.gossip.set_port(1246 + k as u16);
ping_cache
.lock()
.unwrap()
.mock_pong(value2.id, value2.gossip, Instant::now());
let value2 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(value2));
assert_eq!(
crds.write()
.unwrap()
.insert(value2.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
}
push.refresh_push_active_set(
&crds,
&HashMap::new(), // stakes
None, // gossip_validators
&keypair,
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
assert_eq!(push.active_set.read().unwrap().len(), push.num_active);
}
#[test]
fn test_active_set_refresh_with_bank() {
solana_logger::setup();
@ -657,10 +465,9 @@ mod tests {
crds.insert(peer.clone(), time, GossipRoute::LocalMessage)
.unwrap();
stakes.insert(id, i * 100);
push.last_pushed_to.write().unwrap().put(id, time);
}
let crds = RwLock::new(crds);
let mut options = push.push_options(
let options = push.push_options(
&crds,
&Pubkey::default(),
0,
@ -669,9 +476,6 @@ mod tests {
&SocketAddrSpace::Unspecified,
);
assert!(!options.is_empty());
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
// check that the highest stake holder is also the heaviest weighted.
assert_eq!(stakes[&options[0].1.id], 10_000_u64);
}
#[test]
@ -729,7 +533,7 @@ mod tests {
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|(_, node)| node.id)
.map(|node| node.id)
.collect::<Vec<_>>();
assert_eq!(options.len(), 1);
assert!(!options.contains(&spy.pubkey()));
@ -809,7 +613,7 @@ mod tests {
);
assert_eq!(options.len(), 1);
assert_eq!(options[0].1.id, node_123.pubkey());
assert_eq!(options[0].id, node_123.pubkey());
}
#[test]
@ -834,7 +638,6 @@ mod tests {
&Keypair::new(),
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
@ -850,8 +653,16 @@ mod tests {
push.process_push_message(&crds, vec![(Pubkey::default(), vec![new_msg])], 0),
[origin].into_iter().collect()
);
assert_eq!(push.active_set.read().unwrap().len(), 1);
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
assert_eq!(
push.new_push_messages(
&Pubkey::default(),
&crds,
0,
&HashMap::<Pubkey, u64>::default(), // stakes
)
.0,
expected
);
}
#[test]
fn test_personalized_push_messages() {
@ -895,7 +706,6 @@ mod tests {
&Keypair::new(),
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(),
&SocketAddrSpace::Unspecified,
@ -908,8 +718,16 @@ mod tests {
]
.into_iter()
.collect();
assert_eq!(push.active_set.read().unwrap().len(), 3);
assert_eq!(push.new_push_messages(&crds, now).0, expected);
assert_eq!(
push.new_push_messages(
&Pubkey::default(),
&crds,
now,
&HashMap::<Pubkey, u64>::default(), // stakes
)
.0,
expected
);
}
#[test]
fn test_process_prune() {
@ -932,7 +750,6 @@ mod tests {
&Keypair::new(),
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
@ -951,8 +768,18 @@ mod tests {
&self_id,
&peer.label().pubkey(),
&[new_msg.label().pubkey()],
&HashMap::<Pubkey, u64>::default(), // stakes
);
assert_eq!(
push.new_push_messages(
&self_id,
&crds,
0,
&HashMap::<Pubkey, u64>::default(), // stakes
)
.0,
expected
);
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
}
#[test]
fn test_purge_old_pending_push_messages() {
@ -971,7 +798,6 @@ mod tests {
&Keypair::new(),
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
@ -986,7 +812,16 @@ mod tests {
push.process_push_message(&crds, vec![(Pubkey::default(), vec![new_msg])], 1),
[origin].into_iter().collect()
);
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
assert_eq!(
push.new_push_messages(
&Pubkey::default(),
&crds,
0,
&HashMap::<Pubkey, u64>::default(), // stakes
)
.0,
expected
);
}
#[test]

View File

@ -19,6 +19,7 @@ pub mod gossip_service;
#[macro_use]
pub mod legacy_contact_info;
pub mod ping_pong;
mod push_active_set;
mod received_cache;
pub mod weighted_shuffle;

View File

@ -0,0 +1,337 @@
use {
crate::weighted_shuffle::WeightedShuffle,
indexmap::IndexMap,
rand::Rng,
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_sdk::{native_token::LAMPORTS_PER_SOL, pubkey::Pubkey},
std::collections::HashMap,
};
const NUM_PUSH_ACTIVE_SET_ENTRIES: usize = 25;
// Each entry corresponds to a stake bucket for
// min stake of { this node, crds value owner }
// The entry represents set of gossip nodes to actively
// push to for crds values belonging to the bucket.
pub(crate) struct PushActiveSet([PushActiveSetEntry; NUM_PUSH_ACTIVE_SET_ENTRIES]);
// Keys are gossip nodes to push messages to.
// Values are which origins the node has pruned.
#[derive(Default)]
struct PushActiveSetEntry(IndexMap</*node:*/ Pubkey, /*origins:*/ AtomicBloom<Pubkey>>);
impl PushActiveSet {
#[cfg(debug_assertions)]
const MIN_NUM_BLOOM_ITEMS: usize = 512;
#[cfg(not(debug_assertions))]
const MIN_NUM_BLOOM_ITEMS: usize = crate::cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY;
pub(crate) fn get_nodes<'a>(
&'a self,
pubkey: &Pubkey, // This node.
origin: &'a Pubkey, // CRDS value owner.
// If true forces gossip push even if the node has pruned the origin.
should_force_push: impl FnMut(&Pubkey) -> bool + 'a,
stakes: &HashMap<Pubkey, u64>,
) -> impl Iterator<Item = &Pubkey> + 'a {
let stake = stakes.get(pubkey).min(stakes.get(origin));
self.get_entry(stake).get_nodes(origin, should_force_push)
}
// Prunes origins for the given gossip node.
// We will stop pushing messages from the specified origins to the node.
pub(crate) fn prune(
&self,
pubkey: &Pubkey, // This node.
node: &Pubkey, // Gossip node.
origins: &[Pubkey], // CRDS value owners.
stakes: &HashMap<Pubkey, u64>,
) {
let stake = stakes.get(pubkey);
for origin in origins {
if origin == pubkey {
continue;
}
let stake = stake.min(stakes.get(origin));
self.get_entry(stake).prune(node, origin)
}
}
pub(crate) fn rotate<R: Rng>(
&mut self,
rng: &mut R,
size: usize, // Number of nodes to retain in each active-set entry.
cluster_size: usize,
// Gossip nodes to be sampled for each push active set.
nodes: &[Pubkey],
stakes: &HashMap<Pubkey, u64>,
) {
let num_bloom_filter_items = cluster_size.max(Self::MIN_NUM_BLOOM_ITEMS);
// Active set of nodes to push to are sampled from these gossip nodes,
// using sampling probabilities obtained from the stake bucket of each
// node.
let buckets: Vec<_> = nodes
.iter()
.map(|node| get_stake_bucket(stakes.get(node)))
.collect();
// (k, entry) represents push active set where the stake bucket of
// min stake of {this node, crds value owner}
// is equal to `k`. The `entry` maintains set of gossip nodes to
// actively push to for crds values belonging to this bucket.
for (k, entry) in self.0.iter_mut().enumerate() {
let weights: Vec<u64> = buckets
.iter()
.map(|&bucket| {
// bucket <- get_stake_bucket(min stake of {
// this node, crds value owner and gossip peer
// })
// weight <- (bucket + 1)^2
// min stake of {...} is a proxy for how much we care about
// the link, and tries to mirror similar logic on the
// receiving end when pruning incoming links:
// https://github.com/solana-labs/solana/blob/81394cf92/gossip/src/received_cache.rs#L100-L105
let bucket = bucket.min(k) as u64;
bucket.saturating_add(1).saturating_pow(2)
})
.collect();
entry.rotate(rng, size, num_bloom_filter_items, nodes, &weights);
}
}
fn get_entry(&self, stake: Option<&u64>) -> &PushActiveSetEntry {
&self.0[get_stake_bucket(stake)]
}
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
Self(std::array::from_fn(|k| {
PushActiveSetEntry(
self.0[k]
.0
.iter()
.map(|(&node, filter)| (node, filter.mock_clone()))
.collect(),
)
}))
}
}
impl PushActiveSetEntry {
const BLOOM_FALSE_RATE: f64 = 0.1;
const BLOOM_MAX_BITS: usize = 1024 * 8 * 4;
fn get_nodes<'a>(
&'a self,
origin: &'a Pubkey,
// If true forces gossip push even if the node has pruned the origin.
mut should_force_push: impl FnMut(&Pubkey) -> bool + 'a,
) -> impl Iterator<Item = &Pubkey> + 'a {
self.0
.iter()
.filter(move |(node, bloom_filter)| {
!bloom_filter.contains(origin) || should_force_push(node)
})
.map(|(node, _bloom_filter)| node)
}
fn prune(
&self,
node: &Pubkey, // Gossip node.
origin: &Pubkey, // CRDS value owner
) {
if let Some(bloom_filter) = self.0.get(node) {
bloom_filter.add(origin);
}
}
fn rotate<R: Rng>(
&mut self,
rng: &mut R,
size: usize, // Number of nodes to retain.
num_bloom_filter_items: usize,
nodes: &[Pubkey],
weights: &[u64],
) {
debug_assert_eq!(nodes.len(), weights.len());
debug_assert!(weights.iter().all(|&weight| weight != 0u64));
let shuffle = WeightedShuffle::new("rotate-active-set", weights).shuffle(rng);
for node in shuffle.map(|k| &nodes[k]) {
// We intend to discard the oldest/first entry in the index-map.
if self.0.len() > size {
break;
}
if self.0.contains_key(node) {
continue;
}
let bloom = AtomicBloom::from(Bloom::random(
num_bloom_filter_items,
Self::BLOOM_FALSE_RATE,
Self::BLOOM_MAX_BITS,
));
bloom.add(node);
self.0.insert(*node, bloom);
}
// Drop the oldest entry while preserving the ordering of others.
while self.0.len() > size {
self.0.shift_remove_index(0);
}
}
}
impl Default for PushActiveSet {
fn default() -> Self {
Self(std::array::from_fn(|_| PushActiveSetEntry::default()))
}
}
// Maps stake to bucket index.
fn get_stake_bucket(stake: Option<&u64>) -> usize {
let stake = stake.copied().unwrap_or_default() / LAMPORTS_PER_SOL;
let bucket = u64::BITS - stake.leading_zeros();
(bucket as usize).min(NUM_PUSH_ACTIVE_SET_ENTRIES - 1)
}
#[cfg(test)]
mod tests {
use {super::*, rand::SeedableRng, rand_chacha::ChaChaRng, std::iter::repeat_with};
#[test]
fn test_get_stake_bucket() {
assert_eq!(get_stake_bucket(None), 0);
let buckets = [0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5];
for (k, bucket) in buckets.into_iter().enumerate() {
let stake = (k as u64) * LAMPORTS_PER_SOL;
assert_eq!(get_stake_bucket(Some(&stake)), bucket);
}
for (stake, bucket) in [
(4_194_303, 22),
(4_194_304, 23),
(8_388_607, 23),
(8_388_608, 24),
] {
let stake = stake * LAMPORTS_PER_SOL;
assert_eq!(get_stake_bucket(Some(&stake)), bucket);
}
assert_eq!(
get_stake_bucket(Some(&u64::MAX)),
NUM_PUSH_ACTIVE_SET_ENTRIES - 1
);
}
#[test]
fn test_push_active_set() {
const CLUSTER_SIZE: usize = 117;
const MAX_STAKE: u64 = (1 << 20) * LAMPORTS_PER_SOL;
let mut rng = ChaChaRng::from_seed([189u8; 32]);
let pubkey = Pubkey::new_unique();
let nodes: Vec<_> = repeat_with(Pubkey::new_unique).take(20).collect();
let stakes = repeat_with(|| rng.gen_range(1, MAX_STAKE));
let mut stakes: HashMap<_, _> = nodes.iter().copied().zip(stakes).collect();
stakes.insert(pubkey, rng.gen_range(1, MAX_STAKE));
let mut active_set = PushActiveSet::default();
assert!(active_set.0.iter().all(|entry| entry.0.is_empty()));
active_set.rotate(&mut rng, 5, CLUSTER_SIZE, &nodes, &stakes);
assert!(active_set.0.iter().all(|entry| entry.0.len() == 5));
// Assert that for all entries, each filter already prunes the key.
for entry in &active_set.0 {
for (node, filter) in entry.0.iter() {
assert!(filter.contains(node));
}
}
let other = &nodes[5];
let origin = &nodes[17];
assert!(active_set
.get_nodes(&pubkey, origin, |_| false, &stakes)
.eq([13, 5, 18, 16, 0].into_iter().map(|k| &nodes[k])));
assert!(active_set
.get_nodes(&pubkey, other, |_| false, &stakes)
.eq([13, 18, 16, 0].into_iter().map(|k| &nodes[k])));
active_set.prune(&pubkey, &nodes[5], &[*origin], &stakes);
active_set.prune(&pubkey, &nodes[3], &[*origin], &stakes);
active_set.prune(&pubkey, &nodes[16], &[*origin], &stakes);
assert!(active_set
.get_nodes(&pubkey, origin, |_| false, &stakes)
.eq([13, 18, 0].into_iter().map(|k| &nodes[k])));
assert!(active_set
.get_nodes(&pubkey, other, |_| false, &stakes)
.eq([13, 18, 16, 0].into_iter().map(|k| &nodes[k])));
active_set.rotate(&mut rng, 7, CLUSTER_SIZE, &nodes, &stakes);
assert!(active_set.0.iter().all(|entry| entry.0.len() == 7));
assert!(active_set
.get_nodes(&pubkey, origin, |_| false, &stakes)
.eq([18, 0, 7, 15, 11].into_iter().map(|k| &nodes[k])));
assert!(active_set
.get_nodes(&pubkey, other, |_| false, &stakes)
.eq([18, 16, 0, 7, 15, 11].into_iter().map(|k| &nodes[k])));
let origins = [*origin, *other];
active_set.prune(&pubkey, &nodes[18], &origins, &stakes);
active_set.prune(&pubkey, &nodes[0], &origins, &stakes);
active_set.prune(&pubkey, &nodes[15], &origins, &stakes);
assert!(active_set
.get_nodes(&pubkey, origin, |_| false, &stakes)
.eq([7, 11].into_iter().map(|k| &nodes[k])));
assert!(active_set
.get_nodes(&pubkey, other, |_| false, &stakes)
.eq([16, 7, 11].into_iter().map(|k| &nodes[k])));
}
#[test]
fn test_push_active_set_entry() {
const NUM_BLOOM_FILTER_ITEMS: usize = 100;
let mut rng = ChaChaRng::from_seed([147u8; 32]);
let nodes: Vec<_> = repeat_with(Pubkey::new_unique).take(20).collect();
let weights: Vec<_> = repeat_with(|| rng.gen_range(1, 1000)).take(20).collect();
let mut entry = PushActiveSetEntry::default();
entry.rotate(
&mut rng,
5, // size
NUM_BLOOM_FILTER_ITEMS,
&nodes,
&weights,
);
assert_eq!(entry.0.len(), 5);
let keys = [&nodes[16], &nodes[11], &nodes[17], &nodes[14], &nodes[5]];
assert!(entry.0.keys().eq(keys));
for origin in &nodes {
if !keys.contains(&origin) {
assert!(entry.get_nodes(origin, |_| false).eq(keys));
} else {
assert!(entry.get_nodes(origin, |_| true).eq(keys));
assert!(entry
.get_nodes(origin, |_| false)
.eq(keys.into_iter().filter(|&key| key != origin)));
}
}
// Assert that each filter already prunes the key.
for (node, filter) in entry.0.iter() {
assert!(filter.contains(node));
}
for origin in keys {
assert!(entry.get_nodes(origin, |_| true).eq(keys));
assert!(entry
.get_nodes(origin, |_| false)
.eq(keys.into_iter().filter(|&node| node != origin)));
}
// Assert that prune excludes node from get.
let origin = &nodes[3];
entry.prune(&nodes[11], origin);
entry.prune(&nodes[14], origin);
entry.prune(&nodes[19], origin);
assert!(entry.get_nodes(origin, |_| true).eq(keys));
assert!(entry.get_nodes(origin, |_| false).eq(keys
.into_iter()
.filter(|&&node| node != nodes[11] && node != nodes[14])));
// Assert that rotate adds new nodes.
entry.rotate(&mut rng, 5, NUM_BLOOM_FILTER_ITEMS, &nodes, &weights);
let keys = [&nodes[11], &nodes[17], &nodes[14], &nodes[5], &nodes[7]];
assert!(entry.0.keys().eq(keys));
entry.rotate(&mut rng, 6, NUM_BLOOM_FILTER_ITEMS, &nodes, &weights);
let keys = [
&nodes[17], &nodes[14], &nodes[5], &nodes[7], &nodes[1], &nodes[13],
];
assert!(entry.0.keys().eq(keys));
entry.rotate(&mut rng, 4, NUM_BLOOM_FILTER_ITEMS, &nodes, &weights);
let keys = [&nodes[5], &nodes[7], &nodes[1], &nodes[13]];
assert!(entry.0.keys().eq(keys));
}
}

View File

@ -349,7 +349,12 @@ fn network_run_push(
Duration::from_millis(node.gossip.pull.crds_timeout),
);
node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts);
(node_pubkey, node.gossip.new_push_messages(vec![], now).0)
(
node_pubkey,
node.gossip
.new_push_messages(&node_pubkey, vec![], now, &stakes)
.0,
)
})
.collect();
let transfered: Vec<_> = requests
@ -401,6 +406,7 @@ fn network_run_push(
&prune_keys,
now,
now,
&stakes,
)
.unwrap()
})
@ -750,6 +756,7 @@ fn test_prune_errors() {
&SocketAddrSpace::Unspecified,
);
let now = timestamp();
let stakes = HashMap::<Pubkey, u64>::default();
//incorrect dest
let mut res = crds_gossip.process_prune_msg(
&id, // self_pubkey
@ -758,6 +765,7 @@ fn test_prune_errors() {
&[prune_pubkey], // origins
now,
now,
&stakes,
);
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
//correct dest
@ -768,6 +776,7 @@ fn test_prune_errors() {
&[prune_pubkey], // origins
now,
now,
&stakes,
);
res.unwrap();
//test timeout
@ -779,6 +788,7 @@ fn test_prune_errors() {
&[prune_pubkey], // origins
now,
timeout,
&stakes,
);
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
}