Fix flaky gossip weighted tests

This commit is contained in:
Sagar Dhawan 2019-02-26 11:45:10 -08:00 committed by Grimes
parent 10ad536e09
commit 4d73bbe48f
2 changed files with 62 additions and 66 deletions

View File

@ -57,20 +57,7 @@ impl CrdsGossipPull {
now: u64, now: u64,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> { ) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> {
let options: Vec<_> = crds let options = self.pull_options(crds, &self_id, now, stakes);
.table
.values()
.filter_map(|v| v.value.contact_info())
.filter(|v| v.id != self_id && ContactInfo::is_valid_address(&v.gossip))
.map(|item| {
let max_weight = f32::from(u16::max_value()) - 1.0;
let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
let since = ((now - req_time) / 1024) as u32;
let stake = get_stake(&item.id, stakes);
let weight = get_weight(max_weight, since, stake);
(weight, item)
})
.collect();
if options.is_empty() { if options.is_empty() {
return Err(CrdsGossipError::NoPeers); return Err(CrdsGossipError::NoPeers);
} }
@ -83,6 +70,28 @@ impl CrdsGossipPull {
Ok((options[random].1.id, filter, self_info.clone())) Ok((options[random].1.id, filter, self_info.clone()))
} }
fn pull_options<'a>(
&self,
crds: &'a Crds,
self_id: &Pubkey,
now: u64,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<(f32, &'a ContactInfo)> {
crds.table
.values()
.filter_map(|v| v.value.contact_info())
.filter(|v| v.id != *self_id && ContactInfo::is_valid_address(&v.gossip))
.map(|item| {
let max_weight = f32::from(u16::max_value()) - 1.0;
let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
let since = ((now - req_time) / 1024) as u32;
let stake = get_stake(&item.id, stakes);
let weight = get_weight(max_weight, since, stake);
(weight, item)
})
.collect()
}
/// time when a request to `from` was initiated /// time when a request to `from` was initiated
/// This is used for weighted random selection during `new_pull_request` /// This is used for weighted random selection during `new_pull_request`
/// It's important to use the local nodes request creation time as the weight /// It's important to use the local nodes request creation time as the weight
@ -202,7 +211,6 @@ mod test {
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::crds_value::LeaderId; use crate::crds_value::LeaderId;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::f32::consts::E;
#[test] #[test]
fn test_new_pull_with_stakes() { fn test_new_pull_with_stakes() {
@ -218,20 +226,15 @@ mod test {
crds.insert(entry.clone(), 0).unwrap(); crds.insert(entry.clone(), 0).unwrap();
stakes.insert(id, i * 100); stakes.insert(id, i * 100);
} }
// The min balance of the heaviest nodes is at least ln(3000) - 0.5
// This is because the heaviest nodes will have very similar weights
let min_balance = E.powf(3000_f32.ln() - 0.5);
let now = 1024; let now = 1024;
// try upto 10 times because of rng let mut options = node.pull_options(&crds, &me.label().pubkey(), now, &stakes);
for _ in 0..10 { assert!(!options.is_empty());
let msg = node options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
.new_pull_request(&crds, me.label().pubkey(), now, &stakes) // check that the highest stake holder is also the heaviest weighted.
.unwrap(); assert_eq!(
if *stakes.get(&msg.0).unwrap_or(&0) >= min_balance.round() as u64 { *stakes.get(&options.get(0).unwrap().1.id).unwrap(),
return; 3000_u64
} );
}
assert!(false, "weighted nodes didn't get picked");
} }
#[test] #[test]

View File

@ -170,21 +170,7 @@ impl CrdsGossipPush {
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
let mut new_items = HashMap::new(); let mut new_items = HashMap::new();
let mut options: Vec<_> = crds let mut options: Vec<_> = self.push_options(crds, &self_id, stakes);
.table
.values()
.filter(|v| v.value.contact_info().is_some())
.map(|v| (v.value.contact_info().unwrap(), v))
.filter(|(info, _)| info.id != self_id && ContactInfo::is_valid_address(&info.gossip))
.map(|(info, value)| {
let max_weight = f32::from(u16::max_value()) - 1.0;
let last_updated: u64 = value.local_timestamp;
let since = ((timestamp() - last_updated) / 1024) as u32;
let stake = get_stake(&info.id, stakes);
let weight = get_weight(max_weight, since, stake);
(weight, info)
})
.collect();
if options.is_empty() { if options.is_empty() {
return; return;
} }
@ -218,6 +204,28 @@ impl CrdsGossipPush {
} }
} }
fn push_options<'a>(
&self,
crds: &'a Crds,
self_id: &Pubkey,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<(f32, &'a ContactInfo)> {
crds.table
.values()
.filter(|v| v.value.contact_info().is_some())
.map(|v| (v.value.contact_info().unwrap(), v))
.filter(|(info, _)| info.id != *self_id && ContactInfo::is_valid_address(&info.gossip))
.map(|(info, value)| {
let max_weight = f32::from(u16::max_value()) - 1.0;
let last_updated: u64 = value.local_timestamp;
let since = ((timestamp() - last_updated) / 1024) as u32;
let stake = get_stake(&info.id, stakes);
let weight = get_weight(max_weight, since, stake);
(weight, info)
})
.collect()
}
/// purge old pending push messages /// purge old pending push messages
pub fn purge_old_pending_push_messages(&mut self, crds: &Crds, min_time: u64) { pub fn purge_old_pending_push_messages(&mut self, crds: &Crds, min_time: u64) {
let old_msgs: Vec<CrdsValueLabel> = self let old_msgs: Vec<CrdsValueLabel> = self
@ -259,7 +267,6 @@ mod test {
use super::*; use super::*;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::f32::consts::E;
#[test] #[test]
fn test_process_push() { fn test_process_push() {
@ -389,7 +396,7 @@ mod test {
fn test_active_set_refresh_with_bank() { fn test_active_set_refresh_with_bank() {
let time = timestamp() - 1024; //make sure there's at least a 1 second delay let time = timestamp() - 1024; //make sure there's at least a 1 second delay
let mut crds = Crds::default(); let mut crds = Crds::default();
let mut push = CrdsGossipPush::default(); let push = CrdsGossipPush::default();
let mut stakes = HashMap::new(); let mut stakes = HashMap::new();
for i in 1..=100 { for i in 1..=100 {
let peer = let peer =
@ -398,27 +405,13 @@ mod test {
crds.insert(peer.clone(), time).unwrap(); crds.insert(peer.clone(), time).unwrap();
stakes.insert(id, i * 100); stakes.insert(id, i * 100);
} }
let min_balance = E.powf(7000_f32.ln() - 0.5); let mut options = push.push_options(&crds, &Pubkey::default(), &stakes);
// try upto 10 times because of rng assert!(!options.is_empty());
for _ in 0..10 { options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
push.refresh_push_active_set(&crds, &stakes, Pubkey::default(), 100, 30); // check that the highest stake holder is also the heaviest weighted.
let mut num_correct = 0; assert_eq!(
let mut num_wrong = 0; *stakes.get(&options.get(0).unwrap().1.id).unwrap(),
push.active_set.iter().for_each(|peer| { 10_000_u64
if *stakes.get(peer.0).unwrap_or(&0) >= min_balance as u64 {
num_correct += 1;
} else {
num_wrong += 1;
}
});
// at least half of the heaviest nodes should be picked
if num_wrong <= num_correct {
return;
}
}
assert!(
false,
"expected at 50% of the active set to contain the heaviest nodes"
); );
} }
#[test] #[test]