diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index dcb141114d..f100093d66 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -218,7 +218,10 @@ struct GossipStats { process_pull_response_count: Counter, process_pull_response_len: Counter, process_pull_response_timeout: Counter, + process_pull_response_fail: Counter, + process_pull_response_success: Counter, process_pull_requests: Counter, + generate_pull_responses: Counter, process_prune: Counter, process_push_message: Counter, prune_received_cache: Counter, @@ -228,6 +231,11 @@ struct GossipStats { push_message: Counter, new_pull_requests: Counter, mark_pull_request: Counter, + skip_pull_response_shred_version: Counter, + skip_pull_shred_version: Counter, + skip_push_message_shred_version: Counter, + push_message_count: Counter, + push_message_value_count: Counter, } pub struct ClusterInfo { @@ -1526,12 +1534,17 @@ impl ClusterInfo { if contact_info.id == me.id() { warn!("PullRequest ignored, I'm talking to myself"); inc_new_counter_debug!("cluster_info-window-request-loopback", 1); - } else { + } else if contact_info.shred_version == 0 + || contact_info.shred_version == me.my_shred_version() + || me.my_shred_version() == 0 + { gossip_pull_data.push(PullData { from_addr, caller, filter, }); + } else { + me.stats.skip_pull_shred_version.add_relaxed(1); } } datapoint_debug!( @@ -1620,6 +1633,26 @@ impl ClusterInfo { } } + fn update_data_budget(&self, stakes: &HashMap) { + let mut w_outbound_budget = self.outbound_budget.write().unwrap(); + + let now = timestamp(); + const INTERVAL_MS: u64 = 100; + // allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s + const BYTES_PER_INTERVAL: usize = 5000; + const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default + + if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS { + let len = std::cmp::max(stakes.len(), 2); + w_outbound_budget.bytes += len * BYTES_PER_INTERVAL; + w_outbound_budget.bytes = std::cmp::min( + w_outbound_budget.bytes, + MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL, + ); + w_outbound_budget.last_timestamp_ms = now; + } + } + // Pull requests take an incoming bloom filter of contained entries from a node // and tries to send back to them the values it detects are missing. fn handle_pull_requests( @@ -1632,33 +1665,19 @@ impl ClusterInfo { let mut caller_and_filters = vec![]; let mut addrs = vec![]; let mut time = Measure::start("handle_pull_requests"); - { - let mut w_outbound_budget = me.outbound_budget.write().unwrap(); - - let now = timestamp(); - const INTERVAL_MS: u64 = 100; - // allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s - const BYTES_PER_INTERVAL: usize = 5000; - const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default - - if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS { - let len = std::cmp::max(stakes.len(), 2); - w_outbound_budget.bytes += len * BYTES_PER_INTERVAL; - w_outbound_budget.bytes = std::cmp::min( - w_outbound_budget.bytes, - MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL, - ); - w_outbound_budget.last_timestamp_ms = now; - } - } + me.update_data_budget(stakes); for pull_data in requests { caller_and_filters.push((pull_data.caller, pull_data.filter)); addrs.push(pull_data.from_addr); } let now = timestamp(); let self_id = me.id(); + let pull_responses = me - .time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests) + .time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses) + .generate_pull_responses(&caller_and_filters); + + me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests) .process_pull_requests(caller_and_filters, now); // Filter bad to addresses @@ -1755,37 +1774,94 @@ impl ClusterInfo { Some(packets) } + // Returns (failed, timeout, success) fn handle_pull_response( me: &Self, from: &Pubkey, - data: Vec, + mut crds_values: Vec, timeouts: &HashMap, - ) { - let len = data.len(); + ) -> (usize, usize, usize) { + let len = crds_values.len(); trace!("PullResponse me: {} from: {} len={}", me.id, from, len); - let (_fail, timeout_count) = me + + if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) { + Self::filter_by_shred_version( + from, + &mut crds_values, + shred_version, + me.my_shred_version(), + ); + } + let filtered_len = crds_values.len(); + + let (fail, timeout_count, success) = me .time_gossip_write_lock("process_pull", &me.stats.process_pull_response) - .process_pull_response(from, timeouts, data, timestamp()); + .process_pull_response(from, timeouts, crds_values, timestamp()); + + me.stats + .skip_pull_response_shred_version + .add_relaxed((len - filtered_len) as u64); me.stats.process_pull_response_count.add_relaxed(1); - me.stats.process_pull_response_len.add_relaxed(len as u64); + me.stats + .process_pull_response_len + .add_relaxed(filtered_len as u64); me.stats .process_pull_response_timeout .add_relaxed(timeout_count as u64); + me.stats.process_pull_response_fail.add_relaxed(fail as u64); + me.stats + .process_pull_response_success + .add_relaxed(success as u64); + + (fail, timeout_count, success) + } + + fn filter_by_shred_version( + from: &Pubkey, + crds_values: &mut Vec, + shred_version: u16, + my_shred_version: u16, + ) { + if my_shred_version != 0 && shred_version != 0 && shred_version != my_shred_version { + // Allow someone to update their own ContactInfo so they + // can change shred versions if needed. + crds_values.retain(|crds_value| match &crds_value.data { + CrdsData::ContactInfo(contact_info) => contact_info.id == *from, + _ => false, + }); + } } fn handle_push_message( me: &Self, recycler: &PacketsRecycler, from: &Pubkey, - data: Vec, + mut crds_values: Vec, stakes: &HashMap, ) -> Option { let self_id = me.id(); - inc_new_counter_debug!("cluster_info-push_message", 1); + me.stats.push_message_count.add_relaxed(1); + let len = crds_values.len(); + + if let Some(shred_version) = me.lookup_contact_info(from, |ci| ci.shred_version) { + Self::filter_by_shred_version( + from, + &mut crds_values, + shred_version, + me.my_shred_version(), + ); + } + let filtered_len = crds_values.len(); + me.stats + .push_message_value_count + .add_relaxed(filtered_len as u64); + me.stats + .skip_push_message_shred_version + .add_relaxed((len - filtered_len) as u64); let updated: Vec<_> = me .time_gossip_write_lock("process_push", &me.stats.process_push_message) - .process_push_message(from, data, timestamp()); + .process_push_message(from, crds_values, timestamp()); let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect(); let prunes_map: HashMap> = me @@ -1945,6 +2021,11 @@ impl ClusterInfo { self.stats.process_pull_requests.clear(), i64 ), + ( + "generate_pull_responses", + self.stats.generate_pull_responses.clear(), + i64 + ), ("process_prune", self.stats.process_prune.clear(), i64), ( "process_push_message", @@ -1974,6 +2055,34 @@ impl ClusterInfo { i64 ), ); + datapoint_info!( + "cluster_info_shred_version_skip", + ( + "skip_push_message_shred_version", + self.stats.skip_push_message_shred_version.clear(), + i64 + ), + ( + "skip_pull_response_shred_version", + self.stats.skip_pull_response_shred_version.clear(), + i64 + ), + ( + "skip_pull_shred_version", + self.stats.skip_pull_shred_version.clear(), + i64 + ), + ( + "push_message_count", + self.stats.push_message_count.clear(), + i64 + ), + ( + "push_message_value_count", + self.stats.push_message_value_count.clear(), + i64 + ), + ); *last_print = Instant::now(); } @@ -2289,6 +2398,91 @@ mod tests { assert!(ClusterInfo::is_spy_node(&node)); } + #[test] + fn test_handle_pull() { + let node = Node::new_localhost(); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info)); + + let entrypoint_pubkey = Pubkey::new_rand(); + let data = test_crds_values(entrypoint_pubkey); + let timeouts = HashMap::new(); + assert_eq!( + (0, 0, 1), + ClusterInfo::handle_pull_response( + &cluster_info, + &entrypoint_pubkey, + data.clone(), + &timeouts + ) + ); + + let entrypoint_pubkey2 = Pubkey::new_rand(); + assert_eq!( + (1, 0, 0), + ClusterInfo::handle_pull_response(&cluster_info, &entrypoint_pubkey2, data, &timeouts) + ); + } + + fn test_crds_values(pubkey: Pubkey) -> Vec { + let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp()); + let entrypoint_crdsvalue = + CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone())); + vec![entrypoint_crdsvalue] + } + + #[test] + fn test_filter_shred_version() { + let from = Pubkey::new_rand(); + let my_shred_version = 1; + let other_shred_version = 1; + + // Allow same shred_version + let mut values = test_crds_values(from); + ClusterInfo::filter_by_shred_version( + &from, + &mut values, + other_shred_version, + my_shred_version, + ); + assert_eq!(values.len(), 1); + + // Allow shred_version=0. + let other_shred_version = 0; + ClusterInfo::filter_by_shred_version( + &from, + &mut values, + other_shred_version, + my_shred_version, + ); + assert_eq!(values.len(), 1); + + // Change to sender's ContactInfo version, allow that. + let other_shred_version = 2; + ClusterInfo::filter_by_shred_version( + &from, + &mut values, + other_shred_version, + my_shred_version, + ); + assert_eq!(values.len(), 1); + + let snapshot_hash_data = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash { + from: Pubkey::new_rand(), + hashes: vec![], + wallclock: 0, + })); + values.push(snapshot_hash_data); + // Change to sender's ContactInfo version, allow that. + let other_shred_version = 2; + ClusterInfo::filter_by_shred_version( + &from, + &mut values, + other_shred_version, + my_shred_version, + ); + assert_eq!(values.len(), 1); + } + #[test] fn test_cluster_spy_gossip() { //check that gossip doesn't try to push to invalid addresses diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 7cce505532..f6e2739dc7 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -158,14 +158,18 @@ impl CrdsGossip { self.pull.mark_pull_request_creation_time(from, now) } /// process a pull request and create a response - pub fn process_pull_requests( - &mut self, - filters: Vec<(CrdsValue, CrdsFilter)>, - now: u64, - ) -> Vec> { + pub fn process_pull_requests(&mut self, filters: Vec<(CrdsValue, CrdsFilter)>, now: u64) { self.pull - .process_pull_requests(&mut self.crds, filters, now) + .process_pull_requests(&mut self.crds, filters, now); } + + pub fn generate_pull_responses( + &self, + filters: &[(CrdsValue, CrdsFilter)], + ) -> Vec> { + self.pull.generate_pull_responses(&self.crds, filters) + } + /// process a pull response pub fn process_pull_response( &mut self, @@ -173,7 +177,7 @@ impl CrdsGossip { timeouts: &HashMap, response: Vec, now: u64, - ) -> (usize, usize) { + ) -> (usize, usize, usize) { self.pull .process_pull_response(&mut self.crds, from, timeouts, response, now) } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 212ebb5920..98d3da9963 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -204,14 +204,13 @@ impl CrdsGossipPull { self.purged_values.push_back((hash, timestamp)) } - /// process a pull request and create a response + /// process a pull request pub fn process_pull_requests( &mut self, crds: &mut Crds, requests: Vec<(CrdsValue, CrdsFilter)>, now: u64, - ) -> Vec> { - let rv = self.filter_crds_values(crds, &requests); + ) { requests.into_iter().for_each(|(caller, _)| { let key = caller.label().pubkey(); let old = crds.insert(caller, now); @@ -221,8 +220,17 @@ impl CrdsGossipPull { } crds.update_record_timestamp(&key, now); }); - rv } + + /// Create gossip responses to pull requests + pub fn generate_pull_responses( + &self, + crds: &Crds, + requests: &[(CrdsValue, CrdsFilter)], + ) -> Vec> { + self.filter_crds_values(crds, requests) + } + /// process a pull response pub fn process_pull_response( &mut self, @@ -231,9 +239,10 @@ impl CrdsGossipPull { timeouts: &HashMap, response: Vec, now: u64, - ) -> (usize, usize) { + ) -> (usize, usize, usize) { let mut failed = 0; let mut timeout_count = 0; + let mut success = 0; for r in response { let owner = r.label().pubkey(); // Check if the crds value is older than the msg_timeout @@ -274,7 +283,11 @@ impl CrdsGossipPull { } } let old = crds.insert(r, now); - failed += old.is_err() as usize; + if old.is_err() { + failed += 1; + } else { + success += 1; + } old.ok().map(|opt| { crds.update_record_timestamp(&owner, now); opt.map(|val| { @@ -284,7 +297,7 @@ impl CrdsGossipPull { }); } crds.update_record_timestamp(from, now); - (failed, timeout_count) + (failed, timeout_count, success) } // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter @@ -573,8 +586,9 @@ mod test { let mut dest_crds = Crds::default(); let mut dest = CrdsGossipPull::default(); let (_, filters, caller) = req.unwrap(); - let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let rsp = dest.process_pull_requests(&mut dest_crds, filters, 1); + let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); + let rsp = dest.generate_pull_responses(&dest_crds, &filters); + dest.process_pull_requests(&mut dest_crds, filters, 1); assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(dest_crds.lookup(&caller.label()).is_some()); assert_eq!( @@ -643,8 +657,9 @@ mod test { PACKET_DATA_SIZE, ); let (_, filters, caller) = req.unwrap(); - let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let mut rsp = dest.process_pull_requests(&mut dest_crds, filters, 0); + let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); + let mut rsp = dest.generate_pull_responses(&dest_crds, &filters); + dest.process_pull_requests(&mut dest_crds, filters, 0); // if there is a false positive this is empty // prob should be around 0.1 per iteration if rsp.is_empty() { diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index dc0cd1f8bd..04f21150d6 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -426,23 +426,21 @@ fn network_run_pull( .map(|f| f.filter.bits.len() as usize / 8) .sum::(); bytes += serialized_size(&caller_info).unwrap() as usize; - let filters = filters + let filters: Vec<_> = filters .into_iter() .map(|f| (caller_info.clone(), f)) .collect(); - let rsp = network + let rsp: Vec<_> = network .get(&to) .map(|node| { - let mut rsp = vec![]; - rsp.append( - &mut node - .lock() - .unwrap() - .process_pull_requests(filters, now) - .into_iter() - .flatten() - .collect(), - ); + let rsp = node + .lock() + .unwrap() + .generate_pull_responses(&filters) + .into_iter() + .flatten() + .collect(); + node.lock().unwrap().process_pull_requests(filters, now); rsp }) .unwrap();