filters out inactive nodes from push options (#12674)

* filters out inactive nodes from push options

https://github.com/solana-labs/solana/pull/12620
patched the DDOS issue with nodes which go offline:
https://github.com/solana-labs/solana/issues/12409

However, offline nodes still see (much lesser) traffic spike, likely
because no origins are pruned from their bloom filter in active set:
https://github.com/solana-labs/solana/blob/aaf3790d8/core/src/crds_gossip_push.rs#L276-L286
and so multiple nodes push redundant duplicate messages to them
simultaneously:
https://github.com/solana-labs/solana/blob/aaf3790d8/core/src/crds_gossip_push.rs#L254-L255

This commit will filter out inactive peers from potential push targets
entirely. To mitigate eclipse attacks, staked nodes are retried
periodically.

* uses current timestamp in test/crds_gossip
This commit is contained in:
behzad nouri 2020-10-06 13:48:32 +00:00 committed by GitHub
parent 4fc6cf4bcc
commit a5c6a78f6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 39 deletions

View File

@ -281,8 +281,7 @@ impl CrdsGossipPull {
) {
requests.into_iter().for_each(|(caller, _)| {
let key = caller.label().pubkey();
let old = crds.insert(caller, now);
if let Some(val) = old.ok().and_then(|opt| opt) {
if let Ok(Some(val)) = crds.insert(caller, now) {
self.purged_values
.push_back((val.value_hash, val.local_timestamp));
}

View File

@ -19,7 +19,7 @@ use crate::{
use bincode::serialized_size;
use indexmap::map::IndexMap;
use itertools::Itertools;
use rand::{self, seq::SliceRandom, thread_rng, RngCore};
use rand::{seq::SliceRandom, Rng};
use solana_runtime::bloom::Bloom;
use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp};
use std::{
@ -36,6 +36,8 @@ pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
pub const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
// Do not push to peers which have not been updated for this long.
const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000;
// 10 minutes
const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000;
@ -126,7 +128,7 @@ impl CrdsGossipPush {
.collect();
let mut seed = [0; 32];
seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes());
rand::thread_rng().fill(&mut seed[..]);
let shuffle = weighted_shuffle(
staked_peers.iter().map(|(_, stake)| *stake).collect_vec(),
seed,
@ -302,6 +304,7 @@ impl CrdsGossipPush {
network_size: usize,
ratio: usize,
) {
let mut rng = rand::thread_rng();
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
let mut new_items = HashMap::new();
@ -317,7 +320,7 @@ impl CrdsGossipPush {
}
let mut seed = [0; 32];
seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes());
rng.fill(&mut seed[..]);
let mut shuffle = weighted_shuffle(
options.iter().map(|weighted| weighted.0).collect_vec(),
seed,
@ -343,7 +346,7 @@ impl CrdsGossipPush {
}
}
let mut keys: Vec<Pubkey> = self.active_set.keys().cloned().collect();
keys.shuffle(&mut rand::thread_rng());
keys.shuffle(&mut rng);
let num = keys.len() / ratio;
for k in &keys[..num] {
self.active_set.swap_remove(k);
@ -361,11 +364,26 @@ impl CrdsGossipPush {
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
) -> Vec<(f32, &'a ContactInfo)> {
let now = timestamp();
let mut rng = rand::thread_rng();
let max_weight = u16::MAX as f32 - 1.0;
let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS);
crds.table
.values()
.filter(|v| v.value.contact_info().is_some())
.map(|v| (v.value.contact_info().unwrap(), v))
.filter(|(info, _)| {
.filter_map(|value| {
let info = value.value.contact_info()?;
// Stop pushing to nodes which have not been active recently.
if value.local_timestamp < active_cutoff {
// In order to mitigate eclipse attack, for staked nodes
// continue retrying periodically.
let stake = stakes.get(&info.id).unwrap_or(&0);
if *stake == 0 || rng.gen_ratio(7, 8) {
return None;
}
}
Some(info)
})
.filter(|info| {
info.id != *self_id
&& ContactInfo::is_valid_address(&info.gossip)
&& self_shred_version == info.shred_version
@ -373,10 +391,9 @@ impl CrdsGossipPush {
gossip_validators.contains(&info.id)
})
})
.map(|(info, _value)| {
let max_weight = f32::from(u16::max_value()) - 1.0;
.map(|info| {
let last_pushed_to: u64 = *self.last_pushed_to.get(&info.id).unwrap_or(&0);
let since = ((timestamp() - last_pushed_to) / 1024) as u32;
let since = (now.saturating_sub(last_pushed_to) / 1024) as u32;
let stake = get_stake(&info.id, stakes);
let weight = get_weight(max_weight, since, stake);
(weight, info)
@ -556,6 +573,7 @@ mod test {
#[test]
fn test_refresh_active_set() {
solana_logger::setup();
let now = timestamp();
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -563,7 +581,7 @@ mod test {
0,
)));
assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
assert_eq!(crds.insert(value1.clone(), now), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
@ -572,7 +590,7 @@ mod test {
0,
)));
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
assert_eq!(crds.insert(value2.clone(), now), Ok(None));
for _ in 0..30 {
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
if push.active_set.get(&value2.label().pubkey()).is_some() {
@ -585,7 +603,7 @@ mod test {
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&Pubkey::new_rand(), 0),
));
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
assert_eq!(crds.insert(value2.clone(), now), Ok(None));
}
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
assert_eq!(push.active_set.len(), push.num_active);
@ -619,6 +637,7 @@ mod test {
#[test]
fn test_no_pushes_to_from_different_shred_versions() {
let now = timestamp();
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPush::default();
@ -650,10 +669,10 @@ mod test {
..ContactInfo::default()
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(spy.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
crds.insert(node_456, 0).unwrap();
crds.insert(me.clone(), now).unwrap();
crds.insert(spy.clone(), now).unwrap();
crds.insert(node_123.clone(), now).unwrap();
crds.insert(node_456, now).unwrap();
// shred version 123 should ignore nodes with versions 0 and 456
let options = node
@ -676,6 +695,7 @@ mod test {
#[test]
fn test_pushes_only_to_allowed() {
let now = timestamp();
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPush::default();
@ -693,7 +713,7 @@ mod test {
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
crds.insert(node_123.clone(), now).unwrap();
// Unknown pubkey in gossip_validators -- will push to nobody
let mut gossip_validators = HashSet::new();
@ -734,13 +754,14 @@ mod test {
#[test]
fn test_new_push_messages() {
let now = timestamp();
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
assert_eq!(crds.insert(peer.clone(), now), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -758,24 +779,25 @@ mod test {
}
#[test]
fn test_personalized_push_messages() {
let now = timestamp();
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer_1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer_1.clone(), 0), Ok(None));
assert_eq!(crds.insert(peer_1.clone(), now), Ok(None));
let peer_2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer_2.clone(), 0), Ok(None));
assert_eq!(crds.insert(peer_2.clone(), now), Ok(None));
let peer_3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
now,
)));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0),
push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), now),
Ok(None)
);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
@ -789,7 +811,7 @@ mod test {
expected.insert(peer_1.pubkey(), vec![new_msg.clone()]);
expected.insert(peer_2.pubkey(), vec![new_msg]);
assert_eq!(push.active_set.len(), 3);
assert_eq!(push.new_push_messages(&crds, 0), expected);
assert_eq!(push.new_push_messages(&crds, now), expected);
}
#[test]
fn test_process_prune() {

View File

@ -88,15 +88,15 @@ fn star_network_create(num: usize) -> Network {
)));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
node.crds.insert(entry.clone(), 0).unwrap();
node.crds.insert(new.clone(), timestamp()).unwrap();
node.crds.insert(entry.clone(), timestamp()).unwrap();
node.set_self(&id);
(new.label().pubkey(), Node::new(Arc::new(Mutex::new(node))))
})
.collect();
let mut node = CrdsGossip::default();
let id = entry.label().pubkey();
node.crds.insert(entry, 0).unwrap();
node.crds.insert(entry, timestamp()).unwrap();
node.set_self(&id);
network.insert(id, Node::new(Arc::new(Mutex::new(node))));
Network::new(network)
@ -109,7 +109,7 @@ fn rstar_network_create(num: usize) -> Network {
)));
let mut origin = CrdsGossip::default();
let id = entry.label().pubkey();
origin.crds.insert(entry, 0).unwrap();
origin.crds.insert(entry, timestamp()).unwrap();
origin.set_self(&id);
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
@ -119,8 +119,8 @@ fn rstar_network_create(num: usize) -> Network {
)));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
origin.crds.insert(new.clone(), 0).unwrap();
node.crds.insert(new.clone(), timestamp()).unwrap();
origin.crds.insert(new.clone(), timestamp()).unwrap();
node.set_self(&id);
(new.label().pubkey(), Node::new(Arc::new(Mutex::new(node))))
})
@ -138,7 +138,7 @@ fn ring_network_create(num: usize) -> Network {
)));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
node.crds.insert(new.clone(), timestamp()).unwrap();
node.set_self(&id);
(new.label().pubkey(), Node::new(Arc::new(Mutex::new(node))))
})
@ -157,7 +157,11 @@ fn ring_network_create(num: usize) -> Network {
.clone()
};
let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap();
end.lock().unwrap().crds.insert(start_info, 0).unwrap();
end.lock()
.unwrap()
.crds
.insert(start_info, timestamp())
.unwrap();
}
Network::new(network)
}
@ -172,7 +176,7 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
)));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
node.crds.insert(new.clone(), timestamp()).unwrap();
node.set_self(&id);
(
new.label().pubkey(),
@ -196,7 +200,7 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
let mut end = end.lock().unwrap();
if keys[k] != end.id {
let start_info = start_entries[k].clone();
end.crds.insert(start_info, 0).unwrap();
end.crds.insert(start_info, timestamp()).unwrap();
}
}
}
@ -228,10 +232,12 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
.refresh_push_active_set(&HashMap::new(), None);
});
let mut total_bytes = bytes_tx;
for second in 1..num {
let start = second * 10;
let end = (second + 1) * 10;
let mut ts = timestamp();
for _ in 1..num {
let start = ((ts + 99) / 100) as usize;
let end = start + 10;
let now = (start * 100) as u64;
ts += 1000;
// push a message to the network
network_values.par_iter().for_each(|locked_node| {
let node = &mut locked_node.lock().unwrap();