From d4ce59eee7e2537f02c827b6918de9af559b06bd Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sat, 14 Jan 2023 15:44:38 +0000 Subject: [PATCH] reworks weights for gossip pull-requests peer sampling (#28463) Amplifying gossip peer sampling weights by the time since last pull-request has undesired consequence that a node coming back online will see a huge number of pull requests all at once. This "time since last request" is also unnecessary to include in weights because as long as sampling probabilities are non-zero, a node will be almost surely periodically selected in the sample. The commit reworks peer sampling probabilities by just using (dampened) stakes as weights. --- gossip/src/cluster_info.rs | 17 ---- gossip/src/cluster_info_metrics.rs | 2 - gossip/src/crds_gossip.rs | 80 ++++++++------- gossip/src/crds_gossip_pull.rs | 158 +++++++---------------------- gossip/src/crds_gossip_push.rs | 34 ++----- gossip/tests/crds_gossip.rs | 1 - 6 files changed, 90 insertions(+), 202 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 7f40ae678..8f926eba3 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1487,12 +1487,6 @@ impl ClusterInfo { self.append_entrypoint_to_pulls(thread_pool, &mut pulls); let num_requests = pulls.values().map(Vec::len).sum::() as u64; self.stats.new_pull_requests_count.add_relaxed(num_requests); - { - let _st = ScopedTimer::from(&self.stats.mark_pull_request); - for peer in pulls.keys() { - self.gossip.mark_pull_request_creation_time(peer.id, now); - } - } let self_info = CrdsData::LegacyContactInfo(self.my_contact_info()); let self_info = CrdsValue::new_signed(self_info, &self.keypair()); let pulls = pulls @@ -4705,17 +4699,6 @@ RPC Enabled Nodes: 1"#; (0, 0, NO_ENTRIES), cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts) ); - - let now = timestamp(); - for peer in peers { - cluster_info - .gossip - .mark_pull_request_creation_time(peer, now); - } - assert_eq!( - cluster_info.gossip.pull.pull_request_time().len(), - CRDS_UNIQUE_PUBKEY_CAPACITY - ); } #[test] diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 44f025776..1a94793a4 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -123,7 +123,6 @@ pub struct GossipStats { pub(crate) handle_batch_pull_requests_time: Counter, pub(crate) handle_batch_pull_responses_time: Counter, pub(crate) handle_batch_push_messages_time: Counter, - pub(crate) mark_pull_request: Counter, pub(crate) new_pull_requests: Counter, pub(crate) new_pull_requests_count: Counter, pub(crate) new_pull_requests_pings_count: Counter, @@ -373,7 +372,6 @@ pub(crate) fn submit_gossip_stats( ), ("epoch_slots_lookup", stats.epoch_slots_lookup.clear(), i64), ("new_pull_requests", stats.new_pull_requests.clear(), i64), - ("mark_pull_request", stats.mark_pull_request.clear(), i64), ( "gossip_pull_request_no_budget", stats.gossip_pull_request_no_budget.clear(), diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 77950434b..18f74b21f 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -18,6 +18,7 @@ use { ping_pong::PingCache, }, itertools::Itertools, + rand::{CryptoRng, Rng}, rayon::ThreadPool, solana_ledger::shred::Shred, solana_sdk::{ @@ -31,7 +32,7 @@ use { collections::{HashMap, HashSet}, net::SocketAddr, sync::{Mutex, RwLock}, - time::Duration, + time::{Duration, Instant}, }, }; @@ -227,14 +228,6 @@ impl CrdsGossip { ) } - /// Time when a request to `from` was initiated. - /// - /// This is used for weighted random selection during `new_pull_request` - /// It's important to use the local nodes request creation time as the weight - /// instead of the response received time otherwise failed nodes will increase their weight. - pub fn mark_pull_request_creation_time(&self, from: Pubkey, now: u64) { - self.pull.mark_pull_request_creation_time(from, now) - } /// Process a pull request and create a response. pub fn process_pull_requests(&self, callers: I, now: u64) where @@ -339,40 +332,51 @@ impl CrdsGossip { } } -/// Computes a normalized (log of actual stake) stake. -pub fn get_stake(id: &Pubkey, stakes: &HashMap) -> f32 { - // cap the max balance to u32 max (it should be plenty) - let bal = f64::from(u32::max_value()).min(*stakes.get(id).unwrap_or(&0) as f64); - 1_f32.max((bal as f32).ln()) -} - -/// Computes bounded weight given some max, a time since last selected, and a stake value. -/// -/// The minimum stake is 1 and not 0 to allow 'time since last' picked to factor in. -pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) -> f32 { - let mut weight = time_since_last_selected as f32 * stake; - if weight.is_infinite() { - weight = max_weight; - } - 1.0_f32.max(weight.min(max_weight)) -} - -// Dedups gossip addresses, keeping only the one with the highest weight. -pub(crate) fn dedup_gossip_addresses( - nodes: I, -) -> HashMap -where - I: IntoIterator, -{ +// Dedups gossip addresses, keeping only the one with the highest stake. +pub(crate) fn dedup_gossip_addresses( + nodes: impl IntoIterator, + stakes: &HashMap, +) -> HashMap { nodes .into_iter() - .into_grouping_map_by(|(_weight, node)| node.gossip) - .aggregate(|acc, _node_gossip, (weight, node)| match acc { - Some((ref w, _)) if w >= &weight => acc, - Some(_) | None => Some((weight, node)), + .into_grouping_map_by(|node| node.gossip) + .aggregate(|acc, _node_gossip, node| { + let stake = stakes.get(&node.id).copied().unwrap_or_default(); + match acc { + Some((ref s, _)) if s >= &stake => acc, + Some(_) | None => Some((stake, node)), + } }) } +// Pings gossip addresses if needed. +// Returns nodes which have recently responded to a ping message. +#[must_use] +pub(crate) fn maybe_ping_gossip_addresses( + rng: &mut R, + nodes: impl IntoIterator, + keypair: &Keypair, + ping_cache: &Mutex, + pings: &mut Vec<(SocketAddr, Ping)>, +) -> Vec { + let mut ping_cache = ping_cache.lock().unwrap(); + let mut pingf = move || Ping::new_rand(rng, keypair).ok(); + let now = Instant::now(); + nodes + .into_iter() + .filter(|node| { + let (check, ping) = { + let node = (node.id, node.gossip); + ping_cache.check(now, node, &mut pingf) + }; + if let Some(ping) = ping { + pings.push((node.gossip, ping)); + } + check + }) + .collect() +} + #[cfg(test)] mod test { use { diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index f94f49d85..7b2b49ea3 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -13,17 +13,16 @@ use { crate::{ - cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, + cluster_info::Ping, cluster_info_metrics::GossipStats, crds::{Crds, GossipRoute, VersionedCrdsValue}, - crds_gossip::{self, get_stake, get_weight}, + crds_gossip, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, legacy_contact_info::LegacyContactInfo as ContactInfo, ping_pong::PingCache, }, itertools::Itertools, - lru::LruCache, rand::{ distributions::{Distribution, WeightedIndex}, Rng, @@ -32,6 +31,7 @@ use { solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{ hash::{hash, Hash}, + native_token::LAMPORTS_PER_SOL, pubkey::Pubkey, signature::{Keypair, Signer}, }, @@ -45,7 +45,7 @@ use { atomic::{AtomicI64, AtomicUsize, Ordering}, Mutex, RwLock, }, - time::{Duration, Instant}, + time::Duration, }, }; @@ -192,8 +192,6 @@ pub struct ProcessPullStats { } pub struct CrdsGossipPull { - /// Timestamp of last request - pull_request_time: RwLock>, // Hash value and record time (ms) of the pull responses which failed to be // inserted in crds table; Preserved to stop the sender to send back the // same outdated payload again by adding them to the filter for the next @@ -207,7 +205,6 @@ pub struct CrdsGossipPull { impl Default for CrdsGossipPull { fn default() -> Self { Self { - pull_request_time: RwLock::new(LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY)), failed_inserts: RwLock::default(), crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, @@ -232,8 +229,8 @@ impl CrdsGossipPull { pings: &mut Vec<(SocketAddr, Ping)>, socket_addr_space: &SocketAddrSpace, ) -> Result>, CrdsGossipError> { - // Gossip peers and respective sampling weights. - let peers = self.pull_options( + // Active and valid gossip nodes with matching shred-version. + let nodes = self.pull_options( crds, &self_keypair.pubkey(), self_shred_version, @@ -242,35 +239,37 @@ impl CrdsGossipPull { stakes, socket_addr_space, ); - // Check for nodes which have responded to ping messages. let mut rng = rand::thread_rng(); - let peers: Vec<_> = { - let mut ping_cache = ping_cache.lock().unwrap(); - let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok(); - let now = Instant::now(); - peers - .into_iter() - .filter(|(_weight, peer)| { - let node = (peer.id, peer.gossip); - let (check, ping) = ping_cache.check(now, node, &mut pingf); - if let Some(ping) = ping { - pings.push((peer.gossip, ping)); - } - check + // Check for nodes which have responded to ping messages. + let nodes = crds_gossip::maybe_ping_gossip_addresses( + &mut rng, + nodes, + self_keypair, + ping_cache, + pings, + ); + let stake_cap = stakes + .get(&self_keypair.pubkey()) + .copied() + .unwrap_or_default(); + let (weights, nodes): (Vec, Vec) = + crds_gossip::dedup_gossip_addresses(nodes, stakes) + .into_values() + .map(|(stake, node)| { + let stake = stake.min(stake_cap) / LAMPORTS_PER_SOL; + let weight = u64::BITS - stake.leading_zeros(); + let weight = u64::from(weight).saturating_add(1).saturating_pow(2); + (weight, node) }) - .collect() - }; - let (weights, peers): (Vec<_>, Vec<_>) = crds_gossip::dedup_gossip_addresses(peers) - .into_values() - .unzip(); - if peers.is_empty() { + .unzip(); + if nodes.is_empty() { return Err(CrdsGossipError::NoPeers); } // Associate each pull-request filter with a randomly selected peer. let filters = self.build_crds_filters(thread_pool, crds, bloom_size); let dist = WeightedIndex::new(&weights).unwrap(); - let peers = repeat_with(|| peers[dist.sample(&mut rng)].clone()); - Ok(peers.zip(filters).into_group_map()) + let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone()); + Ok(nodes.zip(filters).into_group_map()) } fn pull_options( @@ -282,11 +281,9 @@ impl CrdsGossipPull { gossip_validators: Option<&HashSet>, stakes: &HashMap, socket_addr_space: &SocketAddrSpace, - ) -> Vec<(/*weight:*/ u64, ContactInfo)> { + ) -> Vec { let mut rng = rand::thread_rng(); let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS); - let pull_request_time = self.pull_request_time.read().unwrap(); - // crds should be locked last after self.pull_request_time. let crds = crds.read().unwrap(); crds.get_nodes() .filter_map(|value| { @@ -309,31 +306,10 @@ impl CrdsGossipPull { && gossip_validators .map_or(true, |gossip_validators| gossip_validators.contains(&v.id)) }) - .map(|item| { - let max_weight = f32::from(u16::max_value()) - 1.0; - let req_time: u64 = pull_request_time - .peek(&item.id) - .copied() - .unwrap_or_default(); - let since = (now.saturating_sub(req_time).min(3600 * 1000) / 1024) as u32; - let stake = get_stake(&item.id, stakes); - let weight = get_weight(max_weight, since, stake); - // Weights are bounded by max_weight defined above. - // So this type-cast should be safe. - ((weight * 100.0) as u64, item.clone()) - }) + .cloned() .collect() } - /// Time when a request to `from` was initiated. - /// - /// This is used for weighted random selection during `new_pull_request` - /// It's important to use the local nodes request creation time as the weight - /// instead of the response received time otherwise failed nodes will increase their weight. - pub(crate) fn mark_pull_request_creation_time(&self, from: Pubkey, now: u64) { - self.pull_request_time.write().unwrap().put(from, now); - } - /// Process a pull request pub(crate) fn process_pull_requests(crds: &RwLock, callers: I, now: u64) where @@ -633,27 +609,13 @@ impl CrdsGossipPull { // Only for tests and simulations. pub(crate) fn mock_clone(&self) -> Self { - let pull_request_time = { - let pull_request_time = self.pull_request_time.read().unwrap(); - let mut clone = LruCache::new(pull_request_time.cap()); - for (k, v) in pull_request_time.iter().rev() { - clone.put(*k, *v); - } - clone - }; let failed_inserts = self.failed_inserts.read().unwrap().clone(); Self { - pull_request_time: RwLock::new(pull_request_time), failed_inserts: RwLock::new(failed_inserts), num_pulls: AtomicUsize::new(self.num_pulls.load(Ordering::Relaxed)), ..*self } } - - #[cfg(test)] - pub(crate) fn pull_request_time(&self) -> std::sync::RwLockReadGuard> { - self.pull_request_time.read().unwrap() - } } #[cfg(test)] @@ -673,8 +635,8 @@ pub(crate) mod tests { solana_sdk::{ hash::{hash, HASH_BYTES}, packet::PACKET_DATA_SIZE, - timing::timestamp, }, + std::time::Instant, }; #[cfg(debug_assertions)] @@ -739,7 +701,7 @@ pub(crate) mod tests { } let now = 1024; let crds = RwLock::new(crds); - let mut options = node.pull_options( + let options = node.pull_options( &crds, &me.label().pubkey(), 0, @@ -749,9 +711,6 @@ pub(crate) mod tests { &SocketAddrSpace::Unspecified, ); assert!(!options.is_empty()); - options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); - // check that the highest stake holder is also the heaviest weighted. - assert_eq!(stakes[&options[0].1.id], 3000_u64); } #[test] @@ -809,7 +768,7 @@ pub(crate) mod tests { &SocketAddrSpace::Unspecified, ) .iter() - .map(|(_, peer)| peer.id) + .map(|peer| peer.id) .collect::>(); assert_eq!(options.len(), 1); assert!(!options.contains(&spy.pubkey())); @@ -827,7 +786,7 @@ pub(crate) mod tests { &SocketAddrSpace::Unspecified, ) .iter() - .map(|(_, peer)| peer.id) + .map(|peer| peer.id) .collect::>(); assert_eq!(options.len(), 3); assert!(options.contains(&me.pubkey())); @@ -897,7 +856,7 @@ pub(crate) mod tests { &SocketAddrSpace::Unspecified, ); assert_eq!(options.len(), 1); - assert_eq!(options[0].1.id, node_123.pubkey()); + assert_eq!(options[0].id, node_123.pubkey()); } #[test] @@ -1079,7 +1038,6 @@ pub(crate) mod tests { let peers: Vec<_> = req.unwrap().into_keys().collect(); assert_eq!(peers, vec![new.contact_info().unwrap().clone()]); - node.mark_pull_request_creation_time(new.contact_info().unwrap().id, now); let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now); let offline = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(offline)); crds.write() @@ -1129,13 +1087,11 @@ pub(crate) mod tests { let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new)); - crds.insert(new.clone(), now, GossipRoute::LocalMessage) - .unwrap(); + crds.insert(new, now, GossipRoute::LocalMessage).unwrap(); let crds = RwLock::new(crds); // set request creation time to now. let now = now + 50_000; - node.mark_pull_request_creation_time(new.label().pubkey(), now); // odds of getting the other request should be close to 1. let now = now + 1_000; @@ -1164,43 +1120,7 @@ pub(crate) mod tests { .take(100) .filter(|peer| peer != old) .count(); - assert!(count < 2, "count of peer != old: {count}"); - } - - #[test] - fn test_pull_request_time() { - const NUM_REPS: usize = 2 * CRDS_UNIQUE_PUBKEY_CAPACITY; - let mut rng = rand::thread_rng(); - let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(NUM_REPS).collect(); - let node = CrdsGossipPull::default(); - let mut requests = HashMap::new(); - let now = timestamp(); - for k in 0..NUM_REPS { - let pubkey = pubkeys[rng.gen_range(0, pubkeys.len())]; - let now = now + k as u64; - node.mark_pull_request_creation_time(pubkey, now); - *requests.entry(pubkey).or_default() = now; - } - let pull_request_time = node.pull_request_time.read().unwrap(); - assert!(pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY); - // Assert that timestamps match most recent request. - for (pk, ts) in pull_request_time.iter() { - assert_eq!(*ts, requests[pk]); - } - // Assert that most recent pull timestamps are maintained. - let max_ts = requests - .iter() - .filter(|(pk, _)| !pull_request_time.contains(*pk)) - .map(|(_, ts)| *ts) - .max() - .unwrap(); - let min_ts = requests - .iter() - .filter(|(pk, _)| pull_request_time.contains(*pk)) - .map(|(_, ts)| *ts) - .min() - .unwrap(); - assert!(max_ts <= min_ts); + assert!(count < 75, "count of peer != old: {}", count); } #[test] diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index df81a2279..c310b1e52 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -41,7 +41,6 @@ use { atomic::{AtomicUsize, Ordering}, Mutex, RwLock, }, - time::Instant, }, }; @@ -259,29 +258,14 @@ impl CrdsGossipPush { socket_addr_space, ); // Check for nodes which have responded to ping messages. - let nodes: Vec<_> = { - let mut ping_cache = ping_cache.lock().unwrap(); - let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok(); - let now = Instant::now(); - nodes - .into_iter() - .filter(|node| { - let (check, ping) = { - let node = (node.id, node.gossip); - ping_cache.check(now, node, &mut pingf) - }; - if let Some(ping) = ping { - pings.push((node.gossip, ping)); - } - check - }) - .collect() - }; - let nodes = nodes.into_iter().map(|node| { - let stake = stakes.get(&node.id).copied().unwrap_or_default(); - (stake, node) - }); - let nodes = crds_gossip::dedup_gossip_addresses(nodes) + let nodes = crds_gossip::maybe_ping_gossip_addresses( + &mut rng, + nodes, + self_keypair, + ping_cache, + pings, + ); + let nodes = crds_gossip::dedup_gossip_addresses(nodes, stakes) .into_values() .map(|(_stake, node)| node.id) .collect::>(); @@ -353,7 +337,7 @@ mod tests { use { super::*, crate::{crds_value::CrdsData, socketaddr}, - std::time::Duration, + std::time::{Duration, Instant}, }; fn new_ping_cache() -> PingCache { diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 222507731..0cac130cb 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -565,7 +565,6 @@ fn network_run_pull( bytes += serialized_size(&rsp).unwrap() as usize; msgs += rsp.len(); if let Some(node) = network.get(&from) { - node.gossip.mark_pull_request_creation_time(from, now); let mut stats = ProcessPullStats::default(); let (vers, vers_expired_timeout, failed_inserts) = node .gossip