bumps up min number of bloom items in gossip pull requests (#17236)

When a validator starts, it has an (almost) empty crds table and it only
sends one pull-request to the entrypoint. The bloom filter in the
pull-request targets 10% false rate given the number of items. So, if
the `num_items` is very wrong, it makes a very small bloom filter with a
very high false rate:
https://github.com/solana-labs/solana/blob/2ae57c172/runtime/src/bloom.rs#L70-L80
https://github.com/solana-labs/solana/blob/2ae57c172/core/src/crds_gossip_pull.rs#L48

As a result, it is very unlikely that the validator obtains entrypoint's
contact-info in response. This exacerbates how long the validator will
loop on:
    > Waiting to adopt entrypoint shred version
https://github.com/solana-labs/solana/blob/ed51cde37/validator/src/main.rs#L390-L412

This commit increases the min number of bloom items when making gossip
pull requests. Effectively this will break the entrypoint crds table
into 64 shards, one pull-request for each, a larger bloom filter for
each shard, and increases the chances that the response will include
entrypoint's contact-info, which is needed for adopting shred version
and validator start.
This commit is contained in:
behzad nouri 2021-05-21 13:59:26 +00:00 committed by GitHub
parent 662c2aaeec
commit e8b35a4f7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 89 additions and 101 deletions

View File

@ -1687,18 +1687,12 @@ impl ClusterInfo {
Ok(())
}
fn process_entrypoints(&self, entrypoints_processed: &mut bool) {
if *entrypoints_processed {
return;
}
fn process_entrypoints(&self) -> bool {
let mut entrypoints = self.entrypoints.write().unwrap();
if entrypoints.is_empty() {
// No entrypoint specified. Nothing more to process
*entrypoints_processed = true;
return;
return true;
}
for entrypoint in entrypoints.iter_mut() {
if entrypoint.id == Pubkey::default() {
// If a pull from the entrypoint was successful it should exist in the CRDS table
@ -1727,11 +1721,10 @@ impl ClusterInfo {
.set_shred_version(entrypoint.shred_version);
}
}
*entrypoints_processed = self.my_shred_version() != 0
self.my_shred_version() != 0
&& entrypoints
.iter()
.all(|entrypoint| entrypoint.id != Pubkey::default());
.all(|entrypoint| entrypoint.id != Pubkey::default())
}
fn handle_purge(
@ -1867,8 +1860,7 @@ impl ClusterInfo {
}
self.handle_purge(&thread_pool, &bank_forks, &stakes);
self.process_entrypoints(&mut entrypoints_processed);
entrypoints_processed = entrypoints_processed || self.process_entrypoints();
//TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
@ -3851,21 +3843,17 @@ mod tests {
cluster_info.set_entrypoint(entrypoint.clone());
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
assert!(pings.is_empty());
assert_eq!(1, pulls.len() as u64);
match pulls.get(0) {
Some((addr, msg)) => {
assert_eq!(*addr, entrypoint.gossip);
match msg {
Protocol::PullRequest(_, value) => {
assert!(value.verify());
assert_eq!(value.pubkey(), cluster_info.id())
}
_ => panic!("wrong protocol"),
assert_eq!(pulls.len(), 64);
for (addr, msg) in pulls {
assert_eq!(addr, entrypoint.gossip);
match msg {
Protocol::PullRequest(_, value) => {
assert!(value.verify());
assert_eq!(value.pubkey(), cluster_info.id())
}
_ => panic!("wrong protocol"),
}
None => panic!("entrypoint should be a pull destination"),
}
// now add this message back to the table and make sure after the next pull, the entrypoint is unset
let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone()));
@ -3879,7 +3867,7 @@ mod tests {
);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
assert_eq!(pings.len(), 1);
assert_eq!(1, pulls.len() as u64);
assert_eq!(pulls.len(), 64);
assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]);
}
@ -4068,24 +4056,30 @@ mod tests {
// fresh timestamp). There should only be one pull request to `other_node`
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(1, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
assert_eq!(64, pulls.len());
assert!(pulls.into_iter().all(|(addr, _)| addr == other_node.gossip));
// Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should
// now be two pull requests
cluster_info.entrypoints.write().unwrap()[0].wallclock = 0;
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(2, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip);
assert_eq!(pulls.len(), 64 * 2);
assert!(pulls
.iter()
.take(64)
.all(|(addr, _)| *addr == other_node.gossip));
assert!(pulls
.iter()
.skip(64)
.all(|(addr, _)| *addr == entrypoint.gossip));
// Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should
// only be one pull request to `other_node`
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(1, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
assert_eq!(pulls.len(), 64);
assert!(pulls.into_iter().all(|(addr, _)| addr == other_node.gossip));
}
#[test]
@ -4249,8 +4243,7 @@ mod tests {
.any(|entrypoint| *entrypoint == gossiped_entrypoint1_info));
// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoints_processed = false;
ClusterInfo::process_entrypoints(&cluster_info, &mut entrypoints_processed);
let entrypoints_processed = ClusterInfo::process_entrypoints(&cluster_info);
assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 2);
assert!(cluster_info
.entrypoints
@ -4278,8 +4271,7 @@ mod tests {
// Adopt the entrypoint's gossiped contact info and verify
error!("Adopt the entrypoint's gossiped contact info and verify");
let mut entrypoints_processed = false;
ClusterInfo::process_entrypoints(&cluster_info, &mut entrypoints_processed);
let entrypoints_processed = ClusterInfo::process_entrypoints(&cluster_info);
assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 2);
assert!(cluster_info
.entrypoints
@ -4322,8 +4314,7 @@ mod tests {
cluster_info.insert_info(gossiped_entrypoint_info.clone());
// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoints_processed = false;
ClusterInfo::process_entrypoints(&cluster_info, &mut entrypoints_processed);
let entrypoints_processed = ClusterInfo::process_entrypoints(&cluster_info);
assert_eq!(cluster_info.entrypoints.read().unwrap().len(), 1);
assert_eq!(
cluster_info.entrypoints.read().unwrap()[0],

View File

@ -29,9 +29,6 @@ use std::{
sync::Mutex,
};
///The min size for bloom filters
pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500;
pub struct CrdsGossip {
pub crds: Crds,
pub id: Pubkey,

View File

@ -13,7 +13,7 @@ use crate::{
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
contact_info::ContactInfo,
crds::{Crds, CrdsError},
crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS},
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
ping_pong::PingCache,
@ -30,7 +30,6 @@ use solana_sdk::{
signature::{Keypair, Signer},
};
use std::{
cmp,
collections::{HashMap, HashSet, VecDeque},
convert::TryInto,
net::SocketAddr,
@ -468,11 +467,10 @@ impl CrdsGossipPull {
bloom_size: usize,
) -> Vec<CrdsFilter> {
const PAR_MIN_LENGTH: usize = 512;
let num = cmp::max(
CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
crds.len() + self.purged_values.len() + self.failed_inserts.len(),
);
let filters = CrdsFilterSet::new(num, bloom_size);
const MIN_NUM_BLOOM_ITEMS: usize = 65_536;
let num_items = crds.len() + self.purged_values.len() + self.failed_inserts.len();
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
let filters = CrdsFilterSet::new(num_items, bloom_size);
thread_pool.install(|| {
crds.par_values()
.with_min_len(PAR_MIN_LENGTH)
@ -915,7 +913,7 @@ mod test {
}
assert_eq!(num_inserts, 20_000);
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
assert_eq!(filters.len(), 32);
assert_eq!(filters.len(), 64);
let hash_values: Vec<_> = crds
.values()
.map(|v| v.value_hash)
@ -1170,24 +1168,29 @@ mod test {
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
);
assert_eq!(rsp[0].len(), 0);
assert_eq!(filters.len(), 1);
filters.push(filters[0].clone());
//should return new value since caller is new
filters[1].0 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1,
)));
assert_eq!(filters.len(), 64);
filters.extend({
// Should return new value since caller is new.
let now = CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1;
let caller = ContactInfo::new_localhost(&Pubkey::new_unique(), now);
let caller = CrdsValue::new_unsigned(CrdsData::ContactInfo(caller));
filters
.iter()
.map(|(_, filter)| (caller.clone(), filter.clone()))
.collect::<Vec<_>>()
});
let rsp = dest.generate_pull_responses(
&dest_crds,
&filters,
/*output_size_limit=*/ usize::MAX,
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
);
assert_eq!(rsp.len(), 2);
assert_eq!(rsp[0].len(), 0);
assert_eq!(rsp[1].len(), 1); // Orders are also preserved.
assert_eq!(rsp.len(), 128);
// There should be only one non-empty response in the 2nd half.
// Orders are also preserved.
assert!(rsp.iter().take(64).all(|r| r.is_empty()));
assert_eq!(rsp.iter().filter(|r| r.is_empty()).count(), 127);
assert_eq!(rsp.iter().find(|r| !r.is_empty()).unwrap().len(), 1);
}
#[test]
@ -1312,7 +1315,7 @@ mod test {
);
let (_, filters) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let mut rsp = dest.generate_pull_responses(
let rsp = dest.generate_pull_responses(
&dest_crds,
&filters,
/*output_size_limit=*/ usize::MAX,
@ -1332,13 +1335,13 @@ mod test {
if rsp.is_empty() {
continue;
}
assert_eq!(rsp.len(), 1);
assert_eq!(rsp.len(), 64);
let failed = node
.process_pull_response(
&mut node_crds,
&node_pubkey,
&node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1),
rsp.pop().unwrap(),
rsp.into_iter().flatten().collect(),
1,
)
.0;

View File

@ -12,14 +12,13 @@ use crate::{
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
contact_info::ContactInfo,
crds::{Crds, Cursor, VersionedCrdsValue},
crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS},
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
weighted_shuffle::weighted_shuffle,
};
use bincode::serialized_size;
use indexmap::map::IndexMap;
use itertools::Itertools;
use lru::LruCache;
use rand::{seq::SliceRandom, Rng};
use solana_runtime::bloom::{AtomicBloom, Bloom};
@ -264,46 +263,44 @@ impl CrdsGossipPush {
network_size: usize,
ratio: usize,
) {
const BLOOM_FALSE_RATE: f64 = 0.1;
const BLOOM_MAX_BITS: usize = 1024 * 8 * 4;
let mut rng = rand::thread_rng();
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
let mut new_items = HashMap::new();
let options: Vec<_> = self.push_options(
crds,
&self_id,
self_shred_version,
stakes,
gossip_validators,
);
if options.is_empty() {
let (weights, peers): (Vec<_>, Vec<_>) = self
.push_options(
crds,
&self_id,
self_shred_version,
stakes,
gossip_validators,
)
.into_iter()
.unzip();
if peers.is_empty() {
return;
}
let mut seed = [0; 32];
rng.fill(&mut seed[..]);
let mut shuffle = weighted_shuffle(
&options.iter().map(|weighted| weighted.0).collect_vec(),
seed,
)
.into_iter();
while new_items.len() < need {
match shuffle.next() {
Some(index) => {
let item = options[index].1;
if self.active_set.get(&item.id).is_some() {
continue;
}
if new_items.get(&item.id).is_some() {
continue;
}
let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size);
let bloom: AtomicBloom<_> = Bloom::random(size, 0.1, 1024 * 8 * 4).into();
bloom.add(&item.id);
new_items.insert(item.id, bloom);
}
_ => break,
let num_bloom_items = CRDS_UNIQUE_PUBKEY_CAPACITY.max(network_size);
let shuffle = {
let mut seed = [0; 32];
rng.fill(&mut seed[..]);
weighted_shuffle(&weights, seed).into_iter()
};
for peer in shuffle.map(|i| peers[i].id) {
if new_items.len() >= need {
break;
}
if self.active_set.contains_key(&peer) || new_items.contains_key(&peer) {
continue;
}
let bloom = AtomicBloom::from(Bloom::random(
num_bloom_items,
BLOOM_FALSE_RATE,
BLOOM_MAX_BITS,
));
bloom.add(&peer);
new_items.insert(peer, bloom);
}
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
keys.shuffle(&mut rng);