diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index da1c49ee4f..91de03c797 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -17,7 +17,7 @@ use crate::blocktree::Blocktree; use crate::contact_info::ContactInfo; use crate::crds_gossip::CrdsGossip; use crate::crds_gossip_error::CrdsGossipError; -use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; +use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}; use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::repair_service::RepairType; @@ -25,7 +25,7 @@ use crate::result::Result; use crate::staking_utils; use crate::streamer::{BlobReceiver, BlobSender}; use crate::weighted_shuffle::weighted_shuffle; -use bincode::{deserialize, serialize}; +use bincode::{deserialize, serialize, serialized_size}; use core::cmp; use itertools::Itertools; use rand::SeedableRng; @@ -36,8 +36,7 @@ use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_err use solana_netutil::{ bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range, PortRange, }; -use solana_runtime::bloom::Bloom; -use solana_sdk::hash::Hash; +use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; @@ -63,8 +62,8 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100; /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; -/// Allow protocol messages to carry only 1KB of data a time -const TARGET_PROTOCOL_PAYLOAD_SIZE: u64 = 1024; +/// The maximum size of a protocol payload +const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -157,7 +156,7 @@ impl Signable for PruneData { #[allow(clippy::large_enum_variant)] enum Protocol { /// Gossip protocol messages - PullRequest(Bloom, CrdsValue), + PullRequest(CrdsFilter, CrdsValue), PullResponse(Pubkey, Vec), PushMessage(Pubkey, Vec), PruneMessage(Pubkey, PruneData), @@ -832,7 +831,7 @@ impl ClusterInfo { } } // If the network entrypoint hasn't been discovered yet, add it to the crds table - fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, Bloom, SocketAddr, CrdsValue)>) { + fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) { match &self.entrypoint { Some(entrypoint) => { let self_info = self @@ -841,12 +840,13 @@ impl ClusterInfo { .lookup(&CrdsValueLabel::ContactInfo(self.id())) .unwrap_or_else(|| panic!("self_id invalid {}", self.id())); - pulls.push(( - entrypoint.id, - self.gossip.pull.build_crds_filter(&self.gossip.crds), - entrypoint.gossip, - self_info.clone(), - )) + self.gossip + .pull + .build_crds_filters(&self.gossip.crds, Self::max_bloom_size()) + .into_iter() + .for_each(|filter| { + pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone())) + }) } None => (), } @@ -862,43 +862,65 @@ impl ClusterInfo { let mut size = 0; let mut payload = vec![]; while let Some(msg) = msgs.pop() { - // always put at least one msg. The PROTOCOL_PAYLOAD_SIZE is not a hard limit let msg_size = msg.size(); - size += msg_size; - payload.push(msg); - if size > TARGET_PROTOCOL_PAYLOAD_SIZE { + if size + msg_size > MAX_PROTOCOL_PAYLOAD_SIZE as u64 { + if msg_size < MAX_PROTOCOL_PAYLOAD_SIZE as u64 { + msgs.push(msg); + } else { + warn!( + "dropping message larger than the maximum payload size {:?}", + msg + ); + } break; } + size += msg_size; + payload.push(msg); } messages.push(payload); } messages } + // computes the maximum size for pull request blooms + pub fn max_bloom_size() -> usize { + let filter_size = serialized_size(&CrdsFilter::default()) + .expect("unable to serialize default filter") as usize; + let protocol = Protocol::PullRequest( + CrdsFilter::default(), + CrdsValue::ContactInfo(ContactInfo::default()), + ); + let protocol_size = + serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize; + PACKET_DATA_SIZE - (protocol_size - filter_size) + } + fn new_pull_requests(&mut self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); - let pulls: Vec<_> = self + let mut pulls: Vec<_> = self .gossip - .new_pull_request(now, stakes) + .new_pull_request(now, stakes, Self::max_bloom_size()) .ok() .into_iter() - .collect(); - - let mut pr: Vec<_> = pulls - .into_iter() - .filter_map(|(peer, filter, self_info)| { + .filter_map(|(peer, filters, me)| { let peer_label = CrdsValueLabel::ContactInfo(peer); self.gossip .crds .lookup(&peer_label) .and_then(CrdsValue::contact_info) - .map(|peer_info| (peer, filter, peer_info.gossip, self_info)) + .map(move |peer_info| { + filters + .into_iter() + .map(move |f| (peer, f, peer_info.gossip, me.clone())) + }) }) + .flatten() .collect(); - if pr.is_empty() { - self.add_entrypoint(&mut pr); + if pulls.is_empty() { + self.add_entrypoint(&mut pulls); } - pr.into_iter() + pulls + .into_iter() .map(|(peer, filter, gossip, self_info)| { self.gossip.mark_pull_request_creation_time(&peer, now); (gossip, Protocol::PullRequest(filter, self_info)) @@ -1093,7 +1115,7 @@ impl ClusterInfo { fn handle_pull_request( me: &Arc>, - filter: Bloom, + filter: CrdsFilter, caller: CrdsValue, from_addr: &SocketAddr, ) -> Vec { @@ -2063,7 +2085,7 @@ mod tests { let (_, _, val) = cluster_info .gossip - .new_pull_request(timestamp(), &HashMap::new()) + .new_pull_request(timestamp(), &HashMap::new(), ClusterInfo::max_bloom_size()) .ok() .unwrap(); assert!(val.verify()); @@ -2251,7 +2273,7 @@ mod tests { let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); let pulls = cluster_info.new_pull_requests(&HashMap::new()); - assert_eq!(1, pulls.len()); + assert_eq!(1, pulls.len() as u64); match pulls.get(0) { Some((addr, msg)) => { assert_eq!(*addr, entrypoint.gossip); @@ -2278,7 +2300,7 @@ mod tests { .write() .unwrap() .new_pull_requests(&HashMap::new()); - assert_eq!(1, pulls.len()); + assert_eq!(1, pulls.len() as u64); assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint)); } @@ -2307,7 +2329,7 @@ mod tests { fn test_split_messages(value: CrdsValue) { const NUM_VALUES: usize = 30; let value_size = value.size(); - let expected_len = NUM_VALUES / (TARGET_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize; + let expected_len = NUM_VALUES / (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize; let msgs = vec![value; NUM_VALUES]; let split = ClusterInfo::split_gossip_messages(msgs); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 4617b6fcbd..88feab7af7 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -5,17 +5,15 @@ use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds_gossip_error::CrdsGossipError; -use crate::crds_gossip_pull::CrdsGossipPull; +use crate::crds_gossip_pull::{CrdsFilter, CrdsGossipPull}; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; use crate::crds_value::{CrdsValue, CrdsValueLabel}; -use solana_runtime::bloom::Bloom; -use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signable; use std::collections::{HashMap, HashSet}; ///The min size for bloom filters -pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; +pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500; #[derive(Clone)] pub struct CrdsGossip { @@ -133,9 +131,10 @@ impl CrdsGossip { &self, now: u64, stakes: &HashMap, - ) -> Result<(Pubkey, Bloom, CrdsValue), CrdsGossipError> { + bloom_size: usize, + ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { self.pull - .new_pull_request(&self.crds, &self.id, now, stakes) + .new_pull_request(&self.crds, &self.id, now, stakes, bloom_size) } /// time when a request to `from` was initiated @@ -149,7 +148,7 @@ impl CrdsGossip { pub fn process_pull_request( &mut self, caller: CrdsValue, - filter: Bloom, + filter: CrdsFilter, now: u64, ) -> Vec { self.pull diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 94d4b6203a..65073cce2c 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -11,13 +11,12 @@ use crate::contact_info::ContactInfo; use crate::crds::Crds; -use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE}; +use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; -use crate::packet::BLOB_DATA_SIZE; -use bincode::serialized_size; use rand; use rand::distributions::{Distribution, WeightedIndex}; +use rand::Rng; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; @@ -26,6 +25,94 @@ use std::collections::HashMap; use std::collections::VecDeque; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; +pub const FALSE_RATE: f64 = 0.1f64; + +#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)] +pub struct CrdsFilter { + pub filter: Bloom, + mask: u64, + mask_bits: u32, +} + +impl CrdsFilter { + pub fn new_rand(num_items: usize, max_bytes: usize) -> Self { + let max_bits = (max_bytes * 8) as f64; + let num_keys = Bloom::::num_keys(max_bits, num_items as f64); + let max_items = Self::max_items(max_bits, FALSE_RATE, num_keys); + let mask_bits = Self::mask_bits(num_items as f64, max_items as f64); + let keys = (0..num_keys as u64) + .map(|_| rand::thread_rng().gen()) + .collect(); + let filter = Bloom::new(max_bits as usize, keys); + let seed: u64 = rand::thread_rng().gen_range(0, 2u64.pow(mask_bits)); + let mask = Self::compute_mask(seed, mask_bits); + CrdsFilter { + filter, + mask, + mask_bits, + } + } + // generates a vec of filters that together hold a complete set of Hashes + pub fn new_complete_set(num_items: usize, max_bytes: usize) -> Vec { + let max_bits = (max_bytes * 8) as f64; + let num_keys = Bloom::::num_keys(max_bits, num_items as f64); + let max_items = Self::max_items(max_bits, FALSE_RATE, num_keys); + let mask_bits = Self::mask_bits(num_items as f64, max_items as f64); + // for each possible mask combination, generate a new filter. + let mut filters = vec![]; + for seed in 0..2u64.pow(mask_bits) { + let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize); + let mask = Self::compute_mask(seed, mask_bits); + let filter = CrdsFilter { + filter, + mask, + mask_bits, + }; + filters.push(filter) + } + filters + } + fn compute_mask(seed: u64, mask_bits: u32) -> u64 { + assert!(seed <= 2u64.pow(mask_bits)); + let seed: u64 = seed.checked_shl(64 - mask_bits).unwrap_or(0x0); + seed | (!0u64).checked_shr(mask_bits).unwrap_or(!0x0) as u64 + } + pub fn max_items(max_bits: f64, false_rate: f64, num_keys: f64) -> f64 { + let m = max_bits; + let p = false_rate; + let k = num_keys; + (m / (-k / (1f64 - (p.ln() / k).exp()).ln())).ceil() + } + fn mask_bits(num_items: f64, max_items: f64) -> u32 { + // for small ratios this can result in a negative number, ensure it returns 0 instead + ((num_items / max_items).log2().ceil()).max(0.0) as u32 + } + fn hash_as_u64(item: &Hash) -> u64 { + let arr = item.as_ref(); + let mut accum = 0; + for (i, val) in arr.iter().enumerate().take(8) { + accum |= (u64::from(*val)) << (i * 8) as u64; + } + accum + } + pub fn test_mask(&self, item: &Hash) -> bool { + // only consider the highest mask_bits bits from the hash and set the rest to 1. + let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64); + let bits = Self::hash_as_u64(item) | ones; + bits == self.mask + } + pub fn add(&mut self, item: &Hash) { + if self.test_mask(item) { + self.filter.add(item); + } + } + pub fn contains(&self, item: &Hash) -> bool { + if !self.test_mask(item) { + return true; + } + self.filter.contains(item) + } +} #[derive(Clone)] pub struct CrdsGossipPull { @@ -33,8 +120,6 @@ pub struct CrdsGossipPull { pub pull_request_time: HashMap, /// hash and insert time purged_values: VecDeque<(Hash, u64)>, - /// max bytes per message - pub max_bytes: usize, pub crds_timeout: u64, } @@ -43,7 +128,6 @@ impl Default for CrdsGossipPull { Self { purged_values: VecDeque::new(), pull_request_time: HashMap::new(), - max_bytes: BLOB_DATA_SIZE, crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, } } @@ -56,18 +140,19 @@ impl CrdsGossipPull { self_id: &Pubkey, now: u64, stakes: &HashMap, - ) -> Result<(Pubkey, Bloom, CrdsValue), CrdsGossipError> { + bloom_size: usize, + ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { let options = self.pull_options(crds, &self_id, now, stakes); if options.is_empty() { return Err(CrdsGossipError::NoPeers); } - let filter = self.build_crds_filter(crds); + let filters = self.build_crds_filters(crds, bloom_size); let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap(); let random = index.sample(&mut rand::thread_rng()); let self_info = crds .lookup(&CrdsValueLabel::ContactInfo(*self_id)) .unwrap_or_else(|| panic!("self_id invalid {}", self_id)); - Ok((options[random].1.id, filter, self_info.clone())) + Ok((options[random].1.id, filters, self_info.clone())) } fn pull_options<'a>( @@ -110,15 +195,15 @@ impl CrdsGossipPull { &mut self, crds: &mut Crds, caller: CrdsValue, - mut filter: Bloom, + filter: CrdsFilter, now: u64, ) -> Vec { - let rv = self.filter_crds_values(crds, &mut filter); + let rv = self.filter_crds_values(crds, &filter); let key = caller.label().pubkey(); let old = crds.insert(caller, now); if let Some(val) = old.ok().and_then(|opt| opt) { self.purged_values - .push_back((val.value_hash, val.local_timestamp)) + .push_back((val.value_hash, val.local_timestamp)); } crds.update_record_timestamp(&key, now); rv @@ -147,33 +232,31 @@ impl CrdsGossipPull { crds.update_record_timestamp(from, now); failed } - /// build a filter of the current crds table - pub fn build_crds_filter(&self, crds: &Crds) -> Bloom { + // build a set of filters of the current crds table + // num_filters - used to increase the likely hood of a value in crds being added to some filter + pub fn build_crds_filters(&self, crds: &Crds, bloom_size: usize) -> Vec { let num = cmp::max( - CRDS_GOSSIP_BLOOM_SIZE, + CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, crds.table.values().count() + self.purged_values.len(), ); - let mut bloom = Bloom::random(num, 0.1, 4 * 1024 * 8 - 1); + let mut filters = CrdsFilter::new_complete_set(num, bloom_size); for v in crds.table.values() { - bloom.add(&v.value_hash); + filters + .iter_mut() + .for_each(|filter| filter.add(&v.value_hash)); } for (value_hash, _insert_timestamp) in &self.purged_values { - bloom.add(value_hash); + filters.iter_mut().for_each(|filter| filter.add(value_hash)); } - bloom + filters } /// filter values that fail the bloom filter up to max_bytes - fn filter_crds_values(&self, crds: &Crds, filter: &mut Bloom) -> Vec { - let mut max_bytes = self.max_bytes as isize; + fn filter_crds_values(&self, crds: &Crds, filter: &CrdsFilter) -> Vec { let mut ret = vec![]; for v in crds.table.values() { if filter.contains(&v.value_hash) { continue; } - max_bytes -= serialized_size(&v.value).unwrap() as isize; - if max_bytes < 0 { - break; - } ret.push(v.value.clone()); } ret @@ -209,6 +292,9 @@ impl CrdsGossipPull { mod test { use super::*; use crate::contact_info::ContactInfo; + use itertools::Itertools; + use solana_sdk::hash::hash; + use solana_sdk::packet::PACKET_DATA_SIZE; #[test] fn test_new_pull_with_stakes() { @@ -241,19 +327,19 @@ mod test { let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); assert_eq!( - node.new_pull_request(&crds, &id, 0, &HashMap::new()), + node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE), Err(CrdsGossipError::NoPeers) ); crds.insert(entry.clone(), 0).unwrap(); assert_eq!( - node.new_pull_request(&crds, &id, 0, &HashMap::new()), + node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE), Err(CrdsGossipError::NoPeers) ); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&crds, &id, 0, &HashMap::new()); + let req = node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE); let (to, _, self_info) = req.unwrap(); assert_eq!(to, new.label().pubkey()); assert_eq!(self_info, entry); @@ -276,7 +362,13 @@ mod test { // odds of getting the other request should be 1 in u64::max_value() for _ in 0..10 { - let req = node.new_pull_request(&crds, &node_pubkey, u64::max_value(), &HashMap::new()); + let req = node.new_pull_request( + &crds, + &node_pubkey, + u64::max_value(), + &HashMap::new(), + PACKET_DATA_SIZE, + ); let (to, _, self_info) = req.unwrap(); assert_eq!(to, old.label().pubkey()); assert_eq!(self_info, entry); @@ -292,13 +384,21 @@ mod test { node_crds.insert(entry.clone(), 0).unwrap(); let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); node_crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&node_crds, &node_pubkey, 0, &HashMap::new()); + let req = node.new_pull_request( + &node_crds, + &node_pubkey, + 0, + &HashMap::new(), + PACKET_DATA_SIZE, + ); let mut dest_crds = Crds::default(); let mut dest = CrdsGossipPull::default(); - let (_, filter, caller) = req.unwrap(); - let rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 1); - assert!(rsp.is_empty()); + let (_, filters, caller) = req.unwrap(); + for filter in filters.into_iter() { + let rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 1); + assert!(rsp.is_empty()); + } assert!(dest_crds.lookup(&caller.label()).is_some()); assert_eq!( dest_crds @@ -347,15 +447,27 @@ mod test { let mut done = false; for _ in 0..30 { // there is a chance of a false positive with bloom filters - let req = node.new_pull_request(&node_crds, &node_pubkey, 0, &HashMap::new()); - let (_, filter, caller) = req.unwrap(); - let rsp = dest.process_pull_request(&mut dest_crds, caller, filter, 0); - // if there is a false positive this is empty - // prob should be around 0.1 per iteration + let req = node.new_pull_request( + &node_crds, + &node_pubkey, + 0, + &HashMap::new(), + PACKET_DATA_SIZE, + ); + let (_, filters, caller) = req.unwrap(); + let mut rsp = vec![]; + for filter in filters { + rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 0); + // if there is a false positive this is empty + // prob should be around 0.1 per iteration + if rsp.is_empty() { + continue; + } + } + if rsp.is_empty() { continue; } - assert_eq!(rsp.len(), 1); let failed = node.process_pull_response(&mut node_crds, &node_pubkey, rsp, 1); assert_eq!(failed, 0); @@ -406,12 +518,89 @@ mod test { // there is a chance of a false positive with bloom filters // assert that purged value is still in the set // chance of 30 consecutive false positives is 0.1^30 - let filter = node.build_crds_filter(&node_crds); - assert!(filter.contains(&value_hash)); + let filters = node.build_crds_filters(&node_crds, PACKET_DATA_SIZE); + assert!(filters.iter().any(|filter| filter.contains(&value_hash))); } // purge the value node.purge_purged(1); assert_eq!(node.purged_values.len(), 0); } + #[test] + fn test_crds_filter_mask() { + let filter = CrdsFilter::new_rand(1, 128); + assert_eq!(filter.mask, !0x0); + assert_eq!(CrdsFilter::max_items(80f64, 0.01, 8f64), 9f64); + //1000/9 = 111, so 7 bits are needed to mask it + assert_eq!(CrdsFilter::mask_bits(1000f64, 9f64), 7u32); + let filter = CrdsFilter::new_rand(1000, 10); + assert_eq!(filter.mask & 0x00ffffffff, 0x00ffffffff); + } + #[test] + fn test_crds_filter_add_no_mask() { + let mut filter = CrdsFilter::new_rand(1, 128); + let h: Hash = hash(Hash::default().as_ref()); + assert!(!filter.contains(&h)); + filter.add(&h); + assert!(filter.contains(&h)); + let h: Hash = hash(h.as_ref()); + assert!(!filter.contains(&h)); + } + #[test] + fn test_crds_filter_add_mask() { + let mut filter = CrdsFilter::new_rand(1000, 10); + let mut h: Hash = Hash::default(); + while !filter.test_mask(&h) { + h = hash(h.as_ref()); + } + assert!(filter.test_mask(&h)); + //if the mask succeeds, we want the guaranteed negative + assert!(!filter.contains(&h)); + filter.add(&h); + assert!(filter.contains(&h)); + } + #[test] + fn test_crds_filter_complete_set_add_mask() { + let mut filters = CrdsFilter::new_complete_set(1000, 10); + assert!(filters.iter().all(|f| f.mask_bits > 0)); + let mut h: Hash = Hash::default(); + // rev to make the hash::default() miss on the first few test_masks + while !filters.iter().rev().any(|f| f.test_mask(&h)) { + h = hash(h.as_ref()); + } + let filter = filters.iter_mut().find(|f| f.test_mask(&h)).unwrap(); + assert!(filter.test_mask(&h)); + //if the mask succeeds, we want the guaranteed negative + assert!(!filter.contains(&h)); + filter.add(&h); + assert!(filter.contains(&h)); + } + #[test] + fn test_crds_filter_contains_mask() { + let filter = CrdsFilter::new_rand(1000, 10); + assert!(filter.mask_bits > 0); + let mut h: Hash = Hash::default(); + while filter.test_mask(&h) { + h = hash(h.as_ref()); + } + assert!(!filter.test_mask(&h)); + //if the mask fails, the hash is contained in the set, and can be treated as a false + //positive + assert!(filter.contains(&h)); + } + #[test] + fn test_mask() { + for i in 0..16 { + run_test_mask(i); + } + } + + fn run_test_mask(mask_bits: u32) { + let masks: Vec<_> = (0..2u64.pow(mask_bits)) + .into_iter() + .map(|seed| CrdsFilter::compute_mask(seed, mask_bits)) + .dedup() + .collect(); + assert_eq!(masks.len(), 2u64.pow(mask_bits) as usize) + } } diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 8d61c4694a..90a0866db0 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -10,7 +10,7 @@ use crate::contact_info::ContactInfo; use crate::crds::{Crds, VersionedCrdsValue}; -use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_BLOOM_SIZE}; +use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; @@ -258,7 +258,7 @@ impl CrdsGossipPush { if new_items.get(&item.id).is_some() { continue; } - let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size); + let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size); let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4); bloom.add(&item.id); new_items.insert(item.id, bloom); diff --git a/core/src/packet.rs b/core/src/packet.rs index 1eeff91076..9b1f30b48a 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -335,7 +335,9 @@ pub fn to_blob(resp: T, rsp_addr: SocketAddr) -> Result { let mut b = Blob::default(); let v = bincode::serialize(&resp)?; let len = v.len(); - assert!(len <= BLOB_SIZE); + if len > BLOB_SIZE { + return Err(Error::ToBlobError); + } b.data[..len].copy_from_slice(&v); b.meta.size = len; b.meta.set_addr(&rsp_addr); diff --git a/core/src/result.rs b/core/src/result.rs index aab1e890e7..6b1615da43 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -31,6 +31,7 @@ pub enum Error { PohRecorderError(poh_recorder::PohRecorderError), BlocktreeError(blocktree::BlocktreeError), FsExtra(fs_extra::error::Error), + ToBlobError, } pub type Result = std::result::Result; diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index adc398f6cd..55da346e22 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -1,6 +1,7 @@ use bincode::serialized_size; use log::*; use rayon::prelude::*; +use solana::cluster_info::ClusterInfo; use solana::contact_info::ContactInfo; use solana::crds_gossip::*; use solana::crds_gossip_error::CrdsGossipError; @@ -380,27 +381,36 @@ fn network_run_pull( .filter_map(|from| { from.lock() .unwrap() - .new_pull_request(now, &HashMap::new()) + .new_pull_request(now, &HashMap::new(), ClusterInfo::max_bloom_size()) .ok() }) .collect() }; let transfered: Vec<_> = requests .into_par_iter() - .map(|(to, request, caller_info)| { + .map(|(to, filters, caller_info)| { let mut bytes: usize = 0; let mut msgs: usize = 0; let mut overhead: usize = 0; let from = caller_info.label().pubkey(); - bytes += request.keys.len(); - bytes += (request.bits.len() / 8) as usize; + bytes += filters.iter().map(|f| f.filter.keys.len()).sum::(); + bytes += filters + .iter() + .map(|f| f.filter.bits.len() as usize / 8) + .sum::(); bytes += serialized_size(&caller_info).unwrap() as usize; let rsp = network .get(&to) .map(|node| { - node.lock() - .unwrap() - .process_pull_request(caller_info, request, now) + let mut rsp = vec![]; + for filter in filters { + rsp.append(&mut node.lock().unwrap().process_pull_request( + caller_info.clone(), + filter, + now, + )); + } + rsp }) .unwrap(); bytes += serialized_size(&rsp).unwrap() as usize; diff --git a/runtime/src/bloom.rs b/runtime/src/bloom.rs index ede157eb6e..95f3be37f5 100644 --- a/runtime/src/bloom.rs +++ b/runtime/src/bloom.rs @@ -31,19 +31,27 @@ impl Bloom { _phantom: PhantomData::default(), } } - /// create filter optimal for num size given the `false_rate` + /// create filter optimal for num size given the `FALSE_RATE` /// the keys are randomized for picking data out of a collision resistant hash of size /// `keysize` bytes /// https://hur.st/bloomfilter/ - pub fn random(num: usize, false_rate: f64, max_bits: usize) -> Self { - let min_num_bits = ((num as f64 * false_rate.log(2f64)) - / (1f64 / 2f64.powf(2f64.log(2f64))).log(2f64)) - .ceil() as usize; - let num_bits = cmp::max(1, cmp::min(min_num_bits, max_bits)); - let num_keys = ((num_bits as f64 / num as f64) * 2f64.log(2f64)).round() as usize; + pub fn random(num_items: usize, false_rate: f64, max_bits: usize) -> Self { + let m = Self::num_bits(num_items as f64, false_rate); + let num_bits = cmp::max(1, cmp::min(m as usize, max_bits)); + let num_keys = Self::num_keys(num_bits as f64, num_items as f64) as usize; let keys: Vec = (0..num_keys).map(|_| rand::thread_rng().gen()).collect(); Self::new(num_bits, keys) } + pub fn num_bits(num_items: f64, false_rate: f64) -> f64 { + let n = num_items; + let p = false_rate; + ((n * p.ln()) / (1f64 / 2f64.powf(2f64.ln())).ln()).ceil() + } + pub fn num_keys(num_bits: f64, num_items: f64) -> f64 { + let n = num_items; + let m = num_bits; + 1f64.max(((m / n) * 2f64.ln()).round()) + } fn pos(&self, key: &T, k: u64) -> u64 { key.hash_at_index(k) % self.bits.len() } @@ -98,7 +106,7 @@ mod test { //normal let bloom: Bloom = Bloom::random(10, 0.1, 100); assert_eq!(bloom.keys.len(), 3); - assert_eq!(bloom.bits.len(), 34); + assert_eq!(bloom.bits.len(), 48); //saturated let bloom: Bloom = Bloom::random(100, 0.1, 100); @@ -129,4 +137,24 @@ mod test { b2.keys.sort(); assert_ne!(b1.keys, b2.keys); } + // Bloom filter math in python + // n number of items + // p false rate + // m number of bits + // k number of keys + // + // n = ceil(m / (-k / log(1 - exp(log(p) / k)))) + // p = pow(1 - exp(-k / (m / n)), k) + // m = ceil((n * log(p)) / log(1 / pow(2, log(2)))); + // k = round((m / n) * log(2)); + #[test] + fn test_filter_math() { + assert_eq!(Bloom::::num_bits(100f64, 0.1f64) as u64, 480u64); + assert_eq!(Bloom::::num_bits(100f64, 0.01f64) as u64, 959u64); + assert_eq!(Bloom::::num_keys(1000f64, 50f64) as u64, 14u64); + assert_eq!(Bloom::::num_keys(2000f64, 50f64) as u64, 28u64); + assert_eq!(Bloom::::num_keys(2000f64, 25f64) as u64, 55u64); + //ensure min keys is 1 + assert_eq!(Bloom::::num_keys(20f64, 1000f64) as u64, 1u64); + } }