shares the lock on gossip when processing prune messages (#13339)
Processing prune messages acquires an exclusive lock on gossip: https://github.com/solana-labs/solana/blob/55b0428ff/core/src/cluster_info.rs#L1824-L1825 This can be reduced to a shared lock if active-sets are changed to use atomic bloom filters: https://github.com/solana-labs/solana/blob/55b0428ff/core/src/crds_gossip_push.rs#L50
This commit is contained in:
parent
bc62313c66
commit
8f0796436a
|
@ -539,7 +539,7 @@ impl ClusterInfo {
|
||||||
|
|
||||||
// Should only be used by tests and simulations
|
// Should only be used by tests and simulations
|
||||||
pub fn clone_with_id(&self, new_id: &Pubkey) -> Self {
|
pub fn clone_with_id(&self, new_id: &Pubkey) -> Self {
|
||||||
let mut gossip = self.gossip.read().unwrap().clone();
|
let mut gossip = self.gossip.read().unwrap().mock_clone();
|
||||||
gossip.id = *new_id;
|
gossip.id = *new_id;
|
||||||
let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
|
let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
|
||||||
my_contact_info.id = *new_id;
|
my_contact_info.id = *new_id;
|
||||||
|
@ -1856,8 +1856,7 @@ impl ClusterInfo {
|
||||||
let mut prune_message_timeout = 0;
|
let mut prune_message_timeout = 0;
|
||||||
let mut bad_prune_destination = 0;
|
let mut bad_prune_destination = 0;
|
||||||
{
|
{
|
||||||
let mut gossip =
|
let gossip = self.time_gossip_read_lock("process_prune", &self.stats.process_prune);
|
||||||
self.time_gossip_write_lock("process_prune", &self.stats.process_prune);
|
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
for (from, data) in messages {
|
for (from, data) in messages {
|
||||||
match gossip.process_prune_msg(
|
match gossip.process_prune_msg(
|
||||||
|
|
|
@ -17,7 +17,6 @@ use std::collections::{HashMap, HashSet};
|
||||||
///The min size for bloom filters
|
///The min size for bloom filters
|
||||||
pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500;
|
pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500;
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct CrdsGossip {
|
pub struct CrdsGossip {
|
||||||
pub crds: Crds,
|
pub crds: Crds,
|
||||||
pub id: Pubkey,
|
pub id: Pubkey,
|
||||||
|
@ -108,7 +107,7 @@ impl CrdsGossip {
|
||||||
|
|
||||||
/// add the `from` to the peer's filter of nodes
|
/// add the `from` to the peer's filter of nodes
|
||||||
pub fn process_prune_msg(
|
pub fn process_prune_msg(
|
||||||
&mut self,
|
&self,
|
||||||
peer: &Pubkey,
|
peer: &Pubkey,
|
||||||
destination: &Pubkey,
|
destination: &Pubkey,
|
||||||
origin: &[Pubkey],
|
origin: &[Pubkey],
|
||||||
|
@ -266,6 +265,16 @@ impl CrdsGossip {
|
||||||
self.pull.purge_failed_inserts(now);
|
self.pull.purge_failed_inserts(now);
|
||||||
rv
|
rv
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Only for tests and simulations.
|
||||||
|
pub(crate) fn mock_clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
crds: self.crds.clone(),
|
||||||
|
push: self.push.mock_clone(),
|
||||||
|
pull: self.pull.clone(),
|
||||||
|
..*self
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Computes a normalized(log of actual stake) stake
|
/// Computes a normalized(log of actual stake) stake
|
||||||
|
|
|
@ -20,7 +20,7 @@ use bincode::serialized_size;
|
||||||
use indexmap::map::IndexMap;
|
use indexmap::map::IndexMap;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use rand::{seq::SliceRandom, Rng};
|
use rand::{seq::SliceRandom, Rng};
|
||||||
use solana_runtime::bloom::Bloom;
|
use solana_runtime::bloom::{AtomicBloom, Bloom};
|
||||||
use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp};
|
use solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::timestamp};
|
||||||
use std::{
|
use std::{
|
||||||
cmp,
|
cmp,
|
||||||
|
@ -42,12 +42,11 @@ const PUSH_ACTIVE_TIMEOUT_MS: u64 = 60_000;
|
||||||
// 10 minutes
|
// 10 minutes
|
||||||
const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000;
|
const MAX_PUSHED_TO_TIMEOUT_MS: u64 = 10 * 60 * 1000;
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct CrdsGossipPush {
|
pub struct CrdsGossipPush {
|
||||||
/// max bytes per message
|
/// max bytes per message
|
||||||
pub max_bytes: usize,
|
pub max_bytes: usize,
|
||||||
/// active set of validators for push
|
/// active set of validators for push
|
||||||
active_set: IndexMap<Pubkey, Bloom<Pubkey>>,
|
active_set: IndexMap<Pubkey, AtomicBloom<Pubkey>>,
|
||||||
/// push message queue
|
/// push message queue
|
||||||
push_messages: HashMap<CrdsValueLabel, Hash>,
|
push_messages: HashMap<CrdsValueLabel, Hash>,
|
||||||
/// Cache that tracks which validators a message was received from
|
/// Cache that tracks which validators a message was received from
|
||||||
|
@ -287,11 +286,11 @@ impl CrdsGossipPush {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// add the `from` to the peer's filter of nodes
|
/// add the `from` to the peer's filter of nodes
|
||||||
pub fn process_prune_msg(&mut self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) {
|
pub fn process_prune_msg(&self, self_pubkey: &Pubkey, peer: &Pubkey, origins: &[Pubkey]) {
|
||||||
if let Some(peer) = self.active_set.get_mut(peer) {
|
if let Some(filter) = self.active_set.get(peer) {
|
||||||
for origin in origins {
|
for origin in origins {
|
||||||
if origin != self_pubkey {
|
if origin != self_pubkey {
|
||||||
peer.add(origin);
|
filter.add(origin);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -348,7 +347,7 @@ impl CrdsGossipPush {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size);
|
let size = cmp::max(CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS, network_size);
|
||||||
let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4);
|
let bloom: AtomicBloom<_> = Bloom::random(size, 0.1, 1024 * 8 * 4).into();
|
||||||
bloom.add(&item.id);
|
bloom.add(&item.id);
|
||||||
new_items.insert(item.id, bloom);
|
new_items.insert(item.id, bloom);
|
||||||
}
|
}
|
||||||
|
@ -427,6 +426,21 @@ impl CrdsGossipPush {
|
||||||
!v.is_empty()
|
!v.is_empty()
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Only for tests and simulations.
|
||||||
|
pub(crate) fn mock_clone(&self) -> Self {
|
||||||
|
let mut active_set = IndexMap::<Pubkey, AtomicBloom<Pubkey>>::new();
|
||||||
|
for (k, v) in &self.active_set {
|
||||||
|
active_set.insert(*k, v.mock_clone());
|
||||||
|
}
|
||||||
|
Self {
|
||||||
|
active_set,
|
||||||
|
push_messages: self.push_messages.clone(),
|
||||||
|
received_cache: self.received_cache.clone(),
|
||||||
|
last_pushed_to: self.last_pushed_to.clone(),
|
||||||
|
..*self
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -346,7 +346,7 @@ fn network_run_push(
|
||||||
network
|
network
|
||||||
.get(&from)
|
.get(&from)
|
||||||
.map(|node| {
|
.map(|node| {
|
||||||
let mut node = node.lock().unwrap();
|
let node = node.lock().unwrap();
|
||||||
let destination = node.id;
|
let destination = node.id;
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
node.process_prune_msg(&to, &destination, &prune_keys, now, now)
|
node.process_prune_msg(&to, &destination, &prune_keys, now, now)
|
||||||
|
|
|
@ -135,7 +135,6 @@ fn bench_add_hash_atomic(bencher: &mut Bencher) {
|
||||||
for hash_value in &hash_values {
|
for hash_value in &hash_values {
|
||||||
bloom.add(hash_value);
|
bloom.add(hash_value);
|
||||||
}
|
}
|
||||||
let bloom: Bloom<_> = bloom.into();
|
|
||||||
let index = rng.gen_range(0, hash_values.len());
|
let index = rng.gen_range(0, hash_values.len());
|
||||||
if !bloom.contains(&hash_values[index]) {
|
if !bloom.contains(&hash_values[index]) {
|
||||||
fail += 1;
|
fail += 1;
|
||||||
|
|
|
@ -146,16 +146,42 @@ impl<T: BloomHashIndex> From<Bloom<T>> for AtomicBloom<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: BloomHashIndex> AtomicBloom<T> {
|
impl<T: BloomHashIndex> AtomicBloom<T> {
|
||||||
|
fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) {
|
||||||
|
let pos = key.hash_at_index(hash_index) % self.num_bits;
|
||||||
|
// Divide by 64 to figure out which of the
|
||||||
|
// AtomicU64 bit chunks we need to modify.
|
||||||
|
let index = pos >> 6;
|
||||||
|
// (pos & 63) is equivalent to mod 64 so that we can find
|
||||||
|
// the index of the bit within the AtomicU64 to modify.
|
||||||
|
let mask = 1u64 << (pos & 63);
|
||||||
|
(index as usize, mask)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn add(&self, key: &T) {
|
pub fn add(&self, key: &T) {
|
||||||
for k in &self.keys {
|
for k in &self.keys {
|
||||||
let pos = key.hash_at_index(*k) % self.num_bits;
|
let (index, mask) = self.pos(key, *k);
|
||||||
// Divide by 64 to figure out which of the
|
self.bits[index].fetch_or(mask, Ordering::Relaxed);
|
||||||
// AtomicU64 bit chunks we need to modify.
|
}
|
||||||
let index = pos >> 6;
|
}
|
||||||
// (pos & 63) is equivalent to mod 64 so that we can find
|
|
||||||
// the index of the bit within the AtomicU64 to modify.
|
pub fn contains(&self, key: &T) -> bool {
|
||||||
let bit = 1u64 << (pos & 63);
|
self.keys.iter().all(|k| {
|
||||||
self.bits[index as usize].fetch_or(bit, Ordering::Relaxed);
|
let (index, mask) = self.pos(key, *k);
|
||||||
|
let bit = self.bits[index].load(Ordering::Relaxed) & mask;
|
||||||
|
bit != 0u64
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only for tests and simulations.
|
||||||
|
pub fn mock_clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
keys: self.keys.clone(),
|
||||||
|
bits: self
|
||||||
|
.bits
|
||||||
|
.iter()
|
||||||
|
.map(|v| AtomicU64::new(v.load(Ordering::Relaxed)))
|
||||||
|
.collect(),
|
||||||
|
..*self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -303,6 +329,9 @@ mod test {
|
||||||
let bloom: AtomicBloom<_> = bloom.into();
|
let bloom: AtomicBloom<_> = bloom.into();
|
||||||
assert_eq!(bloom.num_bits, 9731);
|
assert_eq!(bloom.num_bits, 9731);
|
||||||
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
|
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
|
||||||
|
for hash_value in &hash_values {
|
||||||
|
assert!(bloom.contains(hash_value));
|
||||||
|
}
|
||||||
let bloom: Bloom<_> = bloom.into();
|
let bloom: Bloom<_> = bloom.into();
|
||||||
assert_eq!(bloom.num_bits_set, num_bits_set);
|
assert_eq!(bloom.num_bits_set, num_bits_set);
|
||||||
for hash_value in &hash_values {
|
for hash_value in &hash_values {
|
||||||
|
@ -311,6 +340,9 @@ mod test {
|
||||||
// Round trip, re-inserting the same hash values.
|
// Round trip, re-inserting the same hash values.
|
||||||
let bloom: AtomicBloom<_> = bloom.into();
|
let bloom: AtomicBloom<_> = bloom.into();
|
||||||
hash_values.par_iter().for_each(|v| bloom.add(v));
|
hash_values.par_iter().for_each(|v| bloom.add(v));
|
||||||
|
for hash_value in &hash_values {
|
||||||
|
assert!(bloom.contains(hash_value));
|
||||||
|
}
|
||||||
let bloom: Bloom<_> = bloom.into();
|
let bloom: Bloom<_> = bloom.into();
|
||||||
assert_eq!(bloom.num_bits_set, num_bits_set);
|
assert_eq!(bloom.num_bits_set, num_bits_set);
|
||||||
assert_eq!(bloom.bits.len(), 9731);
|
assert_eq!(bloom.bits.len(), 9731);
|
||||||
|
@ -326,6 +358,17 @@ mod test {
|
||||||
assert_eq!(bloom.num_bits, 9731);
|
assert_eq!(bloom.num_bits, 9731);
|
||||||
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
|
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
|
||||||
more_hash_values.par_iter().for_each(|v| bloom.add(v));
|
more_hash_values.par_iter().for_each(|v| bloom.add(v));
|
||||||
|
for hash_value in &hash_values {
|
||||||
|
assert!(bloom.contains(hash_value));
|
||||||
|
}
|
||||||
|
for hash_value in &more_hash_values {
|
||||||
|
assert!(bloom.contains(hash_value));
|
||||||
|
}
|
||||||
|
let false_positive = std::iter::repeat_with(|| solana_sdk::hash::new_rand(&mut rng))
|
||||||
|
.take(10_000)
|
||||||
|
.filter(|hash_value| bloom.contains(hash_value))
|
||||||
|
.count();
|
||||||
|
assert!(false_positive < 2000, "false_positive: {}", false_positive);
|
||||||
let bloom: Bloom<_> = bloom.into();
|
let bloom: Bloom<_> = bloom.into();
|
||||||
assert_eq!(bloom.bits.len(), 9731);
|
assert_eq!(bloom.bits.len(), 9731);
|
||||||
assert!(bloom.num_bits_set > num_bits_set);
|
assert!(bloom.num_bits_set > num_bits_set);
|
||||||
|
|
Loading…
Reference in New Issue