patches local pending push messages processing (#16833)
process_push_messages writes local pending push messages to the crds table, but it discards the return value: https://github.com/solana-labs/solana/blob/cf779c63c/core/src/crds_gossip.rs#L96-L102 In order to exclude outdated values from the next pull-request, we need to record the hash of values purged/overridden by the local push messages, otherwise pull-responses will return outdated values back to the node: https://github.com/solana-labs/solana/blob/c1829dd00/core/src/crds_gossip_pull.rs#L447-L452 Additionally, gossip packets arrive and are processed out of order. So, local pending push messages should be flushed *before* generating bloom filters for pull-requests, preventing pull-responses returning the same values back to the node itself. This requires flipping order of generating pull and push messages: https://github.com/solana-labs/solana/blob/cf779c63c/core/src/cluster_info.rs#L1757-L1762 Both above bugs cause redundant traffic and bandwidth waste in gossip pull-responses.
This commit is contained in:
parent
541aa5ad85
commit
a698e34744
|
@ -1616,7 +1616,7 @@ impl ClusterInfo {
|
|||
pub fn flush_push_queue(&self) {
|
||||
let pending_push_messages = self.drain_push_queue();
|
||||
let mut gossip = self.gossip.write().unwrap();
|
||||
gossip.process_push_messages(pending_push_messages, timestamp());
|
||||
gossip.process_push_message(&self.id, pending_push_messages, timestamp());
|
||||
}
|
||||
fn new_push_requests(
|
||||
&self,
|
||||
|
@ -1667,20 +1667,22 @@ impl ClusterInfo {
|
|||
require_stake_for_gossip: bool,
|
||||
) -> Vec<(SocketAddr, Protocol)> {
|
||||
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes);
|
||||
let mut pulls: Vec<_> = if generate_pull_requests {
|
||||
self.new_pull_requests(&thread_pool, gossip_validators, stakes)
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
let mut pushes: Vec<_> = self.new_push_requests(stakes, require_stake_for_gossip);
|
||||
self.stats
|
||||
.packets_sent_pull_requests_count
|
||||
.add_relaxed(pulls.len() as u64);
|
||||
// This will flush local pending push messages before generating
|
||||
// pull-request bloom filters, preventing pull responses to return the
|
||||
// same values back to the node itself. Note that packets will arrive
|
||||
// and are processed out of order.
|
||||
let mut out: Vec<_> = self.new_push_requests(stakes, require_stake_for_gossip);
|
||||
self.stats
|
||||
.packets_sent_push_messages_count
|
||||
.add_relaxed(pushes.len() as u64);
|
||||
pulls.append(&mut pushes);
|
||||
pulls
|
||||
.add_relaxed(out.len() as u64);
|
||||
if generate_pull_requests {
|
||||
let pull_requests = self.new_pull_requests(&thread_pool, gossip_validators, stakes);
|
||||
self.stats
|
||||
.packets_sent_pull_requests_count
|
||||
.add_relaxed(pull_requests.len() as u64);
|
||||
out.extend(pull_requests);
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// At random pick a node and try to get updated changes from them
|
||||
|
|
|
@ -91,24 +91,13 @@ impl CrdsGossip {
|
|||
prune_map
|
||||
}
|
||||
|
||||
pub(crate) fn process_push_messages(
|
||||
&mut self,
|
||||
pending_push_messages: Vec<CrdsValue>,
|
||||
timestamp: u64,
|
||||
) {
|
||||
for push_message in pending_push_messages {
|
||||
let _ =
|
||||
self.push
|
||||
.process_push_message(&mut self.crds, &self.id, push_message, timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_push_messages(
|
||||
&mut self,
|
||||
pending_push_messages: Vec<CrdsValue>,
|
||||
now: u64,
|
||||
) -> HashMap<Pubkey, Vec<CrdsValue>> {
|
||||
self.process_push_messages(pending_push_messages, now);
|
||||
let self_pubkey = self.id;
|
||||
self.process_push_message(&self_pubkey, pending_push_messages, now);
|
||||
self.push.new_push_messages(&self.crds, now)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue