skips process_push_message for local messages (#18493)

received_cache is not relevant for local messages, and does not need to
be updated:
https://github.com/solana-labs/solana/blob/92c5cdab6/gossip/src/crds_gossip_push.rs#L166-L189
This commit is contained in:
behzad nouri 2021-07-09 01:42:13 +00:00 committed by GitHub
parent aeae14eff0
commit 27cc7577a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 27 deletions

View File

@ -975,7 +975,12 @@ impl ClusterInfo {
reset = true;
}
let mut gossip = self.gossip.write().unwrap();
gossip.process_push_message(&self_pubkey, entries, timestamp());
let now = timestamp();
for entry in entries {
if let Err(err) = gossip.crds.insert(entry, now) {
error!("push_epoch_slots failed: {:?}", err);
}
}
}
fn time_gossip_read_lock<'a>(
@ -1034,10 +1039,10 @@ impl ClusterInfo {
let vote = Vote::new(self_pubkey, vote, now);
let vote = CrdsData::Vote(vote_index, vote);
let vote = CrdsValue::new_signed(vote, &self.keypair());
self.gossip
.write()
.unwrap()
.process_push_message(&self_pubkey, vec![vote], now);
let mut gossip = self.gossip.write().unwrap();
if let Err(err) = gossip.crds.insert(vote, now) {
error!("push_vote failed: {:?}", err);
}
}
pub fn push_vote(&self, tower: &[Slot], vote: Transaction) {
@ -1538,7 +1543,10 @@ impl ClusterInfo {
pub fn flush_push_queue(&self) {
let pending_push_messages = self.drain_push_queue();
let mut gossip = self.gossip.write().unwrap();
gossip.process_push_message(&self.id(), pending_push_messages, timestamp());
let now = timestamp();
for entry in pending_push_messages {
let _ = gossip.crds.insert(entry, now);
}
}
fn new_push_requests(
&self,
@ -3541,7 +3549,6 @@ mod tests {
None, // payer
);
cluster_info.push_vote(&unrefresh_tower, unrefresh_tx.clone());
cluster_info.flush_push_queue();
let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![unrefresh_tx.clone()]);
@ -3563,7 +3570,6 @@ mod tests {
// Trying to refresh vote when it doesn't yet exist in gossip
// shouldn't add the vote
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
cluster_info.flush_push_queue();
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![]);
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
@ -3572,7 +3578,6 @@ mod tests {
// Push the new vote for `refresh_slot`
cluster_info.push_vote(&refresh_tower, refresh_tx.clone());
cluster_info.flush_push_queue();
// Should be two votes in gossip
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
@ -3598,8 +3603,6 @@ mod tests {
);
cluster_info.refresh_vote(latest_refresh_tx.clone(), refresh_slot);
}
cluster_info.flush_push_queue();
// The diff since `max_ts` should only be the latest refreshed vote
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
@ -3640,7 +3643,6 @@ mod tests {
);
let tower = vec![7]; // Last slot in the vote.
cluster_info.push_vote(&tower, tx.clone());
cluster_info.flush_push_queue();
let (labels, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![tx]);

View File

@ -61,16 +61,16 @@ impl CrdsGossip {
}
/// process a push message to the network
/// Returns origins' pubkeys of upserted values.
/// Returns unique origins' pubkeys of upserted values.
pub fn process_push_message(
&mut self,
from: &Pubkey,
values: Vec<CrdsValue>,
now: u64,
) -> Vec<Pubkey> {
) -> HashSet<Pubkey> {
values
.into_iter()
.flat_map(|val| {
.filter_map(|val| {
let origin = val.pubkey();
self.push
.process_push_message(&mut self.crds, from, val, now)
@ -106,8 +106,9 @@ impl CrdsGossip {
pending_push_messages: Vec<CrdsValue>,
now: u64,
) -> HashMap<Pubkey, Vec<CrdsValue>> {
let self_pubkey = self.id;
self.process_push_message(&self_pubkey, pending_push_messages, now);
for entry in pending_push_messages {
let _ = self.crds.insert(entry, now);
}
self.push.new_push_messages(&self.crds, now)
}
@ -161,15 +162,17 @@ impl CrdsGossip {
} else {
offset
};
let entries = chunks
.enumerate()
.map(|(k, chunk)| {
let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS;
let data = CrdsData::DuplicateShred(index, chunk);
CrdsValue::new_signed(data, keypair)
})
.collect();
self.process_push_message(&pubkey, entries, timestamp());
let entries = chunks.enumerate().map(|(k, chunk)| {
let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS;
let data = CrdsData::DuplicateShred(index, chunk);
CrdsValue::new_signed(data, keypair)
});
let now = timestamp();
for entry in entries {
if let Err(err) = self.crds.insert(entry, now) {
error!("push_duplicate_shred faild: {:?}", err);
}
}
Ok(())
}

View File

@ -164,7 +164,7 @@ impl CrdsGossipPush {
}
/// process a push message to the network
pub fn process_push_message(
pub(crate) fn process_push_message(
&mut self,
crds: &mut Crds,
from: &Pubkey,