From a698e3474438759f070072d0f5f75b028cf9f526 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 3 May 2021 16:00:17 +0000 Subject: [PATCH] patches local pending push messages processing (#16833) process_push_messages writes local pending push messages to the crds table, but it discards the return value: https://github.com/solana-labs/solana/blob/cf779c63c/core/src/crds_gossip.rs#L96-L102 In order to exclude outdated values from the next pull-request, we need to record the hash of values purged/overridden by the local push messages, otherwise pull-responses will return outdated values back to the node: https://github.com/solana-labs/solana/blob/c1829dd00/core/src/crds_gossip_pull.rs#L447-L452 Additionally, gossip packets arrive and are processed out of order. So, local pending push messages should be flushed *before* generating bloom filters for pull-requests, preventing pull-responses returning the same values back to the node itself. This requires flipping order of generating pull and push messages: https://github.com/solana-labs/solana/blob/cf779c63c/core/src/cluster_info.rs#L1757-L1762 Both above bugs cause redundant traffic and bandwidth waste in gossip pull-responses. --- core/src/cluster_info.rs | 28 +++++++++++++++------------- core/src/crds_gossip.rs | 15 ++------------- 2 files changed, 17 insertions(+), 26 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 9dc108dba1..caf45e826f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1616,7 +1616,7 @@ impl ClusterInfo { pub fn flush_push_queue(&self) { let pending_push_messages = self.drain_push_queue(); let mut gossip = self.gossip.write().unwrap(); - gossip.process_push_messages(pending_push_messages, timestamp()); + gossip.process_push_message(&self.id, pending_push_messages, timestamp()); } fn new_push_requests( &self, @@ -1667,20 +1667,22 @@ impl ClusterInfo { require_stake_for_gossip: bool, ) -> Vec<(SocketAddr, Protocol)> { self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes); - let mut pulls: Vec<_> = if generate_pull_requests { - self.new_pull_requests(&thread_pool, gossip_validators, stakes) - } else { - vec![] - }; - let mut pushes: Vec<_> = self.new_push_requests(stakes, require_stake_for_gossip); - self.stats - .packets_sent_pull_requests_count - .add_relaxed(pulls.len() as u64); + // This will flush local pending push messages before generating + // pull-request bloom filters, preventing pull responses to return the + // same values back to the node itself. Note that packets will arrive + // and are processed out of order. + let mut out: Vec<_> = self.new_push_requests(stakes, require_stake_for_gossip); self.stats .packets_sent_push_messages_count - .add_relaxed(pushes.len() as u64); - pulls.append(&mut pushes); - pulls + .add_relaxed(out.len() as u64); + if generate_pull_requests { + let pull_requests = self.new_pull_requests(&thread_pool, gossip_validators, stakes); + self.stats + .packets_sent_pull_requests_count + .add_relaxed(pull_requests.len() as u64); + out.extend(pull_requests); + } + out } /// At random pick a node and try to get updated changes from them diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index cc416d6f86..b70930a932 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -91,24 +91,13 @@ impl CrdsGossip { prune_map } - pub(crate) fn process_push_messages( - &mut self, - pending_push_messages: Vec, - timestamp: u64, - ) { - for push_message in pending_push_messages { - let _ = - self.push - .process_push_message(&mut self.crds, &self.id, push_message, timestamp); - } - } - pub fn new_push_messages( &mut self, pending_push_messages: Vec, now: u64, ) -> HashMap> { - self.process_push_messages(pending_push_messages, now); + let self_pubkey = self.id; + self.process_push_message(&self_pubkey, pending_push_messages, now); self.push.new_push_messages(&self.crds, now) }