pings peers before sending push messages (#28537)

This commit is contained in:
behzad nouri 2022-10-25 00:01:23 +00:00 committed by GitHub
parent 2354a0a343
commit f703275fc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 228 additions and 112 deletions

View File

@ -427,7 +427,7 @@ impl ClusterInfo {
socket_addr_space,
};
me.insert_self();
me.push_self(&HashMap::new(), None);
me.push_self();
me
}
@ -466,11 +466,7 @@ impl ClusterInfo {
&self.socket_addr_space
}
fn push_self(
&self,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
) {
fn push_self(&self) {
let now = timestamp();
self.my_contact_info.write().unwrap().wallclock = now;
let entries: Vec<_> = vec![
@ -484,18 +480,45 @@ impl ClusterInfo {
.lock()
.unwrap()
.extend(entries);
let ContactInfo {
id: self_pubkey,
shred_version,
..
} = *self.my_contact_info.read().unwrap();
}
fn refresh_push_active_set(
&self,
recycler: &PacketBatchRecycler,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
sender: &PacketBatchSender,
) {
let ContactInfo { shred_version, .. } = *self.my_contact_info.read().unwrap();
let self_keypair: Arc<Keypair> = self.keypair().clone();
let mut pings = Vec::new();
self.gossip.refresh_push_active_set(
&self_pubkey,
&self_keypair,
shred_version,
stakes,
gossip_validators,
&self.ping_cache,
&mut pings,
&self.socket_addr_space,
);
self.stats
.new_pull_requests_pings_count
.add_relaxed(pings.len() as u64);
let pings: Vec<_> = pings
.into_iter()
.map(|(addr, ping)| (addr, Protocol::PingMessage(ping)))
.collect();
if !pings.is_empty() {
self.stats
.packets_sent_gossip_requests_count
.add_relaxed(pings.len() as u64);
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler.clone(),
"refresh_push_active_set",
&pings,
);
let _ = sender.send(packet_batch);
}
}
// TODO kill insert_info, only used by tests
@ -645,7 +668,7 @@ impl ClusterInfo {
CrdsData::Version(Version::new(self.id())),
&self.keypair(),
));
self.push_self(&HashMap::new(), None);
self.push_self();
}
pub fn lookup_contact_info<F, Y>(&self, id: &Pubkey, map: F) -> Option<Y>
@ -1691,7 +1714,7 @@ impl ClusterInfo {
Builder::new()
.name("solGossip".to_string())
.spawn(move || {
let mut last_push = timestamp();
let mut last_push = 0;
let mut last_contact_info_trace = timestamp();
let mut last_contact_info_save = timestamp();
let mut entrypoints_processed = false;
@ -1754,7 +1777,13 @@ impl ClusterInfo {
//TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
self.push_self(&stakes, gossip_validators.as_ref());
self.push_self();
self.refresh_push_active_set(
&recycler,
&stakes,
gossip_validators.as_ref(),
&sender,
);
last_push = timestamp();
}
let elapsed = timestamp() - start;
@ -3552,10 +3581,12 @@ RPC Enabled Nodes: 1"#;
));
cluster_info.insert_info(spy);
cluster_info.gossip.refresh_push_active_set(
&cluster_info.id(),
&cluster_info.keypair(),
cluster_info.my_shred_version(),
&HashMap::new(), // stakes
None, // gossip validators
&cluster_info.ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
let reqs = cluster_info.generate_new_gossip_requests(
@ -3683,10 +3714,12 @@ RPC Enabled Nodes: 1"#;
.mock_pong(peer.id, peer.gossip, Instant::now());
cluster_info.insert_info(peer);
cluster_info.gossip.refresh_push_active_set(
&cluster_info.id(),
&cluster_info.keypair(),
cluster_info.my_shred_version(),
&HashMap::new(), // stakes
None, // gossip validators
&cluster_info.ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
//check that all types of gossip messages are signed correctly
@ -3987,10 +4020,7 @@ RPC Enabled Nodes: 1"#;
let mut node = cluster_info.my_contact_info.write().unwrap();
node.shred_version = 42;
}
cluster_info.push_self(
&HashMap::default(), // stakes
None, // gossip validators
);
cluster_info.push_self();
cluster_info.flush_push_queue();
// Should now include both epoch slots.
let slots = cluster_info.get_epoch_slots(&mut Cursor::default());

View File

@ -184,10 +184,12 @@ impl CrdsGossip {
/// Refresh the push active set.
pub fn refresh_push_active_set(
&self,
self_pubkey: &Pubkey,
self_keypair: &Keypair,
self_shred_version: u16,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) {
let network_size = self.crds.read().unwrap().num_nodes();
@ -195,10 +197,12 @@ impl CrdsGossip {
&self.crds,
stakes,
gossip_validators,
self_pubkey,
self_keypair,
self_shred_version,
network_size,
CRDS_GOSSIP_NUM_ACTIVE,
ping_cache,
pings,
socket_addr_space,
)
}
@ -378,7 +382,8 @@ mod test {
#[test]
fn test_prune_errors() {
let crds_gossip = CrdsGossip::default();
let id = Pubkey::new(&[0; 32]);
let keypair = Keypair::new();
let id = keypair.pubkey();
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
let prune_pubkey = Pubkey::new(&[2; 32]);
crds_gossip
@ -391,11 +396,19 @@ mod test {
GossipRoute::LocalMessage,
)
.unwrap();
let ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
128, // capacity
);
let ping_cache = Mutex::new(ping_cache);
crds_gossip.refresh_push_active_set(
&id,
&keypair,
0, // shred version
&HashMap::new(), // stakes
None, // gossip validators
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
let now = timestamp();

View File

@ -13,12 +13,13 @@
use {
crate::{
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
contact_info::ContactInfo,
crds::{Crds, Cursor, GossipRoute},
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
ping_pong::PingCache,
weighted_shuffle::WeightedShuffle,
},
bincode::serialized_size,
@ -27,17 +28,24 @@ use {
lru::LruCache,
rand::{seq::SliceRandom, Rng},
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_sdk::{packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp},
solana_sdk::{
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
std::{
cmp,
collections::{HashMap, HashSet},
iter::repeat,
net::SocketAddr,
ops::{DerefMut, RangeBounds},
sync::{
atomic::{AtomicUsize, Ordering},
Mutex, RwLock,
},
time::Instant,
},
};
@ -334,15 +342,18 @@ impl CrdsGossipPush {
/// # Arguments
///
/// * ratio - active_set.len()/ratio is the number of actives to rotate
#[allow(clippy::too_many_arguments)]
pub(crate) fn refresh_push_active_set(
&self,
crds: &RwLock<Crds>,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
self_id: &Pubkey,
self_keypair: &Keypair,
self_shred_version: u16,
network_size: usize,
ratio: usize,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) {
const BLOOM_FALSE_RATE: f64 = 0.1;
@ -353,17 +364,31 @@ impl CrdsGossipPush {
const MIN_NUM_BLOOM_ITEMS: usize = CRDS_UNIQUE_PUBKEY_CAPACITY;
let mut rng = rand::thread_rng();
let mut new_items = HashMap::new();
// Gossip peers and respective sampling weights.
let peers = self.push_options(
crds,
&self_keypair.pubkey(),
self_shred_version,
stakes,
gossip_validators,
socket_addr_space,
);
// Check for nodes which have responded to ping messages.
let (weights, peers): (Vec<_>, Vec<_>) = {
self.push_options(
crds,
self_id,
self_shred_version,
stakes,
gossip_validators,
socket_addr_space,
)
.into_iter()
.unzip()
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now();
peers
.into_iter()
.filter_map(|(weight, peer)| {
let node = (peer.id, peer.gossip);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
pings.push((peer.gossip, ping));
}
check.then_some((weight, peer.id))
})
.unzip()
};
if peers.is_empty() {
return;
@ -406,7 +431,7 @@ impl CrdsGossipPush {
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<(/*weight:*/ u64, /*node:*/ Pubkey)> {
) -> Vec<(/*weight:*/ u64, /*node:*/ ContactInfo)> {
let now = timestamp();
let mut rng = rand::thread_rng();
let max_weight = u16::MAX as f32 - 1.0;
@ -443,7 +468,7 @@ impl CrdsGossipPush {
let weight = get_weight(max_weight, since, stake);
// Weights are bounded by max_weight defined above.
// So this type-cast should be safe.
((weight * 100.0) as u64, info.id)
((weight * 100.0) as u64, info.clone())
})
.collect()
}
@ -489,12 +514,21 @@ impl CrdsGossipPush {
}
#[cfg(test)]
mod test {
mod tests {
use {
super::*,
crate::{contact_info::ContactInfo, crds_value::CrdsData},
std::time::Duration,
};
fn new_ping_cache() -> PingCache {
PingCache::new(
Duration::from_secs(20 * 60), // ttl
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
128, // capacity
)
}
#[test]
fn test_prune() {
let crds = RwLock::<Crds>::default();
@ -660,33 +694,39 @@ mod test {
let now = timestamp();
let mut crds = Crds::default();
let push = CrdsGossipPush::default();
let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let mut ping_cache = new_ping_cache();
let value1 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(value1.id, value1.gossip, Instant::now());
let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(value1));
assert_eq!(
crds.insert(value1.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
let keypair = Keypair::new();
let crds = RwLock::new(crds);
let ping_cache = Mutex::new(ping_cache);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&HashMap::new(), // stakes
None, // gossip_validators
&keypair,
0, // self_shred_version
1, // network_sizer
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
let active_set = push.active_set.read().unwrap();
assert!(active_set.get(&value1.label().pubkey()).is_some());
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache
.lock()
.unwrap()
.mock_pong(value2.id, value2.gossip, Instant::now());
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(value2));
assert!(active_set.get(&value2.label().pubkey()).is_none());
drop(active_set);
assert_eq!(
@ -698,12 +738,14 @@ mod test {
for _ in 0..30 {
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&HashMap::new(), // stakes
None, // gossip_validators
&keypair,
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
let active_set = push.active_set.read().unwrap();
@ -716,9 +758,12 @@ mod test {
assert!(active_set.get(&value2.label().pubkey()).is_some());
}
for _ in 0..push.num_active {
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
));
let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache
.lock()
.unwrap()
.mock_pong(value2.id, value2.gossip, Instant::now());
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(value2));
assert_eq!(
crds.write()
.unwrap()
@ -726,14 +771,17 @@ mod test {
Ok(())
);
}
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&HashMap::new(), // stakes
None, // gossip_validators
&keypair,
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
assert_eq!(push.active_set.read().unwrap().len(), push.num_active);
@ -768,7 +816,7 @@ mod test {
assert!(!options.is_empty());
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
// check that the highest stake holder is also the heaviest weighted.
assert_eq!(stakes[&options[0].1], 10_000_u64);
assert_eq!(stakes[&options[0].1.id], 10_000_u64);
}
#[test]
@ -826,7 +874,7 @@ mod test {
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|(_, pk)| *pk)
.map(|(_, node)| node.id)
.collect::<Vec<_>>();
assert_eq!(options.len(), 1);
assert!(!options.contains(&spy.pubkey()));
@ -906,7 +954,7 @@ mod test {
);
assert_eq!(options.len(), 1);
assert_eq!(options[0].1, node_123.pubkey());
assert_eq!(options[0].1.id, node_123.pubkey());
}
#[test]
@ -914,23 +962,26 @@ mod test {
let now = timestamp();
let mut crds = Crds::default();
let push = CrdsGossipPush::default();
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let mut ping_cache = new_ping_cache();
let peer = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(peer.id, peer.gossip, Instant::now());
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(peer));
assert_eq!(
crds.insert(peer.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
let crds = RwLock::new(crds);
let ping_cache = Mutex::new(ping_cache);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&HashMap::new(), // stakes
None, // gossip_validtors
&Keypair::new(),
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
@ -954,11 +1005,13 @@ mod test {
let mut rng = rand::thread_rng();
let mut crds = Crds::default();
let push = CrdsGossipPush::default();
let mut ping_cache = new_ping_cache();
let peers: Vec<_> = vec![0, 0, now]
.into_iter()
.map(|wallclock| {
let mut peer = ContactInfo::new_rand(&mut rng, /*pubkey=*/ None);
peer.wallclock = wallclock;
ping_cache.mock_pong(peer.id, peer.gossip, Instant::now());
CrdsValue::new_unsigned(CrdsData::ContactInfo(peer))
})
.collect();
@ -976,14 +1029,17 @@ mod test {
push.process_push_message(&crds, &Pubkey::default(), vec![peers[2].clone()], now),
[Ok(origin[2])],
);
let ping_cache = Mutex::new(ping_cache);
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&HashMap::new(), // stakes
None, // gossip_validators
&Keypair::new(),
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(),
&SocketAddrSpace::Unspecified,
);
@ -1011,14 +1067,17 @@ mod test {
Ok(())
);
let crds = RwLock::new(crds);
let ping_cache = Mutex::new(new_ping_cache());
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&HashMap::new(), // stakes
None, // gossip_validators
&Keypair::new(),
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
@ -1049,14 +1108,17 @@ mod test {
)));
assert_eq!(crds.insert(peer, 0, GossipRoute::LocalMessage), Ok(()));
let crds = RwLock::new(crds);
let ping_cache = Mutex::new(new_ping_cache());
push.refresh_push_active_set(
&crds,
&HashMap::new(),
None,
&Pubkey::default(),
0,
1,
1,
&HashMap::new(), // stakes
None, // gossip_validators
&Keypair::new(),
0, // self_shred_version
1, // network_size
1, // ratio
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);

View File

@ -52,11 +52,7 @@ impl Node {
gossip: Arc<CrdsGossip>,
stake: u64,
) -> Self {
let ping_cache = Arc::new(Mutex::new(PingCache::new(
Duration::from_secs(20 * 60), // ttl
Duration::from_secs(20 * 60) / 64, // delay
2048, // capacity
)));
let ping_cache = Arc::new(new_ping_cache());
Node {
keypair,
contact_info,
@ -268,12 +264,13 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
// make sure there is someone in the active set
let network_values: Vec<Node> = network.values().cloned().collect();
network_values.par_iter().for_each(|node| {
let node_pubkey = node.keypair.pubkey();
node.gossip.refresh_push_active_set(
&node_pubkey,
&node.keypair,
0, // shred version
&HashMap::new(), // stakes
None, // gossip validators
&node.ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
});
@ -428,12 +425,13 @@ fn network_run_push(
}
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
network_values.par_iter().for_each(|node| {
let node_pubkey = node.keypair.pubkey();
node.gossip.refresh_push_active_set(
&node_pubkey,
&node.keypair,
0, // shred version
&HashMap::new(), // stakes
None, // gossip validators
&node.ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
});
@ -614,6 +612,15 @@ fn build_gossip_thread_pool() -> ThreadPool {
.unwrap()
}
fn new_ping_cache() -> Mutex<PingCache> {
let ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
2048, // capacity
);
Mutex::new(ping_cache)
}
#[test]
#[serial]
fn test_star_network_pull_50() {
@ -713,7 +720,8 @@ fn test_star_network_large_push() {
#[test]
fn test_prune_errors() {
let crds_gossip = CrdsGossip::default();
let id = Pubkey::new(&[0; 32]);
let keypair = Keypair::new();
let id = keypair.pubkey();
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
let prune_pubkey = Pubkey::new(&[2; 32]);
crds_gossip
@ -726,11 +734,14 @@ fn test_prune_errors() {
GossipRoute::LocalMessage,
)
.unwrap();
let ping_cache = new_ping_cache();
crds_gossip.refresh_push_active_set(
&id,
&keypair,
0, // shred version
&HashMap::new(), // stakes
None, // gossip validators
&ping_cache,
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
let now = timestamp();