diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 7b7b1cda3b..603f0875be 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -367,37 +367,38 @@ impl CrdsGossipPull { ) -> (Vec, Vec, Vec) { let mut active_values = vec![]; let mut expired_values = vec![]; - let mut failed_inserts = vec![]; - let mut maybe_push = |response, values: &mut Vec| { - if crds.upserts(&response) { - values.push(response); - } else { - let response = bincode::serialize(&response).unwrap(); - failed_inserts.push(hash(&response)); - } - }; let default_timeout = timeouts .get(&Pubkey::default()) .copied() .unwrap_or(self.msg_timeout); - for response in responses { + let upsert = |response: CrdsValue| { let owner = response.label().pubkey(); // Check if the crds value is older than the msg_timeout let timeout = timeouts.get(&owner).copied().unwrap_or(default_timeout); // Before discarding this value, check if a ContactInfo for the // owner exists in the table. If it doesn't, that implies that this // value can be discarded - if now <= response.wallclock().saturating_add(timeout) { - maybe_push(response, &mut active_values); + if !crds.upserts(&response) { + Some(response) + } else if now <= response.wallclock().saturating_add(timeout) { + active_values.push(response); + None } else if crds.get_contact_info(owner).is_some() { // Silently insert this old value without bumping record // timestamps - maybe_push(response, &mut expired_values); + expired_values.push(response); + None } else { stats.timeout_count += 1; stats.failed_timeout += 1; + Some(response) } - } + }; + let failed_inserts = responses + .into_iter() + .filter_map(upsert) + .map(|resp| hash(&bincode::serialize(&resp).unwrap())) + .collect(); (active_values, expired_values, failed_inserts) } @@ -1529,7 +1530,7 @@ mod test { node.msg_timeout + 100, ) .0, - 2 + 4 ); let mut node_crds = Crds::default(); @@ -1574,7 +1575,7 @@ mod test { node.msg_timeout + 2, ) .0, - 1 + 2 ); } }