diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index c42d04c00..a4126b7e7 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1430,8 +1430,7 @@ impl ClusterInfo { let num_requests = pulls.iter().map(|(_, filters)| filters.len() as u64).sum(); self.stats.new_pull_requests_count.add_relaxed(num_requests); { - let mut gossip = - self.time_gossip_write_lock("mark_pull", &self.stats.mark_pull_request); + let gossip = self.time_gossip_read_lock("mark_pull", &self.stats.mark_pull_request); for (peer, _) in &pulls { gossip.mark_pull_request_creation_time(peer.id, now); } @@ -4380,14 +4379,9 @@ mod tests { .unwrap() .mark_pull_request_creation_time(peer, now); } + let gossip = cluster_info.gossip.read().unwrap(); assert_eq!( - cluster_info - .gossip - .read() - .unwrap() - .pull - .pull_request_time - .len(), + gossip.pull.pull_request_time().len(), CRDS_UNIQUE_PUBKEY_CAPACITY ); } diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index b3c228ed6..c3afb7a19 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -169,7 +169,7 @@ pub(crate) fn submit_gossip_stats( gossip.crds.len(), gossip.crds.num_nodes(), gossip.crds.num_purged(), - gossip.pull.failed_inserts.len(), + gossip.pull.failed_inserts_size(), ) }; let num_nodes_staked = stakes.values().filter(|stake| **stake > 0).count(); diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 3749407c5..d6de3c512 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -230,7 +230,7 @@ impl CrdsGossip { /// This is used for weighted random selection during `new_pull_request` /// It's important to use the local nodes request creation time as the weight /// instead of the response received time otherwise failed nodes will increase their weight. - pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) { + pub fn mark_pull_request_creation_time(&self, from: Pubkey, now: u64) { self.pull.mark_pull_request_creation_time(from, now) } /// process a pull request and create a response diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 8b8f364b3..c0a19653f 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -33,9 +33,12 @@ use { std::{ collections::{HashMap, HashSet, VecDeque}, convert::TryInto, - iter::repeat_with, + iter::{repeat, repeat_with}, net::SocketAddr, - sync::Mutex, + sync::{ + atomic::{AtomicUsize, Ordering}, + Mutex, RwLock, + }, time::{Duration, Instant}, }, }; @@ -184,25 +187,25 @@ pub struct ProcessPullStats { pub struct CrdsGossipPull { /// timestamp of last request - pub(crate) pull_request_time: LruCache, + pull_request_time: RwLock>, // Hash value and record time (ms) of the pull responses which failed to be // inserted in crds table; Preserved to stop the sender to send back the // same outdated payload again by adding them to the filter for the next // pull request. - pub failed_inserts: VecDeque<(Hash, u64)>, + failed_inserts: RwLock>, pub crds_timeout: u64, - pub msg_timeout: u64, - pub num_pulls: usize, + msg_timeout: u64, + pub num_pulls: AtomicUsize, } impl Default for CrdsGossipPull { fn default() -> Self { Self { - pull_request_time: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY), - failed_inserts: VecDeque::new(), + pull_request_time: RwLock::new(LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY)), + failed_inserts: RwLock::default(), crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, - num_pulls: 0, + num_pulls: AtomicUsize::default(), } } } @@ -274,6 +277,7 @@ impl CrdsGossipPull { ) -> Vec<(u64, &'a ContactInfo)> { let mut rng = rand::thread_rng(); let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS); + let pull_request_time = self.pull_request_time.read().unwrap(); crds.get_nodes() .filter_map(|value| { let info = value.value.contact_info().unwrap(); @@ -297,8 +301,7 @@ impl CrdsGossipPull { }) .map(|item| { let max_weight = f32::from(u16::max_value()) - 1.0; - let req_time: u64 = self - .pull_request_time + let req_time: u64 = pull_request_time .peek(&item.id) .copied() .unwrap_or_default(); @@ -316,8 +319,8 @@ impl CrdsGossipPull { /// This is used for weighted random selection during `new_pull_request` /// It's important to use the local nodes request creation time as the weight /// instead of the response received time otherwise failed nodes will increase their weight. - pub(crate) fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) { - self.pull_request_time.put(from, now); + pub(crate) fn mark_pull_request_creation_time(&self, from: Pubkey, now: u64) { + self.pull_request_time.write().unwrap().put(from, now); } /// process a pull request @@ -395,7 +398,7 @@ impl CrdsGossipPull { /// process a vec of pull responses pub(crate) fn process_pull_responses( - &mut self, + &self, crds: &mut Crds, from: &Pubkey, responses: Vec, @@ -408,36 +411,42 @@ impl CrdsGossipPull { for response in responses_expired_timeout { let _ = crds.insert(response, now); } + let mut num_inserts = 0; for response in responses { let owner = response.pubkey(); if let Ok(()) = crds.insert(response, now) { - stats.success += 1; - self.num_pulls += 1; + num_inserts += 1; owners.insert(owner); } } + stats.success += num_inserts; + self.num_pulls.fetch_add(num_inserts, Ordering::Relaxed); owners.insert(*from); for owner in owners { crds.update_record_timestamp(&owner, now); } stats.failed_insert += failed_inserts.len(); self.purge_failed_inserts(now); - self.failed_inserts - .extend(failed_inserts.into_iter().zip(std::iter::repeat(now))); + let failed_inserts = failed_inserts.into_iter().zip(repeat(now)); + self.failed_inserts.write().unwrap().extend(failed_inserts); } - pub(crate) fn purge_failed_inserts(&mut self, now: u64) { + pub(crate) fn purge_failed_inserts(&self, now: u64) { if FAILED_INSERTS_RETENTION_MS < now { let cutoff = now - FAILED_INSERTS_RETENTION_MS; - let outdated = self - .failed_inserts + let mut failed_inserts = self.failed_inserts.write().unwrap(); + let outdated = failed_inserts .iter() .take_while(|(_, ts)| *ts < cutoff) .count(); - self.failed_inserts.drain(..outdated); + failed_inserts.drain(..outdated); } } + pub(crate) fn failed_inserts_size(&self) -> usize { + self.failed_inserts.read().unwrap().len() + } + // build a set of filters of the current crds table // num_filters - used to increase the likelyhood of a value in crds being added to some filter pub fn build_crds_filters( @@ -451,7 +460,7 @@ impl CrdsGossipPull { const MIN_NUM_BLOOM_ITEMS: usize = 512; #[cfg(not(debug_assertions))] const MIN_NUM_BLOOM_ITEMS: usize = 65_536; - let num_items = crds.len() + crds.num_purged() + self.failed_inserts.len(); + let num_items = crds.len() + crds.num_purged() + self.failed_inserts.read().unwrap().len(); let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items); let filters = CrdsFilterSet::new(num_items, bloom_size); thread_pool.install(|| { @@ -461,6 +470,8 @@ impl CrdsGossipPull { .chain(crds.purged().with_min_len(PAR_MIN_LENGTH)) .chain( self.failed_inserts + .read() + .unwrap() .par_iter() .with_min_len(PAR_MIN_LENGTH) .map(|(v, _)| *v), @@ -568,7 +579,7 @@ impl CrdsGossipPull { /// For legacy tests #[cfg(test)] fn process_pull_response( - &mut self, + &self, crds: &mut Crds, from: &Pubkey, timeouts: &HashMap, @@ -596,17 +607,29 @@ impl CrdsGossipPull { // Only for tests and simulations. pub(crate) fn mock_clone(&self) -> Self { - let mut pull_request_time = LruCache::new(self.pull_request_time.cap()); - for (k, v) in self.pull_request_time.iter().rev() { - pull_request_time.put(*k, *v); - } + let pull_request_time = { + let pull_request_time = self.pull_request_time.read().unwrap(); + let mut clone = LruCache::new(pull_request_time.cap()); + for (k, v) in pull_request_time.iter().rev() { + clone.put(*k, *v); + } + clone + }; + let failed_inserts = self.failed_inserts.read().unwrap().clone(); Self { - pull_request_time, - failed_inserts: self.failed_inserts.clone(), + pull_request_time: RwLock::new(pull_request_time), + failed_inserts: RwLock::new(failed_inserts), + num_pulls: AtomicUsize::new(self.num_pulls.load(Ordering::Relaxed)), ..*self } } + + #[cfg(test)] + pub(crate) fn pull_request_time(&self) -> std::sync::RwLockReadGuard> { + self.pull_request_time.read().unwrap() + } } + #[cfg(test)] pub(crate) mod tests { use { @@ -911,7 +934,7 @@ pub(crate) mod tests { &node_keypair.pubkey(), 0, ))); - let mut node = CrdsGossipPull::default(); + let node = CrdsGossipPull::default(); let mut pings = Vec::new(); let ping_cache = Mutex::new(PingCache::new( Duration::from_secs(20 * 60), // ttl @@ -1008,7 +1031,7 @@ pub(crate) mod tests { &node_keypair.pubkey(), 0, ))); - let mut node = CrdsGossipPull::default(); + let node = CrdsGossipPull::default(); crds.insert(entry, now).unwrap(); let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(old.id, old.gossip, Instant::now()); @@ -1056,7 +1079,7 @@ pub(crate) mod tests { const NUM_REPS: usize = 2 * CRDS_UNIQUE_PUBKEY_CAPACITY; let mut rng = rand::thread_rng(); let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(NUM_REPS).collect(); - let mut node = CrdsGossipPull::default(); + let node = CrdsGossipPull::default(); let mut requests = HashMap::new(); let now = timestamp(); for k in 0..NUM_REPS { @@ -1065,21 +1088,22 @@ pub(crate) mod tests { node.mark_pull_request_creation_time(pubkey, now); *requests.entry(pubkey).or_default() = now; } - assert!(node.pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY); + let pull_request_time = node.pull_request_time.read().unwrap(); + assert!(pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY); // Assert that timestamps match most recent request. - for (pk, ts) in &node.pull_request_time { + for (pk, ts) in pull_request_time.iter() { assert_eq!(*ts, requests[pk]); } // Assert that most recent pull timestamps are maintained. let max_ts = requests .iter() - .filter(|(pk, _)| !node.pull_request_time.contains(*pk)) + .filter(|(pk, _)| !pull_request_time.contains(*pk)) .map(|(_, ts)| *ts) .max() .unwrap(); let min_ts = requests .iter() - .filter(|(pk, _)| node.pull_request_time.contains(*pk)) + .filter(|(pk, _)| pull_request_time.contains(*pk)) .map(|(_, ts)| *ts) .min() .unwrap(); @@ -1236,7 +1260,7 @@ pub(crate) mod tests { ))); let caller = entry.clone(); let node_pubkey = entry.label().pubkey(); - let mut node = CrdsGossipPull::default(); + let node = CrdsGossipPull::default(); node_crds.insert(entry, 0).unwrap(); let mut ping_cache = PingCache::new( Duration::from_secs(20 * 60), // ttl @@ -1449,7 +1473,7 @@ pub(crate) mod tests { #[test] fn test_process_pull_response() { let mut node_crds = Crds::default(); - let mut node = CrdsGossipPull::default(); + let node = CrdsGossipPull::default(); let peer_pubkey = solana_sdk::pubkey::new_rand(); let peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo( diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index c8d6404d2..9d6c08229 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -313,19 +313,19 @@ pub fn cluster_info_scale() { let mut num_push_total = 0; let mut num_pushes = 0; let mut num_pulls = 0; - for node in nodes.iter() { + for (node, _, _) in nodes.iter() { //if node.0.get_votes(0).1.len() != (num_nodes * num_votes) { let has_tx = node - .0 .get_votes(&mut Cursor::default()) .1 .iter() .filter(|v| v.message.account_keys == tx.message.account_keys) .count(); - num_old += node.0.gossip.read().unwrap().push.num_old; - num_push_total += node.0.gossip.read().unwrap().push.num_total; - num_pushes += node.0.gossip.read().unwrap().push.num_pushes; - num_pulls += node.0.gossip.read().unwrap().pull.num_pulls; + let gossip = node.gossip.read().unwrap(); + num_old += gossip.push.num_old; + num_push_total += gossip.push.num_total; + num_pushes += gossip.push.num_pushes; + num_pulls += gossip.pull.num_pulls.load(Ordering::Relaxed); if has_tx == 0 { not_done += 1; } @@ -347,11 +347,12 @@ pub fn cluster_info_scale() { num_votes, time, success ); sleep(Duration::from_millis(200)); - for node in nodes.iter() { - node.0.gossip.write().unwrap().push.num_old = 0; - node.0.gossip.write().unwrap().push.num_total = 0; - node.0.gossip.write().unwrap().push.num_pushes = 0; - node.0.gossip.write().unwrap().pull.num_pulls = 0; + for (node, _, _) in nodes.iter() { + let mut gossip = node.gossip.write().unwrap(); + gossip.push.num_old = 0; + gossip.push.num_total = 0; + gossip.push.num_pushes = 0; + gossip.pull.num_pulls.store(0, Ordering::Relaxed); } }