makes CrdsGossip thread-safe (#18615)

This commit is contained in:
behzad nouri 2021-07-14 22:27:17 +00:00 committed by GitHub
parent 9ed1f24188
commit cf31afdd6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 477 additions and 471 deletions

View File

@ -296,12 +296,12 @@ mod tests {
let cluster_info = ClusterInfo::new_with_invalid_keypair(this_node); let cluster_info = ClusterInfo::new_with_invalid_keypair(this_node);
{ {
let now = timestamp(); let now = timestamp();
let mut gossip = cluster_info.gossip.write().unwrap(); let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
// First node is pushed to crds table by ClusterInfo constructor. // First node is pushed to crds table by ClusterInfo constructor.
for node in nodes.iter().skip(1) { for node in nodes.iter().skip(1) {
let node = CrdsData::ContactInfo(node.clone()); let node = CrdsData::ContactInfo(node.clone());
let node = CrdsValue::new_unsigned(node); let node = CrdsValue::new_unsigned(node);
assert_eq!(gossip.crds.insert(node, now), Ok(())); assert_eq!(gossip_crds.insert(node, now), Ok(()));
} }
} }
(nodes, stakes, cluster_info) (nodes, stakes, cluster_info)

View File

@ -192,8 +192,8 @@ mod test {
cluster_info.flush_push_queue(); cluster_info.flush_push_queue();
let lowest = { let lowest = {
let label = CrdsValueLabel::LowestSlot(pubkey); let label = CrdsValueLabel::LowestSlot(pubkey);
let gossip = cluster_info.gossip.read().unwrap(); let gossip_crds = cluster_info.gossip.crds.read().unwrap();
let entry = gossip.crds.get(&label).unwrap(); let entry = gossip_crds.get(&label).unwrap();
entry.value.lowest_slot().unwrap().clone() entry.value.lowest_slot().unwrap().clone()
}; };
assert_eq!(lowest.lowest, 5); assert_eq!(lowest.lowest, 5);

View File

@ -12,6 +12,7 @@ use {
crds_value::CrdsValue, crds_value::CrdsValue,
}, },
solana_sdk::hash, solana_sdk::hash,
std::sync::RwLock,
test::Bencher, test::Bencher,
}; };
@ -45,6 +46,7 @@ fn bench_build_crds_filters(bencher: &mut Bencher) {
} }
} }
assert_eq!(num_inserts, 90_000); assert_eq!(num_inserts, 90_000);
let crds = RwLock::new(crds);
bencher.iter(|| { bencher.iter(|| {
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
assert_eq!(filters.len(), 128); assert_eq!(filters.len(), 128);

View File

@ -81,7 +81,7 @@ use {
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc::{Receiver, RecvTimeoutError, Sender}, mpsc::{Receiver, RecvTimeoutError, Sender},
{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, {Arc, Mutex, RwLock, RwLockReadGuard},
}, },
thread::{sleep, Builder, JoinHandle}, thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant}, time::{Duration, Instant},
@ -141,7 +141,7 @@ pub enum ClusterInfoError {
pub struct ClusterInfo { pub struct ClusterInfo {
/// The network /// The network
pub gossip: RwLock<CrdsGossip>, pub gossip: CrdsGossip,
/// set the keypair that will be used to sign crds values generated. It is unset only in tests. /// set the keypair that will be used to sign crds values generated. It is unset only in tests.
keypair: RwLock<Arc<Keypair>>, keypair: RwLock<Arc<Keypair>>,
/// Network entrypoints /// Network entrypoints
@ -398,9 +398,9 @@ impl ClusterInfo {
pub fn new(contact_info: ContactInfo, keypair: Arc<Keypair>) -> Self { pub fn new(contact_info: ContactInfo, keypair: Arc<Keypair>) -> Self {
let id = contact_info.id; let id = contact_info.id;
let me = Self { let me = Self {
gossip: RwLock::new(CrdsGossip::default()), gossip: CrdsGossip::default(),
keypair: RwLock::new(keypair), keypair: RwLock::new(keypair),
entrypoints: RwLock::new(vec![]), entrypoints: RwLock::default(),
outbound_budget: DataBudget::default(), outbound_budget: DataBudget::default(),
my_contact_info: RwLock::new(contact_info), my_contact_info: RwLock::new(contact_info),
ping_cache: Mutex::new(PingCache::new( ping_cache: Mutex::new(PingCache::new(
@ -422,11 +422,10 @@ impl ClusterInfo {
// Should only be used by tests and simulations // Should only be used by tests and simulations
pub fn clone_with_id(&self, new_id: &Pubkey) -> Self { pub fn clone_with_id(&self, new_id: &Pubkey) -> Self {
let gossip = self.gossip.read().unwrap().mock_clone();
let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
my_contact_info.id = *new_id; my_contact_info.id = *new_id;
ClusterInfo { ClusterInfo {
gossip: RwLock::new(gossip), gossip: self.gossip.mock_clone(),
keypair: RwLock::new(self.keypair.read().unwrap().clone()), keypair: RwLock::new(self.keypair.read().unwrap().clone()),
entrypoints: RwLock::new(self.entrypoints.read().unwrap().clone()), entrypoints: RwLock::new(self.entrypoints.read().unwrap().clone()),
outbound_budget: self.outbound_budget.clone_non_atomic(), outbound_budget: self.outbound_budget.clone_non_atomic(),
@ -474,18 +473,14 @@ impl ClusterInfo {
shred_version, shred_version,
.. ..
} = *self.my_contact_info.read().unwrap(); } = *self.my_contact_info.read().unwrap();
self.gossip.write().unwrap().refresh_push_active_set( self.gossip
&self_pubkey, .refresh_push_active_set(&self_pubkey, shred_version, stakes, gossip_validators);
shred_version,
stakes,
gossip_validators,
);
} }
// TODO kill insert_info, only used by tests // TODO kill insert_info, only used by tests
pub fn insert_info(&self, contact_info: ContactInfo) { pub fn insert_info(&self, contact_info: ContactInfo) {
let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair()); let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair());
let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); let _ = self.gossip.crds.write().unwrap().insert(value, timestamp());
} }
pub fn set_entrypoint(&self, entrypoint: ContactInfo) { pub fn set_entrypoint(&self, entrypoint: ContactInfo) {
@ -498,7 +493,6 @@ impl ClusterInfo {
pub fn save_contact_info(&self) { pub fn save_contact_info(&self) {
let nodes = { let nodes = {
let gossip = self.gossip.read().unwrap();
let entrypoint_gossip_addrs = self let entrypoint_gossip_addrs = self
.entrypoints .entrypoints
.read() .read()
@ -507,8 +501,8 @@ impl ClusterInfo {
.map(|contact_info| contact_info.gossip) .map(|contact_info| contact_info.gossip)
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
let self_pubkey = self.id(); let self_pubkey = self.id();
gossip let gossip_crds = self.gossip.crds.read().unwrap();
.crds gossip_crds
.get_nodes() .get_nodes()
.filter_map(|v| { .filter_map(|v| {
// Don't save: // Don't save:
@ -599,9 +593,9 @@ impl ClusterInfo {
filename.display() filename.display()
); );
let now = timestamp(); let now = timestamp();
let mut gossip = self.gossip.write().unwrap(); let mut gossip_crds = self.gossip.crds.write().unwrap();
for node in nodes { for node in nodes {
if let Err(err) = gossip.crds.insert(node, now) { if let Err(err) = gossip_crds.insert(node, now) {
warn!("crds insert failed {:?}", err); warn!("crds insert failed {:?}", err);
} }
} }
@ -632,23 +626,17 @@ impl ClusterInfo {
where where
F: FnOnce(&ContactInfo) -> Y, F: FnOnce(&ContactInfo) -> Y,
{ {
let label = CrdsValueLabel::ContactInfo(*id); let gossip_crds = self.gossip.crds.read().unwrap();
let gossip = self.gossip.read().unwrap(); gossip_crds.get_contact_info(*id).map(map)
let entry = gossip.crds.get(&label)?;
Some(map(entry.value.contact_info()?))
} }
pub fn lookup_contact_info_by_gossip_addr( pub fn lookup_contact_info_by_gossip_addr(
&self, &self,
gossip_addr: &SocketAddr, gossip_addr: &SocketAddr,
) -> Option<ContactInfo> { ) -> Option<ContactInfo> {
self.gossip let gossip_crds = self.gossip.crds.read().unwrap();
.read() let mut nodes = gossip_crds.get_nodes_contact_info();
.unwrap() nodes.find(|node| node.gossip == *gossip_addr).cloned()
.crds
.get_nodes_contact_info()
.find(|peer| peer.gossip == *gossip_addr)
.cloned()
} }
pub fn my_contact_info(&self) -> ContactInfo { pub fn my_contact_info(&self) -> ContactInfo {
@ -662,9 +650,9 @@ impl ClusterInfo {
fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots { fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots {
let self_pubkey = self.id(); let self_pubkey = self.id();
let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); let label = CrdsValueLabel::EpochSlots(ix, self_pubkey);
let gossip = self.gossip.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
let entry = gossip.crds.get(&label); gossip_crds
entry .get(&label)
.and_then(|v| v.value.epoch_slots()) .and_then(|v| v.value.epoch_slots())
.cloned() .cloned()
.unwrap_or_else(|| EpochSlots::new(self_pubkey, timestamp())) .unwrap_or_else(|| EpochSlots::new(self_pubkey, timestamp()))
@ -821,12 +809,12 @@ impl ClusterInfo {
) )
} }
// TODO: This has a race condition if called from more than one thread.
pub fn push_lowest_slot(&self, min: Slot) { pub fn push_lowest_slot(&self, min: Slot) {
let self_pubkey = self.id(); let self_pubkey = self.id();
let last = { let last = {
let gossip = self.gossip.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
gossip gossip_crds
.crds
.get_lowest_slot(self_pubkey) .get_lowest_slot(self_pubkey)
.map(|x| x.lowest) .map(|x| x.lowest)
.unwrap_or_default() .unwrap_or_default()
@ -849,12 +837,12 @@ impl ClusterInfo {
pub fn push_epoch_slots(&self, mut update: &[Slot]) { pub fn push_epoch_slots(&self, mut update: &[Slot]) {
let self_pubkey = self.id(); let self_pubkey = self.id();
let current_slots: Vec<_> = { let current_slots: Vec<_> = {
let gossip = let gossip_crds =
self.time_gossip_read_lock("lookup_epoch_slots", &self.stats.epoch_slots_lookup); self.time_gossip_read_lock("lookup_epoch_slots", &self.stats.epoch_slots_lookup);
(0..crds_value::MAX_EPOCH_SLOTS) (0..crds_value::MAX_EPOCH_SLOTS)
.filter_map(|ix| { .filter_map(|ix| {
let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); let label = CrdsValueLabel::EpochSlots(ix, self_pubkey);
let epoch_slots = gossip.crds.get(&label)?.value.epoch_slots()?; let epoch_slots = gossip_crds.get(&label)?.value.epoch_slots()?;
let first_slot = epoch_slots.first_slot()?; let first_slot = epoch_slots.first_slot()?;
Some((epoch_slots.wallclock, first_slot, ix)) Some((epoch_slots.wallclock, first_slot, ix))
}) })
@ -903,10 +891,10 @@ impl ClusterInfo {
epoch_slot_index += 1; epoch_slot_index += 1;
reset = true; reset = true;
} }
let mut gossip = self.gossip.write().unwrap(); let mut gossip_crds = self.gossip.crds.write().unwrap();
let now = timestamp(); let now = timestamp();
for entry in entries { for entry in entries {
if let Err(err) = gossip.crds.insert(entry, now) { if let Err(err) = gossip_crds.insert(entry, now) {
error!("push_epoch_slots failed: {:?}", err); error!("push_epoch_slots failed: {:?}", err);
} }
} }
@ -916,16 +904,8 @@ impl ClusterInfo {
&'a self, &'a self,
label: &'static str, label: &'static str,
counter: &'a Counter, counter: &'a Counter,
) -> TimedGuard<'a, RwLockReadGuard<CrdsGossip>> { ) -> TimedGuard<'a, RwLockReadGuard<Crds>> {
TimedGuard::new(self.gossip.read().unwrap(), label, counter) TimedGuard::new(self.gossip.crds.read().unwrap(), label, counter)
}
fn time_gossip_write_lock<'a>(
&'a self,
label: &'static str,
counter: &'a Counter,
) -> TimedGuard<'a, RwLockWriteGuard<CrdsGossip>> {
TimedGuard::new(self.gossip.write().unwrap(), label, counter)
} }
pub fn push_message(&self, message: CrdsValue) { pub fn push_message(&self, message: CrdsValue) {
@ -968,8 +948,8 @@ impl ClusterInfo {
let vote = Vote::new(self_pubkey, vote, now); let vote = Vote::new(self_pubkey, vote, now);
let vote = CrdsData::Vote(vote_index, vote); let vote = CrdsData::Vote(vote_index, vote);
let vote = CrdsValue::new_signed(vote, &self.keypair()); let vote = CrdsValue::new_signed(vote, &self.keypair());
let mut gossip = self.gossip.write().unwrap(); let mut gossip_crds = self.gossip.crds.write().unwrap();
if let Err(err) = gossip.crds.insert(vote, now) { if let Err(err) = gossip_crds.insert(vote, now) {
error!("push_vote failed: {:?}", err); error!("push_vote failed: {:?}", err);
} }
} }
@ -999,12 +979,12 @@ impl ClusterInfo {
} }
}; };
let vote_index = { let vote_index = {
let gossip = let gossip_crds =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read); self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8) (0..MAX_LOCKOUT_HISTORY as u8)
.filter_map(|ix| { .filter_map(|ix| {
let vote = CrdsValueLabel::Vote(ix, self_pubkey); let vote = CrdsValueLabel::Vote(ix, self_pubkey);
let vote = gossip.crds.get(&vote)?; let vote = gossip_crds.get(&vote)?;
num_crds_votes += 1; num_crds_votes += 1;
match &vote.value.data { match &vote.value.data {
CrdsData::Vote(_, vote) if should_evict_vote(vote) => { CrdsData::Vote(_, vote) if should_evict_vote(vote) => {
@ -1024,11 +1004,11 @@ impl ClusterInfo {
pub fn refresh_vote(&self, vote: Transaction, vote_slot: Slot) { pub fn refresh_vote(&self, vote: Transaction, vote_slot: Slot) {
let vote_index = { let vote_index = {
let self_pubkey = self.id(); let self_pubkey = self.id();
let gossip = let gossip_crds =
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read); self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8).find(|ix| { (0..MAX_LOCKOUT_HISTORY as u8).find(|ix| {
let vote = CrdsValueLabel::Vote(*ix, self_pubkey); let vote = CrdsValueLabel::Vote(*ix, self_pubkey);
if let Some(vote) = gossip.crds.get(&vote) { if let Some(vote) = gossip_crds.get(&vote) {
match &vote.value.data { match &vote.value.data {
CrdsData::Vote(_, prev_vote) => match prev_vote.slot() { CrdsData::Vote(_, prev_vote) => match prev_vote.slot() {
Some(prev_vote_slot) => prev_vote_slot == vote_slot, Some(prev_vote_slot) => prev_vote_slot == vote_slot,
@ -1070,7 +1050,6 @@ impl ClusterInfo {
pub fn get_votes(&self, cursor: &mut Cursor) -> (Vec<CrdsValueLabel>, Vec<Transaction>) { pub fn get_votes(&self, cursor: &mut Cursor) -> (Vec<CrdsValueLabel>, Vec<Transaction>) {
let (labels, txs): (_, Vec<_>) = self let (labels, txs): (_, Vec<_>) = self
.time_gossip_read_lock("get_votes", &self.stats.get_votes) .time_gossip_read_lock("get_votes", &self.stats.get_votes)
.crds
.get_votes(cursor) .get_votes(cursor)
.map(|vote| { .map(|vote| {
let transaction = match &vote.value.data { let transaction = match &vote.value.data {
@ -1089,7 +1068,7 @@ impl ClusterInfo {
shred: &Shred, shred: &Shred,
other_payload: &[u8], other_payload: &[u8],
) -> Result<(), GossipError> { ) -> Result<(), GossipError> {
self.gossip.write().unwrap().push_duplicate_shred( self.gossip.push_duplicate_shred(
&self.keypair(), &self.keypair(),
shred, shred,
other_payload, other_payload,
@ -1104,7 +1083,6 @@ impl ClusterInfo {
F: FnOnce(&Vec<(Slot, Hash)>) -> Y, F: FnOnce(&Vec<(Slot, Hash)>) -> Y,
{ {
self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash) self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash)
.crds
.get(&CrdsValueLabel::AccountsHashes(*pubkey)) .get(&CrdsValueLabel::AccountsHashes(*pubkey))
.map(|x| &x.value.accounts_hash().unwrap().hashes) .map(|x| &x.value.accounts_hash().unwrap().hashes)
.map(map) .map(map)
@ -1114,10 +1092,8 @@ impl ClusterInfo {
where where
F: FnOnce(&Vec<(Slot, Hash)>) -> Y, F: FnOnce(&Vec<(Slot, Hash)>) -> Y,
{ {
self.gossip let gossip_crds = self.gossip.crds.read().unwrap();
.read() gossip_crds
.unwrap()
.crds
.get(&CrdsValueLabel::SnapshotHashes(*pubkey)) .get(&CrdsValueLabel::SnapshotHashes(*pubkey))
.map(|x| &x.value.snapshot_hash().unwrap().hashes) .map(|x| &x.value.snapshot_hash().unwrap().hashes)
.map(map) .map(map)
@ -1127,11 +1103,12 @@ impl ClusterInfo {
/// Excludes entries from nodes with unkown or different shred version. /// Excludes entries from nodes with unkown or different shred version.
pub fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec<EpochSlots> { pub fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec<EpochSlots> {
let self_shred_version = Some(self.my_shred_version()); let self_shred_version = Some(self.my_shred_version());
let gossip = self.gossip.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
let entries = gossip.crds.get_epoch_slots(cursor); gossip_crds
entries .get_epoch_slots(cursor)
.filter(|entry| { .filter(|entry| {
gossip.crds.get_shred_version(&entry.value.pubkey()) == self_shred_version let origin = entry.value.pubkey();
gossip_crds.get_shred_version(&origin) == self_shred_version
}) })
.map(|entry| match &entry.value.data { .map(|entry| match &entry.value.data {
CrdsData::EpochSlots(_, slots) => slots.clone(), CrdsData::EpochSlots(_, slots) => slots.clone(),
@ -1141,12 +1118,12 @@ impl ClusterInfo {
} }
pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> { pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
let gossip = self.gossip.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
let version = gossip.crds.get(&CrdsValueLabel::Version(*pubkey)); let version = gossip_crds.get(&CrdsValueLabel::Version(*pubkey));
if let Some(version) = version.and_then(|v| v.value.version()) { if let Some(version) = version.and_then(|v| v.value.version()) {
return Some(version.version.clone()); return Some(version.version.clone());
} }
let version = gossip.crds.get(&CrdsValueLabel::LegacyVersion(*pubkey))?; let version = gossip_crds.get(&CrdsValueLabel::LegacyVersion(*pubkey))?;
let version = version.value.legacy_version()?; let version = version.value.legacy_version()?;
Some(version.version.clone().into()) Some(version.version.clone().into())
} }
@ -1154,10 +1131,8 @@ impl ClusterInfo {
/// all validators that have a valid rpc port regardless of `shred_version`. /// all validators that have a valid rpc port regardless of `shred_version`.
pub fn all_rpc_peers(&self) -> Vec<ContactInfo> { pub fn all_rpc_peers(&self) -> Vec<ContactInfo> {
let self_pubkey = self.id(); let self_pubkey = self.id();
self.gossip let gossip_crds = self.gossip.crds.read().unwrap();
.read() gossip_crds
.unwrap()
.crds
.get_nodes_contact_info() .get_nodes_contact_info()
.filter(|x| x.id != self_pubkey && ContactInfo::is_valid_address(&x.rpc)) .filter(|x| x.id != self_pubkey && ContactInfo::is_valid_address(&x.rpc))
.cloned() .cloned()
@ -1166,10 +1141,8 @@ impl ClusterInfo {
// All nodes in gossip (including spy nodes) and the last time we heard about them // All nodes in gossip (including spy nodes) and the last time we heard about them
pub fn all_peers(&self) -> Vec<(ContactInfo, u64)> { pub fn all_peers(&self) -> Vec<(ContactInfo, u64)> {
self.gossip let gossip_crds = self.gossip.crds.read().unwrap();
.read() gossip_crds
.unwrap()
.crds
.get_nodes() .get_nodes()
.map(|x| (x.value.contact_info().unwrap().clone(), x.local_timestamp)) .map(|x| (x.value.contact_info().unwrap().clone(), x.local_timestamp))
.collect() .collect()
@ -1177,10 +1150,8 @@ impl ClusterInfo {
pub fn gossip_peers(&self) -> Vec<ContactInfo> { pub fn gossip_peers(&self) -> Vec<ContactInfo> {
let me = self.id(); let me = self.id();
self.gossip let gossip_crds = self.gossip.crds.read().unwrap();
.read() gossip_crds
.unwrap()
.crds
.get_nodes_contact_info() .get_nodes_contact_info()
// shred_version not considered for gossip peers (ie, spy nodes do not set shred_version) // shred_version not considered for gossip peers (ie, spy nodes do not set shred_version)
.filter(|x| x.id != me && ContactInfo::is_valid_address(&x.gossip)) .filter(|x| x.id != me && ContactInfo::is_valid_address(&x.gossip))
@ -1192,7 +1163,6 @@ impl ClusterInfo {
pub fn all_tvu_peers(&self) -> Vec<ContactInfo> { pub fn all_tvu_peers(&self) -> Vec<ContactInfo> {
let self_pubkey = self.id(); let self_pubkey = self.id();
self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers) self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers)
.crds
.get_nodes_contact_info() .get_nodes_contact_info()
.filter(|x| ContactInfo::is_valid_address(&x.tvu) && x.id != self_pubkey) .filter(|x| ContactInfo::is_valid_address(&x.tvu) && x.id != self_pubkey)
.cloned() .cloned()
@ -1204,7 +1174,6 @@ impl ClusterInfo {
let self_pubkey = self.id(); let self_pubkey = self.id();
let self_shred_version = self.my_shred_version(); let self_shred_version = self.my_shred_version();
self.time_gossip_read_lock("tvu_peers", &self.stats.tvu_peers) self.time_gossip_read_lock("tvu_peers", &self.stats.tvu_peers)
.crds
.get_nodes_contact_info() .get_nodes_contact_info()
.filter(|node| { .filter(|node| {
node.id != self_pubkey node.id != self_pubkey
@ -1223,12 +1192,12 @@ impl ClusterInfo {
// node.shred_verion == self.my_shred_version() // node.shred_verion == self.my_shred_version()
let nodes = self.tvu_peers(); let nodes = self.tvu_peers();
let nodes = { let nodes = {
let gossip = self.gossip.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
nodes nodes
.into_iter() .into_iter()
.filter(|node| { .filter(|node| {
ContactInfo::is_valid_address(&node.serve_repair) ContactInfo::is_valid_address(&node.serve_repair)
&& match gossip.crds.get_lowest_slot(node.id) { && match gossip_crds.get_lowest_slot(node.id) {
None => true, // fallback to legacy behavior None => true, // fallback to legacy behavior
Some(lowest_slot) => lowest_slot.lowest <= slot, Some(lowest_slot) => lowest_slot.lowest <= slot,
} }
@ -1248,10 +1217,8 @@ impl ClusterInfo {
/// compute broadcast table /// compute broadcast table
pub fn tpu_peers(&self) -> Vec<ContactInfo> { pub fn tpu_peers(&self) -> Vec<ContactInfo> {
let self_pubkey = self.id(); let self_pubkey = self.id();
self.gossip let gossip_crds = self.gossip.crds.read().unwrap();
.read() gossip_crds
.unwrap()
.crds
.get_nodes_contact_info() .get_nodes_contact_info()
.filter(|x| x.id != self_pubkey && ContactInfo::is_valid_address(&x.tpu)) .filter(|x| x.id != self_pubkey && ContactInfo::is_valid_address(&x.tpu))
.cloned() .cloned()
@ -1301,7 +1268,7 @@ impl ClusterInfo {
CrdsData::ContactInfo(self.my_contact_info()), CrdsData::ContactInfo(self.my_contact_info()),
&self.keypair(), &self.keypair(),
); );
let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); let _ = self.gossip.crds.write().unwrap().insert(value, timestamp());
} }
// If the network entrypoint hasn't been discovered yet, add it to the crds table // If the network entrypoint hasn't been discovered yet, add it to the crds table
@ -1325,7 +1292,6 @@ impl ClusterInfo {
entrypoint.wallclock = now; entrypoint.wallclock = now;
if self if self
.time_gossip_read_lock("entrypoint", &self.stats.entrypoint) .time_gossip_read_lock("entrypoint", &self.stats.entrypoint)
.crds
.get_nodes_contact_info() .get_nodes_contact_info()
.any(|node| node.gossip == entrypoint.gossip) .any(|node| node.gossip == entrypoint.gossip)
{ {
@ -1337,10 +1303,10 @@ impl ClusterInfo {
let filters = match pulls.first() { let filters = match pulls.first() {
Some((_, filters)) => filters.clone(), Some((_, filters)) => filters.clone(),
None => { None => {
let gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2); let _st = ScopedTimer::from(&self.stats.entrypoint2);
gossip self.gossip
.pull .pull
.build_crds_filters(thread_pool, &gossip.crds, MAX_BLOOM_SIZE) .build_crds_filters(thread_pool, &self.gossip.crds, MAX_BLOOM_SIZE)
} }
}; };
self.stats.pull_from_entrypoint_count.add_relaxed(1); self.stats.pull_from_entrypoint_count.add_relaxed(1);
@ -1410,8 +1376,8 @@ impl ClusterInfo {
let now = timestamp(); let now = timestamp();
let mut pings = Vec::new(); let mut pings = Vec::new();
let mut pulls: Vec<_> = { let mut pulls: Vec<_> = {
let gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); let _st = ScopedTimer::from(&self.stats.new_pull_requests);
match gossip.new_pull_request( match self.gossip.new_pull_request(
thread_pool, thread_pool,
self.keypair().deref(), self.keypair().deref(),
self.my_shred_version(), self.my_shred_version(),
@ -1430,9 +1396,9 @@ impl ClusterInfo {
let num_requests = pulls.iter().map(|(_, filters)| filters.len() as u64).sum(); let num_requests = pulls.iter().map(|(_, filters)| filters.len() as u64).sum();
self.stats.new_pull_requests_count.add_relaxed(num_requests); self.stats.new_pull_requests_count.add_relaxed(num_requests);
{ {
let gossip = self.time_gossip_read_lock("mark_pull", &self.stats.mark_pull_request); let _st = ScopedTimer::from(&self.stats.mark_pull_request);
for (peer, _) in &pulls { for (peer, _) in &pulls {
gossip.mark_pull_request_creation_time(peer.id, now); self.gossip.mark_pull_request_creation_time(peer.id, now);
} }
} }
let self_info = CrdsData::ContactInfo(self.my_contact_info()); let self_info = CrdsData::ContactInfo(self.my_contact_info());
@ -1457,10 +1423,10 @@ impl ClusterInfo {
// Used in tests // Used in tests
pub fn flush_push_queue(&self) { pub fn flush_push_queue(&self) {
let pending_push_messages = self.drain_push_queue(); let pending_push_messages = self.drain_push_queue();
let mut gossip = self.gossip.write().unwrap(); let mut gossip_crds = self.gossip.crds.write().unwrap();
let now = timestamp(); let now = timestamp();
for entry in pending_push_messages { for entry in pending_push_messages {
let _ = gossip.crds.insert(entry, now); let _ = gossip_crds.insert(entry, now);
} }
} }
fn new_push_requests( fn new_push_requests(
@ -1469,9 +1435,11 @@ impl ClusterInfo {
require_stake_for_gossip: bool, require_stake_for_gossip: bool,
) -> Vec<(SocketAddr, Protocol)> { ) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id(); let self_id = self.id();
let mut push_messages = self let mut push_messages = {
.time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests) let _st = ScopedTimer::from(&self.stats.new_push_requests);
.new_push_messages(self.drain_push_queue(), timestamp()); self.gossip
.new_push_messages(self.drain_push_queue(), timestamp())
};
if require_stake_for_gossip { if require_stake_for_gossip {
push_messages.retain(|_, data| { push_messages.retain(|_, data| {
retain_staked(data, stakes); retain_staked(data, stakes);
@ -1479,12 +1447,12 @@ impl ClusterInfo {
}) })
} }
let push_messages: Vec<_> = { let push_messages: Vec<_> = {
let gossip = let gossip_crds =
self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2); self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2);
push_messages push_messages
.into_iter() .into_iter()
.filter_map(|(pubkey, messages)| { .filter_map(|(pubkey, messages)| {
let peer = gossip.crds.get_contact_info(pubkey)?; let peer = gossip_crds.get_contact_info(pubkey)?;
Some((peer.gossip, messages)) Some((peer.gossip, messages))
}) })
.collect() .collect()
@ -1607,20 +1575,21 @@ impl ClusterInfo {
) { ) {
let self_pubkey = self.id(); let self_pubkey = self.id();
let epoch_duration = get_epoch_duration(bank_forks); let epoch_duration = get_epoch_duration(bank_forks);
let timeouts = { let timeouts = self
let gossip = self.gossip.read().unwrap(); .gossip
gossip.make_timeouts(self_pubkey, stakes, epoch_duration) .make_timeouts(self_pubkey, stakes, epoch_duration);
let num_purged = {
let _st = ScopedTimer::from(&self.stats.purge);
self.gossip
.purge(&self_pubkey, thread_pool, timestamp(), &timeouts)
}; };
let num_purged = self
.time_gossip_write_lock("purge", &self.stats.purge)
.purge(&self_pubkey, thread_pool, timestamp(), &timeouts);
inc_new_counter_info!("cluster_info-purge-count", num_purged); inc_new_counter_info!("cluster_info-purge-count", num_purged);
} }
// Trims the CRDS table by dropping all values associated with the pubkeys // Trims the CRDS table by dropping all values associated with the pubkeys
// with the lowest stake, so that the number of unique pubkeys are bounded. // with the lowest stake, so that the number of unique pubkeys are bounded.
fn trim_crds_table(&self, cap: usize, stakes: &HashMap<Pubkey, u64>) { fn trim_crds_table(&self, cap: usize, stakes: &HashMap<Pubkey, u64>) {
if !self.gossip.read().unwrap().crds.should_trim(cap) { if !self.gossip.crds.read().unwrap().should_trim(cap) {
return; return;
} }
let keep: Vec<_> = self let keep: Vec<_> = self
@ -1632,8 +1601,8 @@ impl ClusterInfo {
.chain(std::iter::once(self.id())) .chain(std::iter::once(self.id()))
.collect(); .collect();
self.stats.trim_crds_table.add_relaxed(1); self.stats.trim_crds_table.add_relaxed(1);
let mut gossip = self.gossip.write().unwrap(); let mut gossip_crds = self.gossip.crds.write().unwrap();
match gossip.crds.trim(cap, &keep, stakes, timestamp()) { match gossip_crds.trim(cap, &keep, stakes, timestamp()) {
Err(err) => { Err(err) => {
self.stats.trim_crds_table_failed.add_relaxed(1); self.stats.trim_crds_table_failed.add_relaxed(1);
// TODO: Stakes are comming from the root-bank. Debug why/when // TODO: Stakes are comming from the root-bank. Debug why/when
@ -1763,10 +1732,10 @@ impl ClusterInfo {
let mut bad_prune_destination = 0; let mut bad_prune_destination = 0;
let self_pubkey = self.id(); let self_pubkey = self.id();
{ {
let gossip = self.time_gossip_read_lock("process_prune", &self.stats.process_prune); let _st = ScopedTimer::from(&self.stats.process_prune);
let now = timestamp(); let now = timestamp();
for (from, data) in messages { for (from, data) in messages {
match gossip.process_prune_msg( match self.gossip.process_prune_msg(
&self_pubkey, &self_pubkey,
&from, &from,
&data.destination, &data.destination,
@ -1910,8 +1879,11 @@ impl ClusterInfo {
const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT; const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT;
let mut time = Measure::start("handle_pull_requests"); let mut time = Measure::start("handle_pull_requests");
let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller));
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) {
.process_pull_requests(callers.cloned(), timestamp()); let _st = ScopedTimer::from(&self.stats.process_pull_requests);
self.gossip
.process_pull_requests(callers.cloned(), timestamp());
}
let output_size_limit = let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packets = let mut packets =
@ -1928,13 +1900,11 @@ impl ClusterInfo {
}; };
let now = timestamp(); let now = timestamp();
let self_id = self.id(); let self_id = self.id();
let mut pull_responses = {
let mut pull_responses = self let _st = ScopedTimer::from(&self.stats.generate_pull_responses);
.time_gossip_read_lock( self.gossip
"generate_pull_responses", .generate_pull_responses(&caller_and_filters, output_size_limit, now)
&self.stats.generate_pull_responses, };
)
.generate_pull_responses(&caller_and_filters, output_size_limit, now);
if require_stake_for_gossip { if require_stake_for_gossip {
for resp in &mut pull_responses { for resp in &mut pull_responses {
retain_staked(resp, stakes); retain_staked(resp, stakes);
@ -2055,10 +2025,9 @@ impl ClusterInfo {
}); });
if !responses.is_empty() { if !responses.is_empty() {
let self_pubkey = self.id(); let self_pubkey = self.id();
let timeouts = { let timeouts = self
let gossip = self.gossip.read().unwrap(); .gossip
gossip.make_timeouts(self_pubkey, stakes, epoch_duration) .make_timeouts(self_pubkey, stakes, epoch_duration);
};
for (from, data) in responses { for (from, data) in responses {
self.handle_pull_response(&from, data, &timeouts); self.handle_pull_response(&from, data, &timeouts);
} }
@ -2075,23 +2044,24 @@ impl ClusterInfo {
let len = crds_values.len(); let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id(), from, len); trace!("PullResponse me: {} from: {} len={}", self.id(), from, len);
let mut pull_stats = ProcessPullStats::default(); let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = self let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = {
.time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response) let _st = ScopedTimer::from(&self.stats.filter_pull_response);
.filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats); self.gossip
.filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats)
};
if !filtered_pulls.is_empty() if !filtered_pulls.is_empty()
|| !filtered_pulls_expired_timeout.is_empty() || !filtered_pulls_expired_timeout.is_empty()
|| !failed_inserts.is_empty() || !failed_inserts.is_empty()
{ {
self.time_gossip_write_lock("process_pull_resp", &self.stats.process_pull_response) let _st = ScopedTimer::from(&self.stats.process_pull_response);
.process_pull_responses( self.gossip.process_pull_responses(
from, from,
filtered_pulls, filtered_pulls,
filtered_pulls_expired_timeout, filtered_pulls_expired_timeout,
failed_inserts, failed_inserts,
timestamp(), timestamp(),
&mut pull_stats, &mut pull_stats,
); );
} }
self.stats.process_pull_response_count.add_relaxed(1); self.stats.process_pull_response_count.add_relaxed(1);
self.stats.process_pull_response_len.add_relaxed(len as u64); self.stats.process_pull_response_len.add_relaxed(len as u64);
@ -2194,21 +2164,22 @@ impl ClusterInfo {
.add_relaxed(num_crds_values); .add_relaxed(num_crds_values);
// Origins' pubkeys of upserted crds values. // Origins' pubkeys of upserted crds values.
let origins: HashSet<_> = { let origins: HashSet<_> = {
let mut gossip = let _st = ScopedTimer::from(&self.stats.process_push_message);
self.time_gossip_write_lock("process_push", &self.stats.process_push_message);
let now = timestamp(); let now = timestamp();
messages messages
.into_iter() .into_iter()
.flat_map(|(from, crds_values)| { .flat_map(|(from, crds_values)| {
gossip.process_push_message(&from, crds_values, now) self.gossip.process_push_message(&from, crds_values, now)
}) })
.collect() .collect()
}; };
// Generate prune messages. // Generate prune messages.
let self_pubkey = self.id(); let self_pubkey = self.id();
let prunes = self let prunes = {
.time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) let _st = ScopedTimer::from(&self.stats.prune_received_cache);
.prune_received_cache(&self_pubkey, origins, stakes); self.gossip
.prune_received_cache(&self_pubkey, origins, stakes)
};
let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes
.into_iter() .into_iter()
.flat_map(|(from, prunes)| { .flat_map(|(from, prunes)| {
@ -2224,7 +2195,7 @@ impl ClusterInfo {
.collect(); .collect();
let prune_messages: Vec<_> = { let prune_messages: Vec<_> = {
let gossip = self.gossip.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
let wallclock = timestamp(); let wallclock = timestamp();
let self_pubkey = self.id(); let self_pubkey = self.id();
thread_pool.install(|| { thread_pool.install(|| {
@ -2232,7 +2203,7 @@ impl ClusterInfo {
.into_par_iter() .into_par_iter()
.with_min_len(256) .with_min_len(256)
.filter_map(|(from, prunes)| { .filter_map(|(from, prunes)| {
let peer = gossip.crds.get_contact_info(from)?; let peer = gossip_crds.get_contact_info(from)?;
let mut prune_data = PruneData { let mut prune_data = PruneData {
pubkey: self_pubkey, pubkey: self_pubkey,
prunes, prunes,
@ -2320,7 +2291,7 @@ impl ClusterInfo {
let packets = if self_shred_version == 0 { let packets = if self_shred_version == 0 {
packets packets
} else { } else {
let gossip = self.gossip.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
thread_pool.install(|| { thread_pool.install(|| {
packets packets
.into_par_iter() .into_par_iter()
@ -2329,7 +2300,7 @@ impl ClusterInfo {
let msg = filter_on_shred_version( let msg = filter_on_shred_version(
msg, msg,
self_shred_version, self_shred_version,
&gossip.crds, &gossip_crds,
&self.stats, &self.stats,
)?; )?;
Some((from, msg)) Some((from, msg))
@ -2574,7 +2545,7 @@ impl ClusterInfo {
match err { match err {
GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
GossipError::RecvTimeoutError(RecvTimeoutError::Timeout) => { GossipError::RecvTimeoutError(RecvTimeoutError::Timeout) => {
let table_size = self.gossip.read().unwrap().crds.len(); let table_size = self.gossip.crds.read().unwrap().len();
debug!( debug!(
"{}: run_listen timeout, table size: {}", "{}: run_listen timeout, table size: {}",
self.id(), self.id(),
@ -3293,15 +3264,12 @@ mod tests {
let (spy, _, _) = ClusterInfo::spy_node(solana_sdk::pubkey::new_rand(), 0); let (spy, _, _) = ClusterInfo::spy_node(solana_sdk::pubkey::new_rand(), 0);
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info)); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
cluster_info.insert_info(spy); cluster_info.insert_info(spy);
{ cluster_info.gossip.refresh_push_active_set(
let gossip = cluster_info.gossip.read().unwrap(); &cluster_info.id(),
gossip.refresh_push_active_set( cluster_info.my_shred_version(),
&cluster_info.id(), &HashMap::new(), // stakes
cluster_info.my_shred_version(), None, // gossip validators
&HashMap::new(), // stakes );
None, // gossip validators
);
}
let reqs = cluster_info.generate_new_gossip_requests( let reqs = cluster_info.generate_new_gossip_requests(
&thread_pool, &thread_pool,
None, // gossip_validators None, // gossip_validators
@ -3331,8 +3299,8 @@ mod tests {
let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let label = CrdsValueLabel::ContactInfo(d.id); let label = CrdsValueLabel::ContactInfo(d.id);
cluster_info.insert_info(d); cluster_info.insert_info(d);
let gossip = cluster_info.gossip.read().unwrap(); let gossip_crds = cluster_info.gossip.crds.read().unwrap();
assert!(gossip.crds.get(&label).is_some()); assert!(gossip_crds.get(&label).is_some());
} }
fn assert_in_range(x: u16, range: (u16, u16)) { fn assert_in_range(x: u16, range: (u16, u16)) {
@ -3413,20 +3381,15 @@ mod tests {
.unwrap() .unwrap()
.mock_pong(peer.id, peer.gossip, Instant::now()); .mock_pong(peer.id, peer.gossip, Instant::now());
cluster_info.insert_info(peer); cluster_info.insert_info(peer);
{ cluster_info.gossip.refresh_push_active_set(
let gossip = cluster_info.gossip.read().unwrap(); &cluster_info.id(),
gossip.refresh_push_active_set( cluster_info.my_shred_version(),
&cluster_info.id(), &HashMap::new(), // stakes
cluster_info.my_shred_version(), None, // gossip validators
&HashMap::new(), // stakes );
None, // gossip validators
);
}
//check that all types of gossip messages are signed correctly //check that all types of gossip messages are signed correctly
let push_messages = cluster_info let push_messages = cluster_info
.gossip .gossip
.write()
.unwrap()
.new_push_messages(cluster_info.drain_push_queue(), timestamp()); .new_push_messages(cluster_info.drain_push_queue(), timestamp());
// there should be some pushes ready // there should be some pushes ready
assert!(!push_messages.is_empty()); assert!(!push_messages.is_empty());
@ -3437,8 +3400,6 @@ mod tests {
let mut pings = Vec::new(); let mut pings = Vec::new();
cluster_info cluster_info
.gossip .gossip
.write()
.unwrap()
.new_pull_request( .new_pull_request(
&thread_pool, &thread_pool,
cluster_info.keypair().deref(), cluster_info.keypair().deref(),
@ -3601,10 +3562,10 @@ mod tests {
fn test_push_votes_with_tower() { fn test_push_votes_with_tower() {
let get_vote_slots = |cluster_info: &ClusterInfo| -> Vec<Slot> { let get_vote_slots = |cluster_info: &ClusterInfo| -> Vec<Slot> {
let (labels, _) = cluster_info.get_votes(&mut Cursor::default()); let (labels, _) = cluster_info.get_votes(&mut Cursor::default());
let gossip = cluster_info.gossip.read().unwrap(); let gossip_crds = cluster_info.gossip.crds.read().unwrap();
let mut vote_slots = HashSet::new(); let mut vote_slots = HashSet::new();
for label in labels { for label in labels {
match &gossip.crds.get(&label).unwrap().value.data { match &gossip_crds.get(&label).unwrap().value.data {
CrdsData::Vote(_, vote) => { CrdsData::Vote(_, vote) => {
assert!(vote_slots.insert(vote.slot().unwrap())); assert!(vote_slots.insert(vote.slot().unwrap()));
} }
@ -3685,9 +3646,9 @@ mod tests {
CrdsValue::new_unsigned(CrdsData::EpochSlots(0, epoch_slots)), CrdsValue::new_unsigned(CrdsData::EpochSlots(0, epoch_slots)),
]; ];
{ {
let mut gossip = cluster_info.gossip.write().unwrap(); let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
for entry in entries { for entry in entries {
assert!(gossip.crds.insert(entry, /*now=*/ 0).is_ok()); assert!(gossip_crds.insert(entry, /*now=*/ 0).is_ok());
} }
} }
// Should exclude other node's epoch-slot because of different // Should exclude other node's epoch-slot because of different
@ -3740,14 +3701,11 @@ mod tests {
let entrypoint_crdsvalue = let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone())); CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone()));
let cluster_info = Arc::new(cluster_info); let cluster_info = Arc::new(cluster_info);
let timeouts = { let timeouts = cluster_info.gossip.make_timeouts(
let gossip = cluster_info.gossip.read().unwrap(); cluster_info.id(),
gossip.make_timeouts( &HashMap::default(), // stakes,
cluster_info.id(), Duration::from_millis(cluster_info.gossip.pull.crds_timeout),
&HashMap::default(), // stakes, );
Duration::from_millis(gossip.pull.crds_timeout),
)
};
ClusterInfo::handle_pull_response( ClusterInfo::handle_pull_response(
&cluster_info, &cluster_info,
&entrypoint_pubkey, &entrypoint_pubkey,
@ -3979,12 +3937,8 @@ mod tests {
0, 0,
LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()), LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()),
)); ));
let _ = cluster_info let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
.gossip let _ = gossip_crds.insert(value, timestamp());
.write()
.unwrap()
.crds
.insert(value, timestamp());
} }
// only half the visible peers should be eligible to serve this repair // only half the visible peers should be eligible to serve this repair
assert_eq!(cluster_info.repair_peers(5).len(), 5); assert_eq!(cluster_info.repair_peers(5).len(), 5);
@ -4375,13 +4329,10 @@ mod tests {
for peer in peers { for peer in peers {
cluster_info cluster_info
.gossip .gossip
.write()
.unwrap()
.mark_pull_request_creation_time(peer, now); .mark_pull_request_creation_time(peer, now);
} }
let gossip = cluster_info.gossip.read().unwrap();
assert_eq!( assert_eq!(
gossip.pull.pull_request_time().len(), cluster_info.gossip.pull.pull_request_time().len(),
CRDS_UNIQUE_PUBKEY_CAPACITY CRDS_UNIQUE_PUBKEY_CAPACITY
); );
} }

View File

@ -5,10 +5,7 @@ use {
std::{ std::{
collections::HashMap, collections::HashMap,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::{ sync::atomic::{AtomicU64, Ordering},
atomic::{AtomicU64, Ordering},
RwLock,
},
time::Instant, time::Instant,
}, },
}; };
@ -160,15 +157,15 @@ pub(crate) struct GossipStats {
pub(crate) fn submit_gossip_stats( pub(crate) fn submit_gossip_stats(
stats: &GossipStats, stats: &GossipStats,
gossip: &RwLock<CrdsGossip>, gossip: &CrdsGossip,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) { ) {
let (table_size, num_nodes, purged_values_size, failed_inserts_size) = { let (table_size, num_nodes, purged_values_size, failed_inserts_size) = {
let gossip = gossip.read().unwrap(); let gossip_crds = gossip.crds.read().unwrap();
( (
gossip.crds.len(), gossip_crds.len(),
gossip.crds.num_nodes(), gossip_crds.num_nodes(),
gossip.crds.num_purged(), gossip_crds.num_purged(),
gossip.pull.failed_inserts_size(), gossip.pull.failed_inserts_size(),
) )
}; };

View File

@ -26,14 +26,14 @@ use {
std::{ std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
net::SocketAddr, net::SocketAddr,
sync::Mutex, sync::{Mutex, RwLock},
time::Duration, time::Duration,
}, },
}; };
#[derive(Default)] #[derive(Default)]
pub struct CrdsGossip { pub struct CrdsGossip {
pub crds: Crds, pub crds: RwLock<Crds>,
pub push: CrdsGossipPush, pub push: CrdsGossipPush,
pub pull: CrdsGossipPull, pub pull: CrdsGossipPull,
} }
@ -42,13 +42,13 @@ impl CrdsGossip {
/// process a push message to the network /// process a push message to the network
/// Returns unique origins' pubkeys of upserted values. /// Returns unique origins' pubkeys of upserted values.
pub fn process_push_message( pub fn process_push_message(
&mut self, &self,
from: &Pubkey, from: &Pubkey,
values: Vec<CrdsValue>, values: Vec<CrdsValue>,
now: u64, now: u64,
) -> HashSet<Pubkey> { ) -> HashSet<Pubkey> {
self.push self.push
.process_push_message(&mut self.crds, from, values, now) .process_push_message(&self.crds, from, values, now)
.into_iter() .into_iter()
.filter_map(Result::ok) .filter_map(Result::ok)
.collect() .collect()
@ -69,18 +69,21 @@ impl CrdsGossip {
} }
pub fn new_push_messages( pub fn new_push_messages(
&mut self, &self,
pending_push_messages: Vec<CrdsValue>, pending_push_messages: Vec<CrdsValue>,
now: u64, now: u64,
) -> HashMap<Pubkey, Vec<CrdsValue>> { ) -> HashMap<Pubkey, Vec<CrdsValue>> {
for entry in pending_push_messages { {
let _ = self.crds.insert(entry, now); let mut crds = self.crds.write().unwrap();
for entry in pending_push_messages {
let _ = crds.insert(entry, now);
}
} }
self.push.new_push_messages(&self.crds, now) self.push.new_push_messages(&self.crds, now)
} }
pub(crate) fn push_duplicate_shred( pub(crate) fn push_duplicate_shred(
&mut self, &self,
keypair: &Keypair, keypair: &Keypair,
shred: &Shred, shred: &Shred,
other_payload: &[u8], other_payload: &[u8],
@ -91,8 +94,8 @@ impl CrdsGossip {
let pubkey = keypair.pubkey(); let pubkey = keypair.pubkey();
// Skip if there are already records of duplicate shreds for this slot. // Skip if there are already records of duplicate shreds for this slot.
let shred_slot = shred.slot(); let shred_slot = shred.slot();
if self let mut crds = self.crds.write().unwrap();
.crds if crds
.get_records(&pubkey) .get_records(&pubkey)
.any(|value| match &value.value.data { .any(|value| match &value.value.data {
CrdsData::DuplicateShred(_, value) => value.slot == shred_slot, CrdsData::DuplicateShred(_, value) => value.slot == shred_slot,
@ -111,8 +114,7 @@ impl CrdsGossip {
)?; )?;
// Find the index of oldest duplicate shred. // Find the index of oldest duplicate shred.
let mut num_dup_shreds = 0; let mut num_dup_shreds = 0;
let offset = self let offset = crds
.crds
.get_records(&pubkey) .get_records(&pubkey)
.filter_map(|value| match &value.value.data { .filter_map(|value| match &value.value.data {
CrdsData::DuplicateShred(ix, value) => { CrdsData::DuplicateShred(ix, value) => {
@ -136,7 +138,7 @@ impl CrdsGossip {
}); });
let now = timestamp(); let now = timestamp();
for entry in entries { for entry in entries {
if let Err(err) = self.crds.insert(entry, now) { if let Err(err) = crds.insert(entry, now) {
error!("push_duplicate_shred faild: {:?}", err); error!("push_duplicate_shred faild: {:?}", err);
} }
} }
@ -174,13 +176,14 @@ impl CrdsGossip {
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>, gossip_validators: Option<&HashSet<Pubkey>>,
) { ) {
let network_size = self.crds.read().unwrap().num_nodes();
self.push.refresh_push_active_set( self.push.refresh_push_active_set(
&self.crds, &self.crds,
stakes, stakes,
gossip_validators, gossip_validators,
self_pubkey, self_pubkey,
self_shred_version, self_shred_version,
self.crds.num_nodes(), network_size,
CRDS_GOSSIP_NUM_ACTIVE, CRDS_GOSSIP_NUM_ACTIVE,
) )
} }
@ -221,11 +224,11 @@ impl CrdsGossip {
self.pull.mark_pull_request_creation_time(from, now) self.pull.mark_pull_request_creation_time(from, now)
} }
/// process a pull request and create a response /// process a pull request and create a response
pub fn process_pull_requests<I>(&mut self, callers: I, now: u64) pub fn process_pull_requests<I>(&self, callers: I, now: u64)
where where
I: IntoIterator<Item = CrdsValue>, I: IntoIterator<Item = CrdsValue>,
{ {
CrdsGossipPull::process_pull_requests(&mut self.crds, callers, now); CrdsGossipPull::process_pull_requests(&self.crds, callers, now);
} }
pub fn generate_pull_responses( pub fn generate_pull_responses(
@ -254,7 +257,7 @@ impl CrdsGossip {
/// process a pull response /// process a pull response
pub fn process_pull_responses( pub fn process_pull_responses(
&mut self, &self,
from: &Pubkey, from: &Pubkey,
responses: Vec<CrdsValue>, responses: Vec<CrdsValue>,
responses_expired_timeout: Vec<CrdsValue>, responses_expired_timeout: Vec<CrdsValue>,
@ -263,7 +266,7 @@ impl CrdsGossip {
process_pull_stats: &mut ProcessPullStats, process_pull_stats: &mut ProcessPullStats,
) { ) {
self.pull.process_pull_responses( self.pull.process_pull_responses(
&mut self.crds, &self.crds,
from, from,
responses, responses,
responses_expired_timeout, responses_expired_timeout,
@ -283,7 +286,7 @@ impl CrdsGossip {
} }
pub fn purge( pub fn purge(
&mut self, &self,
self_pubkey: &Pubkey, self_pubkey: &Pubkey,
thread_pool: &ThreadPool, thread_pool: &ThreadPool,
now: u64, now: u64,
@ -298,9 +301,11 @@ impl CrdsGossip {
//sanity check //sanity check
assert_eq!(timeouts[self_pubkey], std::u64::MAX); assert_eq!(timeouts[self_pubkey], std::u64::MAX);
assert!(timeouts.contains_key(&Pubkey::default())); assert!(timeouts.contains_key(&Pubkey::default()));
rv = CrdsGossipPull::purge_active(thread_pool, &mut self.crds, now, timeouts); rv = CrdsGossipPull::purge_active(thread_pool, &self.crds, now, timeouts);
} }
self.crds self.crds
.write()
.unwrap()
.trim_purged(now.saturating_sub(5 * self.pull.crds_timeout)); .trim_purged(now.saturating_sub(5 * self.pull.crds_timeout));
self.pull.purge_failed_inserts(now); self.pull.purge_failed_inserts(now);
rv rv
@ -308,8 +313,9 @@ impl CrdsGossip {
// Only for tests and simulations. // Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self { pub(crate) fn mock_clone(&self) -> Self {
let crds = self.crds.read().unwrap().clone();
Self { Self {
crds: self.crds.clone(), crds: RwLock::new(crds),
push: self.push.mock_clone(), push: self.push.mock_clone(),
pull: self.pull.mock_clone(), pull: self.pull.mock_clone(),
} }
@ -343,12 +349,14 @@ mod test {
#[test] #[test]
fn test_prune_errors() { fn test_prune_errors() {
let mut crds_gossip = CrdsGossip::default(); let crds_gossip = CrdsGossip::default();
let id = Pubkey::new(&[0; 32]); let id = Pubkey::new(&[0; 32]);
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0); let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
let prune_pubkey = Pubkey::new(&[2; 32]); let prune_pubkey = Pubkey::new(&[2; 32]);
crds_gossip crds_gossip
.crds .crds
.write()
.unwrap()
.insert( .insert(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())), CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
0, 0,

View File

@ -215,7 +215,7 @@ impl CrdsGossipPull {
pub(crate) fn new_pull_request( pub(crate) fn new_pull_request(
&self, &self,
thread_pool: &ThreadPool, thread_pool: &ThreadPool,
crds: &Crds, crds: &RwLock<Crds>,
self_keypair: &Keypair, self_keypair: &Keypair,
self_shred_version: u16, self_shred_version: u16,
now: u64, now: u64,
@ -225,8 +225,8 @@ impl CrdsGossipPull {
ping_cache: &Mutex<PingCache>, ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>, pings: &mut Vec<(SocketAddr, Ping)>,
) -> Result<(ContactInfo, Vec<CrdsFilter>), CrdsGossipError> { ) -> Result<(ContactInfo, Vec<CrdsFilter>), CrdsGossipError> {
let (weights, peers): (Vec<_>, Vec<_>) = self let (weights, peers): (Vec<_>, Vec<_>) = {
.pull_options( self.pull_options(
crds, crds,
&self_keypair.pubkey(), &self_keypair.pubkey(),
self_shred_version, self_shred_version,
@ -235,7 +235,9 @@ impl CrdsGossipPull {
stakes, stakes,
) )
.into_iter() .into_iter()
.unzip(); .map(|(weight, node, gossip_addr)| (weight, (node, gossip_addr)))
.unzip()
};
if peers.is_empty() { if peers.is_empty() {
return Err(CrdsGossipError::NoPeers); return Err(CrdsGossipError::NoPeers);
} }
@ -248,36 +250,45 @@ impl CrdsGossipPull {
let mut ping_cache = ping_cache.lock().unwrap(); let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok(); let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now(); let now = Instant::now();
peers.find(|peer| { peers.find(|node| {
let node = (peer.id, peer.gossip); let (_, gossip_addr) = *node;
let (check, ping) = ping_cache.check(now, node, &mut pingf); let (check, ping) = ping_cache.check(now, *node, &mut pingf);
if let Some(ping) = ping { if let Some(ping) = ping {
pings.push((peer.gossip, ping)); pings.push((gossip_addr, ping));
} }
check check
}) })
}; };
match peer { let peer = match peer {
None => Err(CrdsGossipError::NoPeers), None => return Err(CrdsGossipError::NoPeers),
Some(peer) => { Some((node, _gossip_addr)) => node,
let filters = self.build_crds_filters(thread_pool, crds, bloom_size); };
Ok((peer.clone(), filters)) let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
} let peer = match crds.read().unwrap().get_contact_info(peer) {
} None => return Err(CrdsGossipError::NoPeers),
Some(node) => node.clone(),
};
Ok((peer, filters))
} }
fn pull_options<'a>( fn pull_options(
&self, &self,
crds: &'a Crds, crds: &RwLock<Crds>,
self_id: &Pubkey, self_id: &Pubkey,
self_shred_version: u16, self_shred_version: u16,
now: u64, now: u64,
gossip_validators: Option<&HashSet<Pubkey>>, gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) -> Vec<(u64, &'a ContactInfo)> { ) -> Vec<(
u64, // weight
Pubkey, // node
SocketAddr, // gossip address
)> {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS); let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS);
let pull_request_time = self.pull_request_time.read().unwrap(); let pull_request_time = self.pull_request_time.read().unwrap();
// crds should be locked last after self.pull_request_time.
let crds = crds.read().unwrap();
crds.get_nodes() crds.get_nodes()
.filter_map(|value| { .filter_map(|value| {
let info = value.value.contact_info().unwrap(); let info = value.value.contact_info().unwrap();
@ -310,7 +321,7 @@ impl CrdsGossipPull {
let weight = get_weight(max_weight, since, stake); let weight = get_weight(max_weight, since, stake);
// Weights are bounded by max_weight defined above. // Weights are bounded by max_weight defined above.
// So this type-cast should be safe. // So this type-cast should be safe.
((weight * 100.0) as u64, item) ((weight * 100.0) as u64, item.id, item.gossip)
}) })
.collect() .collect()
} }
@ -324,10 +335,11 @@ impl CrdsGossipPull {
} }
/// process a pull request /// process a pull request
pub(crate) fn process_pull_requests<I>(crds: &mut Crds, callers: I, now: u64) pub(crate) fn process_pull_requests<I>(crds: &RwLock<Crds>, callers: I, now: u64)
where where
I: IntoIterator<Item = CrdsValue>, I: IntoIterator<Item = CrdsValue>,
{ {
let mut crds = crds.write().unwrap();
for caller in callers { for caller in callers {
let key = caller.pubkey(); let key = caller.pubkey();
let _ = crds.insert(caller, now); let _ = crds.insert(caller, now);
@ -337,7 +349,7 @@ impl CrdsGossipPull {
/// Create gossip responses to pull requests /// Create gossip responses to pull requests
pub(crate) fn generate_pull_responses( pub(crate) fn generate_pull_responses(
crds: &Crds, crds: &RwLock<Crds>,
requests: &[(CrdsValue, CrdsFilter)], requests: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned. output_size_limit: usize, // Limit number of crds values returned.
now: u64, now: u64,
@ -353,7 +365,7 @@ impl CrdsGossipPull {
// .2 => hash value of outdated values which will fail to insert. // .2 => hash value of outdated values which will fail to insert.
pub(crate) fn filter_pull_responses( pub(crate) fn filter_pull_responses(
&self, &self,
crds: &Crds, crds: &RwLock<Crds>,
timeouts: &HashMap<Pubkey, u64>, timeouts: &HashMap<Pubkey, u64>,
responses: Vec<CrdsValue>, responses: Vec<CrdsValue>,
now: u64, now: u64,
@ -365,6 +377,7 @@ impl CrdsGossipPull {
.get(&Pubkey::default()) .get(&Pubkey::default())
.copied() .copied()
.unwrap_or(self.msg_timeout); .unwrap_or(self.msg_timeout);
let crds = crds.read().unwrap();
let upsert = |response: CrdsValue| { let upsert = |response: CrdsValue| {
let owner = response.label().pubkey(); let owner = response.label().pubkey();
// Check if the crds value is older than the msg_timeout // Check if the crds value is older than the msg_timeout
@ -399,7 +412,7 @@ impl CrdsGossipPull {
/// process a vec of pull responses /// process a vec of pull responses
pub(crate) fn process_pull_responses( pub(crate) fn process_pull_responses(
&self, &self,
crds: &mut Crds, crds: &RwLock<Crds>,
from: &Pubkey, from: &Pubkey,
responses: Vec<CrdsValue>, responses: Vec<CrdsValue>,
responses_expired_timeout: Vec<CrdsValue>, responses_expired_timeout: Vec<CrdsValue>,
@ -408,6 +421,7 @@ impl CrdsGossipPull {
stats: &mut ProcessPullStats, stats: &mut ProcessPullStats,
) { ) {
let mut owners = HashSet::new(); let mut owners = HashSet::new();
let mut crds = crds.write().unwrap();
for response in responses_expired_timeout { for response in responses_expired_timeout {
let _ = crds.insert(response, now); let _ = crds.insert(response, now);
} }
@ -425,6 +439,7 @@ impl CrdsGossipPull {
for owner in owners { for owner in owners {
crds.update_record_timestamp(&owner, now); crds.update_record_timestamp(&owner, now);
} }
drop(crds);
stats.failed_insert += failed_inserts.len(); stats.failed_insert += failed_inserts.len();
self.purge_failed_inserts(now); self.purge_failed_inserts(now);
let failed_inserts = failed_inserts.into_iter().zip(repeat(now)); let failed_inserts = failed_inserts.into_iter().zip(repeat(now));
@ -452,7 +467,7 @@ impl CrdsGossipPull {
pub fn build_crds_filters( pub fn build_crds_filters(
&self, &self,
thread_pool: &ThreadPool, thread_pool: &ThreadPool,
crds: &Crds, crds: &RwLock<Crds>,
bloom_size: usize, bloom_size: usize,
) -> Vec<CrdsFilter> { ) -> Vec<CrdsFilter> {
const PAR_MIN_LENGTH: usize = 512; const PAR_MIN_LENGTH: usize = 512;
@ -460,7 +475,10 @@ impl CrdsGossipPull {
const MIN_NUM_BLOOM_ITEMS: usize = 512; const MIN_NUM_BLOOM_ITEMS: usize = 512;
#[cfg(not(debug_assertions))] #[cfg(not(debug_assertions))]
const MIN_NUM_BLOOM_ITEMS: usize = 65_536; const MIN_NUM_BLOOM_ITEMS: usize = 65_536;
let num_items = crds.len() + crds.num_purged() + self.failed_inserts.read().unwrap().len(); let failed_inserts = self.failed_inserts.read().unwrap();
// crds should be locked last after self.failed_inserts.
let crds = crds.read().unwrap();
let num_items = crds.len() + crds.num_purged() + failed_inserts.len();
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items); let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
let filters = CrdsFilterSet::new(num_items, bloom_size); let filters = CrdsFilterSet::new(num_items, bloom_size);
thread_pool.install(|| { thread_pool.install(|| {
@ -469,21 +487,21 @@ impl CrdsGossipPull {
.map(|v| v.value_hash) .map(|v| v.value_hash)
.chain(crds.purged().with_min_len(PAR_MIN_LENGTH)) .chain(crds.purged().with_min_len(PAR_MIN_LENGTH))
.chain( .chain(
self.failed_inserts failed_inserts
.read()
.unwrap()
.par_iter() .par_iter()
.with_min_len(PAR_MIN_LENGTH) .with_min_len(PAR_MIN_LENGTH)
.map(|(v, _)| *v), .map(|(v, _)| *v),
) )
.for_each(|v| filters.add(v)); .for_each(|v| filters.add(v));
}); });
drop(crds);
drop(failed_inserts);
filters.into() filters.into()
} }
/// filter values that fail the bloom filter up to max_bytes /// filter values that fail the bloom filter up to max_bytes
fn filter_crds_values( fn filter_crds_values(
crds: &Crds, crds: &RwLock<Crds>,
filters: &[(CrdsValue, CrdsFilter)], filters: &[(CrdsValue, CrdsFilter)],
mut output_size_limit: usize, // Limit number of crds values returned. mut output_size_limit: usize, // Limit number of crds values returned.
now: u64, now: u64,
@ -495,6 +513,7 @@ impl CrdsGossipPull {
now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout); now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout);
let mut dropped_requests = 0; let mut dropped_requests = 0;
let mut total_skipped = 0; let mut total_skipped = 0;
let crds = crds.read().unwrap();
let ret: Vec<_> = filters let ret: Vec<_> = filters
.iter() .iter()
.map(|(caller, filter)| { .map(|(caller, filter)| {
@ -565,10 +584,11 @@ impl CrdsGossipPull {
/// Purge values from the crds that are older then `active_timeout` /// Purge values from the crds that are older then `active_timeout`
pub(crate) fn purge_active( pub(crate) fn purge_active(
thread_pool: &ThreadPool, thread_pool: &ThreadPool,
crds: &mut Crds, crds: &RwLock<Crds>,
now: u64, now: u64,
timeouts: &HashMap<Pubkey, u64>, timeouts: &HashMap<Pubkey, u64>,
) -> usize { ) -> usize {
let mut crds = crds.write().unwrap();
let labels = crds.find_old_labels(thread_pool, now, timeouts); let labels = crds.find_old_labels(thread_pool, now, timeouts);
for label in &labels { for label in &labels {
crds.remove(label, now); crds.remove(label, now);
@ -580,7 +600,7 @@ impl CrdsGossipPull {
#[cfg(test)] #[cfg(test)]
fn process_pull_response( fn process_pull_response(
&self, &self,
crds: &mut Crds, crds: &RwLock<Crds>,
from: &Pubkey, from: &Pubkey,
timeouts: &HashMap<Pubkey, u64>, timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>, response: Vec<CrdsValue>,
@ -710,14 +730,13 @@ pub(crate) mod tests {
stakes.insert(id, i * 100); stakes.insert(id, i * 100);
} }
let now = 1024; let now = 1024;
let crds = RwLock::new(crds);
let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, None, &stakes); let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, None, &stakes);
assert!(!options.is_empty()); assert!(!options.is_empty());
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); options
.sort_by(|(weight_l, _, _), (weight_r, _, _)| weight_r.partial_cmp(weight_l).unwrap());
// check that the highest stake holder is also the heaviest weighted. // check that the highest stake holder is also the heaviest weighted.
assert_eq!( assert_eq!(stakes[&options[0].1], 3000_u64);
*stakes.get(&options.get(0).unwrap().1.id).unwrap(),
3000_u64
);
} }
#[test] #[test]
@ -757,12 +776,13 @@ pub(crate) mod tests {
crds.insert(spy.clone(), 0).unwrap(); crds.insert(spy.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap(); crds.insert(node_123.clone(), 0).unwrap();
crds.insert(node_456.clone(), 0).unwrap(); crds.insert(node_456.clone(), 0).unwrap();
let crds = RwLock::new(crds);
// shred version 123 should ignore nodes with versions 0 and 456 // shred version 123 should ignore nodes with versions 0 and 456
let options = node let options = node
.pull_options(&crds, &me.label().pubkey(), 123, 0, None, &stakes) .pull_options(&crds, &me.label().pubkey(), 123, 0, None, &stakes)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, pk, _)| *pk)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
assert_eq!(options.len(), 1); assert_eq!(options.len(), 1);
assert!(!options.contains(&spy.pubkey())); assert!(!options.contains(&spy.pubkey()));
@ -772,7 +792,7 @@ pub(crate) mod tests {
let options = node let options = node
.pull_options(&crds, &spy.label().pubkey(), 0, 0, None, &stakes) .pull_options(&crds, &spy.label().pubkey(), 0, 0, None, &stakes)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, pk, _)| *pk)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
assert_eq!(options.len(), 3); assert_eq!(options.len(), 3);
assert!(options.contains(&me.pubkey())); assert!(options.contains(&me.pubkey()));
@ -800,6 +820,7 @@ pub(crate) mod tests {
crds.insert(me.clone(), 0).unwrap(); crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap(); crds.insert(node_123.clone(), 0).unwrap();
let crds = RwLock::new(crds);
// Empty gossip_validators -- will pull from nobody // Empty gossip_validators -- will pull from nobody
let mut gossip_validators = HashSet::new(); let mut gossip_validators = HashSet::new();
@ -836,7 +857,7 @@ pub(crate) mod tests {
&stakes, &stakes,
); );
assert_eq!(options.len(), 1); assert_eq!(options.len(), 1);
assert_eq!(options[0].1.id, node_123.pubkey()); assert_eq!(options[0].1, node_123.pubkey());
} }
#[test] #[test]
@ -902,9 +923,11 @@ pub(crate) mod tests {
num_inserts += 1; num_inserts += 1;
} }
} }
let crds = RwLock::new(crds);
assert!(num_inserts > 30_000, "num inserts: {}", num_inserts); assert!(num_inserts > 30_000, "num inserts: {}", num_inserts);
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32)); assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32));
let crds = crds.read().unwrap();
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect()); let purged: Vec<_> = thread_pool.install(|| crds.purged().collect());
let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect(); let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect();
assert_eq!(hash_values.len(), 40_000); assert_eq!(hash_values.len(), 40_000);
@ -928,7 +951,7 @@ pub(crate) mod tests {
#[test] #[test]
fn test_new_pull_request() { fn test_new_pull_request() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default(); let crds = RwLock::<Crds>::default();
let node_keypair = Keypair::new(); let node_keypair = Keypair::new();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&node_keypair.pubkey(), &node_keypair.pubkey(),
@ -956,7 +979,7 @@ pub(crate) mod tests {
Err(CrdsGossipError::NoPeers) Err(CrdsGossipError::NoPeers)
); );
crds.insert(entry, 0).unwrap(); crds.write().unwrap().insert(entry, 0).unwrap();
assert_eq!( assert_eq!(
node.new_pull_request( node.new_pull_request(
&thread_pool, &thread_pool,
@ -979,7 +1002,7 @@ pub(crate) mod tests {
.unwrap() .unwrap()
.mock_pong(new.id, new.gossip, Instant::now()); .mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
crds.insert(new.clone(), now).unwrap(); crds.write().unwrap().insert(new.clone(), now).unwrap();
let req = node.new_pull_request( let req = node.new_pull_request(
&thread_pool, &thread_pool,
&crds, &crds,
@ -998,7 +1021,7 @@ pub(crate) mod tests {
node.mark_pull_request_creation_time(new.contact_info().unwrap().id, now); node.mark_pull_request_creation_time(new.contact_info().unwrap().id, now);
let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now); let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now);
let offline = CrdsValue::new_unsigned(CrdsData::ContactInfo(offline)); let offline = CrdsValue::new_unsigned(CrdsData::ContactInfo(offline));
crds.insert(offline, now).unwrap(); crds.write().unwrap().insert(offline, now).unwrap();
let req = node.new_pull_request( let req = node.new_pull_request(
&thread_pool, &thread_pool,
&crds, &crds,
@ -1041,6 +1064,7 @@ pub(crate) mod tests {
ping_cache.mock_pong(new.id, new.gossip, Instant::now()); ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
crds.insert(new.clone(), now).unwrap(); crds.insert(new.clone(), now).unwrap();
let crds = RwLock::new(crds);
// set request creation time to now. // set request creation time to now.
let now = now + 50_000; let now = now + 50_000;
@ -1130,6 +1154,7 @@ pub(crate) mod tests {
ping_cache.mock_pong(new.id, new.gossip, Instant::now()); ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap(); node_crds.insert(new, 0).unwrap();
let node_crds = RwLock::new(node_crds);
let mut pings = Vec::new(); let mut pings = Vec::new();
let req = node.new_pull_request( let req = node.new_pull_request(
&thread_pool, &thread_pool,
@ -1144,7 +1169,7 @@ pub(crate) mod tests {
&mut pings, &mut pings,
); );
let mut dest_crds = Crds::default(); let dest_crds = RwLock::<Crds>::default();
let (_, filters) = req.unwrap(); let (_, filters) = req.unwrap();
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = CrdsGossipPull::generate_pull_responses( let rsp = CrdsGossipPull::generate_pull_responses(
@ -1161,6 +1186,8 @@ pub(crate) mod tests {
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
))); )));
dest_crds dest_crds
.write()
.unwrap()
.insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS) .insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS)
.unwrap(); .unwrap();
@ -1217,6 +1244,7 @@ pub(crate) mod tests {
ping_cache.mock_pong(new.id, new.gossip, Instant::now()); ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap(); node_crds.insert(new, 0).unwrap();
let node_crds = RwLock::new(node_crds);
let mut pings = Vec::new(); let mut pings = Vec::new();
let req = node.new_pull_request( let req = node.new_pull_request(
&thread_pool, &thread_pool,
@ -1231,7 +1259,7 @@ pub(crate) mod tests {
&mut pings, &mut pings,
); );
let mut dest_crds = Crds::default(); let dest_crds = RwLock::<Crds>::default();
let (_, filters) = req.unwrap(); let (_, filters) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = CrdsGossipPull::generate_pull_responses( let rsp = CrdsGossipPull::generate_pull_responses(
@ -1240,11 +1268,9 @@ pub(crate) mod tests {
/*output_size_limit=*/ usize::MAX, /*output_size_limit=*/ usize::MAX,
0, 0,
); );
CrdsGossipPull::process_pull_requests( let callers = filters.into_iter().map(|(caller, _)| caller);
&mut dest_crds, CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1);
filters.into_iter().map(|(caller, _)| caller), let dest_crds = dest_crds.read().unwrap();
1,
);
assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(rsp.iter().all(|rsp| rsp.is_empty()));
assert!(dest_crds.get(&caller.label()).is_some()); assert!(dest_crds.get(&caller.label()).is_some());
assert_eq!(dest_crds.get(&caller.label()).unwrap().local_timestamp, 1); assert_eq!(dest_crds.get(&caller.label()).unwrap().local_timestamp, 1);
@ -1277,6 +1303,7 @@ pub(crate) mod tests {
ping_cache.mock_pong(new.id, new.gossip, Instant::now()); ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
dest_crds.insert(new.clone(), 0).unwrap(); dest_crds.insert(new.clone(), 0).unwrap();
let dest_crds = RwLock::new(dest_crds);
// node contains a key from the dest node, but at an older local timestamp // node contains a key from the dest node, but at an older local timestamp
let same_key = ContactInfo::new_localhost(&new_id, 0); let same_key = ContactInfo::new_localhost(&new_id, 0);
@ -1286,6 +1313,7 @@ pub(crate) mod tests {
assert!(same_key.wallclock() < new.wallclock()); assert!(same_key.wallclock() < new.wallclock());
node_crds.insert(same_key.clone(), 0).unwrap(); node_crds.insert(same_key.clone(), 0).unwrap();
assert_eq!(node_crds.get(&same_key.label()).unwrap().local_timestamp, 0); assert_eq!(node_crds.get(&same_key.label()).unwrap().local_timestamp, 0);
let node_crds = RwLock::new(node_crds);
let mut done = false; let mut done = false;
let mut pings = Vec::new(); let mut pings = Vec::new();
let ping_cache = Mutex::new(ping_cache); let ping_cache = Mutex::new(ping_cache);
@ -1312,7 +1340,7 @@ pub(crate) mod tests {
0, 0,
); );
CrdsGossipPull::process_pull_requests( CrdsGossipPull::process_pull_requests(
&mut dest_crds, &dest_crds,
filters.into_iter().map(|(caller, _)| caller), filters.into_iter().map(|(caller, _)| caller),
0, 0,
); );
@ -1328,7 +1356,7 @@ pub(crate) mod tests {
assert_eq!(rsp.len(), MIN_NUM_BLOOM_FILTERS); assert_eq!(rsp.len(), MIN_NUM_BLOOM_FILTERS);
let failed = node let failed = node
.process_pull_response( .process_pull_response(
&mut node_crds, &node_crds,
&node_pubkey, &node_pubkey,
&node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()), &node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()),
rsp.into_iter().flatten().collect(), rsp.into_iter().flatten().collect(),
@ -1336,9 +1364,15 @@ pub(crate) mod tests {
) )
.0; .0;
assert_eq!(failed, 0); assert_eq!(failed, 0);
assert_eq!(node_crds.get(&new.label()).unwrap().local_timestamp, 1); assert_eq!(1, {
let node_crds = node_crds.read().unwrap();
node_crds.get(&new.label()).unwrap().local_timestamp
});
// verify that the whole record was updated for dest since this is a response from dest // verify that the whole record was updated for dest since this is a response from dest
assert_eq!(node_crds.get(&same_key.label()).unwrap().local_timestamp, 1); assert_eq!(1, {
let node_crds = node_crds.read().unwrap();
node_crds.get(&same_key.label()).unwrap().local_timestamp
});
done = true; done = true;
break; break;
} }
@ -1369,16 +1403,17 @@ pub(crate) mod tests {
node_label node_label
); );
// purge // purge
let node_crds = RwLock::new(node_crds);
let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()); let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default());
CrdsGossipPull::purge_active(&thread_pool, &mut node_crds, node.crds_timeout, &timeouts); CrdsGossipPull::purge_active(&thread_pool, &node_crds, node.crds_timeout, &timeouts);
//verify self is still valid after purge //verify self is still valid after purge
assert_eq!( assert_eq!(node_label, {
node_crds.get(&node_label).unwrap().value.label(), let node_crds = node_crds.read().unwrap();
node_label node_crds.get(&node_label).unwrap().value.label()
); });
assert_eq!(node_crds.get(&old.label()), None); assert_eq!(node_crds.read().unwrap().get(&old.label()), None);
assert_eq!(node_crds.num_purged(), 1); assert_eq!(node_crds.read().unwrap().num_purged(), 1);
for _ in 0..30 { for _ in 0..30 {
// there is a chance of a false positive with bloom filters // there is a chance of a false positive with bloom filters
// assert that purged value is still in the set // assert that purged value is still in the set
@ -1388,6 +1423,7 @@ pub(crate) mod tests {
} }
// purge the value // purge the value
let mut node_crds = node_crds.write().unwrap();
node_crds.trim_purged(node.crds_timeout + 1); node_crds.trim_purged(node.crds_timeout + 1);
assert_eq!(node_crds.num_purged(), 0); assert_eq!(node_crds.num_purged(), 0);
} }
@ -1472,7 +1508,7 @@ pub(crate) mod tests {
#[test] #[test]
fn test_process_pull_response() { fn test_process_pull_response() {
let mut node_crds = Crds::default(); let node_crds = RwLock::<Crds>::default();
let node = CrdsGossipPull::default(); let node = CrdsGossipPull::default();
let peer_pubkey = solana_sdk::pubkey::new_rand(); let peer_pubkey = solana_sdk::pubkey::new_rand();
@ -1485,7 +1521,7 @@ pub(crate) mod tests {
// inserting a fresh value should be fine. // inserting a fresh value should be fine.
assert_eq!( assert_eq!(
node.process_pull_response( node.process_pull_response(
&mut node_crds, &node_crds,
&peer_pubkey, &peer_pubkey,
&timeouts, &timeouts,
vec![peer_entry.clone()], vec![peer_entry.clone()],
@ -1495,14 +1531,14 @@ pub(crate) mod tests {
0 0
); );
let mut node_crds = Crds::default(); let node_crds = RwLock::<Crds>::default();
let unstaked_peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo( let unstaked_peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&peer_pubkey, 0), ContactInfo::new_localhost(&peer_pubkey, 0),
)); ));
// check that old contact infos fail if they are too old, regardless of "timeouts" // check that old contact infos fail if they are too old, regardless of "timeouts"
assert_eq!( assert_eq!(
node.process_pull_response( node.process_pull_response(
&mut node_crds, &node_crds,
&peer_pubkey, &peer_pubkey,
&timeouts, &timeouts,
vec![peer_entry.clone(), unstaked_peer_entry], vec![peer_entry.clone(), unstaked_peer_entry],
@ -1512,11 +1548,11 @@ pub(crate) mod tests {
4 4
); );
let mut node_crds = Crds::default(); let node_crds = RwLock::<Crds>::default();
// check that old contact infos can still land as long as they have a "timeouts" entry // check that old contact infos can still land as long as they have a "timeouts" entry
assert_eq!( assert_eq!(
node.process_pull_response( node.process_pull_response(
&mut node_crds, &node_crds,
&peer_pubkey, &peer_pubkey,
&timeouts, &timeouts,
vec![peer_entry], vec![peer_entry],
@ -1533,7 +1569,7 @@ pub(crate) mod tests {
// but a recent contact info (inserted above) exists // but a recent contact info (inserted above) exists
assert_eq!( assert_eq!(
node.process_pull_response( node.process_pull_response(
&mut node_crds, &node_crds,
&peer_pubkey, &peer_pubkey,
&timeouts, &timeouts,
vec![peer_vote.clone()], vec![peer_vote.clone()],
@ -1543,11 +1579,11 @@ pub(crate) mod tests {
0 0
); );
let mut node_crds = Crds::default(); let node_crds = RwLock::<Crds>::default();
// without a contact info, inserting an old value should fail // without a contact info, inserting an old value should fail
assert_eq!( assert_eq!(
node.process_pull_response( node.process_pull_response(
&mut node_crds, &node_crds,
&peer_pubkey, &peer_pubkey,
&timeouts, &timeouts,
vec![peer_vote], vec![peer_vote],

View File

@ -97,9 +97,9 @@ impl Default for CrdsGossipPush {
} }
} }
impl CrdsGossipPush { impl CrdsGossipPush {
pub fn num_pending(&self, crds: &Crds) -> usize { pub fn num_pending(&self, crds: &RwLock<Crds>) -> usize {
let mut cursor: Cursor = *self.crds_cursor.lock().unwrap(); let mut cursor: Cursor = *self.crds_cursor.lock().unwrap();
crds.get_entries(&mut cursor).count() crds.read().unwrap().get_entries(&mut cursor).count()
} }
fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 { fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 {
@ -203,7 +203,7 @@ impl CrdsGossipPush {
/// Returns origins' pubkeys of upserted values. /// Returns origins' pubkeys of upserted values.
pub(crate) fn process_push_message( pub(crate) fn process_push_message(
&self, &self,
crds: &mut Crds, crds: &RwLock<Crds>,
from: &Pubkey, from: &Pubkey,
values: Vec<CrdsValue>, values: Vec<CrdsValue>,
now: u64, now: u64,
@ -228,6 +228,7 @@ impl CrdsGossipPush {
}) })
.collect() .collect()
}; };
let mut crds = crds.write().unwrap();
values values
.into_iter() .into_iter()
.map(|value| { .map(|value| {
@ -249,7 +250,11 @@ impl CrdsGossipPush {
/// peers. /// peers.
/// The list of push messages is created such that all the randomly selected peers have not /// The list of push messages is created such that all the randomly selected peers have not
/// pruned the source addresses. /// pruned the source addresses.
pub fn new_push_messages(&self, crds: &Crds, now: u64) -> HashMap<Pubkey, Vec<CrdsValue>> { pub(crate) fn new_push_messages(
&self,
crds: &RwLock<Crds>,
now: u64,
) -> HashMap<Pubkey, Vec<CrdsValue>> {
let active_set = self.active_set.read().unwrap(); let active_set = self.active_set.read().unwrap();
let active_set_len = active_set.len(); let active_set_len = active_set.len();
let push_fanout = self.push_fanout.min(active_set_len); let push_fanout = self.push_fanout.min(active_set_len);
@ -262,6 +267,8 @@ impl CrdsGossipPush {
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new(); let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
let wallclock_window = self.wallclock_window(now); let wallclock_window = self.wallclock_window(now);
let mut crds_cursor = self.crds_cursor.lock().unwrap(); let mut crds_cursor = self.crds_cursor.lock().unwrap();
// crds should be locked last after self.{active_set,crds_cursor}.
let crds = crds.read().unwrap();
let entries = crds let entries = crds
.get_entries(crds_cursor.deref_mut()) .get_entries(crds_cursor.deref_mut())
.map(|entry| &entry.value) .map(|entry| &entry.value)
@ -287,6 +294,7 @@ impl CrdsGossipPush {
} }
} }
} }
drop(crds);
drop(crds_cursor); drop(crds_cursor);
drop(active_set); drop(active_set);
self.num_pushes.fetch_add(num_pushes, Ordering::Relaxed); self.num_pushes.fetch_add(num_pushes, Ordering::Relaxed);
@ -318,7 +326,7 @@ impl CrdsGossipPush {
/// * ratio - active_set.len()/ratio is the number of actives to rotate /// * ratio - active_set.len()/ratio is the number of actives to rotate
pub(crate) fn refresh_push_active_set( pub(crate) fn refresh_push_active_set(
&self, &self,
crds: &Crds, crds: &RwLock<Crds>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>, gossip_validators: Option<&HashSet<Pubkey>>,
self_id: &Pubkey, self_id: &Pubkey,
@ -333,19 +341,20 @@ impl CrdsGossipPush {
#[cfg(not(debug_assertions))] #[cfg(not(debug_assertions))]
const MIN_NUM_BLOOM_ITEMS: usize = CRDS_UNIQUE_PUBKEY_CAPACITY; const MIN_NUM_BLOOM_ITEMS: usize = CRDS_UNIQUE_PUBKEY_CAPACITY;
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut active_set = self.active_set.write().unwrap();
let need = Self::compute_need(self.num_active, active_set.len(), ratio);
let mut new_items = HashMap::new(); let mut new_items = HashMap::new();
let (weights, peers): (Vec<_>, Vec<_>) = self let (weights, peers): (Vec<_>, Vec<_>) = {
.push_options(crds, self_id, self_shred_version, stakes, gossip_validators) self.push_options(crds, self_id, self_shred_version, stakes, gossip_validators)
.into_iter() .into_iter()
.unzip(); .unzip()
};
if peers.is_empty() { if peers.is_empty() {
return; return;
} }
let num_bloom_items = MIN_NUM_BLOOM_ITEMS.max(network_size); let num_bloom_items = MIN_NUM_BLOOM_ITEMS.max(network_size);
let shuffle = WeightedShuffle::new(&mut rng, &weights).unwrap(); let shuffle = WeightedShuffle::new(&mut rng, &weights).unwrap();
for peer in shuffle.map(|i| peers[i].id) { let mut active_set = self.active_set.write().unwrap();
let need = Self::compute_need(self.num_active, active_set.len(), ratio);
for peer in shuffle.map(|i| peers[i]) {
if new_items.len() >= need { if new_items.len() >= need {
break; break;
} }
@ -371,19 +380,21 @@ impl CrdsGossipPush {
} }
} }
fn push_options<'a>( fn push_options(
&self, &self,
crds: &'a Crds, crds: &RwLock<Crds>,
self_id: &Pubkey, self_id: &Pubkey,
self_shred_version: u16, self_shred_version: u16,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>, gossip_validators: Option<&HashSet<Pubkey>>,
) -> Vec<(u64, &'a ContactInfo)> { ) -> Vec<(/*weight:*/ u64, /*node:*/ Pubkey)> {
let now = timestamp(); let now = timestamp();
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let max_weight = u16::MAX as f32 - 1.0; let max_weight = u16::MAX as f32 - 1.0;
let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS); let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS);
let last_pushed_to = self.last_pushed_to.read().unwrap(); let last_pushed_to = self.last_pushed_to.read().unwrap();
// crds should be locked last after self.last_pushed_to.
let crds = crds.read().unwrap();
crds.get_nodes() crds.get_nodes()
.filter_map(|value| { .filter_map(|value| {
let info = value.value.contact_info().unwrap(); let info = value.value.contact_info().unwrap();
@ -413,7 +424,7 @@ impl CrdsGossipPush {
let weight = get_weight(max_weight, since, stake); let weight = get_weight(max_weight, since, stake);
// Weights are bounded by max_weight defined above. // Weights are bounded by max_weight defined above.
// So this type-cast should be safe. // So this type-cast should be safe.
((weight * 100.0) as u64, info) ((weight * 100.0) as u64, info.id)
}) })
.collect() .collect()
} }
@ -467,7 +478,7 @@ mod test {
#[test] #[test]
fn test_prune() { fn test_prune() {
let mut crds = Crds::default(); let crds = RwLock::<Crds>::default();
let push = CrdsGossipPush::default(); let push = CrdsGossipPush::default();
let mut stakes = HashMap::new(); let mut stakes = HashMap::new();
@ -482,7 +493,7 @@ mod test {
let low_staked_peers = (0..10).map(|_| solana_sdk::pubkey::new_rand()); let low_staked_peers = (0..10).map(|_| solana_sdk::pubkey::new_rand());
let mut low_staked_set = HashSet::new(); let mut low_staked_set = HashSet::new();
low_staked_peers.for_each(|p| { low_staked_peers.for_each(|p| {
push.process_push_message(&mut crds, &p, vec![value.clone()], 0); push.process_push_message(&crds, &p, vec![value.clone()], 0);
low_staked_set.insert(p); low_staked_set.insert(p);
stakes.insert(p, 1); stakes.insert(p, 1);
}); });
@ -504,7 +515,7 @@ mod test {
let high_staked_peer = solana_sdk::pubkey::new_rand(); let high_staked_peer = solana_sdk::pubkey::new_rand();
let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10; let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10;
stakes.insert(high_staked_peer, high_stake); stakes.insert(high_staked_peer, high_stake);
push.process_push_message(&mut crds, &high_staked_peer, vec![value], 0); push.process_push_message(&crds, &high_staked_peer, vec![value], 0);
let pruned = { let pruned = {
let mut received_cache = push.received_cache.lock().unwrap(); let mut received_cache = push.received_cache.lock().unwrap();
@ -529,7 +540,7 @@ mod test {
#[test] #[test]
fn test_process_push_one() { fn test_process_push_one() {
let mut crds = Crds::default(); let crds = RwLock::<Crds>::default();
let push = CrdsGossipPush::default(); let push = CrdsGossipPush::default();
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(), &solana_sdk::pubkey::new_rand(),
@ -538,20 +549,20 @@ mod test {
let label = value.label(); let label = value.label();
// push a new message // push a new message
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0),
[Ok(label.pubkey())], [Ok(label.pubkey())],
); );
assert_eq!(crds.get(&label).unwrap().value, value); assert_eq!(crds.read().unwrap().get(&label).unwrap().value, value);
// push it again // push it again
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
[Err(CrdsGossipError::PushMessageOldVersion)], [Err(CrdsGossipError::PushMessageOldVersion)],
); );
} }
#[test] #[test]
fn test_process_push_old_version() { fn test_process_push_old_version() {
let mut crds = Crds::default(); let crds = RwLock::<Crds>::default();
let push = CrdsGossipPush::default(); let push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ci.wallclock = 1; ci.wallclock = 1;
@ -559,7 +570,7 @@ mod test {
// push a new message // push a new message
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
[Ok(ci.id)], [Ok(ci.id)],
); );
@ -567,13 +578,13 @@ mod test {
ci.wallclock = 0; ci.wallclock = 0;
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
[Err(CrdsGossipError::PushMessageOldVersion)], [Err(CrdsGossipError::PushMessageOldVersion)],
); );
} }
#[test] #[test]
fn test_process_push_timeout() { fn test_process_push_timeout() {
let mut crds = Crds::default(); let crds = RwLock::<Crds>::default();
let push = CrdsGossipPush::default(); let push = CrdsGossipPush::default();
let timeout = push.msg_timeout; let timeout = push.msg_timeout;
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
@ -582,7 +593,7 @@ mod test {
ci.wallclock = timeout + 1; ci.wallclock = timeout + 1;
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())); let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
[Err(CrdsGossipError::PushMessageTimeout)], [Err(CrdsGossipError::PushMessageTimeout)],
); );
@ -590,13 +601,13 @@ mod test {
ci.wallclock = 0; ci.wallclock = 0;
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], timeout + 1), push.process_push_message(&crds, &Pubkey::default(), vec![value], timeout + 1),
[Err(CrdsGossipError::PushMessageTimeout)] [Err(CrdsGossipError::PushMessageTimeout)]
); );
} }
#[test] #[test]
fn test_process_push_update() { fn test_process_push_update() {
let mut crds = Crds::default(); let crds = RwLock::<Crds>::default();
let push = CrdsGossipPush::default(); let push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
let origin = ci.id; let origin = ci.id;
@ -605,7 +616,7 @@ mod test {
// push a new message // push a new message
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value_old], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value_old], 0),
[Ok(origin)], [Ok(origin)],
); );
@ -613,7 +624,7 @@ mod test {
ci.wallclock = 1; ci.wallclock = 1;
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
[Ok(origin)], [Ok(origin)],
); );
} }
@ -636,6 +647,7 @@ mod test {
))); )));
assert_eq!(crds.insert(value1.clone(), now), Ok(())); assert_eq!(crds.insert(value1.clone(), now), Ok(()));
let crds = RwLock::new(crds);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let active_set = push.active_set.read().unwrap(); let active_set = push.active_set.read().unwrap();
@ -646,7 +658,7 @@ mod test {
))); )));
assert!(active_set.get(&value2.label().pubkey()).is_none()); assert!(active_set.get(&value2.label().pubkey()).is_none());
drop(active_set); drop(active_set);
assert_eq!(crds.insert(value2.clone(), now), Ok(())); assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(()));
for _ in 0..30 { for _ in 0..30 {
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let active_set = push.active_set.read().unwrap(); let active_set = push.active_set.read().unwrap();
@ -662,7 +674,7 @@ mod test {
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo( let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0), ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
)); ));
assert_eq!(crds.insert(value2.clone(), now), Ok(())); assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(()));
} }
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
assert_eq!(push.active_set.read().unwrap().len(), push.num_active); assert_eq!(push.active_set.read().unwrap().len(), push.num_active);
@ -684,14 +696,12 @@ mod test {
stakes.insert(id, i * 100); stakes.insert(id, i * 100);
push.last_pushed_to.write().unwrap().put(id, time); push.last_pushed_to.write().unwrap().put(id, time);
} }
let crds = RwLock::new(crds);
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None); let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None);
assert!(!options.is_empty()); assert!(!options.is_empty());
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
// check that the highest stake holder is also the heaviest weighted. // check that the highest stake holder is also the heaviest weighted.
assert_eq!( assert_eq!(stakes[&options[0].1], 10_000_u64);
*stakes.get(&options.get(0).unwrap().1.id).unwrap(),
10_000_u64
);
} }
#[test] #[test]
@ -732,23 +742,20 @@ mod test {
crds.insert(spy.clone(), now).unwrap(); crds.insert(spy.clone(), now).unwrap();
crds.insert(node_123.clone(), now).unwrap(); crds.insert(node_123.clone(), now).unwrap();
crds.insert(node_456, now).unwrap(); crds.insert(node_456, now).unwrap();
let crds = RwLock::new(crds);
// shred version 123 should ignore nodes with versions 0 and 456 // shred version 123 should ignore nodes with versions 0 and 456
let options = node let options = node
.push_options(&crds, &me.label().pubkey(), 123, &stakes, None) .push_options(&crds, &me.label().pubkey(), 123, &stakes, None)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, pk)| *pk)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
assert_eq!(options.len(), 1); assert_eq!(options.len(), 1);
assert!(!options.contains(&spy.pubkey())); assert!(!options.contains(&spy.pubkey()));
assert!(options.contains(&node_123.pubkey())); assert!(options.contains(&node_123.pubkey()));
// spy nodes should not push to people on different shred versions // spy nodes should not push to people on different shred versions
let options = node let options = node.push_options(&crds, &spy.label().pubkey(), 0, &stakes, None);
.push_options(&crds, &spy.label().pubkey(), 0, &stakes, None)
.iter()
.map(|(_, c)| c.id)
.collect::<Vec<_>>();
assert!(options.is_empty()); assert!(options.is_empty());
} }
@ -773,6 +780,7 @@ mod test {
crds.insert(me.clone(), 0).unwrap(); crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), now).unwrap(); crds.insert(node_123.clone(), now).unwrap();
let crds = RwLock::new(crds);
// Unknown pubkey in gossip_validators -- will push to nobody // Unknown pubkey in gossip_validators -- will push to nobody
let mut gossip_validators = HashSet::new(); let mut gossip_validators = HashSet::new();
@ -808,7 +816,7 @@ mod test {
); );
assert_eq!(options.len(), 1); assert_eq!(options.len(), 1);
assert_eq!(options[0].1.id, node_123.pubkey()); assert_eq!(options[0].1, node_123.pubkey());
} }
#[test] #[test]
@ -821,6 +829,7 @@ mod test {
0, 0,
))); )));
assert_eq!(crds.insert(peer.clone(), now), Ok(())); assert_eq!(crds.insert(peer.clone(), now), Ok(()));
let crds = RwLock::new(crds);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -831,7 +840,7 @@ mod test {
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]); expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
let origin = new_msg.pubkey(); let origin = new_msg.pubkey();
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg], 0), push.process_push_message(&crds, &Pubkey::default(), vec![new_msg], 0),
[Ok(origin)] [Ok(origin)]
); );
assert_eq!(push.active_set.read().unwrap().len(), 1); assert_eq!(push.active_set.read().unwrap().len(), 1);
@ -854,8 +863,9 @@ mod test {
let origin: Vec<_> = peers.iter().map(|node| node.pubkey()).collect(); let origin: Vec<_> = peers.iter().map(|node| node.pubkey()).collect();
assert_eq!(crds.insert(peers[0].clone(), now), Ok(())); assert_eq!(crds.insert(peers[0].clone(), now), Ok(()));
assert_eq!(crds.insert(peers[1].clone(), now), Ok(())); assert_eq!(crds.insert(peers[1].clone(), now), Ok(()));
let crds = RwLock::new(crds);
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![peers[2].clone()], now), push.process_push_message(&crds, &Pubkey::default(), vec![peers[2].clone()], now),
[Ok(origin[2])], [Ok(origin[2])],
); );
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
@ -880,6 +890,7 @@ mod test {
0, 0,
))); )));
assert_eq!(crds.insert(peer.clone(), 0), Ok(())); assert_eq!(crds.insert(peer.clone(), 0), Ok(()));
let crds = RwLock::new(crds);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -889,7 +900,7 @@ mod test {
let expected = HashMap::new(); let expected = HashMap::new();
let origin = new_msg.pubkey(); let origin = new_msg.pubkey();
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg.clone()], 0), push.process_push_message(&crds, &Pubkey::default(), vec![new_msg.clone()], 0),
[Ok(origin)], [Ok(origin)],
); );
push.process_prune_msg( push.process_prune_msg(
@ -908,6 +919,7 @@ mod test {
0, 0,
))); )));
assert_eq!(crds.insert(peer, 0), Ok(())); assert_eq!(crds.insert(peer, 0), Ok(()));
let crds = RwLock::new(crds);
push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
@ -916,7 +928,7 @@ mod test {
let expected = HashMap::new(); let expected = HashMap::new();
let origin = new_msg.pubkey(); let origin = new_msg.pubkey();
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg], 1), push.process_push_message(&crds, &Pubkey::default(), vec![new_msg], 1),
[Ok(origin)], [Ok(origin)],
); );
assert_eq!(push.new_push_messages(&crds, 0), expected); assert_eq!(push.new_push_messages(&crds, 0), expected);
@ -924,7 +936,7 @@ mod test {
#[test] #[test]
fn test_purge_old_received_cache() { fn test_purge_old_received_cache() {
let mut crds = Crds::default(); let crds = RwLock::<Crds>::default();
let push = CrdsGossipPush::default(); let push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ci.wallclock = 0; ci.wallclock = 0;
@ -932,14 +944,14 @@ mod test {
let label = value.label(); let label = value.label();
// push a new message // push a new message
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0),
[Ok(label.pubkey())] [Ok(label.pubkey())]
); );
assert_eq!(crds.get(&label).unwrap().value, value); assert_eq!(crds.write().unwrap().get(&label).unwrap().value, value);
// push it again // push it again
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0),
[Err(CrdsGossipError::PushMessageOldVersion)], [Err(CrdsGossipError::PushMessageOldVersion)],
); );
@ -948,7 +960,7 @@ mod test {
// push it again // push it again
assert_eq!( assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value], 0),
[Err(CrdsGossipError::PushMessageOldVersion)], [Err(CrdsGossipError::PushMessageOldVersion)],
); );
} }

View File

@ -33,24 +33,20 @@ use {
struct Node { struct Node {
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
contact_info: ContactInfo, contact_info: ContactInfo,
gossip: Arc<Mutex<CrdsGossip>>, gossip: Arc<CrdsGossip>,
ping_cache: Arc<Mutex<PingCache>>, ping_cache: Arc<Mutex<PingCache>>,
stake: u64, stake: u64,
} }
impl Node { impl Node {
fn new( fn new(keypair: Arc<Keypair>, contact_info: ContactInfo, gossip: Arc<CrdsGossip>) -> Self {
keypair: Arc<Keypair>,
contact_info: ContactInfo,
gossip: Arc<Mutex<CrdsGossip>>,
) -> Self {
Self::staked(keypair, contact_info, gossip, 0) Self::staked(keypair, contact_info, gossip, 0)
} }
fn staked( fn staked(
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
contact_info: ContactInfo, contact_info: ContactInfo,
gossip: Arc<Mutex<CrdsGossip>>, gossip: Arc<CrdsGossip>,
stake: u64, stake: u64,
) -> Self { ) -> Self {
let ping_cache = Arc::new(Mutex::new(PingCache::new( let ping_cache = Arc::new(Mutex::new(PingCache::new(
@ -67,14 +63,6 @@ impl Node {
} }
} }
impl Deref for Node {
type Target = Arc<Mutex<CrdsGossip>>;
fn deref(&self) -> &Self::Target {
&self.gossip
}
}
struct Network { struct Network {
nodes: HashMap<Pubkey, Node>, nodes: HashMap<Pubkey, Node>,
stake_pruned: u64, stake_pruned: u64,
@ -116,17 +104,24 @@ fn star_network_create(num: usize) -> Network {
let node_keypair = Arc::new(Keypair::new()); let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut node = CrdsGossip::default(); let node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap(); {
node.crds.insert(entry.clone(), timestamp()).unwrap(); let mut node_crds = node.crds.write().unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); node_crds.insert(new.clone(), timestamp()).unwrap();
node_crds.insert(entry.clone(), timestamp()).unwrap();
}
let node = Node::new(node_keypair, contact_info, Arc::new(node));
(new.label().pubkey(), node) (new.label().pubkey(), node)
}) })
.collect(); .collect();
let mut node = CrdsGossip::default(); let node = CrdsGossip::default();
let id = entry.label().pubkey(); let id = entry.label().pubkey();
node.crds.insert(entry, timestamp()).unwrap(); node.crds
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); .write()
.unwrap()
.insert(entry, timestamp())
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(node));
network.insert(id, node); network.insert(id, node);
Network::new(network) Network::new(network)
} }
@ -135,22 +130,36 @@ fn rstar_network_create(num: usize) -> Network {
let node_keypair = Arc::new(Keypair::new()); let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut origin = CrdsGossip::default(); let origin = CrdsGossip::default();
let id = entry.label().pubkey(); let id = entry.label().pubkey();
origin.crds.insert(entry, timestamp()).unwrap(); origin
.crds
.write()
.unwrap()
.insert(entry, timestamp())
.unwrap();
let mut network: HashMap<_, _> = (1..num) let mut network: HashMap<_, _> = (1..num)
.map(|_| { .map(|_| {
let node_keypair = Arc::new(Keypair::new()); let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut node = CrdsGossip::default(); let node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap(); node.crds
origin.crds.insert(new.clone(), timestamp()).unwrap(); .write()
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); .unwrap()
.insert(new.clone(), timestamp())
.unwrap();
origin
.crds
.write()
.unwrap()
.insert(new.clone(), timestamp())
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(node));
(new.label().pubkey(), node) (new.label().pubkey(), node)
}) })
.collect(); .collect();
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(origin))); let node = Node::new(node_keypair, contact_info, Arc::new(origin));
network.insert(id, node); network.insert(id, node);
Network::new(network) Network::new(network)
} }
@ -161,9 +170,13 @@ fn ring_network_create(num: usize) -> Network {
let node_keypair = Arc::new(Keypair::new()); let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut node = CrdsGossip::default(); let node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap(); node.crds
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); .write()
.unwrap()
.insert(new.clone(), timestamp())
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(node));
(new.label().pubkey(), node) (new.label().pubkey(), node)
}) })
.collect(); .collect();
@ -173,15 +186,12 @@ fn ring_network_create(num: usize) -> Network {
let start = &network[&keys[k]]; let start = &network[&keys[k]];
let start_id = keys[k]; let start_id = keys[k];
let label = CrdsValueLabel::ContactInfo(start_id); let label = CrdsValueLabel::ContactInfo(start_id);
let gossip = start.gossip.lock().unwrap(); let gossip_crds = start.gossip.crds.read().unwrap();
gossip.crds.get(&label).unwrap().value.clone() gossip_crds.get(&label).unwrap().value.clone()
}; };
let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap(); let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap();
end.lock() let mut end_crds = end.gossip.crds.write().unwrap();
.unwrap() end_crds.insert(start_info, timestamp()).unwrap();
.crds
.insert(start_info, timestamp())
.unwrap();
} }
Network::new(network) Network::new(network)
} }
@ -193,14 +203,13 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
let node_keypair = Arc::new(Keypair::new()); let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut node = CrdsGossip::default(); let node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap(); node.crds
let node = Node::staked( .write()
node_keypair, .unwrap()
contact_info, .insert(new.clone(), timestamp())
Arc::new(Mutex::new(node)), .unwrap();
stakes[n], let node = Node::staked(node_keypair, contact_info, Arc::new(node), stakes[n]);
);
(new.label().pubkey(), node) (new.label().pubkey(), node)
}) })
.collect(); .collect();
@ -209,17 +218,18 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
let start_entries: Vec<_> = keys let start_entries: Vec<_> = keys
.iter() .iter()
.map(|k| { .map(|k| {
let start = &network[k].lock().unwrap(); let start = &network[k];
let start_label = CrdsValueLabel::ContactInfo(*k); let start_label = CrdsValueLabel::ContactInfo(*k);
start.crds.get(&start_label).unwrap().value.clone() let gossip_crds = start.gossip.crds.read().unwrap();
gossip_crds.get(&start_label).unwrap().value.clone()
}) })
.collect(); .collect();
for (end_pubkey, end) in network.iter_mut() { for (end_pubkey, end) in network.iter_mut() {
let mut end_crds = end.gossip.crds.write().unwrap();
for k in 0..keys.len() { for k in 0..keys.len() {
let mut end = end.lock().unwrap();
if keys[k] != *end_pubkey { if keys[k] != *end_pubkey {
let start_info = start_entries[k].clone(); let start_info = start_entries[k].clone();
end.crds.insert(start_info, timestamp()).unwrap(); end_crds.insert(start_info, timestamp()).unwrap();
} }
} }
} }
@ -247,7 +257,7 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
let network_values: Vec<Node> = network.values().cloned().collect(); let network_values: Vec<Node> = network.values().cloned().collect();
network_values.par_iter().for_each(|node| { network_values.par_iter().for_each(|node| {
let node_pubkey = node.keypair.pubkey(); let node_pubkey = node.keypair.pubkey();
node.lock().unwrap().refresh_push_active_set( node.gossip.refresh_push_active_set(
&node_pubkey, &node_pubkey,
0, // shred version 0, // shred version
&HashMap::new(), // stakes &HashMap::new(), // stakes
@ -262,14 +272,14 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
let now = (start * 100) as u64; let now = (start * 100) as u64;
ts += 1000; ts += 1000;
// push a message to the network // push a message to the network
network_values.par_iter().for_each(|locked_node| { network_values.par_iter().for_each(|node| {
let node_pubkey = locked_node.keypair.pubkey(); let node_pubkey = node.keypair.pubkey();
let node = &mut locked_node.lock().unwrap(); let mut m = {
let label = CrdsValueLabel::ContactInfo(node_pubkey); let node_crds = node.gossip.crds.read().unwrap();
let entry = node.crds.get(&label).unwrap(); node_crds.get_contact_info(node_pubkey).cloned().unwrap()
let mut m = entry.value.contact_info().cloned().unwrap(); };
m.wallclock = now; m.wallclock = now;
node.process_push_message( node.gossip.process_push_message(
&Pubkey::default(), &Pubkey::default(),
vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(m))], vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(m))],
now, now,
@ -321,14 +331,13 @@ fn network_run_push(
.par_iter() .par_iter()
.map(|node| { .map(|node| {
let node_pubkey = node.keypair.pubkey(); let node_pubkey = node.keypair.pubkey();
let mut node_lock = node.lock().unwrap(); let timeouts = node.gossip.make_timeouts(
let timeouts = node_lock.make_timeouts(
node_pubkey, node_pubkey,
&HashMap::default(), // stakes &HashMap::default(), // stakes
Duration::from_millis(node_lock.pull.crds_timeout), Duration::from_millis(node.gossip.pull.crds_timeout),
); );
node_lock.purge(&node_pubkey, thread_pool, now, &timeouts); node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts);
(node_pubkey, node_lock.new_push_messages(vec![], now)) (node_pubkey, node.gossip.new_push_messages(vec![], now))
}) })
.collect(); .collect();
let transfered: Vec<_> = requests let transfered: Vec<_> = requests
@ -344,8 +353,7 @@ fn network_run_push(
let origins: HashSet<_> = network let origins: HashSet<_> = network
.get(&to) .get(&to)
.unwrap() .unwrap()
.lock() .gossip
.unwrap()
.process_push_message(&from, msgs.clone(), now) .process_push_message(&from, msgs.clone(), now)
.into_iter() .into_iter()
.collect(); .collect();
@ -353,11 +361,8 @@ fn network_run_push(
.get(&to) .get(&to)
.map(|node| { .map(|node| {
let node_pubkey = node.keypair.pubkey(); let node_pubkey = node.keypair.pubkey();
node.lock().unwrap().prune_received_cache( node.gossip
&node_pubkey, .prune_received_cache(&node_pubkey, origins, &stakes)
origins,
&stakes,
)
}) })
.unwrap(); .unwrap();
@ -374,18 +379,18 @@ fn network_run_push(
.get(&from) .get(&from)
.map(|node| { .map(|node| {
let node_pubkey = node.keypair.pubkey(); let node_pubkey = node.keypair.pubkey();
let node = node.lock().unwrap();
let destination = node_pubkey; let destination = node_pubkey;
let now = timestamp(); let now = timestamp();
node.process_prune_msg( node.gossip
&node_pubkey, .process_prune_msg(
&to, &node_pubkey,
&destination, &to,
&prune_keys, &destination,
now, &prune_keys,
now, now,
) now,
.unwrap() )
.unwrap()
}) })
.unwrap(); .unwrap();
} }
@ -410,7 +415,7 @@ fn network_run_push(
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 { if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
network_values.par_iter().for_each(|node| { network_values.par_iter().for_each(|node| {
let node_pubkey = node.keypair.pubkey(); let node_pubkey = node.keypair.pubkey();
node.lock().unwrap().refresh_push_active_set( node.gossip.refresh_push_active_set(
&node_pubkey, &node_pubkey,
0, // shred version 0, // shred version
&HashMap::new(), // stakes &HashMap::new(), // stakes
@ -420,10 +425,7 @@ fn network_run_push(
} }
total = network_values total = network_values
.par_iter() .par_iter()
.map(|node| { .map(|node| node.gossip.push.num_pending(&node.gossip.crds))
let gossip = node.gossip.lock().unwrap();
gossip.push.num_pending(&gossip.crds)
})
.sum(); .sum();
trace!( trace!(
"network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}", "network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}",
@ -477,8 +479,7 @@ fn network_run_pull(
.filter_map(|from| { .filter_map(|from| {
let mut pings = Vec::new(); let mut pings = Vec::new();
let (peer, filters) = from let (peer, filters) = from
.lock() .gossip
.unwrap()
.new_pull_request( .new_pull_request(
thread_pool, thread_pool,
from.keypair.deref(), from.keypair.deref(),
@ -492,9 +493,9 @@ fn network_run_pull(
) )
.ok()?; .ok()?;
let from_pubkey = from.keypair.pubkey(); let from_pubkey = from.keypair.pubkey();
let gossip = from.gossip.lock().unwrap();
let label = CrdsValueLabel::ContactInfo(from_pubkey); let label = CrdsValueLabel::ContactInfo(from_pubkey);
let self_info = gossip.crds.get(&label).unwrap().value.clone(); let gossip_crds = from.gossip.crds.read().unwrap();
let self_info = gossip_crds.get(&label).unwrap().value.clone();
Some((peer.id, filters, self_info)) Some((peer.id, filters, self_info))
}) })
.collect() .collect()
@ -520,8 +521,7 @@ fn network_run_pull(
.get(&to) .get(&to)
.map(|node| { .map(|node| {
let rsp = node let rsp = node
.lock() .gossip
.unwrap()
.generate_pull_responses( .generate_pull_responses(
&filters, &filters,
/*output_size_limit=*/ usize::MAX, /*output_size_limit=*/ usize::MAX,
@ -530,7 +530,7 @@ fn network_run_pull(
.into_iter() .into_iter()
.flatten() .flatten()
.collect(); .collect();
node.lock().unwrap().process_pull_requests( node.gossip.process_pull_requests(
filters.into_iter().map(|(caller, _)| caller), filters.into_iter().map(|(caller, _)| caller),
now, now,
); );
@ -540,12 +540,12 @@ fn network_run_pull(
bytes += serialized_size(&rsp).unwrap() as usize; bytes += serialized_size(&rsp).unwrap() as usize;
msgs += rsp.len(); msgs += rsp.len();
if let Some(node) = network.get(&from) { if let Some(node) = network.get(&from) {
let mut node = node.lock().unwrap(); node.gossip.mark_pull_request_creation_time(from, now);
node.mark_pull_request_creation_time(from, now);
let mut stats = ProcessPullStats::default(); let mut stats = ProcessPullStats::default();
let (vers, vers_expired_timeout, failed_inserts) = let (vers, vers_expired_timeout, failed_inserts) = node
node.filter_pull_responses(&timeouts, rsp, now, &mut stats); .gossip
node.process_pull_responses( .filter_pull_responses(&timeouts, rsp, now, &mut stats);
node.gossip.process_pull_responses(
&from, &from,
vers, vers,
vers_expired_timeout, vers_expired_timeout,
@ -566,7 +566,7 @@ fn network_run_pull(
} }
let total: usize = network_values let total: usize = network_values
.par_iter() .par_iter()
.map(|v| v.lock().unwrap().crds.len()) .map(|v| v.gossip.crds.read().unwrap().len())
.sum(); .sum();
convergance = total as f64 / ((num * num) as f64); convergance = total as f64 / ((num * num) as f64);
if convergance > max_convergance { if convergance > max_convergance {
@ -692,12 +692,14 @@ fn test_star_network_large_push() {
} }
#[test] #[test]
fn test_prune_errors() { fn test_prune_errors() {
let mut crds_gossip = CrdsGossip::default(); let crds_gossip = CrdsGossip::default();
let id = Pubkey::new(&[0; 32]); let id = Pubkey::new(&[0; 32]);
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0); let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
let prune_pubkey = Pubkey::new(&[2; 32]); let prune_pubkey = Pubkey::new(&[2; 32]);
crds_gossip crds_gossip
.crds .crds
.write()
.unwrap()
.insert( .insert(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())), CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
0, 0,

View File

@ -321,11 +321,10 @@ pub fn cluster_info_scale() {
.iter() .iter()
.filter(|v| v.message.account_keys == tx.message.account_keys) .filter(|v| v.message.account_keys == tx.message.account_keys)
.count(); .count();
let gossip = node.gossip.read().unwrap(); num_old += node.gossip.push.num_old.load(Ordering::Relaxed);
num_old += gossip.push.num_old.load(Ordering::Relaxed); num_push_total += node.gossip.push.num_total.load(Ordering::Relaxed);
num_push_total += gossip.push.num_total.load(Ordering::Relaxed); num_pushes += node.gossip.push.num_pushes.load(Ordering::Relaxed);
num_pushes += gossip.push.num_pushes.load(Ordering::Relaxed); num_pulls += node.gossip.pull.num_pulls.load(Ordering::Relaxed);
num_pulls += gossip.pull.num_pulls.load(Ordering::Relaxed);
if has_tx == 0 { if has_tx == 0 {
not_done += 1; not_done += 1;
} }
@ -348,11 +347,10 @@ pub fn cluster_info_scale() {
); );
sleep(Duration::from_millis(200)); sleep(Duration::from_millis(200));
for (node, _, _) in nodes.iter() { for (node, _, _) in nodes.iter() {
let gossip = node.gossip.read().unwrap(); node.gossip.push.num_old.store(0, Ordering::Relaxed);
gossip.push.num_old.store(0, Ordering::Relaxed); node.gossip.push.num_total.store(0, Ordering::Relaxed);
gossip.push.num_total.store(0, Ordering::Relaxed); node.gossip.push.num_pushes.store(0, Ordering::Relaxed);
gossip.push.num_pushes.store(0, Ordering::Relaxed); node.gossip.pull.num_pulls.store(0, Ordering::Relaxed);
gossip.pull.num_pulls.store(0, Ordering::Relaxed);
} }
} }

View File

@ -745,9 +745,9 @@ mod tests {
// This node is ahead of the trusted validators // This node is ahead of the trusted validators
cluster_info cluster_info
.gossip .gossip
.crds
.write() .write()
.unwrap() .unwrap()
.crds
.insert( .insert(
CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new(
trusted_validators[0], trusted_validators[0],
@ -765,9 +765,9 @@ mod tests {
// Node is slightly behind the trusted validators // Node is slightly behind the trusted validators
cluster_info cluster_info
.gossip .gossip
.crds
.write() .write()
.unwrap() .unwrap()
.crds
.insert( .insert(
CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new(
trusted_validators[1], trusted_validators[1],
@ -781,9 +781,9 @@ mod tests {
// Node is far behind the trusted validators // Node is far behind the trusted validators
cluster_info cluster_info
.gossip .gossip
.crds
.write() .write()
.unwrap() .unwrap()
.crds
.insert( .insert(
CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new(
trusted_validators[2], trusted_validators[2],