factors out common gossip {push,pull}_options code (#29737)

This commit is contained in:
behzad nouri 2023-01-18 17:43:09 +00:00 committed by GitHub
parent aef8692c8f
commit 9f2910e962
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 61 additions and 440 deletions

View File

@ -332,6 +332,51 @@ impl CrdsGossip {
}
}
// Returns active and valid cluster nodes to gossip with.
pub(crate) fn get_gossip_nodes<R: Rng>(
rng: &mut R,
now: u64,
pubkey: &Pubkey, // This node.
// By default, should only push to or pull from gossip nodes with the same
// shred-version. Except for spy nodes (shred_version == 0u16) which can
// pull from any node.
verify_shred_version: impl Fn(/*shred_version:*/ u16) -> bool,
crds: &RwLock<Crds>,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<ContactInfo> {
// Exclude nodes which have not been active for this long.
const ACTIVE_TIMEOUT: Duration = Duration::from_secs(60);
let active_cutoff = now.saturating_sub(ACTIVE_TIMEOUT.as_millis() as u64);
let crds = crds.read().unwrap();
crds.get_nodes()
.filter_map(|value| {
let node = value.value.contact_info().unwrap();
// Exclude nodes which have not been active recently.
if value.local_timestamp < active_cutoff {
// In order to mitigate eclipse attack, for staked nodes
// continue retrying periodically.
let stake = stakes.get(&node.id).copied().unwrap_or_default();
if stake == 0u64 || !rng.gen_ratio(1, 16) {
return None;
}
}
Some(node)
})
.filter(|node| {
&node.id != pubkey
&& verify_shred_version(node.shred_version)
&& ContactInfo::is_valid_address(&node.gossip, socket_addr_space)
&& match gossip_validators {
Some(nodes) => nodes.contains(&node.id),
None => true,
}
})
.cloned()
.collect()
}
// Dedups gossip addresses, keeping only the one with the highest stake.
pub(crate) fn dedup_gossip_addresses(
nodes: impl IntoIterator<Item = ContactInfo>,

View File

@ -54,8 +54,6 @@ pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000;
// Retention period of hashes of received outdated values.
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
// Do not pull from peers which have not been updated for this long.
const PULL_ACTIVE_TIMEOUT_MS: u64 = 60_000;
pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64;
@ -229,17 +227,20 @@ impl CrdsGossipPull {
pings: &mut Vec<(SocketAddr, Ping)>,
socket_addr_space: &SocketAddrSpace,
) -> Result<HashMap<ContactInfo, Vec<CrdsFilter>>, CrdsGossipError> {
let mut rng = rand::thread_rng();
// Active and valid gossip nodes with matching shred-version.
let nodes = self.pull_options(
crds,
&self_keypair.pubkey(),
self_shred_version,
let nodes = crds_gossip::get_gossip_nodes(
&mut rng,
now,
&self_keypair.pubkey(),
// Pull from nodes with the same shred version, unless this is a
// spy node which then can pull from any node.
|shred_version| self_shred_version == 0u16 || shred_version == self_shred_version,
crds,
gossip_validators,
stakes,
socket_addr_space,
);
let mut rng = rand::thread_rng();
// Check for nodes which have responded to ping messages.
let nodes = crds_gossip::maybe_ping_gossip_addresses(
&mut rng,
@ -272,44 +273,6 @@ impl CrdsGossipPull {
Ok(nodes.zip(filters).into_group_map())
}
fn pull_options(
&self,
crds: &RwLock<Crds>,
self_id: &Pubkey,
self_shred_version: u16,
now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<ContactInfo> {
let mut rng = rand::thread_rng();
let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS);
let crds = crds.read().unwrap();
crds.get_nodes()
.filter_map(|value| {
let info = value.value.contact_info().unwrap();
// Stop pulling from nodes which have not been active recently.
if value.local_timestamp < active_cutoff {
// In order to mitigate eclipse attack, for staked nodes
// continue retrying periodically.
let stake = stakes.get(&info.id).unwrap_or(&0);
if *stake == 0 || !rng.gen_ratio(1, 16) {
return None;
}
}
Some(info)
})
.filter(|v| {
v.id != *self_id
&& ContactInfo::is_valid_address(&v.gossip, socket_addr_space)
&& (self_shred_version == 0 || self_shred_version == v.shred_version)
&& gossip_validators
.map_or(true, |gossip_validators| gossip_validators.contains(&v.id))
})
.cloned()
.collect()
}
/// Process a pull request
pub(crate) fn process_pull_requests<I>(crds: &RwLock<Crds>, callers: I, now: u64)
where
@ -625,7 +588,6 @@ pub(crate) mod tests {
crate::{
cluster_info::MAX_BLOOM_SIZE,
crds_value::{CrdsData, Vote},
socketaddr,
},
itertools::Itertools,
rand::{seq::SliceRandom, thread_rng, SeedableRng},
@ -679,186 +641,6 @@ pub(crate) mod tests {
}
}
#[test]
fn test_new_pull_with_stakes() {
let mut crds = Crds::default();
let mut stakes = HashMap::new();
let node = CrdsGossipPull::default();
let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
for i in 1..=30 {
let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
));
let id = entry.label().pubkey();
crds.insert(entry.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
stakes.insert(id, i * 100);
}
let now = 1024;
let crds = RwLock::new(crds);
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
now,
None,
&stakes,
&SocketAddrSpace::Unspecified,
);
assert!(!options.is_empty());
}
#[test]
fn test_no_pulls_from_different_shred_versions() {
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPull::default();
let gossip = socketaddr!("127.0.0.1:1234");
let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 123,
gossip,
..ContactInfo::default()
}));
let spy = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 0,
gossip,
..ContactInfo::default()
}));
let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 123,
gossip,
..ContactInfo::default()
}));
let node_456 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 456,
gossip,
..ContactInfo::default()
}));
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(spy.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_456.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);
// shred version 123 should ignore nodes with versions 0 and 456
let options = node
.pull_options(
&crds,
&me.label().pubkey(),
123,
0,
None,
&stakes,
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|peer| peer.id)
.collect::<Vec<_>>();
assert_eq!(options.len(), 1);
assert!(!options.contains(&spy.pubkey()));
assert!(options.contains(&node_123.pubkey()));
// spy nodes will see all
let options = node
.pull_options(
&crds,
&spy.label().pubkey(),
0,
0,
None,
&stakes,
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|peer| peer.id)
.collect::<Vec<_>>();
assert_eq!(options.len(), 3);
assert!(options.contains(&me.pubkey()));
assert!(options.contains(&node_123.pubkey()));
assert!(options.contains(&node_456.pubkey()));
}
#[test]
fn test_pulls_only_from_allowed() {
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPull::default();
let gossip = socketaddr!("127.0.0.1:1234");
let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));
let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);
// Empty gossip_validators -- will pull from nobody
let mut gossip_validators = HashSet::new();
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
0,
Some(&gossip_validators),
&stakes,
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
// Unknown pubkey in gossip_validators -- will pull from nobody
gossip_validators.insert(solana_sdk::pubkey::new_rand());
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
0,
Some(&gossip_validators),
&stakes,
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
// node_123 pubkey in gossip_validators -- will pull from it
gossip_validators.insert(node_123.pubkey());
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
0,
Some(&gossip_validators),
&stakes,
&SocketAddrSpace::Unspecified,
);
assert_eq!(options.len(), 1);
assert_eq!(options[0].id, node_123.pubkey());
}
#[test]
fn test_crds_filter_set_add() {
let mut rng = thread_rng();

View File

@ -17,14 +17,12 @@ use {
crds::{Crds, CrdsError, Cursor, GossipRoute},
crds_gossip,
crds_value::CrdsValue,
legacy_contact_info::LegacyContactInfo as ContactInfo,
ping_pong::PingCache,
push_active_set::PushActiveSet,
received_cache::ReceivedCache,
},
bincode::serialized_size,
itertools::Itertools,
rand::Rng,
solana_sdk::{
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
@ -52,8 +50,6 @@ pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3;
// Do not push to peers which have not been updated for this long.
const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000;
pub struct CrdsGossipPush {
/// Max bytes per message
@ -249,12 +245,15 @@ impl CrdsGossipPush {
) {
let mut rng = rand::thread_rng();
// Active and valid gossip nodes with matching shred-version.
let nodes = self.push_options(
crds,
let nodes = crds_gossip::get_gossip_nodes(
&mut rng,
timestamp(), // now
&self_keypair.pubkey(),
self_shred_version,
stakes,
// Only push to nodes with the same shred version.
|shred_version| shred_version == self_shred_version,
crds,
gossip_validators,
stakes,
socket_addr_space,
);
// Check for nodes which have responded to ping messages.
@ -276,45 +275,6 @@ impl CrdsGossipPush {
active_set.rotate(&mut rng, self.push_fanout * 3, network_size, &nodes, stakes)
}
fn push_options(
&self,
crds: &RwLock<Crds>,
self_id: &Pubkey,
self_shred_version: u16,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
socket_addr_space: &SocketAddrSpace,
) -> Vec<ContactInfo> {
let now = timestamp();
let mut rng = rand::thread_rng();
let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS);
let crds = crds.read().unwrap();
crds.get_nodes()
.filter_map(|value| {
let info = value.value.contact_info().unwrap();
// Stop pushing to nodes which have not been active recently.
if value.local_timestamp < active_cutoff {
// In order to mitigate eclipse attack, for staked nodes
// continue retrying periodically.
let stake = stakes.get(&info.id).unwrap_or(&0);
if *stake == 0 || !rng.gen_ratio(1, 16) {
return None;
}
}
Some(info)
})
.filter(|info| {
info.id != *self_id
&& ContactInfo::is_valid_address(&info.gossip, socket_addr_space)
&& self_shred_version == info.shred_version
&& gossip_validators.map_or(true, |gossip_validators| {
gossip_validators.contains(&info.id)
})
})
.cloned()
.collect()
}
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let active_set = self.active_set.read().unwrap().mock_clone();
@ -336,7 +296,7 @@ impl CrdsGossipPush {
mod tests {
use {
super::*,
crate::{crds_value::CrdsData, socketaddr},
crate::{crds_value::CrdsData, legacy_contact_info::LegacyContactInfo as ContactInfo},
std::time::{Duration, Instant},
};
@ -434,172 +394,6 @@ mod tests {
);
}
#[test]
fn test_active_set_refresh_with_bank() {
solana_logger::setup();
let time = timestamp() - 1024; //make sure there's at least a 1 second delay
let mut crds = Crds::default();
let push = CrdsGossipPush::default();
let mut stakes = HashMap::new();
for i in 1..=100 {
let peer = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), time),
));
let id = peer.label().pubkey();
crds.insert(peer.clone(), time, GossipRoute::LocalMessage)
.unwrap();
stakes.insert(id, i * 100);
}
let crds = RwLock::new(crds);
let options = push.push_options(
&crds,
&Pubkey::default(),
0,
&stakes,
None,
&SocketAddrSpace::Unspecified,
);
assert!(!options.is_empty());
}
#[test]
fn test_no_pushes_to_from_different_shred_versions() {
let now = timestamp();
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPush::default();
let gossip = socketaddr!("127.0.0.1:1234");
let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 123,
gossip,
..ContactInfo::default()
}));
let spy = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 0,
gossip,
..ContactInfo::default()
}));
let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 123,
gossip,
..ContactInfo::default()
}));
let node_456 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
shred_version: 456,
gossip,
..ContactInfo::default()
}));
crds.insert(me.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(spy.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_456, now, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);
// shred version 123 should ignore nodes with versions 0 and 456
let options = node
.push_options(
&crds,
&me.label().pubkey(),
123,
&stakes,
None,
&SocketAddrSpace::Unspecified,
)
.iter()
.map(|node| node.id)
.collect::<Vec<_>>();
assert_eq!(options.len(), 1);
assert!(!options.contains(&spy.pubkey()));
assert!(options.contains(&node_123.pubkey()));
// spy nodes should not push to people on different shred versions
let options = node.push_options(
&crds,
&spy.label().pubkey(),
0,
&stakes,
None,
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
}
#[test]
fn test_pushes_only_to_allowed() {
let now = timestamp();
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPush::default();
let gossip = socketaddr!("127.0.0.1:1234");
let me = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));
let node_123 = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo {
id: solana_sdk::pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), now, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);
// Unknown pubkey in gossip_validators -- will push to nobody
let mut gossip_validators = HashSet::new();
let options = node.push_options(
&crds,
&me.label().pubkey(),
0,
&stakes,
Some(&gossip_validators),
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
// Unknown pubkey in gossip_validators -- will push to nobody
gossip_validators.insert(solana_sdk::pubkey::new_rand());
let options = node.push_options(
&crds,
&me.label().pubkey(),
0,
&stakes,
Some(&gossip_validators),
&SocketAddrSpace::Unspecified,
);
assert!(options.is_empty());
// node_123 pubkey in gossip_validators -- will push to it
gossip_validators.insert(node_123.pubkey());
let options = node.push_options(
&crds,
&me.label().pubkey(),
0,
&stakes,
Some(&gossip_validators),
&SocketAddrSpace::Unspecified,
);
assert_eq!(options.len(), 1);
assert_eq!(options[0].id, node_123.pubkey());
}
#[test]
fn test_new_push_messages() {
let now = timestamp();