From 97d57d168b12c20c94e6dc8c6b844a06ff97cb06 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 13 Aug 2019 10:29:26 -0700 Subject: [PATCH] Revert "Fix gossip messages growing beyond blob size (#5460)" (#5512) This reverts commit a8eb0409b74f6d71f30495ab475e6118862b3886. --- core/src/cluster_info.rs | 69 ++++------ core/src/crds_gossip.rs | 13 +- core/src/crds_gossip_pull.rs | 236 ++++++----------------------------- core/src/crds_gossip_push.rs | 4 +- core/src/packet.rs | 4 +- core/src/result.rs | 1 - core/tests/crds_gossip.rs | 24 ++-- runtime/src/bloom.rs | 42 ++----- 8 files changed, 92 insertions(+), 301 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4562752d12..da1c49ee4f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -17,7 +17,7 @@ use crate::blocktree::Blocktree; use crate::contact_info::ContactInfo; use crate::crds_gossip::CrdsGossip; use crate::crds_gossip_error::CrdsGossipError; -use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}; +use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::repair_service::RepairType; @@ -25,7 +25,7 @@ use crate::result::Result; use crate::staking_utils; use crate::streamer::{BlobReceiver, BlobSender}; use crate::weighted_shuffle::weighted_shuffle; -use bincode::{deserialize, serialize, serialized_size}; +use bincode::{deserialize, serialize}; use core::cmp; use itertools::Itertools; use rand::SeedableRng; @@ -36,7 +36,8 @@ use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_err use solana_netutil::{ bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange, }; -use solana_sdk::packet::PACKET_DATA_SIZE; +use solana_runtime::bloom::Bloom; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; @@ -156,7 +157,7 @@ impl Signable for PruneData { #[allow(clippy::large_enum_variant)] enum Protocol { /// Gossip protocol messages - PullRequest(CrdsFilter, CrdsValue), + PullRequest(Bloom, CrdsValue), PullResponse(Pubkey, Vec), PushMessage(Pubkey, Vec), PruneMessage(Pubkey, PruneData), @@ -831,7 +832,7 @@ impl ClusterInfo { } } // If the network entrypoint hasn't been discovered yet, add it to the crds table - fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) { + fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, Bloom, SocketAddr, CrdsValue)>) { match &self.entrypoint { Some(entrypoint) => { let self_info = self @@ -840,13 +841,12 @@ impl ClusterInfo { .lookup(&CrdsValueLabel::ContactInfo(self.id())) .unwrap_or_else(|| panic!("self_id invalid {}", self.id())); - self.gossip - .pull - .build_crds_filters(&self.gossip.crds, Self::max_bloom_size()) - .into_iter() - .for_each(|filter| { - pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone())) - }) + pulls.push(( + entrypoint.id, + self.gossip.pull.build_crds_filter(&self.gossip.crds), + entrypoint.gossip, + self_info.clone(), + )) } None => (), } @@ -875,45 +875,30 @@ impl ClusterInfo { messages } - // computes the maximum size for pull request blooms - pub fn max_bloom_size() -> usize { - let filter_size = serialized_size(&CrdsFilter::default()) - .expect("unable to serialize default filter") as usize; - let protocol = Protocol::PullRequest( - CrdsFilter::default(), - CrdsValue::ContactInfo(ContactInfo::default()), - ); - let protocol_size = - serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize; - PACKET_DATA_SIZE - (protocol_size - filter_size) - } - fn new_pull_requests(&mut self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); - let mut pulls: Vec<_> = self + let pulls: Vec<_> = self .gossip - .new_pull_request(now, stakes, Self::max_bloom_size()) + .new_pull_request(now, stakes) .ok() .into_iter() - .filter_map(|(peer, filters, me)| { + .collect(); + + let mut pr: Vec<_> = pulls + .into_iter() + .filter_map(|(peer, filter, self_info)| { let peer_label = CrdsValueLabel::ContactInfo(peer); self.gossip .crds .lookup(&peer_label) .and_then(CrdsValue::contact_info) - .map(move |peer_info| { - filters - .into_iter() - .map(move |f| (peer, f, peer_info.gossip, me.clone())) - }) + .map(|peer_info| (peer, filter, peer_info.gossip, self_info)) }) - .flatten() .collect(); - if pulls.is_empty() { - self.add_entrypoint(&mut pulls); + if pr.is_empty() { + self.add_entrypoint(&mut pr); } - pulls - .into_iter() + pr.into_iter() .map(|(peer, filter, gossip, self_info)| { self.gossip.mark_pull_request_creation_time(&peer, now); (gossip, Protocol::PullRequest(filter, self_info)) @@ -1108,7 +1093,7 @@ impl ClusterInfo { fn handle_pull_request( me: &Arc>, - filter: CrdsFilter, + filter: Bloom, caller: CrdsValue, from_addr: &SocketAddr, ) -> Vec { @@ -2078,7 +2063,7 @@ mod tests { let (_, _, val) = cluster_info .gossip - .new_pull_request(timestamp(), &HashMap::new(), ClusterInfo::max_bloom_size()) + .new_pull_request(timestamp(), &HashMap::new()) .ok() .unwrap(); assert!(val.verify()); @@ -2266,7 +2251,7 @@ mod tests { let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); let pulls = cluster_info.new_pull_requests(&HashMap::new()); - assert_eq!(1, pulls.len() as u64); + assert_eq!(1, pulls.len()); match pulls.get(0) { Some((addr, msg)) => { assert_eq!(*addr, entrypoint.gossip); @@ -2293,7 +2278,7 @@ mod tests { .write() .unwrap() .new_pull_requests(&HashMap::new()); - assert_eq!(1, pulls.len() as u64); + assert_eq!(1, pulls.len()); assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint)); } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 88feab7af7..4617b6fcbd 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -5,15 +5,17 @@ use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds_gossip_error::CrdsGossipError; -use crate::crds_gossip_pull::{CrdsFilter, CrdsGossipPull}; +use crate::crds_gossip_pull::CrdsGossipPull; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; use crate::crds_value::{CrdsValue, CrdsValueLabel}; +use solana_runtime::bloom::Bloom; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signable; use std::collections::{HashMap, HashSet}; ///The min size for bloom filters -pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500; +pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; #[derive(Clone)] pub struct CrdsGossip { @@ -131,10 +133,9 @@ impl CrdsGossip { &self, now: u64, stakes: &HashMap, - bloom_size: usize, - ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { + ) -> Result<(Pubkey, Bloom, CrdsValue), CrdsGossipError> { self.pull - .new_pull_request(&self.crds, &self.id, now, stakes, bloom_size) + .new_pull_request(&self.crds, &self.id, now, stakes) } /// time when a request to `from` was initiated @@ -148,7 +149,7 @@ impl CrdsGossip { pub fn process_pull_request( &mut self, caller: CrdsValue, - filter: CrdsFilter, + filter: Bloom, now: u64, ) -> Vec { self.pull diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index aeab9b0c5f..94d4b6203a 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -11,12 +11,13 @@ use crate::contact_info::ContactInfo; use crate::crds::Crds; -use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; +use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; +use crate::packet::BLOB_DATA_SIZE; +use bincode::serialized_size; use rand; use rand::distributions::{Distribution, WeightedIndex}; -use rand::Rng; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; @@ -25,78 +26,6 @@ use std::collections::HashMap; use std::collections::VecDeque; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; -pub const KEYS: f64 = 8f64; -pub const FALSE_RATE: f64 = 0.01f64; - -#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)] -pub struct CrdsFilter { - pub filter: Bloom, - mask: u64, -} - -impl CrdsFilter { - pub fn new_rand(num_items: usize, max_bytes: usize) -> Self { - let max_bits = (max_bytes * 8) as f64; - let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS); - let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize); - let mask_bits = Self::mask_bits(num_items as f64, max_items as f64); - let seed: u64 = rand::thread_rng().gen_range(0, 2u64.pow(mask_bits)); - let mask = Self::compute_mask(seed, mask_bits); - CrdsFilter { filter, mask } - } - // generates a vec of filters that together hold a complete set of Hashes - pub fn new_complete_set(num_items: usize, max_bytes: usize) -> Vec { - let max_bits = (max_bytes * 8) as f64; - let max_items = Self::max_items(max_bits, FALSE_RATE, KEYS); - let mask_bits = Self::mask_bits(num_items as f64, max_items as f64); - // for each possible mask combination, generate a new filter. - let mut filters = vec![]; - for seed in 0..2u64.pow(mask_bits) { - let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize); - let mask = Self::compute_mask(seed, mask_bits); - filters.push(CrdsFilter { filter, mask }) - } - filters - } - fn compute_mask(seed: u64, mask_bits: u32) -> u64 { - assert!(seed <= 2u64.pow(mask_bits)); - let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0); - seed | (!0u64).checked_shr(mask_bits).unwrap_or(!0x0) as u64 - } - pub fn max_items(max_bits: f64, false_rate: f64, num_keys: f64) -> f64 { - let m = max_bits; - let p = false_rate; - let k = num_keys; - (m / (-k / (1f64 - (p.ln() / k).exp()).ln())).ceil() - } - fn mask_bits(num_items: f64, max_items: f64) -> u32 { - // for small ratios this can result in a negative number, ensure it returns 0 instead - ((num_items / max_items).log2().ceil()).max(0.0) as u32 - } - fn hash_as_u64(item: &Hash) -> u64 { - let arr = item.as_ref(); - let mut accum = 0; - for (i, val) in arr.iter().enumerate().take(8) { - accum |= (u64::from(*val)) << (i * 8) as u64; - } - accum - } - pub fn test_mask(&self, item: &Hash) -> bool { - let bits = Self::hash_as_u64(item); - (bits & self.mask) == bits - } - pub fn add(&mut self, item: &Hash) { - if self.test_mask(item) { - self.filter.add(item); - } - } - pub fn contains(&self, item: &Hash) -> bool { - if !self.test_mask(item) { - return true; - } - self.filter.contains(item) - } -} #[derive(Clone)] pub struct CrdsGossipPull { @@ -104,6 +33,8 @@ pub struct CrdsGossipPull { pub pull_request_time: HashMap, /// hash and insert time purged_values: VecDeque<(Hash, u64)>, + /// max bytes per message + pub max_bytes: usize, pub crds_timeout: u64, } @@ -112,6 +43,7 @@ impl Default for CrdsGossipPull { Self { purged_values: VecDeque::new(), pull_request_time: HashMap::new(), + max_bytes: BLOB_DATA_SIZE, crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, } } @@ -124,19 +56,18 @@ impl CrdsGossipPull { self_id: &Pubkey, now: u64, stakes: &HashMap, - bloom_size: usize, - ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { + ) -> Result<(Pubkey, Bloom, CrdsValue), CrdsGossipError> { let options = self.pull_options(crds, &self_id, now, stakes); if options.is_empty() { return Err(CrdsGossipError::NoPeers); } - let filters = self.build_crds_filters(crds, bloom_size); + let filter = self.build_crds_filter(crds); let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap(); let random = index.sample(&mut rand::thread_rng()); let self_info = crds .lookup(&CrdsValueLabel::ContactInfo(*self_id)) .unwrap_or_else(|| panic!("self_id invalid {}", self_id)); - Ok((options[random].1.id, filters, self_info.clone())) + Ok((options[random].1.id, filter, self_info.clone())) } fn pull_options<'a>( @@ -179,10 +110,10 @@ impl CrdsGossipPull { &mut self, crds: &mut Crds, caller: CrdsValue, - filter: CrdsFilter, + mut filter: Bloom, now: u64, ) -> Vec { - let rv = self.filter_crds_values(crds, &filter); + let rv = self.filter_crds_values(crds, &mut filter); let key = caller.label().pubkey(); let old = crds.insert(caller, now); if let Some(val) = old.ok().and_then(|opt| opt) { @@ -216,31 +147,33 @@ impl CrdsGossipPull { crds.update_record_timestamp(from, now); failed } - // build a set of filters of the current crds table - // num_filters - used to increase the likely hood of a value in crds being added to some filter - pub fn build_crds_filters(&self, crds: &Crds, bloom_size: usize) -> Vec { + /// build a filter of the current crds table + pub fn build_crds_filter(&self, crds: &Crds) -> Bloom { let num = cmp::max( - CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, + CRDS_GOSSIP_BLOOM_SIZE, crds.table.values().count() + self.purged_values.len(), ); - let mut filters = CrdsFilter::new_complete_set(num, bloom_size); + let mut bloom = Bloom::random(num, 0.1, 4 * 1024 * 8 - 1); for v in crds.table.values() { - filters - .iter_mut() - .for_each(|filter| filter.add(&v.value_hash)); + bloom.add(&v.value_hash); } for (value_hash, _insert_timestamp) in &self.purged_values { - filters.iter_mut().for_each(|filter| filter.add(value_hash)); + bloom.add(value_hash); } - filters + bloom } /// filter values that fail the bloom filter up to max_bytes - fn filter_crds_values(&self, crds: &Crds, filter: &CrdsFilter) -> Vec { + fn filter_crds_values(&self, crds: &Crds, filter: &mut Bloom) -> Vec { + let mut max_bytes = self.max_bytes as isize; let mut ret = vec![]; for v in crds.table.values() { if filter.contains(&v.value_hash) { continue; } + max_bytes -= serialized_size(&v.value).unwrap() as isize; + if max_bytes < 0 { + break; + } ret.push(v.value.clone()); } ret @@ -276,9 +209,6 @@ impl CrdsGossipPull { mod test { use super::*; use crate::contact_info::ContactInfo; - use itertools::Itertools; - use solana_sdk::hash::hash; - use solana_sdk::packet::PACKET_DATA_SIZE; #[test] fn test_new_pull_with_stakes() { @@ -311,19 +241,19 @@ mod test { let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); assert_eq!( - node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request(&crds, &id, 0, &HashMap::new()), Err(CrdsGossipError::NoPeers) ); crds.insert(entry.clone(), 0).unwrap(); assert_eq!( - node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request(&crds, &id, 0, &HashMap::new()), Err(CrdsGossipError::NoPeers) ); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE); + let req = node.new_pull_request(&crds, &id, 0, &HashMap::new()); let (to, _, self_info) = req.unwrap(); assert_eq!(to, new.label().pubkey()); assert_eq!(self_info, entry); @@ -346,13 +276,7 @@ mod test { // odds of getting the other request should be 1 in u64::max_value() for _ in 0..10 { - let req = node.new_pull_request( - &crds, - &node_pubkey, - u64::max_value(), - &HashMap::new(), - PACKET_DATA_SIZE, - ); + let req = node.new_pull_request(&crds, &node_pubkey, u64::max_value(), &HashMap::new()); let (to, _, self_info) = req.unwrap(); assert_eq!(to, old.label().pubkey()); assert_eq!(self_info, entry); @@ -368,21 +292,13 @@ mod test { node_crds.insert(entry.clone(), 0).unwrap(); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); node_crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request( - &node_crds, - &node_pubkey, - 0, - &HashMap::new(), - PACKET_DATA_SIZE, - ); + let req = node.new_pull_request(&node_crds, &node_pubkey, 0, &HashMap::new()); let mut dest_crds = Crds::default(); let mut dest = CrdsGossipPull::default(); - let (_, filters, caller) = req.unwrap(); - for filter in filters.into_iter() { - let rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 1); - assert!(rsp.is_empty()); - } + let (_, filter, caller) = req.unwrap(); + let rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 1); + assert!(rsp.is_empty()); assert!(dest_crds.lookup(&caller.label()).is_some()); assert_eq!( dest_crds @@ -431,27 +347,15 @@ mod test { let mut done = false; for _ in 0..30 { // there is a chance of a false positive with bloom filters - let req = node.new_pull_request( - &node_crds, - &node_pubkey, - 0, - &HashMap::new(), - PACKET_DATA_SIZE, - ); - let (_, filters, caller) = req.unwrap(); - let mut rsp = vec![]; - for filter in filters { - rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 0); - // if there is a false positive this is empty - // prob should be around 0.1 per iteration - if rsp.is_empty() { - continue; - } - } - + let req = node.new_pull_request(&node_crds, &node_pubkey, 0, &HashMap::new()); + let (_, filter, caller) = req.unwrap(); + let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0); + // if there is a false positive this is empty + // prob should be around 0.1 per iteration if rsp.is_empty() { continue; } + assert_eq!(rsp.len(), 1); let failed = node.process_pull_response(&mut node_crds, &node_pubkey, rsp, 1); assert_eq!(failed, 0); @@ -502,72 +406,12 @@ mod test { // there is a chance of a false positive with bloom filters // assert that purged value is still in the set // chance of 30 consecutive false positives is 0.1^30 - let filters = node.build_crds_filters(&node_crds, PACKET_DATA_SIZE); - assert!(filters.iter().any(|filter| filter.contains(&value_hash))); + let filter = node.build_crds_filter(&node_crds); + assert!(filter.contains(&value_hash)); } // purge the value node.purge_purged(1); assert_eq!(node.purged_values.len(), 0); } - #[test] - fn test_crds_filter_mask() { - let filter = CrdsFilter::new_rand(1, 128); - assert_eq!(filter.mask, !0x0); - assert_eq!(CrdsFilter::max_items(80f64, 0.01, 8f64), 9f64); - //1000/9 = 111, so 7 bits are needed to mask it - assert_eq!(CrdsFilter::mask_bits(1000f64, 9f64), 7u32); - let filter = CrdsFilter::new_rand(1000, 10); - assert_eq!(filter.mask & 0x00ffffffff, 0x00ffffffff); - } - #[test] - fn test_crds_filter_add_no_mask() { - let mut filter = CrdsFilter::new_rand(1, 128); - let h: Hash = hash(Hash::default().as_ref()); - assert!(!filter.contains(&h)); - filter.add(&h); - assert!(filter.contains(&h)); - let h: Hash = hash(h.as_ref()); - assert!(!filter.contains(&h)); - } - #[test] - fn test_crds_filter_add_mask() { - let mut filter = CrdsFilter::new_rand(1000, 10); - let mut h: Hash = Hash::default(); - while !filter.test_mask(&h) { - h = hash(h.as_ref()); - } - assert!(filter.test_mask(&h)); - //if the mask succeeds, we want the guaranteed negative - assert!(!filter.contains(&h)); - filter.add(&h); - assert!(filter.contains(&h)); - } - #[test] - fn test_crds_filter_contains_mask() { - let filter = CrdsFilter::new_rand(1000, 10); - let mut h: Hash = Hash::default(); - while filter.test_mask(&h) { - h = hash(h.as_ref()); - } - assert!(!filter.test_mask(&h)); - //if the mask fails, the hash is contained in the set, and can be treated as a false - //positive - assert!(filter.contains(&h)); - } - #[test] - fn test_mask() { - for i in 0..16 { - run_test_mask(i); - } - } - - fn run_test_mask(mask_bits: u32) { - let masks: Vec<_> = (0..2u64.pow(mask_bits)) - .into_iter() - .map(|seed| CrdsFilter::compute_mask(seed, mask_bits)) - .dedup() - .collect(); - assert_eq!(masks.len(), 2u64.pow(mask_bits) as usize) - } } diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 90a0866db0..8d61c4694a 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -10,7 +10,7 @@ use crate::contact_info::ContactInfo; use crate::crds::{Crds, VersionedCrdsValue}; -use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; +use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; @@ -258,7 +258,7 @@ impl CrdsGossipPush { if new_items.get(&item.id).is_some() { continue; } - let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size); + let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); bloom.add(&item.id); new_items.insert(item.id, bloom); diff --git a/core/src/packet.rs b/core/src/packet.rs index 9b1f30b48a..1eeff91076 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -335,9 +335,7 @@ pub fn to_blob(resp: T, rsp_addr: SocketAddr) -> Result { let mut b = Blob::default(); let v = bincode::serialize(&resp)?; let len = v.len(); - if len > BLOB_SIZE { - return Err(Error::ToBlobError); - } + assert!(len <= BLOB_SIZE); b.data[..len].copy_from_slice(&v); b.meta.size = len; b.meta.set_addr(&rsp_addr); diff --git a/core/src/result.rs b/core/src/result.rs index 6b1615da43..aab1e890e7 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -31,7 +31,6 @@ pub enum Error { PohRecorderError(poh_recorder::PohRecorderError), BlocktreeError(blocktree::BlocktreeError), FsExtra(fs_extra::error::Error), - ToBlobError, } pub type Result = std::result::Result; diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 55da346e22..adc398f6cd 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -1,7 +1,6 @@ use bincode::serialized_size; use log::*; use rayon::prelude::*; -use solana::cluster_info::ClusterInfo; use solana::contact_info::ContactInfo; use solana::crds_gossip::*; use solana::crds_gossip_error::CrdsGossipError; @@ -381,36 +380,27 @@ fn network_run_pull( .filter_map(|from| { from.lock() .unwrap() - .new_pull_request(now, &HashMap::new(), ClusterInfo::max_bloom_size()) + .new_pull_request(now, &HashMap::new()) .ok() }) .collect() }; let transfered: Vec<_> = requests .into_par_iter() - .map(|(to, filters, caller_info)| { + .map(|(to, request, caller_info)| { let mut bytes: usize = 0; let mut msgs: usize = 0; let mut overhead: usize = 0; let from = caller_info.label().pubkey(); - bytes += filters.iter().map(|f| f.filter.keys.len()).sum::(); - bytes += filters - .iter() - .map(|f| f.filter.bits.len() as usize / 8) - .sum::(); + bytes += request.keys.len(); + bytes += (request.bits.len() / 8) as usize; bytes += serialized_size(&caller_info).unwrap() as usize; let rsp = network .get(&to) .map(|node| { - let mut rsp = vec![]; - for filter in filters { - rsp.append(&mut node.lock().unwrap().process_pull_request( - caller_info.clone(), - filter, - now, - )); - } - rsp + node.lock() + .unwrap() + .process_pull_request(caller_info, request, now) }) .unwrap(); bytes += serialized_size(&rsp).unwrap() as usize; diff --git a/runtime/src/bloom.rs b/runtime/src/bloom.rs index 80dab4e19b..ede157eb6e 100644 --- a/runtime/src/bloom.rs +++ b/runtime/src/bloom.rs @@ -31,27 +31,19 @@ impl Bloom { _phantom: PhantomData::default(), } } - /// create filter optimal for num size given the `FALSE_RATE` + /// create filter optimal for num size given the `false_rate` /// the keys are randomized for picking data out of a collision resistant hash of size /// `keysize` bytes /// https://hur.st/bloomfilter/ - pub fn random(num_items: usize, false_rate: f64, max_bits: usize) -> Self { - let m = Self::num_bits(num_items as f64, false_rate); - let num_bits = cmp::max(1, cmp::min(m as usize, max_bits)); - let num_keys = Self::num_keys(num_bits as f64, num_items as f64) as usize; + pub fn random(num: usize, false_rate: f64, max_bits: usize) -> Self { + let min_num_bits = ((num as f64 * false_rate.log(2f64)) + / (1f64 / 2f64.powf(2f64.log(2f64))).log(2f64)) + .ceil() as usize; + let num_bits = cmp::max(1, cmp::min(min_num_bits, max_bits)); + let num_keys = ((num_bits as f64 / num as f64) * 2f64.log(2f64)).round() as usize; let keys: Vec = (0..num_keys).map(|_| rand::thread_rng().gen()).collect(); Self::new(num_bits, keys) } - pub fn num_bits(num_items: f64, false_rate: f64) -> f64 { - let n = num_items; - let p = false_rate; - ((n * p.ln()) / (1f64 / 2f64.powf(2f64.ln())).ln()).ceil() - } - pub fn num_keys(num_bits: f64, num_items: f64) -> f64 { - let n = num_items; - let m = num_bits; - ((m / n) * 2f64.ln()).round() - } fn pos(&self, key: &T, k: u64) -> u64 { key.hash_at_index(k) % self.bits.len() } @@ -106,7 +98,7 @@ mod test { //normal let bloom: Bloom = Bloom::random(10, 0.1, 100); assert_eq!(bloom.keys.len(), 3); - assert_eq!(bloom.bits.len(), 48); + assert_eq!(bloom.bits.len(), 34); //saturated let bloom: Bloom = Bloom::random(100, 0.1, 100); @@ -137,22 +129,4 @@ mod test { b2.keys.sort(); assert_ne!(b1.keys, b2.keys); } - // Bloom filter math in python - // n number of items - // p false rate - // m number of bits - // k number of keys - // - // n = ceil(m / (-k / log(1 - exp(log(p) / k)))) - // p = pow(1 - exp(-k / (m / n)), k) - // m = ceil((n * log(p)) / log(1 / pow(2, log(2)))); - // k = round((m / n) * log(2)); - #[test] - fn test_filter_math() { - assert_eq!(Bloom::::num_bits(100f64, 0.1f64) as u64, 480u64); - assert_eq!(Bloom::::num_bits(100f64, 0.01f64) as u64, 959u64); - assert_eq!(Bloom::::num_keys(1000f64, 50f64) as u64, 14u64); - assert_eq!(Bloom::::num_keys(2000f64, 50f64) as u64, 28u64); - assert_eq!(Bloom::::num_keys(2000f64, 25f64) as u64, 55u64); - } }