From b5faa11f736fd836d4afdee67d827e86344b06e8 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 7 Oct 2020 18:29:20 +0000 Subject: [PATCH] removes invalid/outdated pending push messages early (#12555) In CrdsGossipPush::new_push_messages: https://github.com/solana-labs/solana/blob/972619edb/core/src/crds_gossip_push.rs#L211-L228 we already have paid the cost of looking-up the label in crds table and checking the hash value and wallclock only to find out that in some cases the value is invalid or is outdated. So might as well remove the value here rather than wait for the next call to purge_old_pending_push_messages: https://github.com/solana-labs/solana/blob/972619edb/core/src/crds_gossip_push.rs#L372 --- core/src/crds_gossip_push.rs | 85 +++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index be2e7cbbef..07a9dc791a 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -218,52 +218,59 @@ impl CrdsGossipPush { /// The list of push messages is created such that all the randomly selected peers have not /// pruned the source addresses. pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap> { - let mut total_bytes: usize = 0; - let mut values = vec![]; - let mut push_messages: HashMap> = HashMap::new(); trace!("new_push_messages {}", self.push_messages.len()); - for (label, hash) in &self.push_messages { - let res = crds.lookup_versioned(label); - if res.is_none() { - continue; - } - let version = res.unwrap(); - if version.value_hash != *hash { - continue; - } - let value = &version.value; - if value.wallclock() > now || value.wallclock() + self.msg_timeout < now { - continue; - } - total_bytes += serialized_size(value).unwrap() as usize; - if total_bytes > self.max_bytes { - break; - } - values.push(value.clone()); + let push_fanout = self.push_fanout.min(self.active_set.len()); + if push_fanout == 0 { + return HashMap::default(); } - trace!( - "new_push_messages {} {}", - values.len(), - self.active_set.len() - ); - for v in values { + let mut num_pushes = 0; + let mut num_values = 0; + let mut total_bytes: usize = 0; + let mut labels = vec![]; + let mut push_messages: HashMap> = HashMap::new(); + let cutoff = now.saturating_sub(self.msg_timeout); + let lookup = |label, &hash| -> Option<&CrdsValue> { + let value = crds.lookup_versioned(label)?; + if value.value_hash != hash || value.value.wallclock() < cutoff { + None + } else { + Some(&value.value) + } + }; + let mut push_value = |origin: Pubkey, value: &CrdsValue| { //use a consistent index for the same origin so //the active set learns the MST for that origin - let start = v.label().pubkey().as_ref()[0] as usize; - let max = self.push_fanout.min(self.active_set.len()); - for i in start..(start + max) { - let ix = i % self.active_set.len(); - if let Some((p, filter)) = self.active_set.get_index(ix) { - if !filter.contains(&v.label().pubkey()) { - trace!("new_push_messages insert {} {:?}", *p, v); - push_messages.entry(*p).or_default().push(v.clone()); - self.num_pushes += 1; - } + let start = origin.as_ref()[0] as usize; + for i in start..(start + push_fanout) { + let index = i % self.active_set.len(); + let (peer, filter) = self.active_set.get_index(index).unwrap(); + if !filter.contains(&origin) { + trace!("new_push_messages insert {} {:?}", *peer, value); + push_messages.entry(*peer).or_default().push(value.clone()); + num_pushes += 1; + } + } + }; + for (label, hash) in &self.push_messages { + match lookup(label, hash) { + None => labels.push(label.clone()), + Some(value) if value.wallclock() > now => continue, + Some(value) => { + total_bytes += serialized_size(value).unwrap() as usize; + if total_bytes > self.max_bytes { + break; + } + num_values += 1; + labels.push(label.clone()); + push_value(label.pubkey(), value); } - self.push_messages.remove(&v.label()); } } - + self.num_pushes += num_pushes; + trace!("new_push_messages {} {}", num_values, self.active_set.len()); + for label in labels { + self.push_messages.remove(&label); + } for target_pubkey in push_messages.keys() { *self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now; }