Fix gossip messages growing beyond blob size (#5460)

* fixed bloom filter math

* Add split each pull request into multiple pulls with different filters

* Rework CrdsFilter to generate all possible masks to cover the keyspace

* Limit the bloom sizes such that each pull request is no larger than mtu
This commit is contained in:
Sagar Dhawan 2019-08-12 13:51:29 -07:00 committed by GitHub
parent b6151b5200
commit a8eb0409b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 301 additions and 92 deletions

View File

@ -17,7 +17,7 @@ use crate::blocktree::Blocktree;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::crds_gossip::CrdsGossip; use crate::crds_gossip::CrdsGossip;
use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS};
use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote};
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
use crate::repair_service::RepairType; use crate::repair_service::RepairType;
@ -25,7 +25,7 @@ use crate::result::Result;
use crate::staking_utils; use crate::staking_utils;
use crate::streamer::{BlobReceiver, BlobSender}; use crate::streamer::{BlobReceiver, BlobSender};
use crate::weighted_shuffle::weighted_shuffle; use crate::weighted_shuffle::weighted_shuffle;
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize, serialized_size};
use core::cmp; use core::cmp;
use itertools::Itertools; use itertools::Itertools;
use rand::SeedableRng; use rand::SeedableRng;
@ -36,8 +36,7 @@ use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_err
use solana_netutil::{ use solana_netutil::{
bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange, bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange,
}; };
use solana_runtime::bloom::Bloom; use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::timing::{duration_as_ms, timestamp};
@ -157,7 +156,7 @@ impl Signable for PruneData {
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum Protocol { enum Protocol {
/// Gossip protocol messages /// Gossip protocol messages
PullRequest(Bloom<Hash>, CrdsValue), PullRequest(CrdsFilter, CrdsValue),
PullResponse(Pubkey, Vec<CrdsValue>), PullResponse(Pubkey, Vec<CrdsValue>),
PushMessage(Pubkey, Vec<CrdsValue>), PushMessage(Pubkey, Vec<CrdsValue>),
PruneMessage(Pubkey, PruneData), PruneMessage(Pubkey, PruneData),
@ -832,7 +831,7 @@ impl ClusterInfo {
} }
} }
// If the network entrypoint hasn't been discovered yet, add it to the crds table // If the network entrypoint hasn't been discovered yet, add it to the crds table
fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, Bloom<Hash>, SocketAddr, CrdsValue)>) { fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) {
match &self.entrypoint { match &self.entrypoint {
Some(entrypoint) => { Some(entrypoint) => {
let self_info = self let self_info = self
@ -841,12 +840,13 @@ impl ClusterInfo {
.lookup(&CrdsValueLabel::ContactInfo(self.id())) .lookup(&CrdsValueLabel::ContactInfo(self.id()))
.unwrap_or_else(|| panic!("self_id invalid {}", self.id())); .unwrap_or_else(|| panic!("self_id invalid {}", self.id()));
pulls.push(( self.gossip
entrypoint.id, .pull
self.gossip.pull.build_crds_filter(&self.gossip.crds), .build_crds_filters(&self.gossip.crds, Self::max_bloom_size())
entrypoint.gossip, .into_iter()
self_info.clone(), .for_each(|filter| {
)) pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone()))
})
} }
None => (), None => (),
} }
@ -875,30 +875,45 @@ impl ClusterInfo {
messages messages
} }
// computes the maximum size for pull request blooms
pub fn max_bloom_size() -> usize {
let filter_size = serialized_size(&CrdsFilter::default())
.expect("unable to serialize default filter") as usize;
let protocol = Protocol::PullRequest(
CrdsFilter::default(),
CrdsValue::ContactInfo(ContactInfo::default()),
);
let protocol_size =
serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize;
PACKET_DATA_SIZE - (protocol_size - filter_size)
}
fn new_pull_requests(&mut self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> { fn new_pull_requests(&mut self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let now = timestamp(); let now = timestamp();
let pulls: Vec<_> = self let mut pulls: Vec<_> = self
.gossip .gossip
.new_pull_request(now, stakes) .new_pull_request(now, stakes, Self::max_bloom_size())
.ok() .ok()
.into_iter() .into_iter()
.collect(); .filter_map(|(peer, filters, me)| {
let mut pr: Vec<_> = pulls
.into_iter()
.filter_map(|(peer, filter, self_info)| {
let peer_label = CrdsValueLabel::ContactInfo(peer); let peer_label = CrdsValueLabel::ContactInfo(peer);
self.gossip self.gossip
.crds .crds
.lookup(&peer_label) .lookup(&peer_label)
.and_then(CrdsValue::contact_info) .and_then(CrdsValue::contact_info)
.map(|peer_info| (peer, filter, peer_info.gossip, self_info)) .map(move |peer_info| {
filters
.into_iter()
.map(move |f| (peer, f, peer_info.gossip, me.clone()))
})
}) })
.flatten()
.collect(); .collect();
if pr.is_empty() { if pulls.is_empty() {
self.add_entrypoint(&mut pr); self.add_entrypoint(&mut pulls);
} }
pr.into_iter() pulls
.into_iter()
.map(|(peer, filter, gossip, self_info)| { .map(|(peer, filter, gossip, self_info)| {
self.gossip.mark_pull_request_creation_time(&peer, now); self.gossip.mark_pull_request_creation_time(&peer, now);
(gossip, Protocol::PullRequest(filter, self_info)) (gossip, Protocol::PullRequest(filter, self_info))
@ -1093,7 +1108,7 @@ impl ClusterInfo {
fn handle_pull_request( fn handle_pull_request(
me: &Arc<RwLock<Self>>, me: &Arc<RwLock<Self>>,
filter: Bloom<Hash>, filter: CrdsFilter,
caller: CrdsValue, caller: CrdsValue,
from_addr: &SocketAddr, from_addr: &SocketAddr,
) -> Vec<SharedBlob> { ) -> Vec<SharedBlob> {
@ -2063,7 +2078,7 @@ mod tests {
let (_, _, val) = cluster_info let (_, _, val) = cluster_info
.gossip .gossip
.new_pull_request(timestamp(), &HashMap::new()) .new_pull_request(timestamp(), &HashMap::new(), ClusterInfo::max_bloom_size())
.ok() .ok()
.unwrap(); .unwrap();
assert!(val.verify()); assert!(val.verify());
@ -2251,7 +2266,7 @@ mod tests {
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint.clone()); cluster_info.set_entrypoint(entrypoint.clone());
let pulls = cluster_info.new_pull_requests(&HashMap::new()); let pulls = cluster_info.new_pull_requests(&HashMap::new());
assert_eq!(1, pulls.len()); assert_eq!(1, pulls.len() as u64);
match pulls.get(0) { match pulls.get(0) {
Some((addr, msg)) => { Some((addr, msg)) => {
assert_eq!(*addr, entrypoint.gossip); assert_eq!(*addr, entrypoint.gossip);
@ -2278,7 +2293,7 @@ mod tests {
.write() .write()
.unwrap() .unwrap()
.new_pull_requests(&HashMap::new()); .new_pull_requests(&HashMap::new());
assert_eq!(1, pulls.len()); assert_eq!(1, pulls.len() as u64);
assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint)); assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint));
} }

View File

@ -5,17 +5,15 @@
use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds::{Crds, VersionedCrdsValue};
use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CrdsGossipPull; use crate::crds_gossip_pull::{CrdsFilter, CrdsGossipPull};
use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE};
use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::crds_value::{CrdsValue, CrdsValueLabel};
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signable; use solana_sdk::signature::Signable;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
///The min size for bloom filters ///The min size for bloom filters
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500;
#[derive(Clone)] #[derive(Clone)]
pub struct CrdsGossip { pub struct CrdsGossip {
@ -133,9 +131,10 @@ impl CrdsGossip {
&self, &self,
now: u64, now: u64,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> { bloom_size: usize,
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
self.pull self.pull
.new_pull_request(&self.crds, &self.id, now, stakes) .new_pull_request(&self.crds, &self.id, now, stakes, bloom_size)
} }
/// time when a request to `from` was initiated /// time when a request to `from` was initiated
@ -149,7 +148,7 @@ impl CrdsGossip {
pub fn process_pull_request( pub fn process_pull_request(
&mut self, &mut self,
caller: CrdsValue, caller: CrdsValue,
filter: Bloom<Hash>, filter: CrdsFilter,
now: u64, now: u64,
) -> Vec<CrdsValue> { ) -> Vec<CrdsValue> {
self.pull self.pull

View File

@ -11,13 +11,12 @@
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::crds::Crds; use crate::crds::Crds;
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE}; use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS};
use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::crds_value::{CrdsValue, CrdsValueLabel};
use crate::packet::BLOB_DATA_SIZE;
use bincode::serialized_size;
use rand; use rand;
use rand::distributions::{Distribution, WeightedIndex}; use rand::distributions::{Distribution, WeightedIndex};
use rand::Rng;
use solana_runtime::bloom::Bloom; use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
@ -26,6 +25,78 @@ use std::collections::HashMap;
use std::collections::VecDeque; use std::collections::VecDeque;
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
pub const KEYS: f64 = 8f64;
pub const FALSE_RATE: f64 = 0.01f64;
#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
pub struct CrdsFilter {
pub filter: Bloom<Hash>,
mask: u64,
}
impl CrdsFilter {
pub fn new_rand(num_items: usize, max_bytes: usize) -> Self {
let max_bits = (max_bytes * 8) as f64;
let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
let mask_bits = Self::mask_bits(num_items as f64, max_items as f64);
let seed: u64 = rand::thread_rng().gen_range(0, 2u64.pow(mask_bits));
let mask = Self::compute_mask(seed, mask_bits);
CrdsFilter { filter, mask }
}
// generates a vec of filters that together hold a complete set of Hashes
pub fn new_complete_set(num_items: usize, max_bytes: usize) -> Vec<Self> {
let max_bits = (max_bytes * 8) as f64;
let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS);
let mask_bits = Self::mask_bits(num_items as f64, max_items as f64);
// for each possible mask combination, generate a new filter.
let mut filters = vec![];
for seed in 0..2u64.pow(mask_bits) {
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
let mask = Self::compute_mask(seed, mask_bits);
filters.push(CrdsFilter { filter, mask })
}
filters
}
fn compute_mask(seed: u64, mask_bits: u32) -> u64 {
assert!(seed <= 2u64.pow(mask_bits));
let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0);
seed | (!0u64).checked_shr(mask_bits).unwrap_or(!0x0) as u64
}
pub fn max_items(max_bits: f64, false_rate: f64, num_keys: f64) -> f64 {
let m = max_bits;
let p = false_rate;
let k = num_keys;
(m / (-k / (1f64 - (p.ln() / k).exp()).ln())).ceil()
}
fn mask_bits(num_items: f64, max_items: f64) -> u32 {
// for small ratios this can result in a negative number, ensure it returns 0 instead
((num_items / max_items).log2().ceil()).max(0.0) as u32
}
fn hash_as_u64(item: &Hash) -> u64 {
let arr = item.as_ref();
let mut accum = 0;
for (i, val) in arr.iter().enumerate().take(8) {
accum |= (u64::from(*val)) << (i * 8) as u64;
}
accum
}
pub fn test_mask(&self, item: &Hash) -> bool {
let bits = Self::hash_as_u64(item);
(bits & self.mask) == bits
}
pub fn add(&mut self, item: &Hash) {
if self.test_mask(item) {
self.filter.add(item);
}
}
pub fn contains(&self, item: &Hash) -> bool {
if !self.test_mask(item) {
return true;
}
self.filter.contains(item)
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct CrdsGossipPull { pub struct CrdsGossipPull {
@ -33,8 +104,6 @@ pub struct CrdsGossipPull {
pub pull_request_time: HashMap<Pubkey, u64>, pub pull_request_time: HashMap<Pubkey, u64>,
/// hash and insert time /// hash and insert time
purged_values: VecDeque<(Hash, u64)>, purged_values: VecDeque<(Hash, u64)>,
/// max bytes per message
pub max_bytes: usize,
pub crds_timeout: u64, pub crds_timeout: u64,
} }
@ -43,7 +112,6 @@ impl Default for CrdsGossipPull {
Self { Self {
purged_values: VecDeque::new(), purged_values: VecDeque::new(),
pull_request_time: HashMap::new(), pull_request_time: HashMap::new(),
max_bytes: BLOB_DATA_SIZE,
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
} }
} }
@ -56,18 +124,19 @@ impl CrdsGossipPull {
self_id: &Pubkey, self_id: &Pubkey,
now: u64, now: u64,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) -> Result<(Pubkey, Bloom<Hash>, CrdsValue), CrdsGossipError> { bloom_size: usize,
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
let options = self.pull_options(crds, &self_id, now, stakes); let options = self.pull_options(crds, &self_id, now, stakes);
if options.is_empty() { if options.is_empty() {
return Err(CrdsGossipError::NoPeers); return Err(CrdsGossipError::NoPeers);
} }
let filter = self.build_crds_filter(crds); let filters = self.build_crds_filters(crds, bloom_size);
let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap(); let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap();
let random = index.sample(&mut rand::thread_rng()); let random = index.sample(&mut rand::thread_rng());
let self_info = crds let self_info = crds
.lookup(&CrdsValueLabel::ContactInfo(*self_id)) .lookup(&CrdsValueLabel::ContactInfo(*self_id))
.unwrap_or_else(|| panic!("self_id invalid {}", self_id)); .unwrap_or_else(|| panic!("self_id invalid {}", self_id));
Ok((options[random].1.id, filter, self_info.clone())) Ok((options[random].1.id, filters, self_info.clone()))
} }
fn pull_options<'a>( fn pull_options<'a>(
@ -110,10 +179,10 @@ impl CrdsGossipPull {
&mut self, &mut self,
crds: &mut Crds, crds: &mut Crds,
caller: CrdsValue, caller: CrdsValue,
mut filter: Bloom<Hash>, filter: CrdsFilter,
now: u64, now: u64,
) -> Vec<CrdsValue> { ) -> Vec<CrdsValue> {
let rv = self.filter_crds_values(crds, &mut filter); let rv = self.filter_crds_values(crds, &filter);
let key = caller.label().pubkey(); let key = caller.label().pubkey();
let old = crds.insert(caller, now); let old = crds.insert(caller, now);
if let Some(val) = old.ok().and_then(|opt| opt) { if let Some(val) = old.ok().and_then(|opt| opt) {
@ -147,33 +216,31 @@ impl CrdsGossipPull {
crds.update_record_timestamp(from, now); crds.update_record_timestamp(from, now);
failed failed
} }
/// build a filter of the current crds table // build a set of filters of the current crds table
pub fn build_crds_filter(&self, crds: &Crds) -> Bloom<Hash> { // num_filters - used to increase the likely hood of a value in crds being added to some filter
pub fn build_crds_filters(&self, crds: &Crds, bloom_size: usize) -> Vec<CrdsFilter> {
let num = cmp::max( let num = cmp::max(
CRDS_GOSSIP_BLOOM_SIZE, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
crds.table.values().count() + self.purged_values.len(), crds.table.values().count() + self.purged_values.len(),
); );
let mut bloom = Bloom::random(num, 0.1, 4 * 1024 * 8 - 1); let mut filters = CrdsFilter::new_complete_set(num, bloom_size);
for v in crds.table.values() { for v in crds.table.values() {
bloom.add(&v.value_hash); filters
.iter_mut()
.for_each(|filter| filter.add(&v.value_hash));
} }
for (value_hash, _insert_timestamp) in &self.purged_values { for (value_hash, _insert_timestamp) in &self.purged_values {
bloom.add(value_hash); filters.iter_mut().for_each(|filter| filter.add(value_hash));
} }
bloom filters
} }
/// filter values that fail the bloom filter up to max_bytes /// filter values that fail the bloom filter up to max_bytes
fn filter_crds_values(&self, crds: &Crds, filter: &mut Bloom<Hash>) -> Vec<CrdsValue> { fn filter_crds_values(&self, crds: &Crds, filter: &CrdsFilter) -> Vec<CrdsValue> {
let mut max_bytes = self.max_bytes as isize;
let mut ret = vec![]; let mut ret = vec![];
for v in crds.table.values() { for v in crds.table.values() {
if filter.contains(&v.value_hash) { if filter.contains(&v.value_hash) {
continue; continue;
} }
max_bytes -= serialized_size(&v.value).unwrap() as isize;
if max_bytes < 0 {
break;
}
ret.push(v.value.clone()); ret.push(v.value.clone());
} }
ret ret
@ -209,6 +276,9 @@ impl CrdsGossipPull {
mod test { mod test {
use super::*; use super::*;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use itertools::Itertools;
use solana_sdk::hash::hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
#[test] #[test]
fn test_new_pull_with_stakes() { fn test_new_pull_with_stakes() {
@ -241,19 +311,19 @@ mod test {
let id = entry.label().pubkey(); let id = entry.label().pubkey();
let node = CrdsGossipPull::default(); let node = CrdsGossipPull::default();
assert_eq!( assert_eq!(
node.new_pull_request(&crds, &id, 0, &HashMap::new()), node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE),
Err(CrdsGossipError::NoPeers) Err(CrdsGossipError::NoPeers)
); );
crds.insert(entry.clone(), 0).unwrap(); crds.insert(entry.clone(), 0).unwrap();
assert_eq!( assert_eq!(
node.new_pull_request(&crds, &id, 0, &HashMap::new()), node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE),
Err(CrdsGossipError::NoPeers) Err(CrdsGossipError::NoPeers)
); );
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
crds.insert(new.clone(), 0).unwrap(); crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(&crds, &id, 0, &HashMap::new()); let req = node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE);
let (to, _, self_info) = req.unwrap(); let (to, _, self_info) = req.unwrap();
assert_eq!(to, new.label().pubkey()); assert_eq!(to, new.label().pubkey());
assert_eq!(self_info, entry); assert_eq!(self_info, entry);
@ -276,7 +346,13 @@ mod test {
// odds of getting the other request should be 1 in u64::max_value() // odds of getting the other request should be 1 in u64::max_value()
for _ in 0..10 { for _ in 0..10 {
let req = node.new_pull_request(&crds, &node_pubkey, u64::max_value(), &HashMap::new()); let req = node.new_pull_request(
&crds,
&node_pubkey,
u64::max_value(),
&HashMap::new(),
PACKET_DATA_SIZE,
);
let (to, _, self_info) = req.unwrap(); let (to, _, self_info) = req.unwrap();
assert_eq!(to, old.label().pubkey()); assert_eq!(to, old.label().pubkey());
assert_eq!(self_info, entry); assert_eq!(self_info, entry);
@ -292,13 +368,21 @@ mod test {
node_crds.insert(entry.clone(), 0).unwrap(); node_crds.insert(entry.clone(), 0).unwrap();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
node_crds.insert(new.clone(), 0).unwrap(); node_crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(&node_crds, &node_pubkey, 0, &HashMap::new()); let req = node.new_pull_request(
&node_crds,
&node_pubkey,
0,
&HashMap::new(),
PACKET_DATA_SIZE,
);
let mut dest_crds = Crds::default(); let mut dest_crds = Crds::default();
let mut dest = CrdsGossipPull::default(); let mut dest = CrdsGossipPull::default();
let (_, filter, caller) = req.unwrap(); let (_, filters, caller) = req.unwrap();
let rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 1); for filter in filters.into_iter() {
assert!(rsp.is_empty()); let rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 1);
assert!(rsp.is_empty());
}
assert!(dest_crds.lookup(&caller.label()).is_some()); assert!(dest_crds.lookup(&caller.label()).is_some());
assert_eq!( assert_eq!(
dest_crds dest_crds
@ -347,15 +431,27 @@ mod test {
let mut done = false; let mut done = false;
for _ in 0..30 { for _ in 0..30 {
// there is a chance of a false positive with bloom filters // there is a chance of a false positive with bloom filters
let req = node.new_pull_request(&node_crds, &node_pubkey, 0, &HashMap::new()); let req = node.new_pull_request(
let (_, filter, caller) = req.unwrap(); &node_crds,
let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0); &node_pubkey,
// if there is a false positive this is empty 0,
// prob should be around 0.1 per iteration &HashMap::new(),
PACKET_DATA_SIZE,
);
let (_, filters, caller) = req.unwrap();
let mut rsp = vec![];
for filter in filters {
rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 0);
// if there is a false positive this is empty
// prob should be around 0.1 per iteration
if rsp.is_empty() {
continue;
}
}
if rsp.is_empty() { if rsp.is_empty() {
continue; continue;
} }
assert_eq!(rsp.len(), 1); assert_eq!(rsp.len(), 1);
let failed = node.process_pull_response(&mut node_crds, &node_pubkey, rsp, 1); let failed = node.process_pull_response(&mut node_crds, &node_pubkey, rsp, 1);
assert_eq!(failed, 0); assert_eq!(failed, 0);
@ -406,12 +502,72 @@ mod test {
// there is a chance of a false positive with bloom filters // there is a chance of a false positive with bloom filters
// assert that purged value is still in the set // assert that purged value is still in the set
// chance of 30 consecutive false positives is 0.1^30 // chance of 30 consecutive false positives is 0.1^30
let filter = node.build_crds_filter(&node_crds); let filters = node.build_crds_filters(&node_crds, PACKET_DATA_SIZE);
assert!(filter.contains(&value_hash)); assert!(filters.iter().any(|filter| filter.contains(&value_hash)));
} }
// purge the value // purge the value
node.purge_purged(1); node.purge_purged(1);
assert_eq!(node.purged_values.len(), 0); assert_eq!(node.purged_values.len(), 0);
} }
#[test]
fn test_crds_filter_mask() {
let filter = CrdsFilter::new_rand(1, 128);
assert_eq!(filter.mask, !0x0);
assert_eq!(CrdsFilter::max_items(80f64, 0.01, 8f64), 9f64);
//1000/9 = 111, so 7 bits are needed to mask it
assert_eq!(CrdsFilter::mask_bits(1000f64, 9f64), 7u32);
let filter = CrdsFilter::new_rand(1000, 10);
assert_eq!(filter.mask & 0x00ffffffff, 0x00ffffffff);
}
#[test]
fn test_crds_filter_add_no_mask() {
let mut filter = CrdsFilter::new_rand(1, 128);
let h: Hash = hash(Hash::default().as_ref());
assert!(!filter.contains(&h));
filter.add(&h);
assert!(filter.contains(&h));
let h: Hash = hash(h.as_ref());
assert!(!filter.contains(&h));
}
#[test]
fn test_crds_filter_add_mask() {
let mut filter = CrdsFilter::new_rand(1000, 10);
let mut h: Hash = Hash::default();
while !filter.test_mask(&h) {
h = hash(h.as_ref());
}
assert!(filter.test_mask(&h));
//if the mask succeeds, we want the guaranteed negative
assert!(!filter.contains(&h));
filter.add(&h);
assert!(filter.contains(&h));
}
#[test]
fn test_crds_filter_contains_mask() {
let filter = CrdsFilter::new_rand(1000, 10);
let mut h: Hash = Hash::default();
while filter.test_mask(&h) {
h = hash(h.as_ref());
}
assert!(!filter.test_mask(&h));
//if the mask fails, the hash is contained in the set, and can be treated as a false
//positive
assert!(filter.contains(&h));
}
#[test]
fn test_mask() {
for i in 0..16 {
run_test_mask(i);
}
}
fn run_test_mask(mask_bits: u32) {
let masks: Vec<_> = (0..2u64.pow(mask_bits))
.into_iter()
.map(|seed| CrdsFilter::compute_mask(seed, mask_bits))
.dedup()
.collect();
assert_eq!(masks.len(), 2u64.pow(mask_bits) as usize)
}
} }

View File

@ -10,7 +10,7 @@
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds::{Crds, VersionedCrdsValue};
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE}; use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS};
use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::crds_value::{CrdsValue, CrdsValueLabel};
use crate::packet::BLOB_DATA_SIZE; use crate::packet::BLOB_DATA_SIZE;
@ -258,7 +258,7 @@ impl CrdsGossipPush {
if new_items.get(&item.id).is_some() { if new_items.get(&item.id).is_some() {
continue; continue;
} }
let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size);
let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4);
bloom.add(&item.id); bloom.add(&item.id);
new_items.insert(item.id, bloom); new_items.insert(item.id, bloom);

View File

@ -335,7 +335,9 @@ pub fn to_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<Blob> {
let mut b = Blob::default(); let mut b = Blob::default();
let v = bincode::serialize(&resp)?; let v = bincode::serialize(&resp)?;
let len = v.len(); let len = v.len();
assert!(len <= BLOB_SIZE); if len > BLOB_SIZE {
return Err(Error::ToBlobError);
}
b.data[..len].copy_from_slice(&v); b.data[..len].copy_from_slice(&v);
b.meta.size = len; b.meta.size = len;
b.meta.set_addr(&rsp_addr); b.meta.set_addr(&rsp_addr);

View File

@ -31,6 +31,7 @@ pub enum Error {
PohRecorderError(poh_recorder::PohRecorderError), PohRecorderError(poh_recorder::PohRecorderError),
BlocktreeError(blocktree::BlocktreeError), BlocktreeError(blocktree::BlocktreeError),
FsExtra(fs_extra::error::Error), FsExtra(fs_extra::error::Error),
ToBlobError,
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;

View File

@ -1,6 +1,7 @@
use bincode::serialized_size; use bincode::serialized_size;
use log::*; use log::*;
use rayon::prelude::*; use rayon::prelude::*;
use solana::cluster_info::ClusterInfo;
use solana::contact_info::ContactInfo; use solana::contact_info::ContactInfo;
use solana::crds_gossip::*; use solana::crds_gossip::*;
use solana::crds_gossip_error::CrdsGossipError; use solana::crds_gossip_error::CrdsGossipError;
@ -380,27 +381,36 @@ fn network_run_pull(
.filter_map(|from| { .filter_map(|from| {
from.lock() from.lock()
.unwrap() .unwrap()
.new_pull_request(now, &HashMap::new()) .new_pull_request(now, &HashMap::new(), ClusterInfo::max_bloom_size())
.ok() .ok()
}) })
.collect() .collect()
}; };
let transfered: Vec<_> = requests let transfered: Vec<_> = requests
.into_par_iter() .into_par_iter()
.map(|(to, request, caller_info)| { .map(|(to, filters, caller_info)| {
let mut bytes: usize = 0; let mut bytes: usize = 0;
let mut msgs: usize = 0; let mut msgs: usize = 0;
let mut overhead: usize = 0; let mut overhead: usize = 0;
let from = caller_info.label().pubkey(); let from = caller_info.label().pubkey();
bytes += request.keys.len(); bytes += filters.iter().map(|f| f.filter.keys.len()).sum::<usize>();
bytes += (request.bits.len() / 8) as usize; bytes += filters
.iter()
.map(|f| f.filter.bits.len() as usize / 8)
.sum::<usize>();
bytes += serialized_size(&caller_info).unwrap() as usize; bytes += serialized_size(&caller_info).unwrap() as usize;
let rsp = network let rsp = network
.get(&to) .get(&to)
.map(|node| { .map(|node| {
node.lock() let mut rsp = vec![];
.unwrap() for filter in filters {
.process_pull_request(caller_info, request, now) rsp.append(&mut node.lock().unwrap().process_pull_request(
caller_info.clone(),
filter,
now,
));
}
rsp
}) })
.unwrap(); .unwrap();
bytes += serialized_size(&rsp).unwrap() as usize; bytes += serialized_size(&rsp).unwrap() as usize;

View File

@ -31,19 +31,27 @@ impl<T: BloomHashIndex> Bloom<T> {
_phantom: PhantomData::default(), _phantom: PhantomData::default(),
} }
} }
/// create filter optimal for num size given the `false_rate` /// create filter optimal for num size given the `FALSE_RATE`
/// the keys are randomized for picking data out of a collision resistant hash of size /// the keys are randomized for picking data out of a collision resistant hash of size
/// `keysize` bytes /// `keysize` bytes
/// https://hur.st/bloomfilter/ /// https://hur.st/bloomfilter/
pub fn random(num: usize, false_rate: f64, max_bits: usize) -> Self { pub fn random(num_items: usize, false_rate: f64, max_bits: usize) -> Self {
let min_num_bits = ((num as f64 * false_rate.log(2f64)) let m = Self::num_bits(num_items as f64, false_rate);
/ (1f64 / 2f64.powf(2f64.log(2f64))).log(2f64)) let num_bits = cmp::max(1, cmp::min(m as usize, max_bits));
.ceil() as usize; let num_keys = Self::num_keys(num_bits as f64, num_items as f64) as usize;
let num_bits = cmp::max(1, cmp::min(min_num_bits, max_bits));
let num_keys = ((num_bits as f64 / num as f64) * 2f64.log(2f64)).round() as usize;
let keys: Vec<u64> = (0..num_keys).map(|_| rand::thread_rng().gen()).collect(); let keys: Vec<u64> = (0..num_keys).map(|_| rand::thread_rng().gen()).collect();
Self::new(num_bits, keys) Self::new(num_bits, keys)
} }
pub fn num_bits(num_items: f64, false_rate: f64) -> f64 {
let n = num_items;
let p = false_rate;
((n * p.ln()) / (1f64 / 2f64.powf(2f64.ln())).ln()).ceil()
}
pub fn num_keys(num_bits: f64, num_items: f64) -> f64 {
let n = num_items;
let m = num_bits;
((m / n) * 2f64.ln()).round()
}
fn pos(&self, key: &T, k: u64) -> u64 { fn pos(&self, key: &T, k: u64) -> u64 {
key.hash_at_index(k) % self.bits.len() key.hash_at_index(k) % self.bits.len()
} }
@ -98,7 +106,7 @@ mod test {
//normal //normal
let bloom: Bloom<Hash> = Bloom::random(10, 0.1, 100); let bloom: Bloom<Hash> = Bloom::random(10, 0.1, 100);
assert_eq!(bloom.keys.len(), 3); assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.bits.len(), 34); assert_eq!(bloom.bits.len(), 48);
//saturated //saturated
let bloom: Bloom<Hash> = Bloom::random(100, 0.1, 100); let bloom: Bloom<Hash> = Bloom::random(100, 0.1, 100);
@ -129,4 +137,22 @@ mod test {
b2.keys.sort(); b2.keys.sort();
assert_ne!(b1.keys, b2.keys); assert_ne!(b1.keys, b2.keys);
} }
// Bloom filter math in python
// n number of items
// p false rate
// m number of bits
// k number of keys
//
// n = ceil(m / (-k / log(1 - exp(log(p) / k))))
// p = pow(1 - exp(-k / (m / n)), k)
// m = ceil((n * log(p)) / log(1 / pow(2, log(2))));
// k = round((m / n) * log(2));
#[test]
fn test_filter_math() {
assert_eq!(Bloom::<Hash>::num_bits(100f64, 0.1f64) as u64, 480u64);
assert_eq!(Bloom::<Hash>::num_bits(100f64, 0.01f64) as u64, 959u64);
assert_eq!(Bloom::<Hash>::num_keys(1000f64, 50f64) as u64, 14u64);
assert_eq!(Bloom::<Hash>::num_keys(2000f64, 50f64) as u64, 28u64);
assert_eq!(Bloom::<Hash>::num_keys(2000f64, 25f64) as u64, 55u64);
}
} }