From 4d73bbe48fb6b3d610029421599d5b5ba11bc258 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 26 Feb 2019 11:45:10 -0800 Subject: [PATCH] Fix flaky gossip weighted tests --- src/crds_gossip_pull.rs | 59 ++++++++++++++++++----------------- src/crds_gossip_push.rs | 69 ++++++++++++++++++----------------------- 2 files changed, 62 insertions(+), 66 deletions(-) diff --git a/src/crds_gossip_pull.rs b/src/crds_gossip_pull.rs index 8364c7088d..2e0416af6f 100644 --- a/src/crds_gossip_pull.rs +++ b/src/crds_gossip_pull.rs @@ -57,20 +57,7 @@ impl CrdsGossipPull { now: u64, stakes: &HashMap, ) -> Result<(Pubkey, Bloom, CrdsValue), CrdsGossipError> { - let options: Vec<_> = crds - .table - .values() - .filter_map(|v| v.value.contact_info()) - .filter(|v| v.id != self_id && ContactInfo::is_valid_address(&v.gossip)) - .map(|item| { - let max_weight = f32::from(u16::max_value()) - 1.0; - let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0); - let since = ((now - req_time) / 1024) as u32; - let stake = get_stake(&item.id, stakes); - let weight = get_weight(max_weight, since, stake); - (weight, item) - }) - .collect(); + let options = self.pull_options(crds, &self_id, now, stakes); if options.is_empty() { return Err(CrdsGossipError::NoPeers); } @@ -83,6 +70,28 @@ impl CrdsGossipPull { Ok((options[random].1.id, filter, self_info.clone())) } + fn pull_options<'a>( + &self, + crds: &'a Crds, + self_id: &Pubkey, + now: u64, + stakes: &HashMap, + ) -> Vec<(f32, &'a ContactInfo)> { + crds.table + .values() + .filter_map(|v| v.value.contact_info()) + .filter(|v| v.id != *self_id && ContactInfo::is_valid_address(&v.gossip)) + .map(|item| { + let max_weight = f32::from(u16::max_value()) - 1.0; + let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0); + let since = ((now - req_time) / 1024) as u32; + let stake = get_stake(&item.id, stakes); + let weight = get_weight(max_weight, since, stake); + (weight, item) + }) + .collect() + } + /// time when a request to `from` was initiated /// This is used for weighted random selection during `new_pull_request` /// It's important to use the local nodes request creation time as the weight @@ -202,7 +211,6 @@ mod test { use crate::contact_info::ContactInfo; use crate::crds_value::LeaderId; use solana_sdk::signature::{Keypair, KeypairUtil}; - use std::f32::consts::E; #[test] fn test_new_pull_with_stakes() { @@ -218,20 +226,15 @@ mod test { crds.insert(entry.clone(), 0).unwrap(); stakes.insert(id, i * 100); } - // The min balance of the heaviest nodes is at least ln(3000) - 0.5 - // This is because the heaviest nodes will have very similar weights - let min_balance = E.powf(3000_f32.ln() - 0.5); let now = 1024; - // try upto 10 times because of rng - for _ in 0..10 { - let msg = node - .new_pull_request(&crds, me.label().pubkey(), now, &stakes) - .unwrap(); - if *stakes.get(&msg.0).unwrap_or(&0) >= min_balance.round() as u64 { - return; - } - } - assert!(false, "weighted nodes didn't get picked"); + let mut options = node.pull_options(&crds, &me.label().pubkey(), now, &stakes); + assert!(!options.is_empty()); + options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); + // check that the highest stake holder is also the heaviest weighted. + assert_eq!( + *stakes.get(&options.get(0).unwrap().1.id).unwrap(), + 3000_u64 + ); } #[test] diff --git a/src/crds_gossip_push.rs b/src/crds_gossip_push.rs index 270995fd88..f5a4fc2cfe 100644 --- a/src/crds_gossip_push.rs +++ b/src/crds_gossip_push.rs @@ -170,21 +170,7 @@ impl CrdsGossipPush { let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let mut new_items = HashMap::new(); - let mut options: Vec<_> = crds - .table - .values() - .filter(|v| v.value.contact_info().is_some()) - .map(|v| (v.value.contact_info().unwrap(), v)) - .filter(|(info, _)| info.id != self_id && ContactInfo::is_valid_address(&info.gossip)) - .map(|(info, value)| { - let max_weight = f32::from(u16::max_value()) - 1.0; - let last_updated: u64 = value.local_timestamp; - let since = ((timestamp() - last_updated) / 1024) as u32; - let stake = get_stake(&info.id, stakes); - let weight = get_weight(max_weight, since, stake); - (weight, info) - }) - .collect(); + let mut options: Vec<_> = self.push_options(crds, &self_id, stakes); if options.is_empty() { return; } @@ -218,6 +204,28 @@ impl CrdsGossipPush { } } + fn push_options<'a>( + &self, + crds: &'a Crds, + self_id: &Pubkey, + stakes: &HashMap, + ) -> Vec<(f32, &'a ContactInfo)> { + crds.table + .values() + .filter(|v| v.value.contact_info().is_some()) + .map(|v| (v.value.contact_info().unwrap(), v)) + .filter(|(info, _)| info.id != *self_id && ContactInfo::is_valid_address(&info.gossip)) + .map(|(info, value)| { + let max_weight = f32::from(u16::max_value()) - 1.0; + let last_updated: u64 = value.local_timestamp; + let since = ((timestamp() - last_updated) / 1024) as u32; + let stake = get_stake(&info.id, stakes); + let weight = get_weight(max_weight, since, stake); + (weight, info) + }) + .collect() + } + /// purge old pending push messages pub fn purge_old_pending_push_messages(&mut self, crds: &Crds, min_time: u64) { let old_msgs: Vec = self @@ -259,7 +267,6 @@ mod test { use super::*; use crate::contact_info::ContactInfo; use solana_sdk::signature::{Keypair, KeypairUtil}; - use std::f32::consts::E; #[test] fn test_process_push() { @@ -389,7 +396,7 @@ mod test { fn test_active_set_refresh_with_bank() { let time = timestamp() - 1024; //make sure there's at least a 1 second delay let mut crds = Crds::default(); - let mut push = CrdsGossipPush::default(); + let push = CrdsGossipPush::default(); let mut stakes = HashMap::new(); for i in 1..=100 { let peer = @@ -398,27 +405,13 @@ mod test { crds.insert(peer.clone(), time).unwrap(); stakes.insert(id, i * 100); } - let min_balance = E.powf(7000_f32.ln() - 0.5); - // try upto 10 times because of rng - for _ in 0..10 { - push.refresh_push_active_set(&crds, &stakes, Pubkey::default(), 100, 30); - let mut num_correct = 0; - let mut num_wrong = 0; - push.active_set.iter().for_each(|peer| { - if *stakes.get(peer.0).unwrap_or(&0) >= min_balance as u64 { - num_correct += 1; - } else { - num_wrong += 1; - } - }); - // at least half of the heaviest nodes should be picked - if num_wrong <= num_correct { - return; - } - } - assert!( - false, - "expected at 50% of the active set to contain the heaviest nodes" + let mut options = push.push_options(&crds, &Pubkey::default(), &stakes); + assert!(!options.is_empty()); + options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); + // check that the highest stake holder is also the heaviest weighted. + assert_eq!( + *stakes.get(&options.get(0).unwrap().1.id).unwrap(), + 10_000_u64 ); } #[test]