Optimize process pull responses (#10460)

* Batch process pull responses

* Generate pull requests at 1/2 rate

* Do filtering work of process_pull_response in read lock

Only take write lock to insert if needed.
This commit is contained in:
sakridge 2020-06-09 17:08:13 -07:00 committed by GitHub
parent 4131eee94d
commit ecb6959720
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 213 additions and 54 deletions

View File

@ -16,7 +16,7 @@ use crate::{
contact_info::ContactInfo,
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_value::{
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash,
Version, Vote, MAX_WALLCLOCK,
@ -213,11 +213,13 @@ struct GossipStats {
new_push_requests: Counter,
new_push_requests2: Counter,
new_push_requests_num: Counter,
filter_pull_response: Counter,
process_pull_response: Counter,
process_pull_response_count: Counter,
process_pull_response_len: Counter,
process_pull_response_timeout: Counter,
process_pull_response_fail: Counter,
process_pull_response_fail_insert: Counter,
process_pull_response_fail_timeout: Counter,
process_pull_response_success: Counter,
process_pull_requests: Counter,
generate_pull_responses: Counter,
@ -1398,8 +1400,13 @@ impl ClusterInfo {
fn generate_new_gossip_requests(
&self,
stakes: &HashMap<Pubkey, u64>,
generate_pull_requests: bool,
) -> Vec<(SocketAddr, Protocol)> {
let pulls: Vec<_> = self.new_pull_requests(stakes);
let pulls: Vec<_> = if generate_pull_requests {
self.new_pull_requests(stakes)
} else {
vec![]
};
let pushes: Vec<_> = self.new_push_requests();
vec![pulls, pushes].into_iter().flatten().collect()
}
@ -1410,8 +1417,9 @@ impl ClusterInfo {
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
sender: &PacketSender,
generate_pull_requests: bool,
) -> Result<()> {
let reqs = obj.generate_new_gossip_requests(&stakes);
let reqs = obj.generate_new_gossip_requests(&stakes, generate_pull_requests);
if !reqs.is_empty() {
let packets = to_packets_with_destination(recycler.clone(), &reqs);
sender.send(packets)?;
@ -1496,6 +1504,7 @@ impl ClusterInfo {
let message = CrdsData::Version(Version::new(obj.id()));
obj.push_message(CrdsValue::new_signed(message, &obj.keypair));
let mut generate_pull_requests = true;
loop {
let start = timestamp();
thread_mem_usage::datapoint("solana-gossip");
@ -1512,7 +1521,8 @@ impl ClusterInfo {
None => HashMap::new(),
};
let _ = Self::run_gossip(&obj, &recycler, &stakes, &sender);
let _ =
Self::run_gossip(&obj, &recycler, &stakes, &sender, generate_pull_requests);
if exit.load(Ordering::Relaxed) {
return;
}
@ -1532,6 +1542,7 @@ impl ClusterInfo {
let time_left = GOSSIP_SLEEP_MILLIS - elapsed;
sleep(Duration::from_millis(time_left));
}
generate_pull_requests = !generate_pull_requests;
}
})
.unwrap()
@ -1550,6 +1561,7 @@ impl ClusterInfo {
let allocated = thread_mem_usage::Allocatedp::default();
let mut gossip_pull_data: Vec<PullData> = vec![];
let timeouts = me.gossip.read().unwrap().make_timeouts(&stakes, epoch_ms);
let mut pull_responses = HashMap::new();
packets.packets.iter().for_each(|packet| {
let from_addr = packet.meta.addr();
limited_deserialize(&packet.data[..packet.meta.size])
@ -1597,7 +1609,8 @@ impl ClusterInfo {
}
ret
});
Self::handle_pull_response(me, &from, data, &timeouts);
let pull_entry = pull_responses.entry(from).or_insert_with(Vec::new);
pull_entry.extend(data);
datapoint_debug!(
"solana-gossip-listen-memory",
("pull_response", (allocated.get() - start) as i64, i64),
@ -1659,6 +1672,11 @@ impl ClusterInfo {
}
})
});
for (from, data) in pull_responses {
Self::handle_pull_response(me, &from, data, &timeouts);
}
// process the collected pulls together
let rsp = Self::handle_pull_requests(me, recycler, gossip_pull_data, stakes);
if let Some(rsp) = rsp {
@ -1827,9 +1845,21 @@ impl ClusterInfo {
}
let filtered_len = crds_values.len();
let (fail, timeout_count, success) = me
.time_gossip_write_lock("process_pull", &me.stats.process_pull_response)
.process_pull_response(from, timeouts, crds_values, timestamp());
let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout) = me
.time_gossip_read_lock("filter_pull_resp", &me.stats.filter_pull_response)
.filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats);
if !filtered_pulls.is_empty() || !filtered_pulls_expired_timeout.is_empty() {
me.time_gossip_write_lock("process_pull_resp", &me.stats.process_pull_response)
.process_pull_responses(
from,
filtered_pulls,
filtered_pulls_expired_timeout,
timestamp(),
&mut pull_stats,
);
}
me.stats
.skip_pull_response_shred_version
@ -1840,13 +1870,22 @@ impl ClusterInfo {
.add_relaxed(filtered_len as u64);
me.stats
.process_pull_response_timeout
.add_relaxed(timeout_count as u64);
me.stats.process_pull_response_fail.add_relaxed(fail as u64);
.add_relaxed(pull_stats.timeout_count as u64);
me.stats
.process_pull_response_fail_insert
.add_relaxed(pull_stats.failed_insert as u64);
me.stats
.process_pull_response_fail_timeout
.add_relaxed(pull_stats.failed_timeout as u64);
me.stats
.process_pull_response_success
.add_relaxed(success as u64);
.add_relaxed(pull_stats.success as u64);
(fail, timeout_count, success)
(
pull_stats.failed_insert + pull_stats.failed_timeout,
pull_stats.timeout_count,
pull_stats.success,
)
}
fn filter_by_shred_version(
@ -2043,11 +2082,26 @@ impl ClusterInfo {
self.stats.process_pull_response.clear(),
i64
),
(
"filter_pull_resp",
self.stats.filter_pull_response.clear(),
i64
),
(
"process_pull_resp_count",
self.stats.process_pull_response_count.clear(),
i64
),
(
"pull_response_fail_insert",
self.stats.process_pull_response_fail_insert.clear(),
i64
),
(
"pull_response_fail_timeout",
self.stats.process_pull_response_fail_timeout.clear(),
i64
),
(
"process_pull_resp_timeout",
self.stats.process_pull_response_timeout.clear(),
@ -2456,6 +2510,7 @@ mod tests {
#[test]
fn test_handle_pull() {
solana_logger::setup();
let node = Node::new_localhost();
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
@ -2550,7 +2605,7 @@ mod tests {
.write()
.unwrap()
.refresh_push_active_set(&HashMap::new());
let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new());
let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new(), true);
//assert none of the addrs are invalid.
reqs.iter().all(|(addr, _)| {
let res = ContactInfo::is_valid_address(addr);

View File

@ -93,6 +93,24 @@ impl Crds {
pub fn new_versioned(&self, local_timestamp: u64, value: CrdsValue) -> VersionedCrdsValue {
VersionedCrdsValue::new(local_timestamp, value)
}
pub fn would_insert(
&self,
value: CrdsValue,
local_timestamp: u64,
) -> Option<VersionedCrdsValue> {
let new_value = self.new_versioned(local_timestamp, value);
let label = new_value.value.label();
let would_insert = self
.table
.get(&label)
.map(|current| new_value > *current)
.unwrap_or(true);
if would_insert {
Some(new_value)
} else {
None
}
}
/// insert the new value, returns the old value if insert succeeds
pub fn insert_versioned(
&mut self,

View File

@ -6,7 +6,7 @@
use crate::{
crds::{Crds, VersionedCrdsValue},
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull},
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
crds_value::{CrdsValue, CrdsValueLabel},
};
@ -170,16 +170,34 @@ impl CrdsGossip {
self.pull.generate_pull_responses(&self.crds, filters)
}
/// process a pull response
pub fn process_pull_response(
&mut self,
from: &Pubkey,
pub fn filter_pull_responses(
&self,
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> (usize, usize, usize) {
process_pull_stats: &mut ProcessPullStats,
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>) {
self.pull
.process_pull_response(&mut self.crds, from, timeouts, response, now)
.filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats)
}
/// process a pull response
pub fn process_pull_responses(
&mut self,
from: &Pubkey,
responses: Vec<VersionedCrdsValue>,
responses_expired_timeout: Vec<VersionedCrdsValue>,
now: u64,
process_pull_stats: &mut ProcessPullStats,
) {
self.pull.process_pull_responses(
&mut self.crds,
from,
responses,
responses_expired_timeout,
now,
process_pull_stats,
)
}
pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {

View File

@ -10,7 +10,7 @@
//! of false positives.
use crate::contact_info::ContactInfo;
use crate::crds::Crds;
use crate::crds::{Crds, VersionedCrdsValue};
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS};
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
@ -20,8 +20,8 @@ use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::collections::{HashMap, HashSet};
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
// The maximum age of a value received over pull responses
@ -118,6 +118,14 @@ impl CrdsFilter {
}
}
#[derive(Default)]
pub struct ProcessPullStats {
pub success: usize,
pub failed_insert: usize,
pub failed_timeout: usize,
pub timeout_count: usize,
}
#[derive(Clone)]
pub struct CrdsGossipPull {
/// timestamp of last request
@ -231,19 +239,22 @@ impl CrdsGossipPull {
self.filter_crds_values(crds, requests)
}
/// process a pull response
pub fn process_pull_response(
&mut self,
crds: &mut Crds,
from: &Pubkey,
// Checks if responses should be inserted and
// returns those responses converted to VersionedCrdsValue
// Separated in two vecs as:
// .0 => responses that update the owner timestamp
// .1 => responses that do not update the owner timestamp
pub fn filter_pull_responses(
&self,
crds: &Crds,
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
responses: Vec<CrdsValue>,
now: u64,
) -> (usize, usize, usize) {
let mut failed = 0;
let mut timeout_count = 0;
let mut success = 0;
for r in response {
stats: &mut ProcessPullStats,
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>) {
let mut versioned = vec![];
let mut versioned_expired_timestamp = vec![];
for r in responses {
let owner = r.label().pubkey();
// Check if the crds value is older than the msg_timeout
if now
@ -262,8 +273,8 @@ impl CrdsGossipPull {
if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0)
|| now + timeout < r.wallclock()
{
timeout_count += 1;
failed += 1;
stats.timeout_count += 1;
stats.failed_timeout += 1;
continue;
}
}
@ -271,33 +282,62 @@ impl CrdsGossipPull {
// Before discarding this value, check if a ContactInfo for the owner
// exists in the table. If it doesn't, that implies that this value can be discarded
if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
timeout_count += 1;
failed += 1;
stats.timeout_count += 1;
stats.failed_timeout += 1;
continue;
} else {
// Silently insert this old value without bumping record timestamps
failed += crds.insert(r, now).is_err() as usize;
match crds.would_insert(r, now) {
Some(resp) => versioned_expired_timestamp.push(resp),
None => stats.failed_insert += 1,
}
continue;
}
}
}
}
let old = crds.insert(r, now);
match crds.would_insert(r, now) {
Some(resp) => versioned.push(resp),
None => stats.failed_insert += 1,
}
}
(versioned, versioned_expired_timestamp)
}
/// process a vec of pull responses
pub fn process_pull_responses(
&mut self,
crds: &mut Crds,
from: &Pubkey,
responses: Vec<VersionedCrdsValue>,
responses_expired_timeout: Vec<VersionedCrdsValue>,
now: u64,
stats: &mut ProcessPullStats,
) {
let mut owners = HashSet::new();
for r in responses_expired_timeout {
stats.failed_insert += crds.insert_versioned(r).is_err() as usize;
}
for r in responses {
let owner = r.value.label().pubkey();
let old = crds.insert_versioned(r);
if old.is_err() {
failed += 1;
stats.failed_insert += 1;
} else {
success += 1;
stats.success += 1;
}
old.ok().map(|opt| {
crds.update_record_timestamp(&owner, now);
owners.insert(owner);
opt.map(|val| {
self.purged_values
.push_back((val.value_hash, val.local_timestamp))
})
});
}
crds.update_record_timestamp(from, now);
(failed, timeout_count, success)
owners.insert(*from);
for owner in owners {
crds.update_record_timestamp(&owner, now);
}
}
// build a set of filters of the current crds table
// num_filters - used to increase the likelyhood of a value in crds being added to some filter
@ -387,6 +427,34 @@ impl CrdsGossipPull {
.count();
self.purged_values.drain(..cnt);
}
/// For legacy tests
#[cfg(test)]
pub fn process_pull_response(
&mut self,
crds: &mut Crds,
from: &Pubkey,
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
) -> (usize, usize, usize) {
let mut stats = ProcessPullStats::default();
let (versioned, versioned_expired_timeout) =
self.filter_pull_responses(crds, timeouts, response, now, &mut stats);
self.process_pull_responses(
crds,
from,
versioned,
versioned_expired_timeout,
now,
&mut stats,
);
(
stats.failed_timeout + stats.failed_insert,
stats.timeout_count,
stats.success,
)
}
}
#[cfg(test)]
mod test {

View File

@ -5,7 +5,7 @@ use solana_core::cluster_info;
use solana_core::contact_info::ContactInfo;
use solana_core::crds_gossip::*;
use solana_core::crds_gossip_error::CrdsGossipError;
use solana_core::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use solana_core::crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS};
use solana_core::crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS;
use solana_core::crds_value::CrdsValueLabel;
use solana_core::crds_value::{CrdsData, CrdsValue};
@ -447,14 +447,14 @@ fn network_run_pull(
bytes += serialized_size(&rsp).unwrap() as usize;
msgs += rsp.len();
if let Some(node) = network.get(&from) {
node.lock()
.unwrap()
.mark_pull_request_creation_time(&from, now);
overhead += node
.lock()
.unwrap()
.process_pull_response(&from, &timeouts, rsp, now)
.0;
let mut node = node.lock().unwrap();
node.mark_pull_request_creation_time(&from, now);
let mut stats = ProcessPullStats::default();
let (vers, vers_expired_timeout) =
node.filter_pull_responses(&timeouts, rsp, now, &mut stats);
node.process_pull_responses(&from, vers, vers_expired_timeout, now, &mut stats);
overhead += stats.failed_insert;
overhead += stats.failed_timeout;
}
(bytes, msgs, overhead)
})