records hash of timed-out pull responses

Gossip should record hash of pull responses which are timed out and
fail to insert:
https://github.com/solana-labs/solana/blob/ed51cde37/core/src/crds_gossip_pull.rs#L397-L400

so that they are excluded from the next pull request:
https://github.com/solana-labs/solana/blob/ed51cde37/core/src/crds_gossip_pull.rs#L486-L491

otherwise the next pull request will likely include the same timed out
values and waste bandwidth.
This commit is contained in:
behzad nouri 2021-05-16 10:37:45 -04:00
parent d41266e4e9
commit a7870cda8d
1 changed files with 17 additions and 16 deletions

View File

@ -367,37 +367,38 @@ impl CrdsGossipPull {
) -> (Vec<CrdsValue>, Vec<CrdsValue>, Vec<Hash>) { ) -> (Vec<CrdsValue>, Vec<CrdsValue>, Vec<Hash>) {
let mut active_values = vec![]; let mut active_values = vec![];
let mut expired_values = vec![]; let mut expired_values = vec![];
let mut failed_inserts = vec![];
let mut maybe_push = |response, values: &mut Vec<CrdsValue>| {
if crds.upserts(&response) {
values.push(response);
} else {
let response = bincode::serialize(&response).unwrap();
failed_inserts.push(hash(&response));
}
};
let default_timeout = timeouts let default_timeout = timeouts
.get(&Pubkey::default()) .get(&Pubkey::default())
.copied() .copied()
.unwrap_or(self.msg_timeout); .unwrap_or(self.msg_timeout);
for response in responses { let upsert = |response: CrdsValue| {
let owner = response.label().pubkey(); let owner = response.label().pubkey();
// Check if the crds value is older than the msg_timeout // Check if the crds value is older than the msg_timeout
let timeout = timeouts.get(&owner).copied().unwrap_or(default_timeout); let timeout = timeouts.get(&owner).copied().unwrap_or(default_timeout);
// Before discarding this value, check if a ContactInfo for the // Before discarding this value, check if a ContactInfo for the
// owner exists in the table. If it doesn't, that implies that this // owner exists in the table. If it doesn't, that implies that this
// value can be discarded // value can be discarded
if now <= response.wallclock().saturating_add(timeout) { if !crds.upserts(&response) {
maybe_push(response, &mut active_values); Some(response)
} else if now <= response.wallclock().saturating_add(timeout) {
active_values.push(response);
None
} else if crds.get_contact_info(owner).is_some() { } else if crds.get_contact_info(owner).is_some() {
// Silently insert this old value without bumping record // Silently insert this old value without bumping record
// timestamps // timestamps
maybe_push(response, &mut expired_values); expired_values.push(response);
None
} else { } else {
stats.timeout_count += 1; stats.timeout_count += 1;
stats.failed_timeout += 1; stats.failed_timeout += 1;
Some(response)
} }
} };
let failed_inserts = responses
.into_iter()
.filter_map(upsert)
.map(|resp| hash(&bincode::serialize(&resp).unwrap()))
.collect();
(active_values, expired_values, failed_inserts) (active_values, expired_values, failed_inserts)
} }
@ -1529,7 +1530,7 @@ mod test {
node.msg_timeout + 100, node.msg_timeout + 100,
) )
.0, .0,
2 4
); );
let mut node_crds = Crds::default(); let mut node_crds = Crds::default();
@ -1574,7 +1575,7 @@ mod test {
node.msg_timeout + 2, node.msg_timeout + 2,
) )
.0, .0,
1 2
); );
} }
} }