Filter old CrdsValues received via Pull Responses in Gossip (#8150)

* Add CrdsValue timeout checks on Pull Responses

* Allow older values to enter Crds as long as a ContactInfo exists

* Allow staked contact infos to be inserted into crds if they haven't expired

* Try and handle oveflows

* Fix test

* Some comments

* Fix compile

* fix test deadlock

* Add a test for processing timed out values received via pull response
This commit is contained in:
Sagar Dhawan 2020-02-07 12:38:24 -08:00 committed by GitHub
parent 04ef977509
commit fa00803fbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 192 additions and 13 deletions

View File

@ -1291,10 +1291,12 @@ impl ClusterInfo {
stakes: &HashMap<Pubkey, u64>,
packets: Packets,
response_sender: &PacketSender,
epoch_ms: u64,
) {
// iter over the packets, collect pulls separately and process everything else
let allocated = thread_mem_usage::Allocatedp::default();
let mut gossip_pull_data: Vec<PullData> = vec![];
let timeouts = me.read().unwrap().gossip.make_timeouts(&stakes, epoch_ms);
packets.packets.iter().for_each(|packet| {
let from_addr = packet.meta.addr();
limited_deserialize(&packet.data[..packet.meta.size])
@ -1336,7 +1338,7 @@ impl ClusterInfo {
}
ret
});
Self::handle_pull_response(me, &from, data);
Self::handle_pull_response(me, &from, data, &timeouts);
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_response", (allocated.get() - start) as i64, i64),
@ -1455,7 +1457,12 @@ impl ClusterInfo {
Some(packets)
}
fn handle_pull_response(me: &Arc<RwLock<Self>>, from: &Pubkey, data: Vec<CrdsValue>) {
fn handle_pull_response(
me: &Arc<RwLock<Self>>,
from: &Pubkey,
data: Vec<CrdsValue>,
timeouts: &HashMap<Pubkey, u64>,
) {
let len = data.len();
let now = Instant::now();
let self_id = me.read().unwrap().gossip.id;
@ -1463,7 +1470,7 @@ impl ClusterInfo {
me.write()
.unwrap()
.gossip
.process_pull_response(from, data, timestamp());
.process_pull_response(from, timeouts, data, timestamp());
inc_new_counter_debug!("cluster_info-pull_request_response", 1);
inc_new_counter_debug!("cluster_info-pull_request_response-size", len);
@ -1633,14 +1640,31 @@ impl ClusterInfo {
//TODO cache connections
let timeout = Duration::new(1, 0);
let reqs = requests_receiver.recv_timeout(timeout)?;
let epoch_ms;
let stakes: HashMap<_, _> = match bank_forks {
Some(ref bank_forks) => {
staking_utils::staked_nodes(&bank_forks.read().unwrap().working_bank())
let bank = bank_forks.read().unwrap().working_bank();
let epoch = bank.epoch();
let epoch_schedule = bank.epoch_schedule();
epoch_ms = epoch_schedule.get_slots_in_epoch(epoch) * DEFAULT_MS_PER_SLOT;
staking_utils::staked_nodes(&bank)
}
None => {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1);
epoch_ms = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
HashMap::new()
}
None => HashMap::new(),
};
Self::handle_packets(obj, &recycler, blockstore, &stakes, reqs, response_sender);
Self::handle_packets(
obj,
&recycler,
blockstore,
&stakes,
reqs,
response_sender,
epoch_ms,
);
Ok(())
}
pub fn listen(
@ -2601,10 +2625,12 @@ mod tests {
let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone()));
let cluster_info = Arc::new(RwLock::new(cluster_info));
let timeouts = cluster_info.read().unwrap().gossip.make_timeouts_test();
ClusterInfo::handle_pull_response(
&cluster_info,
&entrypoint_pubkey,
vec![entrypoint_crdsvalue],
&timeouts,
);
let pulls = cluster_info
.write()

View File

@ -156,11 +156,12 @@ impl CrdsGossip {
pub fn process_pull_response(
&mut self,
from: &Pubkey,
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> usize {
self.pull
.process_pull_response(&mut self.crds, from, response, now)
.process_pull_response(&mut self.crds, from, timeouts, response, now)
}
pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {

View File

@ -25,6 +25,8 @@ use std::collections::HashMap;
use std::collections::VecDeque;
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
// The maximum age of a value received over pull responses
pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000;
pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64;
@ -117,6 +119,7 @@ pub struct CrdsGossipPull {
/// hash and insert time
purged_values: VecDeque<(Hash, u64)>,
pub crds_timeout: u64,
pub msg_timeout: u64,
}
impl Default for CrdsGossipPull {
@ -125,6 +128,7 @@ impl Default for CrdsGossipPull {
purged_values: VecDeque::new(),
pull_request_time: HashMap::new(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
}
}
}
@ -210,12 +214,56 @@ impl CrdsGossipPull {
&mut self,
crds: &mut Crds,
from: &Pubkey,
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> usize {
let mut failed = 0;
for r in response {
let owner = r.label().pubkey();
// Check if the crds value is older than the msg_timeout
if now
> r.wallclock()
.checked_add(self.msg_timeout)
.unwrap_or_else(|| 0)
|| now + self.msg_timeout < r.wallclock()
{
match &r.label() {
CrdsValueLabel::ContactInfo(_) => {
// Check if this ContactInfo is actually too old, it's possible that it has
// stake and so might have a longer effective timeout
let timeout = *timeouts
.get(&owner)
.unwrap_or_else(|| timeouts.get(&Pubkey::default()).unwrap());
if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0)
|| now + timeout < r.wallclock()
{
inc_new_counter_warn!(
"cluster_info-gossip_pull_response_value_timeout",
1
);
failed += 1;
continue;
}
}
_ => {
// Before discarding this value, check if a ContactInfo for the owner
// exists in the table. If it doesn't, that implies that this value can be discarded
if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
inc_new_counter_warn!(
"cluster_info-gossip_pull_response_value_timeout",
1
);
failed += 1;
continue;
} else {
// Silently insert this old value without bumping record timestamps
failed += crds.insert(r, now).is_err() as usize;
continue;
}
}
}
}
let old = crds.insert(r, now);
failed += old.is_err() as usize;
old.ok().map(|opt| {
@ -322,8 +370,9 @@ impl CrdsGossipPull {
mod test {
use super::*;
use crate::contact_info::ContactInfo;
use crate::crds_value::CrdsData;
use crate::crds_value::{CrdsData, Vote};
use itertools::Itertools;
use solana_perf::test_tx::test_tx;
use solana_sdk::hash::hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
@ -534,8 +583,13 @@ mod test {
continue;
}
assert_eq!(rsp.len(), 1);
let failed =
node.process_pull_response(&mut node_crds, &node_pubkey, rsp.pop().unwrap(), 1);
let failed = node.process_pull_response(
&mut node_crds,
&node_pubkey,
&node.make_timeouts_def(&node_pubkey, &HashMap::new(), 0, 1),
rsp.pop().unwrap(),
1,
);
assert_eq!(failed, 0);
assert_eq!(
node_crds
@ -675,4 +729,87 @@ mod test {
.collect();
assert_eq!(masks.len(), 2u64.pow(mask_bits) as usize)
}
#[test]
fn test_process_pull_response() {
let mut node_crds = Crds::default();
let mut node = CrdsGossipPull::default();
let peer_pubkey = Pubkey::new_rand();
let peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&peer_pubkey, 0),
));
let mut timeouts = HashMap::new();
timeouts.insert(Pubkey::default(), node.crds_timeout);
timeouts.insert(peer_pubkey, node.msg_timeout + 1);
// inserting a fresh value should be fine.
assert_eq!(
node.process_pull_response(
&mut node_crds,
&peer_pubkey,
&timeouts,
vec![peer_entry.clone()],
1,
),
0
);
let mut node_crds = Crds::default();
let unstaked_peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&peer_pubkey, 0),
));
// check that old contact infos fail if they are too old, regardless of "timeouts"
assert_eq!(
node.process_pull_response(
&mut node_crds,
&peer_pubkey,
&timeouts,
vec![peer_entry.clone(), unstaked_peer_entry],
node.msg_timeout + 100,
),
2
);
let mut node_crds = Crds::default();
// check that old contact infos can still land as long as they have a "timeouts" entry
assert_eq!(
node.process_pull_response(
&mut node_crds,
&peer_pubkey,
&timeouts,
vec![peer_entry.clone()],
node.msg_timeout + 1,
),
0
);
// construct something that's not a contact info
let peer_vote =
CrdsValue::new_unsigned(CrdsData::Vote(0, Vote::new(&peer_pubkey, test_tx(), 0)));
// check that older CrdsValues (non-ContactInfos) infos pass even if are too old,
// but a recent contact info (inserted above) exists
assert_eq!(
node.process_pull_response(
&mut node_crds,
&peer_pubkey,
&timeouts,
vec![peer_vote.clone()],
node.msg_timeout + 1,
),
0
);
let mut node_crds = Crds::default();
// without a contact info, inserting an old value should fail
assert_eq!(
node.process_pull_response(
&mut node_crds,
&peer_pubkey,
&timeouts,
vec![peer_vote.clone()],
node.msg_timeout + 1,
),
1
);
}
}

View File

@ -30,7 +30,10 @@ use std::collections::{HashMap, HashSet};
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000;
// With a fanout of 6, a 1000 node cluster should only take ~4 hops to converge.
// However since pushes are stake weighed, some trailing nodes
// might need more time to receive values. 30 seconds should be plenty.
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
@ -135,7 +138,12 @@ impl CrdsGossipPush {
value: CrdsValue,
now: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
if now > value.wallclock() + self.msg_timeout {
if now
> value
.wallclock()
.checked_add(self.msg_timeout)
.unwrap_or_else(|| 0)
{
return Err(CrdsGossipError::PushMessageTimeout);
}
if now + self.msg_timeout < value.wallclock() {

View File

@ -5,6 +5,7 @@ use solana_core::cluster_info;
use solana_core::contact_info::ContactInfo;
use solana_core::crds_gossip::*;
use solana_core::crds_gossip_error::CrdsGossipError;
use solana_core::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use solana_core::crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS;
use solana_core::crds_value::CrdsValueLabel;
use solana_core::crds_value::{CrdsData, CrdsValue};
@ -396,6 +397,9 @@ fn network_run_pull(
let mut convergance = 0f64;
let num = network.len();
let network_values: Vec<Node> = network.values().cloned().collect();
let mut timeouts = HashMap::new();
timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS);
for t in start..end {
let now = t as u64 * 100;
let requests: Vec<_> = {
@ -448,7 +452,10 @@ fn network_run_pull(
node.lock()
.unwrap()
.mark_pull_request_creation_time(&from, now);
overhead += node.lock().unwrap().process_pull_response(&from, rsp, now);
overhead += node
.lock()
.unwrap()
.process_pull_response(&from, &timeouts, rsp, now);
});
(bytes, msgs, overhead)
})