reworks weights for gossip pull-requests peer sampling (#28463)

Amplifying gossip peer sampling weights by the time since last
pull-request has undesired consequence that a node coming back online
will see a huge number of pull requests all at once.
This "time since last request" is also unnecessary to include in
weights because as long as sampling probabilities are non-zero, a node
will be almost surely periodically selected in the sample.
The commit reworks peer sampling probabilities by just using (dampened)
stakes as weights.
This commit is contained in:
behzad nouri 2023-01-14 15:44:38 +00:00 committed by GitHub
parent 71713a92c1
commit d4ce59eee7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 90 additions and 202 deletions

View File

@ -1487,12 +1487,6 @@ impl ClusterInfo {
self.append_entrypoint_to_pulls(thread_pool, &mut pulls);
let num_requests = pulls.values().map(Vec::len).sum::<usize>() as u64;
self.stats.new_pull_requests_count.add_relaxed(num_requests);
{
let _st = ScopedTimer::from(&self.stats.mark_pull_request);
for peer in pulls.keys() {
self.gossip.mark_pull_request_creation_time(peer.id, now);
}
}
let self_info = CrdsData::LegacyContactInfo(self.my_contact_info());
let self_info = CrdsValue::new_signed(self_info, &self.keypair());
let pulls = pulls
@ -4705,17 +4699,6 @@ RPC Enabled Nodes: 1"#;
(0, 0, NO_ENTRIES),
cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts)
);
let now = timestamp();
for peer in peers {
cluster_info
.gossip
.mark_pull_request_creation_time(peer, now);
}
assert_eq!(
cluster_info.gossip.pull.pull_request_time().len(),
CRDS_UNIQUE_PUBKEY_CAPACITY
);
}
#[test]

View File

