diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 7099132891..a6ee16eb78 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -285,7 +285,7 @@ impl CrdsGossip { now: u64, process_pull_stats: &mut ProcessPullStats, ) { - let success = self.pull.process_pull_responses( + self.pull.process_pull_responses( &mut self.crds, from, responses, @@ -294,7 +294,6 @@ impl CrdsGossip { now, process_pull_stats, ); - self.push.push_pull_responses(success, now); } pub fn make_timeouts_test(&self) -> HashMap { @@ -316,10 +315,6 @@ impl CrdsGossip { timeouts: &HashMap, ) -> usize { let mut rv = 0; - if now > self.push.msg_timeout { - let min = now - self.push.msg_timeout; - self.push.purge_old_pending_push_messages(&self.crds, min); - } if now > 5 * self.push.msg_timeout { let min = now - 5 * self.push.msg_timeout; self.push.purge_old_received_cache(min); diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 70192b3598..add744f043 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -15,7 +15,7 @@ use crate::{ crds::{Crds, CrdsError}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, crds_gossip_error::CrdsGossipError, - crds_value::{CrdsValue, CrdsValueLabel}, + crds_value::CrdsValue, ping_pong::PingCache, }; use itertools::Itertools; @@ -412,8 +412,7 @@ impl CrdsGossipPull { mut failed_inserts: Vec, now: u64, stats: &mut ProcessPullStats, - ) -> Vec<(CrdsValueLabel, Hash, u64)> { - let mut success = vec![]; + ) { let mut owners = HashSet::new(); for response in responses_expired_timeout { match crds.insert(response, now) { @@ -424,17 +423,14 @@ impl CrdsGossipPull { } } for response in responses { - let label = response.label(); - let wallclock = response.wallclock(); + let owner = response.pubkey(); match crds.insert(response, now) { Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash), Err(CrdsError::UnknownStakes) => (), Ok(old) => { stats.success += 1; self.num_pulls += 1; - owners.insert(label.pubkey()); - let value_hash = crds.get(&label).unwrap().value_hash; - success.push((label, value_hash, wallclock)); + owners.insert(owner); if let Some(val) = old { self.purged_values.push_back((val.value_hash, now)) } @@ -449,7 +445,6 @@ impl CrdsGossipPull { self.purge_failed_inserts(now); self.failed_inserts .extend(failed_inserts.into_iter().zip(std::iter::repeat(now))); - success } pub fn purge_failed_inserts(&mut self, now: u64) { diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 3ce4ae39b1..bbe87666ee 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -11,10 +11,10 @@ use crate::{ cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, contact_info::ContactInfo, - crds::{Crds, VersionedCrdsValue}, + crds::{Crds, Cursor, VersionedCrdsValue}, crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}, crds_gossip_error::CrdsGossipError, - crds_value::{CrdsValue, CrdsValueLabel}, + crds_value::CrdsValue, weighted_shuffle::weighted_shuffle, }; use bincode::serialized_size; @@ -23,10 +23,11 @@ use itertools::Itertools; use lru::LruCache; use rand::{seq::SliceRandom, Rng}; use solana_runtime::bloom::{AtomicBloom, Bloom}; -use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; +use solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp}; use std::{ cmp, collections::{HashMap, HashSet}, + ops::RangeBounds, }; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; @@ -46,8 +47,8 @@ pub struct CrdsGossipPush { pub max_bytes: usize, /// active set of validators for push active_set: IndexMap>, - /// push message queue - push_messages: HashMap, + /// Cursor into the crds table for values to push. + crds_cursor: Cursor, /// Cache that tracks which validators a message was received from /// bool indicates it has been pruned. /// This cache represents a lagging view of which validators @@ -69,7 +70,7 @@ impl Default for CrdsGossipPush { // Allow upto 64 Crds Values per PUSH max_bytes: PACKET_DATA_SIZE * 64, active_set: IndexMap::new(), - push_messages: HashMap::new(), + crds_cursor: Cursor::default(), received_cache: HashMap::new(), last_pushed_to: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY), num_active: CRDS_GOSSIP_NUM_ACTIVE, @@ -83,8 +84,9 @@ impl Default for CrdsGossipPush { } } impl CrdsGossipPush { - pub fn num_pending(&self) -> usize { - self.push_messages.len() + pub fn num_pending(&self, crds: &Crds) -> usize { + let mut cursor = self.crds_cursor; + crds.get_entries(&mut cursor).count() } fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 { @@ -163,6 +165,10 @@ impl CrdsGossipPush { pruned_peers } + fn wallclock_window(&self, now: u64) -> impl RangeBounds { + now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout) + } + /// process a push message to the network pub fn process_push_message( &mut self, @@ -172,39 +178,20 @@ impl CrdsGossipPush { now: u64, ) -> Result, CrdsGossipError> { self.num_total += 1; - let range = now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout); - if !range.contains(&value.wallclock()) { + if !self.wallclock_window(now).contains(&value.wallclock()) { return Err(CrdsGossipError::PushMessageTimeout); } - let label = value.label(); - let origin = label.pubkey(); + let origin = value.pubkey(); self.received_cache .entry(origin) .or_default() .entry(*from) .and_modify(|(_pruned, timestamp)| *timestamp = now) .or_insert((/*pruned:*/ false, now)); - match crds.insert(value, now) { - Err(_) => { - self.num_old += 1; - Err(CrdsGossipError::PushMessageOldVersion) - } - Ok(old) => { - let value_hash = crds.get(&label).unwrap().value_hash; - self.push_messages.insert(label, value_hash); - Ok(old) - } - } - } - - /// push pull responses - pub fn push_pull_responses(&mut self, values: Vec<(CrdsValueLabel, Hash, u64)>, now: u64) { - for (label, value_hash, wc) in values { - if now > wc.checked_add(self.msg_timeout).unwrap_or(0) { - continue; - } - self.push_messages.insert(label, value_hash); - } + crds.insert(value, now).map_err(|_| { + self.num_old += 1; + CrdsGossipError::PushMessageOldVersion + }) } /// New push message to broadcast to peers. @@ -213,7 +200,6 @@ impl CrdsGossipPush { /// The list of push messages is created such that all the randomly selected peers have not /// pruned the source addresses. pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap> { - trace!("new_push_messages {}", self.push_messages.len()); let push_fanout = self.push_fanout.min(self.active_set.len()); if push_fanout == 0 { return HashMap::default(); @@ -221,22 +207,24 @@ impl CrdsGossipPush { let mut num_pushes = 0; let mut num_values = 0; let mut total_bytes: usize = 0; - let mut labels = vec![]; let mut push_messages: HashMap> = HashMap::new(); - let cutoff = now.saturating_sub(self.msg_timeout); - let lookup = |label, &hash| -> Option<&CrdsValue> { - let value = crds.lookup_versioned(label)?; - if value.value_hash != hash || value.value.wallclock() < cutoff { - None - } else { - Some(&value.value) + let wallclock_window = self.wallclock_window(now); + let entries = crds + .get_entries(&mut self.crds_cursor) + .map(|entry| &entry.value) + .filter(|value| wallclock_window.contains(&value.wallclock())); + for value in entries { + let serialized_size = serialized_size(&value).unwrap(); + total_bytes = total_bytes.saturating_add(serialized_size as usize); + if total_bytes > self.max_bytes { + break; } - }; - let mut push_value = |origin: Pubkey, value: &CrdsValue| { - //use a consistent index for the same origin so - //the active set learns the MST for that origin - let start = origin.as_ref()[0] as usize; - for i in start..(start + push_fanout) { + num_values += 1; + let origin = value.pubkey(); + // Use a consistent index for the same origin so the active set + // learns the MST for that origin. + let offset = origin.as_ref()[0] as usize; + for i in offset..offset + push_fanout { let index = i % self.active_set.len(); let (peer, filter) = self.active_set.get_index(index).unwrap(); if !filter.contains(&origin) || value.should_force_push(peer) { @@ -245,27 +233,9 @@ impl CrdsGossipPush { num_pushes += 1; } } - }; - for (label, hash) in &self.push_messages { - match lookup(label, hash) { - None => labels.push(label.clone()), - Some(value) if value.wallclock() > now => continue, - Some(value) => { - total_bytes += serialized_size(value).unwrap() as usize; - if total_bytes > self.max_bytes { - break; - } - num_values += 1; - labels.push(label.clone()); - push_value(label.pubkey(), value); - } - } } self.num_pushes += num_pushes; trace!("new_push_messages {} {}", num_values, self.active_set.len()); - for label in labels { - self.push_messages.remove(&label); - } for target_pubkey in push_messages.keys().copied() { self.last_pushed_to.put(target_pubkey, now); } @@ -400,15 +370,6 @@ impl CrdsGossipPush { .collect() } - /// purge old pending push messages - pub fn purge_old_pending_push_messages(&mut self, crds: &Crds, min_time: u64) { - self.push_messages.retain(|k, hash| { - matches!(crds.lookup_versioned(k), Some(versioned) if - versioned.value.wallclock() >= min_time - && versioned.value_hash == *hash) - }); - } - /// purge received push message cache pub fn purge_old_received_cache(&mut self, min_time: u64) { self.received_cache.retain(|_, v| { @@ -430,7 +391,6 @@ impl CrdsGossipPush { } Self { active_set, - push_messages: self.push_messages.clone(), received_cache: self.received_cache.clone(), last_pushed_to, ..*self @@ -879,7 +839,6 @@ mod test { push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 1), Ok(None) ); - push.purge_old_pending_push_messages(&crds, 0); assert_eq!(push.new_push_messages(&crds, 0), expected); } diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 0e2c18b877..656332bfab 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -415,7 +415,10 @@ fn network_run_push( } total = network_values .par_iter() - .map(|v| v.lock().unwrap().push.num_pending()) + .map(|node| { + let gossip = node.gossip.lock().unwrap(); + gossip.push.num_pending(&gossip.crds) + }) .sum(); trace!( "network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}",