diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index be2e7cbbef..07a9dc791a 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -218,52 +218,59 @@ impl CrdsGossipPush { /// The list of push messages is created such that all the randomly selected peers have not /// pruned the source addresses. pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap> { - let mut total_bytes: usize = 0; - let mut values = vec![]; - let mut push_messages: HashMap> = HashMap::new(); trace!("new_push_messages {}", self.push_messages.len()); - for (label, hash) in &self.push_messages { - let res = crds.lookup_versioned(label); - if res.is_none() { - continue; - } - let version = res.unwrap(); - if version.value_hash != *hash { - continue; - } - let value = &version.value; - if value.wallclock() > now || value.wallclock() + self.msg_timeout < now { - continue; - } - total_bytes += serialized_size(value).unwrap() as usize; - if total_bytes > self.max_bytes { - break; - } - values.push(value.clone()); + let push_fanout = self.push_fanout.min(self.active_set.len()); + if push_fanout == 0 { + return HashMap::default(); } - trace!( - "new_push_messages {} {}", - values.len(), - self.active_set.len() - ); - for v in values { + let mut num_pushes = 0; + let mut num_values = 0; + let mut total_bytes: usize = 0; + let mut labels = vec![]; + let mut push_messages: HashMap> = HashMap::new(); + let cutoff = now.saturating_sub(self.msg_timeout); + let lookup = |label, &hash| -> Option<&CrdsValue> { + let value = crds.lookup_versioned(label)?; + if value.value_hash != hash || value.value.wallclock() < cutoff { + None + } else { + Some(&value.value) + } + }; + let mut push_value = |origin: Pubkey, value: &CrdsValue| { //use a consistent index for the same origin so //the active set learns the MST for that origin - let start = v.label().pubkey().as_ref()[0] as usize; - let max = self.push_fanout.min(self.active_set.len()); - for i in start..(start + max) { - let ix = i % self.active_set.len(); - if let Some((p, filter)) = self.active_set.get_index(ix) { - if !filter.contains(&v.label().pubkey()) { - trace!("new_push_messages insert {} {:?}", *p, v); - push_messages.entry(*p).or_default().push(v.clone()); - self.num_pushes += 1; - } + let start = origin.as_ref()[0] as usize; + for i in start..(start + push_fanout) { + let index = i % self.active_set.len(); + let (peer, filter) = self.active_set.get_index(index).unwrap(); + if !filter.contains(&origin) { + trace!("new_push_messages insert {} {:?}", *peer, value); + push_messages.entry(*peer).or_default().push(value.clone()); + num_pushes += 1; + } + } + }; + for (label, hash) in &self.push_messages { + match lookup(label, hash) { + None => labels.push(label.clone()), + Some(value) if value.wallclock() > now => continue, + Some(value) => { + total_bytes += serialized_size(value).unwrap() as usize; + if total_bytes > self.max_bytes { + break; + } + num_values += 1; + labels.push(label.clone()); + push_value(label.pubkey(), value); } - self.push_messages.remove(&v.label()); } } - + self.num_pushes += num_pushes; + trace!("new_push_messages {} {}", num_values, self.active_set.len()); + for label in labels { + self.push_messages.remove(&label); + } for target_pubkey in push_messages.keys() { *self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now; }