removes invalid/outdated pending push messages early (#12555)
In CrdsGossipPush::new_push_messages: https://github.com/solana-labs/solana/blob/972619edb/core/src/crds_gossip_push.rs#L211-L228 we already have paid the cost of looking-up the label in crds table and checking the hash value and wallclock only to find out that in some cases the value is invalid or is outdated. So might as well remove the value here rather than wait for the next call to purge_old_pending_push_messages: https://github.com/solana-labs/solana/blob/972619edb/core/src/crds_gossip_push.rs#L372
This commit is contained in:
parent
41ad3dd8f0
commit
b5faa11f73
|
@ -218,52 +218,59 @@ impl CrdsGossipPush {
|
|||
/// The list of push messages is created such that all the randomly selected peers have not
|
||||
/// pruned the source addresses.
|
||||
pub fn new_push_messages(&mut self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> {
|
||||
let mut total_bytes: usize = 0;
|
||||
let mut values = vec![];
|
||||
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
|
||||
trace!("new_push_messages {}", self.push_messages.len());
|
||||
let push_fanout = self.push_fanout.min(self.active_set.len());
|
||||
if push_fanout == 0 {
|
||||
return HashMap::default();
|
||||
}
|
||||
let mut num_pushes = 0;
|
||||
let mut num_values = 0;
|
||||
let mut total_bytes: usize = 0;
|
||||
let mut labels = vec![];
|
||||
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
|
||||
let cutoff = now.saturating_sub(self.msg_timeout);
|
||||
let lookup = |label, &hash| -> Option<&CrdsValue> {
|
||||
let value = crds.lookup_versioned(label)?;
|
||||
if value.value_hash != hash || value.value.wallclock() < cutoff {
|
||||
None
|
||||
} else {
|
||||
Some(&value.value)
|
||||
}
|
||||
};
|
||||
let mut push_value = |origin: Pubkey, value: &CrdsValue| {
|
||||
//use a consistent index for the same origin so
|
||||
//the active set learns the MST for that origin
|
||||
let start = origin.as_ref()[0] as usize;
|
||||
for i in start..(start + push_fanout) {
|
||||
let index = i % self.active_set.len();
|
||||
let (peer, filter) = self.active_set.get_index(index).unwrap();
|
||||
if !filter.contains(&origin) {
|
||||
trace!("new_push_messages insert {} {:?}", *peer, value);
|
||||
push_messages.entry(*peer).or_default().push(value.clone());
|
||||
num_pushes += 1;
|
||||
}
|
||||
}
|
||||
};
|
||||
for (label, hash) in &self.push_messages {
|
||||
let res = crds.lookup_versioned(label);
|
||||
if res.is_none() {
|
||||
continue;
|
||||
}
|
||||
let version = res.unwrap();
|
||||
if version.value_hash != *hash {
|
||||
continue;
|
||||
}
|
||||
let value = &version.value;
|
||||
if value.wallclock() > now || value.wallclock() + self.msg_timeout < now {
|
||||
continue;
|
||||
}
|
||||
match lookup(label, hash) {
|
||||
None => labels.push(label.clone()),
|
||||
Some(value) if value.wallclock() > now => continue,
|
||||
Some(value) => {
|
||||
total_bytes += serialized_size(value).unwrap() as usize;
|
||||
if total_bytes > self.max_bytes {
|
||||
break;
|
||||
}
|
||||
values.push(value.clone());
|
||||
}
|
||||
trace!(
|
||||
"new_push_messages {} {}",
|
||||
values.len(),
|
||||
self.active_set.len()
|
||||
);
|
||||
for v in values {
|
||||
//use a consistent index for the same origin so
|
||||
//the active set learns the MST for that origin
|
||||
let start = v.label().pubkey().as_ref()[0] as usize;
|
||||
let max = self.push_fanout.min(self.active_set.len());
|
||||
for i in start..(start + max) {
|
||||
let ix = i % self.active_set.len();
|
||||
if let Some((p, filter)) = self.active_set.get_index(ix) {
|
||||
if !filter.contains(&v.label().pubkey()) {
|
||||
trace!("new_push_messages insert {} {:?}", *p, v);
|
||||
push_messages.entry(*p).or_default().push(v.clone());
|
||||
self.num_pushes += 1;
|
||||
num_values += 1;
|
||||
labels.push(label.clone());
|
||||
push_value(label.pubkey(), value);
|
||||
}
|
||||
}
|
||||
self.push_messages.remove(&v.label());
|
||||
}
|
||||
self.num_pushes += num_pushes;
|
||||
trace!("new_push_messages {} {}", num_values, self.active_set.len());
|
||||
for label in labels {
|
||||
self.push_messages.remove(&label);
|
||||
}
|
||||
|
||||
for target_pubkey in push_messages.keys() {
|
||||
*self.last_pushed_to.entry(*target_pubkey).or_insert(0) = now;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue