makes CrdsGossipPull thread-safe (#18578)

This commit is contained in:
behzad nouri 2021-07-11 15:32:10 +00:00 committed by GitHub
parent d30a36641e
commit e7a1f2c9b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 61 deletions

View File

@ -1430,8 +1430,7 @@ impl ClusterInfo {
let num_requests = pulls.iter().map(|(_, filters)| filters.len() as u64).sum();
self.stats.new_pull_requests_count.add_relaxed(num_requests);
{
let mut gossip =
self.time_gossip_write_lock("mark_pull", &self.stats.mark_pull_request);
let gossip = self.time_gossip_read_lock("mark_pull", &self.stats.mark_pull_request);
for (peer, _) in &pulls {
gossip.mark_pull_request_creation_time(peer.id, now);
}
@ -4380,14 +4379,9 @@ mod tests {
.unwrap()
.mark_pull_request_creation_time(peer, now);
}
let gossip = cluster_info.gossip.read().unwrap();
assert_eq!(
cluster_info
.gossip
.read()
.unwrap()
.pull
.pull_request_time
.len(),
gossip.pull.pull_request_time().len(),
CRDS_UNIQUE_PUBKEY_CAPACITY
);
}

View File

@ -169,7 +169,7 @@ pub(crate) fn submit_gossip_stats(
gossip.crds.len(),
gossip.crds.num_nodes(),
gossip.crds.num_purged(),
gossip.pull.failed_inserts.len(),
gossip.pull.failed_inserts_size(),
)
};
let num_nodes_staked = stakes.values().filter(|stake| **stake > 0).count();

View File

@ -230,7 +230,7 @@ impl CrdsGossip {
/// This is used for weighted random selection during `new_pull_request`
/// It's important to use the local nodes request creation time as the weight
/// instead of the response received time otherwise failed nodes will increase their weight.
pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) {
pub fn mark_pull_request_creation_time(&self, from: Pubkey, now: u64) {
self.pull.mark_pull_request_creation_time(from, now)
}
/// process a pull request and create a response

View File

@ -33,9 +33,12 @@ use {
std::{
collections::{HashMap, HashSet, VecDeque},
convert::TryInto,
iter::repeat_with,
iter::{repeat, repeat_with},
net::SocketAddr,
sync::Mutex,
sync::{
atomic::{AtomicUsize, Ordering},
Mutex, RwLock,
},
time::{Duration, Instant},
},
};
@ -184,25 +187,25 @@ pub struct ProcessPullStats {
pub struct CrdsGossipPull {
/// timestamp of last request
pub(crate) pull_request_time: LruCache<Pubkey, u64>,
pull_request_time: RwLock<LruCache<Pubkey, /*timestamp:*/ u64>>,
// Hash value and record time (ms) of the pull responses which failed to be
// inserted in crds table; Preserved to stop the sender to send back the
// same outdated payload again by adding them to the filter for the next
// pull request.
pub failed_inserts: VecDeque<(Hash, u64)>,
failed_inserts: RwLock<VecDeque<(Hash, /*timestamp:*/ u64)>>,
pub crds_timeout: u64,
pub msg_timeout: u64,
pub num_pulls: usize,
msg_timeout: u64,
pub num_pulls: AtomicUsize,
}
impl Default for CrdsGossipPull {
fn default() -> Self {
Self {
pull_request_time: LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY),
failed_inserts: VecDeque::new(),
pull_request_time: RwLock::new(LruCache::new(CRDS_UNIQUE_PUBKEY_CAPACITY)),
failed_inserts: RwLock::default(),
crds_timeout: CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
msg_timeout: CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
num_pulls: 0,
num_pulls: AtomicUsize::default(),
}
}
}
@ -274,6 +277,7 @@ impl CrdsGossipPull {
) -> Vec<(u64, &'a ContactInfo)> {
let mut rng = rand::thread_rng();
let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS);
let pull_request_time = self.pull_request_time.read().unwrap();
crds.get_nodes()
.filter_map(|value| {
let info = value.value.contact_info().unwrap();
@ -297,8 +301,7 @@ impl CrdsGossipPull {
})
.map(|item| {
let max_weight = f32::from(u16::max_value()) - 1.0;
let req_time: u64 = self
.pull_request_time
let req_time: u64 = pull_request_time
.peek(&item.id)
.copied()
.unwrap_or_default();
@ -316,8 +319,8 @@ impl CrdsGossipPull {
/// This is used for weighted random selection during `new_pull_request`
/// It's important to use the local nodes request creation time as the weight
/// instead of the response received time otherwise failed nodes will increase their weight.
pub(crate) fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) {
self.pull_request_time.put(from, now);
pub(crate) fn mark_pull_request_creation_time(&self, from: Pubkey, now: u64) {
self.pull_request_time.write().unwrap().put(from, now);
}
/// process a pull request
@ -395,7 +398,7 @@ impl CrdsGossipPull {
/// process a vec of pull responses
pub(crate) fn process_pull_responses(
&mut self,
&self,
crds: &mut Crds,
from: &Pubkey,
responses: Vec<CrdsValue>,
@ -408,36 +411,42 @@ impl CrdsGossipPull {
for response in responses_expired_timeout {
let _ = crds.insert(response, now);
}
let mut num_inserts = 0;
for response in responses {
let owner = response.pubkey();
if let Ok(()) = crds.insert(response, now) {
stats.success += 1;
self.num_pulls += 1;
num_inserts += 1;
owners.insert(owner);
}
}
stats.success += num_inserts;
self.num_pulls.fetch_add(num_inserts, Ordering::Relaxed);
owners.insert(*from);
for owner in owners {
crds.update_record_timestamp(&owner, now);
}
stats.failed_insert += failed_inserts.len();
self.purge_failed_inserts(now);
self.failed_inserts
.extend(failed_inserts.into_iter().zip(std::iter::repeat(now)));
let failed_inserts = failed_inserts.into_iter().zip(repeat(now));
self.failed_inserts.write().unwrap().extend(failed_inserts);
}
pub(crate) fn purge_failed_inserts(&mut self, now: u64) {
pub(crate) fn purge_failed_inserts(&self, now: u64) {
if FAILED_INSERTS_RETENTION_MS < now {
let cutoff = now - FAILED_INSERTS_RETENTION_MS;
let outdated = self
.failed_inserts
let mut failed_inserts = self.failed_inserts.write().unwrap();
let outdated = failed_inserts
.iter()
.take_while(|(_, ts)| *ts < cutoff)
.count();
self.failed_inserts.drain(..outdated);
failed_inserts.drain(..outdated);
}
}
pub(crate) fn failed_inserts_size(&self) -> usize {
self.failed_inserts.read().unwrap().len()
}
// build a set of filters of the current crds table
// num_filters - used to increase the likelyhood of a value in crds being added to some filter
pub fn build_crds_filters(
@ -451,7 +460,7 @@ impl CrdsGossipPull {
const MIN_NUM_BLOOM_ITEMS: usize = 512;
#[cfg(not(debug_assertions))]
const MIN_NUM_BLOOM_ITEMS: usize = 65_536;
let num_items = crds.len() + crds.num_purged() + self.failed_inserts.len();
let num_items = crds.len() + crds.num_purged() + self.failed_inserts.read().unwrap().len();
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
let filters = CrdsFilterSet::new(num_items, bloom_size);
thread_pool.install(|| {
@ -461,6 +470,8 @@ impl CrdsGossipPull {
.chain(crds.purged().with_min_len(PAR_MIN_LENGTH))
.chain(
self.failed_inserts
.read()
.unwrap()
.par_iter()
.with_min_len(PAR_MIN_LENGTH)
.map(|(v, _)| *v),
@ -568,7 +579,7 @@ impl CrdsGossipPull {
/// For legacy tests
#[cfg(test)]
fn process_pull_response(
&mut self,
&self,
crds: &mut Crds,
from: &Pubkey,
timeouts: &HashMap<Pubkey, u64>,
@ -596,17 +607,29 @@ impl CrdsGossipPull {
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let mut pull_request_time = LruCache::new(self.pull_request_time.cap());
for (k, v) in self.pull_request_time.iter().rev() {
pull_request_time.put(*k, *v);
}
let pull_request_time = {
let pull_request_time = self.pull_request_time.read().unwrap();
let mut clone = LruCache::new(pull_request_time.cap());
for (k, v) in pull_request_time.iter().rev() {
clone.put(*k, *v);
}
clone
};
let failed_inserts = self.failed_inserts.read().unwrap().clone();
Self {
pull_request_time,
failed_inserts: self.failed_inserts.clone(),
pull_request_time: RwLock::new(pull_request_time),
failed_inserts: RwLock::new(failed_inserts),
num_pulls: AtomicUsize::new(self.num_pulls.load(Ordering::Relaxed)),
..*self
}
}
#[cfg(test)]
pub(crate) fn pull_request_time(&self) -> std::sync::RwLockReadGuard<LruCache<Pubkey, u64>> {
self.pull_request_time.read().unwrap()
}
}
#[cfg(test)]
pub(crate) mod tests {
use {
@ -911,7 +934,7 @@ pub(crate) mod tests {
&node_keypair.pubkey(),
0,
)));
let mut node = CrdsGossipPull::default();
let node = CrdsGossipPull::default();
let mut pings = Vec::new();
let ping_cache = Mutex::new(PingCache::new(
Duration::from_secs(20 * 60), // ttl
@ -1008,7 +1031,7 @@ pub(crate) mod tests {
&node_keypair.pubkey(),
0,
)));
let mut node = CrdsGossipPull::default();
let node = CrdsGossipPull::default();
crds.insert(entry, now).unwrap();
let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(old.id, old.gossip, Instant::now());
@ -1056,7 +1079,7 @@ pub(crate) mod tests {
const NUM_REPS: usize = 2 * CRDS_UNIQUE_PUBKEY_CAPACITY;
let mut rng = rand::thread_rng();
let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(NUM_REPS).collect();
let mut node = CrdsGossipPull::default();
let node = CrdsGossipPull::default();
let mut requests = HashMap::new();
let now = timestamp();
for k in 0..NUM_REPS {
@ -1065,21 +1088,22 @@ pub(crate) mod tests {
node.mark_pull_request_creation_time(pubkey, now);
*requests.entry(pubkey).or_default() = now;
}
assert!(node.pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY);
let pull_request_time = node.pull_request_time.read().unwrap();
assert!(pull_request_time.len() <= CRDS_UNIQUE_PUBKEY_CAPACITY);
// Assert that timestamps match most recent request.
for (pk, ts) in &node.pull_request_time {
for (pk, ts) in pull_request_time.iter() {
assert_eq!(*ts, requests[pk]);
}
// Assert that most recent pull timestamps are maintained.
let max_ts = requests
.iter()
.filter(|(pk, _)| !node.pull_request_time.contains(*pk))
.filter(|(pk, _)| !pull_request_time.contains(*pk))
.map(|(_, ts)| *ts)
.max()
.unwrap();
let min_ts = requests
.iter()
.filter(|(pk, _)| node.pull_request_time.contains(*pk))
.filter(|(pk, _)| pull_request_time.contains(*pk))
.map(|(_, ts)| *ts)
.min()
.unwrap();
@ -1236,7 +1260,7 @@ pub(crate) mod tests {
)));
let caller = entry.clone();
let node_pubkey = entry.label().pubkey();
let mut node = CrdsGossipPull::default();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
@ -1449,7 +1473,7 @@ pub(crate) mod tests {
#[test]
fn test_process_pull_response() {
let mut node_crds = Crds::default();
let mut node = CrdsGossipPull::default();
let node = CrdsGossipPull::default();
let peer_pubkey = solana_sdk::pubkey::new_rand();
let peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(

View File

@ -313,19 +313,19 @@ pub fn cluster_info_scale() {
let mut num_push_total = 0;
let mut num_pushes = 0;
let mut num_pulls = 0;
for node in nodes.iter() {
for (node, _, _) in nodes.iter() {
//if node.0.get_votes(0).1.len() != (num_nodes * num_votes) {
let has_tx = node
.0
.get_votes(&mut Cursor::default())
.1
.iter()
.filter(|v| v.message.account_keys == tx.message.account_keys)
.count();
num_old += node.0.gossip.read().unwrap().push.num_old;
num_push_total += node.0.gossip.read().unwrap().push.num_total;
num_pushes += node.0.gossip.read().unwrap().push.num_pushes;
num_pulls += node.0.gossip.read().unwrap().pull.num_pulls;
let gossip = node.gossip.read().unwrap();
num_old += gossip.push.num_old;
num_push_total += gossip.push.num_total;
num_pushes += gossip.push.num_pushes;
num_pulls += gossip.pull.num_pulls.load(Ordering::Relaxed);
if has_tx == 0 {
not_done += 1;
}
@ -347,11 +347,12 @@ pub fn cluster_info_scale() {
num_votes, time, success
);
sleep(Duration::from_millis(200));
for node in nodes.iter() {
node.0.gossip.write().unwrap().push.num_old = 0;
node.0.gossip.write().unwrap().push.num_total = 0;
node.0.gossip.write().unwrap().push.num_pushes = 0;
node.0.gossip.write().unwrap().pull.num_pulls = 0;
for (node, _, _) in nodes.iter() {
let mut gossip = node.gossip.write().unwrap();
gossip.push.num_old = 0;
gossip.push.num_total = 0;
gossip.push.num_pushes = 0;
gossip.pull.num_pulls.store(0, Ordering::Relaxed);
}
}