@ -123,7 +123,6 @@ pub struct GossipStats {
pub(crate) handle_batch_pull_requests_time: Counter,
pub(crate) handle_batch_pull_responses_time: Counter,
pub(crate) handle_batch_push_messages_time: Counter,
pub(crate) mark_pull_request: Counter,
pub(crate) new_pull_requests: Counter,
pub(crate) new_pull_requests_count: Counter,
pub(crate) new_pull_requests_pings_count: Counter,
@ -373,7 +372,6 @@ pub(crate) fn submit_gossip_stats(
),
("epoch_slots_lookup", stats.epoch_slots_lookup.clear(), i64),
("new_pull_requests", stats.new_pull_requests.clear(), i64),
("mark_pull_request", stats.mark_pull_request.clear(), i64),
(
"gossip_pull_request_no_budget",
stats.gossip_pull_request_no_budget.clear(),

View File

@ -18,6 +18,7 @@ use {
ping_pong::PingCache,
},
itertools::Itertools,
rand::{CryptoRng, Rng},
rayon::ThreadPool,
solana_ledger::shred::Shred,
solana_sdk::{
@ -31,7 +32,7 @@ use {
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{Mutex, RwLock},
time::Duration,
time::{Duration, Instant},
},
};
@ -227,14 +228,6 @@ impl CrdsGossip {
)
}
/// Time when a request to `from` was initiated.
///
/// This is used for weighted random selection during `new_pull_request`
/// It's important to use the local nodes request creation time as the weight
/// instead of the response received time otherwise failed nodes will increase their weight.
pub fn mark_pull_request_creation_time(&self, from: Pubkey, now: u64) {
self.pull.mark_pull_request_creation_time(from, now)
}
/// Process a pull request and create a response.
pub fn process_pull_requests<I>(&self, callers: I, now: u64)
where
@ -339,40 +332,51 @@ impl CrdsGossip {
}
}
/// Computes a normalized (log of actual stake) stake.
pub fn get_stake<S: std::hash::BuildHasher>(id: &Pubkey, stakes: &HashMap<Pubkey, u64, S>) -> f32 {
// cap the max balance to u32 max (it should be plenty)
let bal = f64::from(u32::max_value()).min(*stakes.get(id).unwrap_or(&0) as f64);
1_f32.max((bal as f32).ln())
}
/// Computes bounded weight given some max, a time since last selected, and a stake value.
///
/// The minimum stake is 1 and not 0 to allow 'time since last' picked to factor in.
pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) -> f32 {
let mut weight = time_since_last_selected as f32 * stake;
if weight.is_infinite() {
weight = max_weight;
}
1.0_f32.max(weight.min(max_weight))
}
// Dedups gossip addresses, keeping only the one with the highest weight.
pub(crate) fn dedup_gossip_addresses<I, T: PartialOrd>(
nodes: I,
) -> HashMap</*gossip:*/ SocketAddr, (/*weight:*/ T, ContactInfo)>
where
I: IntoIterator<Item = (/*weight:*/ T, ContactInfo)>,
{
// Dedups gossip addresses, keeping only the one with the highest stake.
pub(crate) fn dedup_gossip_addresses(
nodes: impl IntoIterator<Item = ContactInfo>,
stakes: &HashMap<Pubkey, u64>,
) -> HashMap</*gossip:*/ SocketAddr, (/*stake:*/ u64, ContactInfo)> {
nodes
.into_iter()
.into_grouping_map_by(|(_weight, node)| node.gossip)
.aggregate(|acc, _node_gossip, (weight, node)| match acc {
Some((ref w, _)) if w >= &weight => acc,
Some(_) | None => Some((weight, node)),
.into_grouping_map_by(|node| node.gossip)
.aggregate(|acc, _node_gossip, node| {
let stake = stakes.get(&node.id).copied().unwrap_or_default();
match acc {
Some((ref s, _)) if s >= &stake => acc,
Some(_) | None => Some((stake, node)),
}
})
}
// Pings gossip addresses if needed.
// Returns nodes which have recently responded to a ping message.
#[must_use]
pub(crate) fn maybe_ping_gossip_addresses<R: Rng + CryptoRng>(
rng: &mut R,
nodes: impl IntoIterator<Item = ContactInfo>,
keypair: &Keypair,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
) -> Vec<ContactInfo> {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(rng, keypair).ok();
let now = Instant::now();
nodes
.into_iter()
.filter(|node| {
let (check, ping) = {
let node = (node.id, node.gossip);
ping_cache.check(now, node, &mut pingf)
};
if let Some(ping) = ping {
pings.push((node.gossip, ping));
}
check
})
.collect()
}
#[cfg(test)]
mod test {
use {

View File

@ -13,17 +13,16 @@
use {
crate::{
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
cluster_info::Ping,
cluster_info_metrics::GossipStats,
crds::{Crds, GossipRoute, VersionedCrdsValue},
crds_gossip::{self, get_stake, get_weight},
crds_gossip,
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
legacy_contact_info::LegacyContactInfo as ContactInfo,
ping_pong::PingCache,
},
itertools::Itertools,
lru::LruCache,
rand::{
distributions::{Distribution, WeightedIndex},
Rng,
@ -32,6 +31,7 @@ use {
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_sdk::{
hash::{hash, Hash},
native_token::LAMPORTS_PER_SOL,
pubkey::Pubkey,
signature::{Keypair, Signer},
},
@ -45,7 +45,7 @@ use {
atomic::{AtomicI64, AtomicUsize, Ordering},
Mutex, RwLock,
},
time::{Duration, Instant},
time::Duration,
},
};
@ -192,8 +192,6 @@ pub struct ProcessPullStats {
}
pub struct CrdsGossipPull {
/// Timestamp of last request
pull_request_time: RwLock<LruCache<Pubkey, /*timestamp:*/ u64>>,
// Hash value and record time (ms) of the pull responses which failed to be
// inserted in crds table; Preserved to stop the sender to send back the
// same outdated payload again by adding them to the filter for the next
@ -207,7 +205,6 @@ pub struct CrdsGossipPull {
impl Default for CrdsGossipPull {
fn default() -> Self {
Self {
pull_request_time: RwLock::new(LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY)),
failed_inserts: RwLock::default(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
@ -232,8 +229,8 @@ impl CrdsGossipPull {
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) -> Result<HashMap<ContactInfo, Vec<CrdsFilter>>, CrdsGossipError> {
// Gossip peers and respective sampling weights.
let peers = self.pull_options(
// Active and valid gossip nodes with matching shred-version.
let nodes = self.pull_options(
crds,
&self_keypair.pubkey(),
self_shred_version,
@ -242,35 +239,37 @@ impl CrdsGossipPull {
stakes,
socket_addr_space,
);
// Check for nodes which have responded to ping messages.
let mut rng = rand::thread_rng();
let peers: Vec<_> = {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now();
peers
.into_iter()
.filter(|(_weight, peer)| {
let node = (peer.id, peer.gossip);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
pings.push((peer.gossip, ping));
}
check
// Check for nodes which have responded to ping messages.
let nodes = crds_gossip::maybe_ping_gossip_addresses(
&mut rng,
nodes,
self_keypair,
ping_cache,
pings,
);
let stake_cap = stakes
.get(&self_keypair.pubkey())
.copied()
.unwrap_or_default();
let (weights, nodes): (Vec<u64>, Vec<ContactInfo>) =
crds_gossip::dedup_gossip_addresses(nodes, stakes)
.into_values()
.map(|(stake, node)| {
let stake = stake.min(stake_cap) / LAMPORTS_PER_SOL;
let weight = u64::BITS - stake.leading_zeros();
let weight = u64::from(weight).saturating_add(1).saturating_pow(2);
(weight, node)
})
.collect()
};
let (weights, peers): (Vec<_>, Vec<_>) = crds_gossip::dedup_gossip_addresses(peers)
.into_values()
.unzip();
if peers.is_empty() {
.unzip();
if nodes.is_empty() {
return Err(CrdsGossipError::NoPeers);
}
// Associate each pull-request filter with a randomly selected peer.
let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
let dist = WeightedIndex::new(&weights).unwrap();
let peers = repeat_with(|| peers[dist.sample(&mut rng)].clone());
Ok(peers.zip(filters).into_group_map())
let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone());
Ok(nodes.zip(filters).into_group_map())
}
fn pull_options(
@ -282,11 +281,9 @@ impl CrdsGossipPull {
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<(/*weight:*/ u64, ContactInfo)> {
) -> Vec<ContactInfo> {
let mut rng = rand::thread_rng();
let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS);
let pull_request_time = self.pull_request_time.read().unwrap();
// crds should be locked last after self.pull_request_time.
let crds = crds.read().unwrap();
crds.get_nodes()
.filter_map(|value| {
@ -309,31 +306,10 @@ impl CrdsGossipPull {
&& gossip_validators
.map_or(true, |gossip_validators| gossip_validators.contains(&v.id))
})
.map(|item| {
let max_weight = f32::from(u16::max_value()) - 1.0;
let req_time: u64 = pull_request_time
.peek(&item.id)
.copied()
.unwrap_or_default();
let since = (now.saturating_sub(req_time).min(3600 * 1000) / 1024) as u32;
let stake = get_stake(&item.id, stakes);
let weight = get_weight(max_weight, since, stake);
// Weights are bounded by max_weight defined above.
// So this type-cast should be safe.
((weight * 100.0) as u64, item.clone())
})
.cloned()
.collect()
}
/// Time when a request to `from` was initiated.
///
/// This is used for weighted random selection during `new_pull_request`
/// It's important to use the local nodes request creation time as the weight
/// instead of the response received time otherwise failed nodes will increase their weight.
pub(crate) fn mark_pull_request_creation_time(&self, from: Pubkey, now: u64) {
self.pull_request_time.write().unwrap().put(from, now);
}
/// Process a pull request
pub(crate) fn process_pull_requests<I>(crds: &RwLock<Crds>, callers: I, now: u64)
where
@ -633,27 +609,13 @@ impl CrdsGossipPull {
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let pull_request_time = {
let pull_request_time = self.pull_request_time.read().unwrap();
let mut clone = LruCache::new(pull_request_time.cap());
for (k, v) in pull_request_time.iter().rev() {
clone.put(*k, *v);
}
clone
};
let failed_inserts = self.failed_inserts.read().unwrap().clone();
Self {
pull_request_time: RwLock::new(pull_request_time),
failed_inserts: RwLock::new(failed_inserts),
num_pulls: AtomicUsize::new(self.num_pulls.load(Ordering::Relaxed)),
..*self
}
}
#[cfg(test)]
pub(crate) fn pull_request_time(&self) -> std::sync::RwLockReadGuard<LruCache<Pubkey, u64>> {
self.pull_request_time.read().unwrap()
}
}
#[cfg(test)]
@ -673,8 +635,8 @@ pub(crate) mod tests {
solana_sdk::{
hash::{hash, HASH_BYTES},
packet::PACKET_DATA_SIZE,
timing::timestamp,
},
std::time::Instant,
};
#[cfg(debug_assertions)]
@ -739,7 +701,7 @@ pub(crate) mod tests {
}
let now = 1024;
let crds = RwLock::new(crds);
let mut options = node.pull_options(
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
@ -749,9 +711,6 @@ pub(crate) mod tests {
&SocketAddrSpace::Unspecified,
);
assert!(!options.is_empty());
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
// check that the highest stake holder is also the heaviest weighted.
assert_eq!(stakes[&options[0].1.id], 3000_u64);
}
#[test]
@ -809,7 +768,7 @@ pub(crate) mod tests {
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|(_, peer)| peer.id)
.map(|peer| peer.id)
.collect::<Vec<_>>();
assert_eq!(options.len(), 1);
assert!(!options.contains(&spy.pubkey()));
@ -827,7 +786,7 @@ pub(crate) mod tests {
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|(_, peer)| peer.id)
.map(|peer| peer.id)
.collect::<Vec<_>>();
assert_eq!(options.len(), 3);
assert!(options.contains(&me.pubkey()));
@ -897,7 +856,7 @@ pub(crate) mod tests {
&SocketAddrSpace::Unspecified,
);
assert_eq!(options.len(), 1);
assert_eq!(options[0].1.id, node_123.pubkey());
assert_eq!(options[0].id, node_123.pubkey());
}
#[test]
@ -1079,7 +1038,6 @@ pub(crate) mod tests {
let peers: Vec<_> = req.unwrap().into_keys().collect();
assert_eq!(peers, vec![new.contact_info().unwrap().clone()]);
node.mark_pull_request_creation_time(new.contact_info().unwrap().id, now);
let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now);
let offline = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(offline));
crds.write()
@ -1129,13 +1087,11 @@ pub(crate) mod tests {
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new));
crds.insert(new.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(new, now, GossipRoute::LocalMessage).unwrap();
let crds = RwLock::new(crds);
// set request creation time to now.
let now = now + 50_000;
node.mark_pull_request_creation_time(new.label().pubkey(), now);
// odds of getting the other request should be close to 1.
let now = now + 1_000;
@ -1164,43 +1120,7 @@ pub(crate) mod tests {
.take(100)
.filter(|peer| peer != old)
.count();
assert!(count < 2, "count of peer != old: {count}");
}
#[test]
fn test_pull_request_time() {
const NUM_REPS: usize = 2 * CRDS_UNIQUE_PUBKEY_CAPACITY;
let mut rng = rand::thread_rng();
let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(NUM_REPS).collect();
let node = CrdsGossipPull::default();
let mut requests = HashMap::new();
let now = timestamp();
for k in 0..NUM_REPS {
let pubkey = pubkeys[rng.gen_range(0, pubkeys.len())];
let now = now + k as u64;
node.mark_pull_request_creation_time(pubkey, now);
*requests.entry(pubkey).or_default() = now;
}
let pull_request_time = node.pull_request_time.read().unwrap();
assert!(pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY);
// Assert that timestamps match most recent request.
for (pk, ts) in pull_request_time.iter() {
assert_eq!(*ts, requests[pk]);
}
// Assert that most recent pull timestamps are maintained.
let max_ts = requests
.iter()
.filter(|(pk, _)| !pull_request_time.contains(*pk))
.map(|(_, ts)| *ts)
.max()
.unwrap();
let min_ts = requests
.iter()
.filter(|(pk, _)| pull_request_time.contains(*pk))
.map(|(_, ts)| *ts)
.min()
.unwrap();
assert!(max_ts <= min_ts);
assert!(count < 75, "count of peer != old: {}", count);
}
#[test]

View File

@ -41,7 +41,6 @@ use {
atomic::{AtomicUsize, Ordering},
Mutex, RwLock,
},
time::Instant,
},
};
@ -259,29 +258,14 @@ impl CrdsGossipPush {
socket_addr_space,
);
// Check for nodes which have responded to ping messages.
let nodes: Vec<_> = {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now();
nodes
.into_iter()
.filter(|node| {
let (check, ping) = {
let node = (node.id, node.gossip);
ping_cache.check(now, node, &mut pingf)
};
if let Some(ping) = ping {
pings.push((node.gossip, ping));
}
check
})
.collect()
};
let nodes = nodes.into_iter().map(|node| {
let stake = stakes.get(&node.id).copied().unwrap_or_default();
(stake, node)
});
let nodes = crds_gossip::dedup_gossip_addresses(nodes)
let nodes = crds_gossip::maybe_ping_gossip_addresses(
&mut rng,
nodes,
self_keypair,
ping_cache,
pings,
);
let nodes = crds_gossip::dedup_gossip_addresses(nodes, stakes)
.into_values()
.map(|(_stake, node)| node.id)
.collect::<Vec<_>>();
@ -353,7 +337,7 @@ mod tests {
use {
super::*,
crate::{crds_value::CrdsData, socketaddr},
std::time::Duration,
std::time::{Duration, Instant},
};
fn new_ping_cache() -> PingCache {

View File

@ -565,7 +565,6 @@ fn network_run_pull(
bytes += serialized_size(&rsp).unwrap() as usize;
msgs += rsp.len();
if let Some(node) = network.get(&from) {
node.gossip.mark_pull_request_creation_time(from, now);
let mut stats = ProcessPullStats::default();
let (vers, vers_expired_timeout, failed_inserts) = node
.gossip