diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 0ed9833fa4..9e906ed9a5 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -16,7 +16,7 @@ use crate::{ contact_info::ContactInfo, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, - crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, + crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_value::{ self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash, Version, Vote, MAX_WALLCLOCK, @@ -213,11 +213,13 @@ struct GossipStats { new_push_requests: Counter, new_push_requests2: Counter, new_push_requests_num: Counter, + filter_pull_response: Counter, process_pull_response: Counter, process_pull_response_count: Counter, process_pull_response_len: Counter, process_pull_response_timeout: Counter, - process_pull_response_fail: Counter, + process_pull_response_fail_insert: Counter, + process_pull_response_fail_timeout: Counter, process_pull_response_success: Counter, process_pull_requests: Counter, generate_pull_responses: Counter, @@ -1398,8 +1400,13 @@ impl ClusterInfo { fn generate_new_gossip_requests( &self, stakes: &HashMap, + generate_pull_requests: bool, ) -> Vec<(SocketAddr, Protocol)> { - let pulls: Vec<_> = self.new_pull_requests(stakes); + let pulls: Vec<_> = if generate_pull_requests { + self.new_pull_requests(stakes) + } else { + vec![] + }; let pushes: Vec<_> = self.new_push_requests(); vec![pulls, pushes].into_iter().flatten().collect() } @@ -1410,8 +1417,9 @@ impl ClusterInfo { recycler: &PacketsRecycler, stakes: &HashMap, sender: &PacketSender, + generate_pull_requests: bool, ) -> Result<()> { - let reqs = obj.generate_new_gossip_requests(&stakes); + let reqs = obj.generate_new_gossip_requests(&stakes, generate_pull_requests); if !reqs.is_empty() { let packets = to_packets_with_destination(recycler.clone(), &reqs); sender.send(packets)?; @@ -1496,6 +1504,7 @@ impl ClusterInfo { let message = CrdsData::Version(Version::new(obj.id())); obj.push_message(CrdsValue::new_signed(message, &obj.keypair)); + let mut generate_pull_requests = true; loop { let start = timestamp(); thread_mem_usage::datapoint("solana-gossip"); @@ -1512,7 +1521,8 @@ impl ClusterInfo { None => HashMap::new(), }; - let _ = Self::run_gossip(&obj, &recycler, &stakes, &sender); + let _ = + Self::run_gossip(&obj, &recycler, &stakes, &sender, generate_pull_requests); if exit.load(Ordering::Relaxed) { return; } @@ -1532,6 +1542,7 @@ impl ClusterInfo { let time_left = GOSSIP_SLEEP_MILLIS - elapsed; sleep(Duration::from_millis(time_left)); } + generate_pull_requests = !generate_pull_requests; } }) .unwrap() @@ -1550,6 +1561,7 @@ impl ClusterInfo { let allocated = thread_mem_usage::Allocatedp::default(); let mut gossip_pull_data: Vec = vec![]; let timeouts = me.gossip.read().unwrap().make_timeouts(&stakes, epoch_ms); + let mut pull_responses = HashMap::new(); packets.packets.iter().for_each(|packet| { let from_addr = packet.meta.addr(); limited_deserialize(&packet.data[..packet.meta.size]) @@ -1597,7 +1609,8 @@ impl ClusterInfo { } ret }); - Self::handle_pull_response(me, &from, data, &timeouts); + let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new); + pull_entry.extend(data); datapoint_debug!( "solana-gossip-listen-memory", ("pull_response", (allocated.get() - start) as i64, i64), @@ -1659,6 +1672,11 @@ impl ClusterInfo { } }) }); + + for (from, data) in pull_responses { + Self::handle_pull_response(me, &from, data, &timeouts); + } + // process the collected pulls together let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data, stakes); if let Some(rsp) = rsp { @@ -1827,9 +1845,21 @@ impl ClusterInfo { } let filtered_len = crds_values.len(); - let (fail, timeout_count, success) = me - .time_gossip_write_lock("process_pull", &me.stats.process_pull_response) - .process_pull_response(from, timeouts, crds_values, timestamp()); + let mut pull_stats = ProcessPullStats::default(); + let (filtered_pulls, filtered_pulls_expired_timeout) = me + .time_gossip_read_lock("filter_pull_resp", &me.stats.filter_pull_response) + .filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats); + + if !filtered_pulls.is_empty() || !filtered_pulls_expired_timeout.is_empty() { + me.time_gossip_write_lock("process_pull_resp", &me.stats.process_pull_response) + .process_pull_responses( + from, + filtered_pulls, + filtered_pulls_expired_timeout, + timestamp(), + &mut pull_stats, + ); + } me.stats .skip_pull_response_shred_version @@ -1840,13 +1870,22 @@ impl ClusterInfo { .add_relaxed(filtered_len as u64); me.stats .process_pull_response_timeout - .add_relaxed(timeout_count as u64); - me.stats.process_pull_response_fail.add_relaxed(fail as u64); + .add_relaxed(pull_stats.timeout_count as u64); + me.stats + .process_pull_response_fail_insert + .add_relaxed(pull_stats.failed_insert as u64); + me.stats + .process_pull_response_fail_timeout + .add_relaxed(pull_stats.failed_timeout as u64); me.stats .process_pull_response_success - .add_relaxed(success as u64); + .add_relaxed(pull_stats.success as u64); - (fail, timeout_count, success) + ( + pull_stats.failed_insert + pull_stats.failed_timeout, + pull_stats.timeout_count, + pull_stats.success, + ) } fn filter_by_shred_version( @@ -2043,11 +2082,26 @@ impl ClusterInfo { self.stats.process_pull_response.clear(), i64 ), + ( + "filter_pull_resp", + self.stats.filter_pull_response.clear(), + i64 + ), ( "process_pull_resp_count", self.stats.process_pull_response_count.clear(), i64 ), + ( + "pull_response_fail_insert", + self.stats.process_pull_response_fail_insert.clear(), + i64 + ), + ( + "pull_response_fail_timeout", + self.stats.process_pull_response_fail_timeout.clear(), + i64 + ), ( "process_pull_resp_timeout", self.stats.process_pull_response_timeout.clear(), @@ -2456,6 +2510,7 @@ mod tests { #[test] fn test_handle_pull() { + solana_logger::setup(); let node = Node::new_localhost(); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info)); @@ -2550,7 +2605,7 @@ mod tests { .write() .unwrap() .refresh_push_active_set(&HashMap::new()); - let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new()); + let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new(), true); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { let res = ContactInfo::is_valid_address(addr); diff --git a/core/src/crds.rs b/core/src/crds.rs index d17c8c8fc1..ede88881dc 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -93,6 +93,24 @@ impl Crds { pub fn new_versioned(&self, local_timestamp: u64, value: CrdsValue) -> VersionedCrdsValue { VersionedCrdsValue::new(local_timestamp, value) } + pub fn would_insert( + &self, + value: CrdsValue, + local_timestamp: u64, + ) -> Option { + let new_value = self.new_versioned(local_timestamp, value); + let label = new_value.value.label(); + let would_insert = self + .table + .get(&label) + .map(|current| new_value > *current) + .unwrap_or(true); + if would_insert { + Some(new_value) + } else { + None + } + } /// insert the new value, returns the old value if insert succeeds pub fn insert_versioned( &mut self, diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index f6e2739dc7..0d578d89b6 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -6,7 +6,7 @@ use crate::{ crds::{Crds, VersionedCrdsValue}, crds_gossip_error::CrdsGossipError, - crds_gossip_pull::{CrdsFilter, CrdsGossipPull}, + crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}, crds_value::{CrdsValue, CrdsValueLabel}, }; @@ -170,16 +170,34 @@ impl CrdsGossip { self.pull.generate_pull_responses(&self.crds, filters) } - /// process a pull response - pub fn process_pull_response( - &mut self, - from: &Pubkey, + pub fn filter_pull_responses( + &self, timeouts: &HashMap, response: Vec, now: u64, - ) -> (usize, usize, usize) { + process_pull_stats: &mut ProcessPullStats, + ) -> (Vec, Vec) { self.pull - .process_pull_response(&mut self.crds, from, timeouts, response, now) + .filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats) + } + + /// process a pull response + pub fn process_pull_responses( + &mut self, + from: &Pubkey, + responses: Vec, + responses_expired_timeout: Vec, + now: u64, + process_pull_stats: &mut ProcessPullStats, + ) { + self.pull.process_pull_responses( + &mut self.crds, + from, + responses, + responses_expired_timeout, + now, + process_pull_stats, + ) } pub fn make_timeouts_test(&self) -> HashMap { diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 98d3da9963..aafcb3d151 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -10,7 +10,7 @@ //! of false positives. use crate::contact_info::ContactInfo; -use crate::crds::Crds; +use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; @@ -20,8 +20,8 @@ use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::cmp; -use std::collections::HashMap; use std::collections::VecDeque; +use std::collections::{HashMap, HashSet}; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; // The maximum age of a value received over pull responses @@ -118,6 +118,14 @@ impl CrdsFilter { } } +#[derive(Default)] +pub struct ProcessPullStats { + pub success: usize, + pub failed_insert: usize, + pub failed_timeout: usize, + pub timeout_count: usize, +} + #[derive(Clone)] pub struct CrdsGossipPull { /// timestamp of last request @@ -231,19 +239,22 @@ impl CrdsGossipPull { self.filter_crds_values(crds, requests) } - /// process a pull response - pub fn process_pull_response( - &mut self, - crds: &mut Crds, - from: &Pubkey, + // Checks if responses should be inserted and + // returns those responses converted to VersionedCrdsValue + // Separated in two vecs as: + // .0 => responses that update the owner timestamp + // .1 => responses that do not update the owner timestamp + pub fn filter_pull_responses( + &self, + crds: &Crds, timeouts: &HashMap, - response: Vec, + responses: Vec, now: u64, - ) -> (usize, usize, usize) { - let mut failed = 0; - let mut timeout_count = 0; - let mut success = 0; - for r in response { + stats: &mut ProcessPullStats, + ) -> (Vec, Vec) { + let mut versioned = vec![]; + let mut versioned_expired_timestamp = vec![]; + for r in responses { let owner = r.label().pubkey(); // Check if the crds value is older than the msg_timeout if now @@ -262,8 +273,8 @@ impl CrdsGossipPull { if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0) || now + timeout < r.wallclock() { - timeout_count += 1; - failed += 1; + stats.timeout_count += 1; + stats.failed_timeout += 1; continue; } } @@ -271,33 +282,62 @@ impl CrdsGossipPull { // Before discarding this value, check if a ContactInfo for the owner // exists in the table. If it doesn't, that implies that this value can be discarded if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() { - timeout_count += 1; - failed += 1; + stats.timeout_count += 1; + stats.failed_timeout += 1; continue; } else { // Silently insert this old value without bumping record timestamps - failed += crds.insert(r, now).is_err() as usize; + match crds.would_insert(r, now) { + Some(resp) => versioned_expired_timestamp.push(resp), + None => stats.failed_insert += 1, + } continue; } } } } - let old = crds.insert(r, now); + match crds.would_insert(r, now) { + Some(resp) => versioned.push(resp), + None => stats.failed_insert += 1, + } + } + (versioned, versioned_expired_timestamp) + } + + /// process a vec of pull responses + pub fn process_pull_responses( + &mut self, + crds: &mut Crds, + from: &Pubkey, + responses: Vec, + responses_expired_timeout: Vec, + now: u64, + stats: &mut ProcessPullStats, + ) { + let mut owners = HashSet::new(); + for r in responses_expired_timeout { + stats.failed_insert += crds.insert_versioned(r).is_err() as usize; + } + for r in responses { + let owner = r.value.label().pubkey(); + let old = crds.insert_versioned(r); if old.is_err() { - failed += 1; + stats.failed_insert += 1; } else { - success += 1; + stats.success += 1; } old.ok().map(|opt| { - crds.update_record_timestamp(&owner, now); + owners.insert(owner); opt.map(|val| { self.purged_values .push_back((val.value_hash, val.local_timestamp)) }) }); } - crds.update_record_timestamp(from, now); - (failed, timeout_count, success) + owners.insert(*from); + for owner in owners { + crds.update_record_timestamp(&owner, now); + } } // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter @@ -387,6 +427,34 @@ impl CrdsGossipPull { .count(); self.purged_values.drain(..cnt); } + + /// For legacy tests + #[cfg(test)] + pub fn process_pull_response( + &mut self, + crds: &mut Crds, + from: &Pubkey, + timeouts: &HashMap, + response: Vec, + now: u64, + ) -> (usize, usize, usize) { + let mut stats = ProcessPullStats::default(); + let (versioned, versioned_expired_timeout) = + self.filter_pull_responses(crds, timeouts, response, now, &mut stats); + self.process_pull_responses( + crds, + from, + versioned, + versioned_expired_timeout, + now, + &mut stats, + ); + ( + stats.failed_timeout + stats.failed_insert, + stats.timeout_count, + stats.success, + ) + } } #[cfg(test)] mod test { diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 04f21150d6..15e133812f 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -5,7 +5,7 @@ use solana_core::cluster_info; use solana_core::contact_info::ContactInfo; use solana_core::crds_gossip::*; use solana_core::crds_gossip_error::CrdsGossipError; -use solana_core::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; +use solana_core::crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}; use solana_core::crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS; use solana_core::crds_value::CrdsValueLabel; use solana_core::crds_value::{CrdsData, CrdsValue}; @@ -447,14 +447,14 @@ fn network_run_pull( bytes += serialized_size(&rsp).unwrap() as usize; msgs += rsp.len(); if let Some(node) = network.get(&from) { - node.lock() - .unwrap() - .mark_pull_request_creation_time(&from, now); - overhead += node - .lock() - .unwrap() - .process_pull_response(&from, &timeouts, rsp, now) - .0; + let mut node = node.lock().unwrap(); + node.mark_pull_request_creation_time(&from, now); + let mut stats = ProcessPullStats::default(); + let (vers, vers_expired_timeout) = + node.filter_pull_responses(&timeouts, rsp, now, &mut stats); + node.process_pull_responses(&from, vers, vers_expired_timeout, now, &mut stats); + overhead += stats.failed_insert; + overhead += stats.failed_timeout; } (bytes, msgs, overhead) })