retains hash value of outdated responses received from pull requests (#12513)

pull_response_fail_inserts has been increasing:
https://cdn.discordapp.com/attachments/478692221441409024/759096187587657778/pull_response_fail_insert.png
but for outdated values which fail to insert:
https://github.com/solana-labs/solana/blob/a5c3fc14b3/core/src/crds_gossip_pull.rs#L332-L344
https://github.com/solana-labs/solana/blob/a5c3fc14b3/core/src/crds.rs#L104-L108
are not recorded anywhere, and so the next pull request may obtain the
same redundant payload again, unnecessary taking bandwidth.

This commit holds on to the hashes of failed-inserts for a while, similar
to purged_values:
https://github.com/solana-labs/solana/blob/a5c3fc14b3/core/src/crds_gossip_pull.rs#L380
and filter them out for the next pull request:
https://github.com/solana-labs/solana/blob/a5c3fc14b3/core/src/crds_gossip_pull.rs#L204
This commit is contained in:
behzad nouri 2020-10-01 00:39:22 +00:00 committed by GitHub
parent c31a34fbcb
commit 1866521df6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 112 additions and 64 deletions

View File

@ -1933,16 +1933,20 @@ impl ClusterInfo {
let filtered_len = crds_values.len();
let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout) = self
let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = self
.time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response)
.filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats);
if !filtered_pulls.is_empty() || !filtered_pulls_expired_timeout.is_empty() {
if !filtered_pulls.is_empty()
|| !filtered_pulls_expired_timeout.is_empty()
|| !failed_inserts.is_empty()
{
self.time_gossip_write_lock("process_pull_resp", &self.stats.process_pull_response)
.process_pull_responses(
from,
filtered_pulls,
filtered_pulls_expired_timeout,
failed_inserts,
timestamp(),
&mut pull_stats,
);
@ -2156,9 +2160,13 @@ impl ClusterInfo {
fn print_reset_stats(&self, last_print: &mut Instant) {
if last_print.elapsed().as_millis() > 2000 {
let (table_size, purged_values_size) = {
let (table_size, purged_values_size, failed_inserts_size) = {
let r_gossip = self.gossip.read().unwrap();
(r_gossip.crds.table.len(), r_gossip.pull.purged_values.len())
(
r_gossip.crds.table.len(),
r_gossip.pull.purged_values.len(),
r_gossip.pull.failed_inserts.len(),
)
};
datapoint_info!(
"cluster_info_stats",
@ -2185,6 +2193,7 @@ impl ClusterInfo {
),
("table_size", table_size as i64, i64),
("purged_values_size", purged_values_size as i64, i64),
("failed_inserts_size", failed_inserts_size as i64, i64),
);
datapoint_info!(
"cluster_info_stats2",

View File

@ -105,19 +105,13 @@ impl Crds {
&self,
value: CrdsValue,
local_timestamp: u64,
) -> Option<VersionedCrdsValue> {
) -> (bool, VersionedCrdsValue) {
let new_value = self.new_versioned(local_timestamp, value);
let label = new_value.value.label();
let would_insert = self
.table
.get(&label)
.map(|current| new_value > *current)
.unwrap_or(true);
if would_insert {
Some(new_value)
} else {
None
}
// New value is outdated and fails to insert, if it already exists in
// the table with a more recent wallclock.
let outdated = matches!(self.table.get(&label), Some(current) if new_value <= *current);
(!outdated, new_value)
}
/// insert the new value, returns the old value if insert succeeds
pub fn insert_versioned(

View File

@ -11,7 +11,7 @@ use crate::{
crds_value::{CrdsValue, CrdsValueLabel},
};
use rayon::ThreadPool;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::{hash::Hash, pubkey::Pubkey};
use std::collections::{HashMap, HashSet};
///The min size for bloom filters
@ -180,7 +180,7 @@ impl CrdsGossip {
response: Vec<CrdsValue>,
now: u64,
process_pull_stats: &mut ProcessPullStats,
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>) {
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>, Vec<Hash>) {
self.pull
.filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats)
}
@ -191,6 +191,7 @@ impl CrdsGossip {
from: &Pubkey,
responses: Vec<VersionedCrdsValue>,
responses_expired_timeout: Vec<VersionedCrdsValue>,
failed_inserts: Vec<Hash>,
now: u64,
process_pull_stats: &mut ProcessPullStats,
) {
@ -199,6 +200,7 @@ impl CrdsGossip {
from,
responses,
responses_expired_timeout,
failed_inserts,
now,
process_pull_stats,
);
@ -238,6 +240,7 @@ impl CrdsGossip {
let min = now - 5 * self.pull.crds_timeout;
self.pull.purge_purged(min);
}
self.pull.purge_failed_inserts(now);
rv
}
}

View File

@ -29,6 +29,8 @@ use std::ops::Index;
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
// The maximum age of a value received over pull responses
pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000;
// Retention period of hashes of received outdated values.
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64;
@ -172,6 +174,11 @@ pub struct CrdsGossipPull {
pub pull_request_time: HashMap<Pubkey, u64>,
/// hash and insert time
pub purged_values: VecDeque<(Hash, u64)>,
// Hash value and record time (ms) of the pull responses which failed to be
// inserted in crds table; Preserved to stop the sender to send back the
// same outdated payload again by adding them to the filter for the next
// pull request.
pub failed_inserts: VecDeque<(Hash, u64)>,
pub crds_timeout: u64,
pub msg_timeout: u64,
pub num_pulls: usize,
@ -182,6 +189,7 @@ impl Default for CrdsGossipPull {
Self {
purged_values: VecDeque::new(),
pull_request_time: HashMap::new(),
failed_inserts: VecDeque::new(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
num_pulls: 0,
@ -294,9 +302,10 @@ impl CrdsGossipPull {
// Checks if responses should be inserted and
// returns those responses converted to VersionedCrdsValue
// Separated in two vecs as:
// Separated in three vecs as:
// .0 => responses that update the owner timestamp
// .1 => responses that do not update the owner timestamp
// .2 => hash value of outdated values which will fail to insert.
pub fn filter_pull_responses(
&self,
crds: &Crds,
@ -304,9 +313,18 @@ impl CrdsGossipPull {
responses: Vec<CrdsValue>,
now: u64,
stats: &mut ProcessPullStats,
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>) {
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>, Vec<Hash>) {
let mut versioned = vec![];
let mut versioned_expired_timestamp = vec![];
let mut failed_inserts = vec![];
let mut maybe_push = |response, values: &mut Vec<VersionedCrdsValue>| {
let (push, value) = crds.would_insert(response, now);
if push {
values.push(value);
} else {
failed_inserts.push(value.value_hash)
}
};
for r in responses {
let owner = r.label().pubkey();
// Check if the crds value is older than the msg_timeout
@ -337,24 +355,17 @@ impl CrdsGossipPull {
if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
stats.timeout_count += 1;
stats.failed_timeout += 1;
continue;
} else {
// Silently insert this old value without bumping record timestamps
match crds.would_insert(r, now) {
Some(resp) => versioned_expired_timestamp.push(resp),
None => stats.failed_insert += 1,
}
continue;
maybe_push(r, &mut versioned_expired_timestamp);
}
continue;
}
}
}
match crds.would_insert(r, now) {
Some(resp) => versioned.push(resp),
None => stats.failed_insert += 1,
}
maybe_push(r, &mut versioned);
}
(versioned, versioned_expired_timestamp)
(versioned, versioned_expired_timestamp, failed_inserts)
}
/// process a vec of pull responses
@ -364,41 +375,59 @@ impl CrdsGossipPull {
from: &Pubkey,
responses: Vec<VersionedCrdsValue>,
responses_expired_timeout: Vec<VersionedCrdsValue>,
mut failed_inserts: Vec<Hash>,
now: u64,
stats: &mut ProcessPullStats,
) -> Vec<(CrdsValueLabel, Hash, u64)> {
let mut success = vec![];
let mut owners = HashSet::new();
for r in responses_expired_timeout {
stats.failed_insert += crds.insert_versioned(r).is_err() as usize;
let value_hash = r.value_hash;
if crds.insert_versioned(r).is_err() {
failed_inserts.push(value_hash);
}
}
for r in responses {
let owner = r.value.label().pubkey();
let label = r.value.label();
let wc = r.value.wallclock();
let hash = r.value_hash;
let old = crds.insert_versioned(r);
if old.is_err() {
stats.failed_insert += 1;
} else {
stats.success += 1;
self.num_pulls += 1;
success.push((label, hash, wc));
match crds.insert_versioned(r) {
Err(_) => failed_inserts.push(hash),
Ok(old) => {
stats.success += 1;
self.num_pulls += 1;
owners.insert(label.pubkey());
success.push((label, hash, wc));
if let Some(val) = old {
self.purged_values
.push_back((val.value_hash, val.local_timestamp))
}
}
}
old.ok().map(|opt| {
owners.insert(owner);
opt.map(|val| {
self.purged_values
.push_back((val.value_hash, val.local_timestamp))
})
});
}
owners.insert(*from);
for owner in owners {
crds.update_record_timestamp(&owner, now);
}
stats.failed_insert += failed_inserts.len();
self.purge_failed_inserts(now);
self.failed_inserts
.extend(failed_inserts.into_iter().zip(std::iter::repeat(now)));
success
}
pub fn purge_failed_inserts(&mut self, now: u64) {
if FAILED_INSERTS_RETENTION_MS < now {
let cutoff = now - FAILED_INSERTS_RETENTION_MS;
let outdated = self
.failed_inserts
.iter()
.take_while(|(_, ts)| *ts < cutoff)
.count();
self.failed_inserts.drain(..outdated);
}
}
// build a set of filters of the current crds table
// num_filters - used to increase the likelyhood of a value in crds being added to some filter
pub fn build_crds_filters(
@ -410,23 +439,28 @@ impl CrdsGossipPull {
const PAR_MIN_LENGTH: usize = 512;
let num = cmp::max(
CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,
crds.table.values().count() + self.purged_values.len(),
crds.table.len() + self.purged_values.len() + self.failed_inserts.len(),
);
let filters = CrdsFilterSet::new(num, bloom_size);
thread_pool.join(
|| {
crds.table
.par_values()
.with_min_len(PAR_MIN_LENGTH)
.for_each(|v| filters.add(v.value_hash))
},
|| {
self.purged_values
.par_iter()
.with_min_len(PAR_MIN_LENGTH)
.for_each(|(v, _)| filters.add(*v))
},
);
thread_pool.install(|| {
crds.table
.par_values()
.with_min_len(PAR_MIN_LENGTH)
.map(|v| v.value_hash)
.chain(
self.purged_values
.par_iter()
.with_min_len(PAR_MIN_LENGTH)
.map(|(v, _)| *v),
)
.chain(
self.failed_inserts
.par_iter()
.with_min_len(PAR_MIN_LENGTH)
.map(|(v, _)| *v),
)
.for_each(|v| filters.add(v));
});
filters.into()
}
@ -544,13 +578,14 @@ impl CrdsGossipPull {
now: u64,
) -> (usize, usize, usize) {
let mut stats = ProcessPullStats::default();
let (versioned, versioned_expired_timeout) =
let (versioned, versioned_expired_timeout, failed_inserts) =
self.filter_pull_responses(crds, timeouts, response, now, &mut stats);
self.process_pull_responses(
crds,
from,
versioned,
versioned_expired_timeout,
failed_inserts,
now,
&mut stats,
);

View File

@ -459,9 +459,16 @@ fn network_run_pull(
let mut node = node.lock().unwrap();
node.mark_pull_request_creation_time(&from, now);
let mut stats = ProcessPullStats::default();
let (vers, vers_expired_timeout) =
let (vers, vers_expired_timeout, failed_inserts) =
node.filter_pull_responses(&timeouts, rsp, now, &mut stats);
node.process_pull_responses(&from, vers, vers_expired_timeout, now, &mut stats);
node.process_pull_responses(
&from,
vers,
vers_expired_timeout,
failed_inserts,
now,
&mut stats,
);
overhead += stats.failed_insert;
overhead += stats.failed_timeout;
}