retains peer's contact-info when making pull requests (#16715)

ClusterInfo::new_pull_requests has to lookup contact-infos:
https://github.com/solana-labs/solana/blob/a1ef2bd74/core/src/cluster_info.rs#L1663-L1673

when it was already available when making pull requests:
https://github.com/solana-labs/solana/blob/a1ef2bd74/core/src/crds_gossip_pull.rs#L232
This commit is contained in:
behzad nouri 2021-04-28 13:19:12 +00:00 committed by GitHub
parent 1eaff394da
commit 25054bfd35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 92 additions and 100 deletions

View File

@ -28,7 +28,7 @@ use crate::{
result::{Error, Result}, result::{Error, Result},
weighted_shuffle::weighted_shuffle, weighted_shuffle::weighted_shuffle,
}; };
use rand::{CryptoRng, Rng}; use rand::{seq::SliceRandom, CryptoRng, Rng};
use solana_ledger::shred::Shred; use solana_ledger::shred::Shred;
use solana_sdk::sanitize::{Sanitize, SanitizeError}; use solana_sdk::sanitize::{Sanitize, SanitizeError};
@ -1440,52 +1440,43 @@ impl ClusterInfo {
fn append_entrypoint_to_pulls( fn append_entrypoint_to_pulls(
&self, &self,
thread_pool: &ThreadPool, thread_pool: &ThreadPool,
pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>, pulls: &mut Vec<(ContactInfo, Vec<CrdsFilter>)>,
) { ) {
let entrypoint_id_and_gossip = { const THROTTLE_DELAY: u64 = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2;
let entrypoint = {
let mut entrypoints = self.entrypoints.write().unwrap(); let mut entrypoints = self.entrypoints.write().unwrap();
if entrypoints.is_empty() { let entrypoint = match entrypoints.choose_mut(&mut rand::thread_rng()) {
None Some(entrypoint) => entrypoint,
} else { None => return,
let i = thread_rng().gen_range(0, entrypoints.len()); };
let entrypoint = &mut entrypoints[i]; if !pulls.is_empty() {
let now = timestamp();
if pulls.is_empty() { if now <= entrypoint.wallclock.saturating_add(THROTTLE_DELAY) {
// Nobody else to pull from, try an entrypoint return;
Some((entrypoint.id, entrypoint.gossip)) }
} else { entrypoint.wallclock = now;
let now = timestamp(); if self
if now - entrypoint.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { .time_gossip_read_lock("entrypoint", &self.stats.entrypoint)
None .crds
} else { .get_nodes_contact_info()
entrypoint.wallclock = now; .any(|node| node.gossip == entrypoint.gossip)
if self {
.time_gossip_read_lock("entrypoint", &self.stats.entrypoint) return; // Found the entrypoint, no need to pull from it
.crds
.get_nodes_contact_info()
.any(|node| node.gossip == entrypoint.gossip)
{
None // Found the entrypoint, no need to pull from it
} else {
Some((entrypoint.id, entrypoint.gossip))
}
}
} }
} }
entrypoint.clone()
}; };
let filters = match pulls.first() {
if let Some((id, gossip)) = entrypoint_id_and_gossip { Some((_, filters)) => filters.clone(),
let r_gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2); None => {
let self_info = r_gossip let gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2);
.crds gossip
.lookup(&CrdsValueLabel::ContactInfo(self.id())) .pull
.unwrap_or_else(|| panic!("self_id invalid {}", self.id())); .build_crds_filters(thread_pool, &gossip.crds, MAX_BLOOM_SIZE)
r_gossip }
.pull };
.build_crds_filters(thread_pool, &r_gossip.crds, MAX_BLOOM_SIZE) self.stats.pull_from_entrypoint_count.add_relaxed(1);
.into_iter() pulls.push((entrypoint, filters));
.for_each(|filter| pulls.push((id, filter, gossip, self_info.clone())));
}
} }
/// Splits an input feed of serializable data into chunks where the sum of /// Splits an input feed of serializable data into chunks where the sum of
@ -1546,45 +1537,36 @@ impl ClusterInfo {
) -> Vec<(SocketAddr, Protocol)> { ) -> Vec<(SocketAddr, Protocol)> {
let now = timestamp(); let now = timestamp();
let mut pulls: Vec<_> = { let mut pulls: Vec<_> = {
let r_gossip = let gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests);
self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); match gossip.new_pull_request(
r_gossip thread_pool,
.new_pull_request(thread_pool, now, gossip_validators, stakes, MAX_BLOOM_SIZE) now,
.ok() gossip_validators,
.into_iter() stakes,
.filter_map(|(peer, filters, me)| { MAX_BLOOM_SIZE,
let peer_label = CrdsValueLabel::ContactInfo(peer); ) {
r_gossip Err(_) => Vec::default(),
.crds Ok((peer, filters)) => vec![(peer, filters)],
.lookup(&peer_label) }
.and_then(CrdsValue::contact_info)
.map(move |peer_info| {
filters
.into_iter()
.map(move |f| (peer, f, peer_info.gossip, me.clone()))
})
})
.flatten()
.collect()
}; };
self.append_entrypoint_to_pulls(thread_pool, &mut pulls); self.append_entrypoint_to_pulls(thread_pool, &mut pulls);
self.stats let num_requests = pulls.iter().map(|(_, filters)| filters.len() as u64).sum();
.new_pull_requests_count self.stats.new_pull_requests_count.add_relaxed(num_requests);
.add_relaxed(pulls.len() as u64);
// There are at most 2 unique peers here: The randomly
// selected pull peer, and possibly also the entrypoint.
let peers: Vec<Pubkey> = pulls.iter().map(|(peer, _, _, _)| *peer).dedup().collect();
{ {
let mut gossip = let mut gossip =
self.time_gossip_write_lock("mark_pull", &self.stats.mark_pull_request); self.time_gossip_write_lock("mark_pull", &self.stats.mark_pull_request);
for peer in peers { for (peer, _) in &pulls {
gossip.mark_pull_request_creation_time(&peer, now); gossip.mark_pull_request_creation_time(peer.id, now);
} }
} }
let self_info = CrdsData::ContactInfo(self.my_contact_info());
let self_info = CrdsValue::new_signed(self_info, &self.keypair);
pulls pulls
.into_iter() .into_iter()
.map(|(_, filter, gossip, self_info)| { .flat_map(|(peer, filters)| std::iter::repeat(peer.gossip).zip(filters))
(gossip, Protocol::PullRequest(filter, self_info)) .map(|(gossip_addr, filter)| {
let request = Protocol::PullRequest(filter, self_info.clone());
(gossip_addr, request)
}) })
.collect() .collect()
} }
@ -3573,7 +3555,7 @@ mod tests {
.values() .values()
.for_each(|v| v.par_iter().for_each(|v| assert!(v.verify()))); .for_each(|v| v.par_iter().for_each(|v| assert!(v.verify())));
let (_, _, val) = cluster_info cluster_info
.gossip .gossip
.write() .write()
.unwrap() .unwrap()
@ -3586,7 +3568,6 @@ mod tests {
) )
.ok() .ok()
.unwrap(); .unwrap();
assert!(val.verify());
} }
#[test] #[test]
@ -4397,7 +4378,7 @@ mod tests {
.gossip .gossip
.write() .write()
.unwrap() .unwrap()
.mark_pull_request_creation_time(&peer, now); .mark_pull_request_creation_time(peer, now);
} }
assert_eq!( assert_eq!(
cluster_info cluster_info

View File

@ -96,6 +96,7 @@ pub(crate) struct GossipStats {
pub(crate) prune_message_count: Counter, pub(crate) prune_message_count: Counter,
pub(crate) prune_message_len: Counter, pub(crate) prune_message_len: Counter,
pub(crate) prune_received_cache: Counter, pub(crate) prune_received_cache: Counter,
pub(crate) pull_from_entrypoint_count: Counter,
pub(crate) pull_request_ping_pong_check_failed_count: Counter, pub(crate) pull_request_ping_pong_check_failed_count: Counter,
pub(crate) pull_requests_count: Counter, pub(crate) pull_requests_count: Counter,
pub(crate) purge: Counter, pub(crate) purge: Counter,
@ -289,6 +290,11 @@ pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock<CrdsGossi
stats.new_pull_requests_count.clear(), stats.new_pull_requests_count.clear(),
i64 i64
), ),
(
"pull_from_entrypoint_count",
stats.pull_from_entrypoint_count.clear(),
i64
),
( (
"prune_message_count", "prune_message_count",
stats.prune_message_count.clear(), stats.prune_message_count.clear(),

View File

@ -4,6 +4,7 @@
//! packet::PACKET_DATA_SIZE size. //! packet::PACKET_DATA_SIZE size.
use crate::{ use crate::{
contact_info::ContactInfo,
crds::{Crds, VersionedCrdsValue}, crds::{Crds, VersionedCrdsValue},
crds_gossip_error::CrdsGossipError, crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
@ -220,7 +221,7 @@ impl CrdsGossip {
gossip_validators: Option<&HashSet<Pubkey>>, gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
bloom_size: usize, bloom_size: usize,
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> { ) -> Result<(ContactInfo, Vec<CrdsFilter>), CrdsGossipError> {
self.pull.new_pull_request( self.pull.new_pull_request(
thread_pool, thread_pool,
&self.crds, &self.crds,
@ -237,7 +238,7 @@ impl CrdsGossip {
/// This is used for weighted random selection during `new_pull_request` /// This is used for weighted random selection during `new_pull_request`
/// It's important to use the local nodes request creation time as the weight /// It's important to use the local nodes request creation time as the weight
/// instead of the response received time otherwise failed nodes will increase their weight. /// instead of the response received time otherwise failed nodes will increase their weight.
pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) { pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) {
self.pull.mark_pull_request_creation_time(from, now) self.pull.mark_pull_request_creation_time(from, now)
} }
/// process a pull request and create a response /// process a pull request and create a response

View File

@ -213,7 +213,7 @@ impl CrdsGossipPull {
gossip_validators: Option<&HashSet<Pubkey>>, gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
bloom_size: usize, bloom_size: usize,
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> { ) -> Result<(ContactInfo, Vec<CrdsFilter>), CrdsGossipError> {
let options = self.pull_options( let options = self.pull_options(
crds, crds,
&self_id, &self_id,
@ -228,10 +228,8 @@ impl CrdsGossipPull {
let filters = self.build_crds_filters(thread_pool, crds, bloom_size); let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap(); let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap();
let random = index.sample(&mut rand::thread_rng()); let random = index.sample(&mut rand::thread_rng());
let self_info = crds let (_weight, peer) = options[random];
.lookup(&CrdsValueLabel::ContactInfo(*self_id)) Ok((peer.clone(), filters))
.unwrap_or_else(|| panic!("self_id invalid {}", self_id));
Ok((options[random].1.id, filters, self_info.clone()))
} }
fn pull_options<'a>( fn pull_options<'a>(
@ -285,8 +283,8 @@ impl CrdsGossipPull {
/// This is used for weighted random selection during `new_pull_request` /// This is used for weighted random selection during `new_pull_request`
/// It's important to use the local nodes request creation time as the weight /// It's important to use the local nodes request creation time as the weight
/// instead of the response received time otherwise failed nodes will increase their weight. /// instead of the response received time otherwise failed nodes will increase their weight.
pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) { pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) {
self.pull_request_time.put(*from, now); self.pull_request_time.put(from, now);
} }
/// Store an old hash in the purged values set /// Store an old hash in the purged values set
@ -941,7 +939,7 @@ mod test {
Err(CrdsGossipError::NoPeers) Err(CrdsGossipError::NoPeers)
); );
crds.insert(entry.clone(), 0).unwrap(); crds.insert(entry, 0).unwrap();
assert_eq!( assert_eq!(
node.new_pull_request( node.new_pull_request(
&thread_pool, &thread_pool,
@ -971,9 +969,8 @@ mod test {
&HashMap::new(), &HashMap::new(),
PACKET_DATA_SIZE, PACKET_DATA_SIZE,
); );
let (to, _, self_info) = req.unwrap(); let (peer, _) = req.unwrap();
assert_eq!(to, new.label().pubkey()); assert_eq!(peer, *new.contact_info().unwrap());
assert_eq!(self_info, entry);
} }
#[test] #[test]
@ -987,7 +984,7 @@ mod test {
))); )));
let node_pubkey = entry.label().pubkey(); let node_pubkey = entry.label().pubkey();
let mut node = CrdsGossipPull::default(); let mut node = CrdsGossipPull::default();
crds.insert(entry.clone(), now).unwrap(); crds.insert(entry, now).unwrap();
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(), &solana_sdk::pubkey::new_rand(),
0, 0,
@ -1001,7 +998,7 @@ mod test {
// set request creation time to now. // set request creation time to now.
let now = now + 50_000; let now = now + 50_000;
node.mark_pull_request_creation_time(&new.label().pubkey(), now); node.mark_pull_request_creation_time(new.label().pubkey(), now);
// odds of getting the other request should be close to 1. // odds of getting the other request should be close to 1.
let now = now + 1_000; let now = now + 1_000;
@ -1016,9 +1013,8 @@ mod test {
&HashMap::new(), &HashMap::new(),
PACKET_DATA_SIZE, PACKET_DATA_SIZE,
); );
let (to, _, self_info) = req.unwrap(); let (peer, _) = req.unwrap();
assert_eq!(to, old.label().pubkey()); assert_eq!(peer, *old.contact_info().unwrap());
assert_eq!(self_info, entry);
} }
} }
@ -1033,7 +1029,7 @@ mod test {
for k in 0..NUM_REPS { for k in 0..NUM_REPS {
let pubkey = pubkeys[rng.gen_range(0, pubkeys.len())]; let pubkey = pubkeys[rng.gen_range(0, pubkeys.len())];
let now = now + k as u64; let now = now + k as u64;
node.mark_pull_request_creation_time(&pubkey, now); node.mark_pull_request_creation_time(pubkey, now);
*requests.entry(pubkey).or_default() = now; *requests.entry(pubkey).or_default() = now;
} }
assert!(node.pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY); assert!(node.pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY);
@ -1065,6 +1061,7 @@ mod test {
&solana_sdk::pubkey::new_rand(), &solana_sdk::pubkey::new_rand(),
0, 0,
))); )));
let caller = entry.clone();
let node_pubkey = entry.label().pubkey(); let node_pubkey = entry.label().pubkey();
let node = CrdsGossipPull::default(); let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap(); node_crds.insert(entry, 0).unwrap();
@ -1086,7 +1083,7 @@ mod test {
let mut dest_crds = Crds::default(); let mut dest_crds = Crds::default();
let dest = CrdsGossipPull::default(); let dest = CrdsGossipPull::default();
let (_, filters, caller) = req.unwrap(); let (_, filters) = req.unwrap();
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses( let rsp = dest.generate_pull_responses(
&dest_crds, &dest_crds,
@ -1141,6 +1138,7 @@ mod test {
&solana_sdk::pubkey::new_rand(), &solana_sdk::pubkey::new_rand(),
0, 0,
))); )));
let caller = entry.clone();
let node_pubkey = entry.label().pubkey(); let node_pubkey = entry.label().pubkey();
let node = CrdsGossipPull::default(); let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap(); node_crds.insert(entry, 0).unwrap();
@ -1162,7 +1160,7 @@ mod test {
let mut dest_crds = Crds::default(); let mut dest_crds = Crds::default();
let mut dest = CrdsGossipPull::default(); let mut dest = CrdsGossipPull::default();
let (_, filters, caller) = req.unwrap(); let (_, filters) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses( let rsp = dest.generate_pull_responses(
&dest_crds, &dest_crds,
@ -1200,6 +1198,7 @@ mod test {
&solana_sdk::pubkey::new_rand(), &solana_sdk::pubkey::new_rand(),
1, 1,
))); )));
let caller = entry.clone();
let node_pubkey = entry.label().pubkey(); let node_pubkey = entry.label().pubkey();
let mut node = CrdsGossipPull::default(); let mut node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap(); node_crds.insert(entry, 0).unwrap();
@ -1245,7 +1244,7 @@ mod test {
&HashMap::new(), &HashMap::new(),
PACKET_DATA_SIZE, PACKET_DATA_SIZE,
); );
let (_, filters, caller) = req.unwrap(); let (_, filters) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let mut rsp = dest.generate_pull_responses( let mut rsp = dest.generate_pull_responses(
&dest_crds, &dest_crds,

View File

@ -423,7 +423,8 @@ fn network_run_pull(
network_values network_values
.par_iter() .par_iter()
.filter_map(|from| { .filter_map(|from| {
from.lock() let (peer, filters) = from
.lock()
.unwrap() .unwrap()
.new_pull_request( .new_pull_request(
&thread_pool, &thread_pool,
@ -432,7 +433,11 @@ fn network_run_pull(
&HashMap::new(), &HashMap::new(),
cluster_info::MAX_BLOOM_SIZE, cluster_info::MAX_BLOOM_SIZE,
) )
.ok() .ok()?;
let gossip = from.gossip.lock().unwrap();
let label = CrdsValueLabel::ContactInfo(gossip.id);
let self_info = gossip.crds.get(&label).unwrap().value.clone();
Some((peer.id, filters, self_info))
}) })
.collect() .collect()
}; };
@ -478,7 +483,7 @@ fn network_run_pull(
msgs += rsp.len(); msgs += rsp.len();
if let Some(node) = network.get(&from) { if let Some(node) = network.get(&from) {
let mut node = node.lock().unwrap(); let mut node = node.lock().unwrap();
node.mark_pull_request_creation_time(&from, now); node.mark_pull_request_creation_time(from, now);
let mut stats = ProcessPullStats::default(); let mut stats = ProcessPullStats::default();
let (vers, vers_expired_timeout, failed_inserts) = let (vers, vers_expired_timeout, failed_inserts) =
node.filter_pull_responses(&timeouts, rsp, now, &mut stats); node.filter_pull_responses(&timeouts, rsp, now, &mut stats);