diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index c19f841b86..89bd077f47 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -296,12 +296,12 @@ mod tests { let cluster_info = ClusterInfo::new_with_invalid_keypair(this_node); { let now = timestamp(); - let mut gossip = cluster_info.gossip.write().unwrap(); + let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); // First node is pushed to crds table by ClusterInfo constructor. for node in nodes.iter().skip(1) { let node = CrdsData::ContactInfo(node.clone()); let node = CrdsValue::new_unsigned(node); - assert_eq!(gossip.crds.insert(node, now), Ok(())); + assert_eq!(gossip_crds.insert(node, now), Ok(())); } } (nodes, stakes, cluster_info) diff --git a/core/src/cluster_slots_service.rs b/core/src/cluster_slots_service.rs index 88da52ca33..71405db18e 100644 --- a/core/src/cluster_slots_service.rs +++ b/core/src/cluster_slots_service.rs @@ -192,8 +192,8 @@ mod test { cluster_info.flush_push_queue(); let lowest = { let label = CrdsValueLabel::LowestSlot(pubkey); - let gossip = cluster_info.gossip.read().unwrap(); - let entry = gossip.crds.get(&label).unwrap(); + let gossip_crds = cluster_info.gossip.crds.read().unwrap(); + let entry = gossip_crds.get(&label).unwrap(); entry.value.lowest_slot().unwrap().clone() }; assert_eq!(lowest.lowest, 5); diff --git a/gossip/benches/crds_gossip_pull.rs b/gossip/benches/crds_gossip_pull.rs index e82e9b5733..3f44ddd72e 100644 --- a/gossip/benches/crds_gossip_pull.rs +++ b/gossip/benches/crds_gossip_pull.rs @@ -12,6 +12,7 @@ use { crds_value::CrdsValue, }, solana_sdk::hash, + std::sync::RwLock, test::Bencher, }; @@ -45,6 +46,7 @@ fn bench_build_crds_filters(bencher: &mut Bencher) { } } assert_eq!(num_inserts, 90_000); + let crds = RwLock::new(crds); bencher.iter(|| { let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); assert_eq!(filters.len(), 128); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 6731266744..b8d4df85a7 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -81,7 +81,7 @@ use { sync::{ atomic::{AtomicBool, Ordering}, mpsc::{Receiver, RecvTimeoutError, Sender}, - {Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}, + {Arc, Mutex, RwLock, RwLockReadGuard}, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -141,7 +141,7 @@ pub enum ClusterInfoError { pub struct ClusterInfo { /// The network - pub gossip: RwLock, + pub gossip: CrdsGossip, /// set the keypair that will be used to sign crds values generated. It is unset only in tests. keypair: RwLock>, /// Network entrypoints @@ -398,9 +398,9 @@ impl ClusterInfo { pub fn new(contact_info: ContactInfo, keypair: Arc) -> Self { let id = contact_info.id; let me = Self { - gossip: RwLock::new(CrdsGossip::default()), + gossip: CrdsGossip::default(), keypair: RwLock::new(keypair), - entrypoints: RwLock::new(vec![]), + entrypoints: RwLock::default(), outbound_budget: DataBudget::default(), my_contact_info: RwLock::new(contact_info), ping_cache: Mutex::new(PingCache::new( @@ -422,11 +422,10 @@ impl ClusterInfo { // Should only be used by tests and simulations pub fn clone_with_id(&self, new_id: &Pubkey) -> Self { - let gossip = self.gossip.read().unwrap().mock_clone(); let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); my_contact_info.id = *new_id; ClusterInfo { - gossip: RwLock::new(gossip), + gossip: self.gossip.mock_clone(), keypair: RwLock::new(self.keypair.read().unwrap().clone()), entrypoints: RwLock::new(self.entrypoints.read().unwrap().clone()), outbound_budget: self.outbound_budget.clone_non_atomic(), @@ -474,18 +473,14 @@ impl ClusterInfo { shred_version, .. } = *self.my_contact_info.read().unwrap(); - self.gossip.write().unwrap().refresh_push_active_set( - &self_pubkey, - shred_version, - stakes, - gossip_validators, - ); + self.gossip + .refresh_push_active_set(&self_pubkey, shred_version, stakes, gossip_validators); } // TODO kill insert_info, only used by tests pub fn insert_info(&self, contact_info: ContactInfo) { let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair()); - let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); + let _ = self.gossip.crds.write().unwrap().insert(value, timestamp()); } pub fn set_entrypoint(&self, entrypoint: ContactInfo) { @@ -498,7 +493,6 @@ impl ClusterInfo { pub fn save_contact_info(&self) { let nodes = { - let gossip = self.gossip.read().unwrap(); let entrypoint_gossip_addrs = self .entrypoints .read() @@ -507,8 +501,8 @@ impl ClusterInfo { .map(|contact_info| contact_info.gossip) .collect::>(); let self_pubkey = self.id(); - gossip - .crds + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds .get_nodes() .filter_map(|v| { // Don't save: @@ -599,9 +593,9 @@ impl ClusterInfo { filename.display() ); let now = timestamp(); - let mut gossip = self.gossip.write().unwrap(); + let mut gossip_crds = self.gossip.crds.write().unwrap(); for node in nodes { - if let Err(err) = gossip.crds.insert(node, now) { + if let Err(err) = gossip_crds.insert(node, now) { warn!("crds insert failed {:?}", err); } } @@ -632,23 +626,17 @@ impl ClusterInfo { where F: FnOnce(&ContactInfo) -> Y, { - let label = CrdsValueLabel::ContactInfo(*id); - let gossip = self.gossip.read().unwrap(); - let entry = gossip.crds.get(&label)?; - Some(map(entry.value.contact_info()?)) + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds.get_contact_info(*id).map(map) } pub fn lookup_contact_info_by_gossip_addr( &self, gossip_addr: &SocketAddr, ) -> Option { - self.gossip - .read() - .unwrap() - .crds - .get_nodes_contact_info() - .find(|peer| peer.gossip == *gossip_addr) - .cloned() + let gossip_crds = self.gossip.crds.read().unwrap(); + let mut nodes = gossip_crds.get_nodes_contact_info(); + nodes.find(|node| node.gossip == *gossip_addr).cloned() } pub fn my_contact_info(&self) -> ContactInfo { @@ -662,9 +650,9 @@ impl ClusterInfo { fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots { let self_pubkey = self.id(); let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); - let gossip = self.gossip.read().unwrap(); - let entry = gossip.crds.get(&label); - entry + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds + .get(&label) .and_then(|v| v.value.epoch_slots()) .cloned() .unwrap_or_else(|| EpochSlots::new(self_pubkey, timestamp())) @@ -821,12 +809,12 @@ impl ClusterInfo { ) } + // TODO: This has a race condition if called from more than one thread. pub fn push_lowest_slot(&self, min: Slot) { let self_pubkey = self.id(); let last = { - let gossip = self.gossip.read().unwrap(); - gossip - .crds + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds .get_lowest_slot(self_pubkey) .map(|x| x.lowest) .unwrap_or_default() @@ -849,12 +837,12 @@ impl ClusterInfo { pub fn push_epoch_slots(&self, mut update: &[Slot]) { let self_pubkey = self.id(); let current_slots: Vec<_> = { - let gossip = + let gossip_crds = self.time_gossip_read_lock("lookup_epoch_slots", &self.stats.epoch_slots_lookup); (0..crds_value::MAX_EPOCH_SLOTS) .filter_map(|ix| { let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); - let epoch_slots = gossip.crds.get(&label)?.value.epoch_slots()?; + let epoch_slots = gossip_crds.get(&label)?.value.epoch_slots()?; let first_slot = epoch_slots.first_slot()?; Some((epoch_slots.wallclock, first_slot, ix)) }) @@ -903,10 +891,10 @@ impl ClusterInfo { epoch_slot_index += 1; reset = true; } - let mut gossip = self.gossip.write().unwrap(); + let mut gossip_crds = self.gossip.crds.write().unwrap(); let now = timestamp(); for entry in entries { - if let Err(err) = gossip.crds.insert(entry, now) { + if let Err(err) = gossip_crds.insert(entry, now) { error!("push_epoch_slots failed: {:?}", err); } } @@ -916,16 +904,8 @@ impl ClusterInfo { &'a self, label: &'static str, counter: &'a Counter, - ) -> TimedGuard<'a, RwLockReadGuard> { - TimedGuard::new(self.gossip.read().unwrap(), label, counter) - } - - fn time_gossip_write_lock<'a>( - &'a self, - label: &'static str, - counter: &'a Counter, - ) -> TimedGuard<'a, RwLockWriteGuard> { - TimedGuard::new(self.gossip.write().unwrap(), label, counter) + ) -> TimedGuard<'a, RwLockReadGuard> { + TimedGuard::new(self.gossip.crds.read().unwrap(), label, counter) } pub fn push_message(&self, message: CrdsValue) { @@ -968,8 +948,8 @@ impl ClusterInfo { let vote = Vote::new(self_pubkey, vote, now); let vote = CrdsData::Vote(vote_index, vote); let vote = CrdsValue::new_signed(vote, &self.keypair()); - let mut gossip = self.gossip.write().unwrap(); - if let Err(err) = gossip.crds.insert(vote, now) { + let mut gossip_crds = self.gossip.crds.write().unwrap(); + if let Err(err) = gossip_crds.insert(vote, now) { error!("push_vote failed: {:?}", err); } } @@ -999,12 +979,12 @@ impl ClusterInfo { } }; let vote_index = { - let gossip = + let gossip_crds = self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read); (0..MAX_LOCKOUT_HISTORY as u8) .filter_map(|ix| { let vote = CrdsValueLabel::Vote(ix, self_pubkey); - let vote = gossip.crds.get(&vote)?; + let vote = gossip_crds.get(&vote)?; num_crds_votes += 1; match &vote.value.data { CrdsData::Vote(_, vote) if should_evict_vote(vote) => { @@ -1024,11 +1004,11 @@ impl ClusterInfo { pub fn refresh_vote(&self, vote: Transaction, vote_slot: Slot) { let vote_index = { let self_pubkey = self.id(); - let gossip = + let gossip_crds = self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read); (0..MAX_LOCKOUT_HISTORY as u8).find(|ix| { let vote = CrdsValueLabel::Vote(*ix, self_pubkey); - if let Some(vote) = gossip.crds.get(&vote) { + if let Some(vote) = gossip_crds.get(&vote) { match &vote.value.data { CrdsData::Vote(_, prev_vote) => match prev_vote.slot() { Some(prev_vote_slot) => prev_vote_slot == vote_slot, @@ -1070,7 +1050,6 @@ impl ClusterInfo { pub fn get_votes(&self, cursor: &mut Cursor) -> (Vec, Vec) { let (labels, txs): (_, Vec<_>) = self .time_gossip_read_lock("get_votes", &self.stats.get_votes) - .crds .get_votes(cursor) .map(|vote| { let transaction = match &vote.value.data { @@ -1089,7 +1068,7 @@ impl ClusterInfo { shred: &Shred, other_payload: &[u8], ) -> Result<(), GossipError> { - self.gossip.write().unwrap().push_duplicate_shred( + self.gossip.push_duplicate_shred( &self.keypair(), shred, other_payload, @@ -1104,7 +1083,6 @@ impl ClusterInfo { F: FnOnce(&Vec<(Slot, Hash)>) -> Y, { self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash) - .crds .get(&CrdsValueLabel::AccountsHashes(*pubkey)) .map(|x| &x.value.accounts_hash().unwrap().hashes) .map(map) @@ -1114,10 +1092,8 @@ impl ClusterInfo { where F: FnOnce(&Vec<(Slot, Hash)>) -> Y, { - self.gossip - .read() - .unwrap() - .crds + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds .get(&CrdsValueLabel::SnapshotHashes(*pubkey)) .map(|x| &x.value.snapshot_hash().unwrap().hashes) .map(map) @@ -1127,11 +1103,12 @@ impl ClusterInfo { /// Excludes entries from nodes with unkown or different shred version. pub fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec { let self_shred_version = Some(self.my_shred_version()); - let gossip = self.gossip.read().unwrap(); - let entries = gossip.crds.get_epoch_slots(cursor); - entries + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds + .get_epoch_slots(cursor) .filter(|entry| { - gossip.crds.get_shred_version(&entry.value.pubkey()) == self_shred_version + let origin = entry.value.pubkey(); + gossip_crds.get_shred_version(&origin) == self_shred_version }) .map(|entry| match &entry.value.data { CrdsData::EpochSlots(_, slots) => slots.clone(), @@ -1141,12 +1118,12 @@ impl ClusterInfo { } pub fn get_node_version(&self, pubkey: &Pubkey) -> Option { - let gossip = self.gossip.read().unwrap(); - let version = gossip.crds.get(&CrdsValueLabel::Version(*pubkey)); + let gossip_crds = self.gossip.crds.read().unwrap(); + let version = gossip_crds.get(&CrdsValueLabel::Version(*pubkey)); if let Some(version) = version.and_then(|v| v.value.version()) { return Some(version.version.clone()); } - let version = gossip.crds.get(&CrdsValueLabel::LegacyVersion(*pubkey))?; + let version = gossip_crds.get(&CrdsValueLabel::LegacyVersion(*pubkey))?; let version = version.value.legacy_version()?; Some(version.version.clone().into()) } @@ -1154,10 +1131,8 @@ impl ClusterInfo { /// all validators that have a valid rpc port regardless of `shred_version`. pub fn all_rpc_peers(&self) -> Vec { let self_pubkey = self.id(); - self.gossip - .read() - .unwrap() - .crds + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds .get_nodes_contact_info() .filter(|x| x.id != self_pubkey && ContactInfo::is_valid_address(&x.rpc)) .cloned() @@ -1166,10 +1141,8 @@ impl ClusterInfo { // All nodes in gossip (including spy nodes) and the last time we heard about them pub fn all_peers(&self) -> Vec<(ContactInfo, u64)> { - self.gossip - .read() - .unwrap() - .crds + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds .get_nodes() .map(|x| (x.value.contact_info().unwrap().clone(), x.local_timestamp)) .collect() @@ -1177,10 +1150,8 @@ impl ClusterInfo { pub fn gossip_peers(&self) -> Vec { let me = self.id(); - self.gossip - .read() - .unwrap() - .crds + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds .get_nodes_contact_info() // shred_version not considered for gossip peers (ie, spy nodes do not set shred_version) .filter(|x| x.id != me && ContactInfo::is_valid_address(&x.gossip)) @@ -1192,7 +1163,6 @@ impl ClusterInfo { pub fn all_tvu_peers(&self) -> Vec { let self_pubkey = self.id(); self.time_gossip_read_lock("all_tvu_peers", &self.stats.all_tvu_peers) - .crds .get_nodes_contact_info() .filter(|x| ContactInfo::is_valid_address(&x.tvu) && x.id != self_pubkey) .cloned() @@ -1204,7 +1174,6 @@ impl ClusterInfo { let self_pubkey = self.id(); let self_shred_version = self.my_shred_version(); self.time_gossip_read_lock("tvu_peers", &self.stats.tvu_peers) - .crds .get_nodes_contact_info() .filter(|node| { node.id != self_pubkey @@ -1223,12 +1192,12 @@ impl ClusterInfo { // node.shred_verion == self.my_shred_version() let nodes = self.tvu_peers(); let nodes = { - let gossip = self.gossip.read().unwrap(); + let gossip_crds = self.gossip.crds.read().unwrap(); nodes .into_iter() .filter(|node| { ContactInfo::is_valid_address(&node.serve_repair) - && match gossip.crds.get_lowest_slot(node.id) { + && match gossip_crds.get_lowest_slot(node.id) { None => true, // fallback to legacy behavior Some(lowest_slot) => lowest_slot.lowest <= slot, } @@ -1248,10 +1217,8 @@ impl ClusterInfo { /// compute broadcast table pub fn tpu_peers(&self) -> Vec { let self_pubkey = self.id(); - self.gossip - .read() - .unwrap() - .crds + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds .get_nodes_contact_info() .filter(|x| x.id != self_pubkey && ContactInfo::is_valid_address(&x.tpu)) .cloned() @@ -1301,7 +1268,7 @@ impl ClusterInfo { CrdsData::ContactInfo(self.my_contact_info()), &self.keypair(), ); - let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); + let _ = self.gossip.crds.write().unwrap().insert(value, timestamp()); } // If the network entrypoint hasn't been discovered yet, add it to the crds table @@ -1325,7 +1292,6 @@ impl ClusterInfo { entrypoint.wallclock = now; if self .time_gossip_read_lock("entrypoint", &self.stats.entrypoint) - .crds .get_nodes_contact_info() .any(|node| node.gossip == entrypoint.gossip) { @@ -1337,10 +1303,10 @@ impl ClusterInfo { let filters = match pulls.first() { Some((_, filters)) => filters.clone(), None => { - let gossip = self.time_gossip_read_lock("entrypoint", &self.stats.entrypoint2); - gossip + let _st = ScopedTimer::from(&self.stats.entrypoint2); + self.gossip .pull - .build_crds_filters(thread_pool, &gossip.crds, MAX_BLOOM_SIZE) + .build_crds_filters(thread_pool, &self.gossip.crds, MAX_BLOOM_SIZE) } }; self.stats.pull_from_entrypoint_count.add_relaxed(1); @@ -1410,8 +1376,8 @@ impl ClusterInfo { let now = timestamp(); let mut pings = Vec::new(); let mut pulls: Vec<_> = { - let gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); - match gossip.new_pull_request( + let _st = ScopedTimer::from(&self.stats.new_pull_requests); + match self.gossip.new_pull_request( thread_pool, self.keypair().deref(), self.my_shred_version(), @@ -1430,9 +1396,9 @@ impl ClusterInfo { let num_requests = pulls.iter().map(|(_, filters)| filters.len() as u64).sum(); self.stats.new_pull_requests_count.add_relaxed(num_requests); { - let gossip = self.time_gossip_read_lock("mark_pull", &self.stats.mark_pull_request); + let _st = ScopedTimer::from(&self.stats.mark_pull_request); for (peer, _) in &pulls { - gossip.mark_pull_request_creation_time(peer.id, now); + self.gossip.mark_pull_request_creation_time(peer.id, now); } } let self_info = CrdsData::ContactInfo(self.my_contact_info()); @@ -1457,10 +1423,10 @@ impl ClusterInfo { // Used in tests pub fn flush_push_queue(&self) { let pending_push_messages = self.drain_push_queue(); - let mut gossip = self.gossip.write().unwrap(); + let mut gossip_crds = self.gossip.crds.write().unwrap(); let now = timestamp(); for entry in pending_push_messages { - let _ = gossip.crds.insert(entry, now); + let _ = gossip_crds.insert(entry, now); } } fn new_push_requests( @@ -1469,9 +1435,11 @@ impl ClusterInfo { require_stake_for_gossip: bool, ) -> Vec<(SocketAddr, Protocol)> { let self_id = self.id(); - let mut push_messages = self - .time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests) - .new_push_messages(self.drain_push_queue(), timestamp()); + let mut push_messages = { + let _st = ScopedTimer::from(&self.stats.new_push_requests); + self.gossip + .new_push_messages(self.drain_push_queue(), timestamp()) + }; if require_stake_for_gossip { push_messages.retain(|_, data| { retain_staked(data, stakes); @@ -1479,12 +1447,12 @@ impl ClusterInfo { }) } let push_messages: Vec<_> = { - let gossip = + let gossip_crds = self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2); push_messages .into_iter() .filter_map(|(pubkey, messages)| { - let peer = gossip.crds.get_contact_info(pubkey)?; + let peer = gossip_crds.get_contact_info(pubkey)?; Some((peer.gossip, messages)) }) .collect() @@ -1607,20 +1575,21 @@ impl ClusterInfo { ) { let self_pubkey = self.id(); let epoch_duration = get_epoch_duration(bank_forks); - let timeouts = { - let gossip = self.gossip.read().unwrap(); - gossip.make_timeouts(self_pubkey, stakes, epoch_duration) + let timeouts = self + .gossip + .make_timeouts(self_pubkey, stakes, epoch_duration); + let num_purged = { + let _st = ScopedTimer::from(&self.stats.purge); + self.gossip + .purge(&self_pubkey, thread_pool, timestamp(), &timeouts) }; - let num_purged = self - .time_gossip_write_lock("purge", &self.stats.purge) - .purge(&self_pubkey, thread_pool, timestamp(), &timeouts); inc_new_counter_info!("cluster_info-purge-count", num_purged); } // Trims the CRDS table by dropping all values associated with the pubkeys // with the lowest stake, so that the number of unique pubkeys are bounded. fn trim_crds_table(&self, cap: usize, stakes: &HashMap) { - if !self.gossip.read().unwrap().crds.should_trim(cap) { + if !self.gossip.crds.read().unwrap().should_trim(cap) { return; } let keep: Vec<_> = self @@ -1632,8 +1601,8 @@ impl ClusterInfo { .chain(std::iter::once(self.id())) .collect(); self.stats.trim_crds_table.add_relaxed(1); - let mut gossip = self.gossip.write().unwrap(); - match gossip.crds.trim(cap, &keep, stakes, timestamp()) { + let mut gossip_crds = self.gossip.crds.write().unwrap(); + match gossip_crds.trim(cap, &keep, stakes, timestamp()) { Err(err) => { self.stats.trim_crds_table_failed.add_relaxed(1); // TODO: Stakes are comming from the root-bank. Debug why/when @@ -1763,10 +1732,10 @@ impl ClusterInfo { let mut bad_prune_destination = 0; let self_pubkey = self.id(); { - let gossip = self.time_gossip_read_lock("process_prune", &self.stats.process_prune); + let _st = ScopedTimer::from(&self.stats.process_prune); let now = timestamp(); for (from, data) in messages { - match gossip.process_prune_msg( + match self.gossip.process_prune_msg( &self_pubkey, &from, &data.destination, @@ -1910,8 +1879,11 @@ impl ClusterInfo { const DEFAULT_EPOCH_DURATION_MS: u64 = DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT; let mut time = Measure::start("handle_pull_requests"); let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); - self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) - .process_pull_requests(callers.cloned(), timestamp()); + { + let _st = ScopedTimer::from(&self.stats.process_pull_requests); + self.gossip + .process_pull_requests(callers.cloned(), timestamp()); + } let output_size_limit = self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; let mut packets = @@ -1928,13 +1900,11 @@ impl ClusterInfo { }; let now = timestamp(); let self_id = self.id(); - - let mut pull_responses = self - .time_gossip_read_lock( - "generate_pull_responses", - &self.stats.generate_pull_responses, - ) - .generate_pull_responses(&caller_and_filters, output_size_limit, now); + let mut pull_responses = { + let _st = ScopedTimer::from(&self.stats.generate_pull_responses); + self.gossip + .generate_pull_responses(&caller_and_filters, output_size_limit, now) + }; if require_stake_for_gossip { for resp in &mut pull_responses { retain_staked(resp, stakes); @@ -2055,10 +2025,9 @@ impl ClusterInfo { }); if !responses.is_empty() { let self_pubkey = self.id(); - let timeouts = { - let gossip = self.gossip.read().unwrap(); - gossip.make_timeouts(self_pubkey, stakes, epoch_duration) - }; + let timeouts = self + .gossip + .make_timeouts(self_pubkey, stakes, epoch_duration); for (from, data) in responses { self.handle_pull_response(&from, data, &timeouts); } @@ -2075,23 +2044,24 @@ impl ClusterInfo { let len = crds_values.len(); trace!("PullResponse me: {} from: {} len={}", self.id(), from, len); let mut pull_stats = ProcessPullStats::default(); - let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = self - .time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response) - .filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats); - + let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = { + let _st = ScopedTimer::from(&self.stats.filter_pull_response); + self.gossip + .filter_pull_responses(timeouts, crds_values, timestamp(), &mut pull_stats) + }; if !filtered_pulls.is_empty() || !filtered_pulls_expired_timeout.is_empty() || !failed_inserts.is_empty() { - self.time_gossip_write_lock("process_pull_resp", &self.stats.process_pull_response) - .process_pull_responses( - from, - filtered_pulls, - filtered_pulls_expired_timeout, - failed_inserts, - timestamp(), - &mut pull_stats, - ); + let _st = ScopedTimer::from(&self.stats.process_pull_response); + self.gossip.process_pull_responses( + from, + filtered_pulls, + filtered_pulls_expired_timeout, + failed_inserts, + timestamp(), + &mut pull_stats, + ); } self.stats.process_pull_response_count.add_relaxed(1); self.stats.process_pull_response_len.add_relaxed(len as u64); @@ -2194,21 +2164,22 @@ impl ClusterInfo { .add_relaxed(num_crds_values); // Origins' pubkeys of upserted crds values. let origins: HashSet<_> = { - let mut gossip = - self.time_gossip_write_lock("process_push", &self.stats.process_push_message); + let _st = ScopedTimer::from(&self.stats.process_push_message); let now = timestamp(); messages .into_iter() .flat_map(|(from, crds_values)| { - gossip.process_push_message(&from, crds_values, now) + self.gossip.process_push_message(&from, crds_values, now) }) .collect() }; // Generate prune messages. let self_pubkey = self.id(); - let prunes = self - .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) - .prune_received_cache(&self_pubkey, origins, stakes); + let prunes = { + let _st = ScopedTimer::from(&self.stats.prune_received_cache); + self.gossip + .prune_received_cache(&self_pubkey, origins, stakes) + }; let prunes: Vec<(Pubkey /*from*/, Vec /*origins*/)> = prunes .into_iter() .flat_map(|(from, prunes)| { @@ -2224,7 +2195,7 @@ impl ClusterInfo { .collect(); let prune_messages: Vec<_> = { - let gossip = self.gossip.read().unwrap(); + let gossip_crds = self.gossip.crds.read().unwrap(); let wallclock = timestamp(); let self_pubkey = self.id(); thread_pool.install(|| { @@ -2232,7 +2203,7 @@ impl ClusterInfo { .into_par_iter() .with_min_len(256) .filter_map(|(from, prunes)| { - let peer = gossip.crds.get_contact_info(from)?; + let peer = gossip_crds.get_contact_info(from)?; let mut prune_data = PruneData { pubkey: self_pubkey, prunes, @@ -2320,7 +2291,7 @@ impl ClusterInfo { let packets = if self_shred_version == 0 { packets } else { - let gossip = self.gossip.read().unwrap(); + let gossip_crds = self.gossip.crds.read().unwrap(); thread_pool.install(|| { packets .into_par_iter() @@ -2329,7 +2300,7 @@ impl ClusterInfo { let msg = filter_on_shred_version( msg, self_shred_version, - &gossip.crds, + &gossip_crds, &self.stats, )?; Some((from, msg)) @@ -2574,7 +2545,7 @@ impl ClusterInfo { match err { GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, GossipError::RecvTimeoutError(RecvTimeoutError::Timeout) => { - let table_size = self.gossip.read().unwrap().crds.len(); + let table_size = self.gossip.crds.read().unwrap().len(); debug!( "{}: run_listen timeout, table size: {}", self.id(), @@ -3293,15 +3264,12 @@ mod tests { let (spy, _, _) = ClusterInfo::spy_node(solana_sdk::pubkey::new_rand(), 0); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info)); cluster_info.insert_info(spy); - { - let gossip = cluster_info.gossip.read().unwrap(); - gossip.refresh_push_active_set( - &cluster_info.id(), - cluster_info.my_shred_version(), - &HashMap::new(), // stakes - None, // gossip validators - ); - } + cluster_info.gossip.refresh_push_active_set( + &cluster_info.id(), + cluster_info.my_shred_version(), + &HashMap::new(), // stakes + None, // gossip validators + ); let reqs = cluster_info.generate_new_gossip_requests( &thread_pool, None, // gossip_validators @@ -3331,8 +3299,8 @@ mod tests { let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); let label = CrdsValueLabel::ContactInfo(d.id); cluster_info.insert_info(d); - let gossip = cluster_info.gossip.read().unwrap(); - assert!(gossip.crds.get(&label).is_some()); + let gossip_crds = cluster_info.gossip.crds.read().unwrap(); + assert!(gossip_crds.get(&label).is_some()); } fn assert_in_range(x: u16, range: (u16, u16)) { @@ -3413,20 +3381,15 @@ mod tests { .unwrap() .mock_pong(peer.id, peer.gossip, Instant::now()); cluster_info.insert_info(peer); - { - let gossip = cluster_info.gossip.read().unwrap(); - gossip.refresh_push_active_set( - &cluster_info.id(), - cluster_info.my_shred_version(), - &HashMap::new(), // stakes - None, // gossip validators - ); - } + cluster_info.gossip.refresh_push_active_set( + &cluster_info.id(), + cluster_info.my_shred_version(), + &HashMap::new(), // stakes + None, // gossip validators + ); //check that all types of gossip messages are signed correctly let push_messages = cluster_info .gossip - .write() - .unwrap() .new_push_messages(cluster_info.drain_push_queue(), timestamp()); // there should be some pushes ready assert!(!push_messages.is_empty()); @@ -3437,8 +3400,6 @@ mod tests { let mut pings = Vec::new(); cluster_info .gossip - .write() - .unwrap() .new_pull_request( &thread_pool, cluster_info.keypair().deref(), @@ -3601,10 +3562,10 @@ mod tests { fn test_push_votes_with_tower() { let get_vote_slots = |cluster_info: &ClusterInfo| -> Vec { let (labels, _) = cluster_info.get_votes(&mut Cursor::default()); - let gossip = cluster_info.gossip.read().unwrap(); + let gossip_crds = cluster_info.gossip.crds.read().unwrap(); let mut vote_slots = HashSet::new(); for label in labels { - match &gossip.crds.get(&label).unwrap().value.data { + match &gossip_crds.get(&label).unwrap().value.data { CrdsData::Vote(_, vote) => { assert!(vote_slots.insert(vote.slot().unwrap())); } @@ -3685,9 +3646,9 @@ mod tests { CrdsValue::new_unsigned(CrdsData::EpochSlots(0, epoch_slots)), ]; { - let mut gossip = cluster_info.gossip.write().unwrap(); + let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); for entry in entries { - assert!(gossip.crds.insert(entry, /*now=*/ 0).is_ok()); + assert!(gossip_crds.insert(entry, /*now=*/ 0).is_ok()); } } // Should exclude other node's epoch-slot because of different @@ -3740,14 +3701,11 @@ mod tests { let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone())); let cluster_info = Arc::new(cluster_info); - let timeouts = { - let gossip = cluster_info.gossip.read().unwrap(); - gossip.make_timeouts( - cluster_info.id(), - &HashMap::default(), // stakes, - Duration::from_millis(gossip.pull.crds_timeout), - ) - }; + let timeouts = cluster_info.gossip.make_timeouts( + cluster_info.id(), + &HashMap::default(), // stakes, + Duration::from_millis(cluster_info.gossip.pull.crds_timeout), + ); ClusterInfo::handle_pull_response( &cluster_info, &entrypoint_pubkey, @@ -3979,12 +3937,8 @@ mod tests { 0, LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()), )); - let _ = cluster_info - .gossip - .write() - .unwrap() - .crds - .insert(value, timestamp()); + let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); + let _ = gossip_crds.insert(value, timestamp()); } // only half the visible peers should be eligible to serve this repair assert_eq!(cluster_info.repair_peers(5).len(), 5); @@ -4375,13 +4329,10 @@ mod tests { for peer in peers { cluster_info .gossip - .write() - .unwrap() .mark_pull_request_creation_time(peer, now); } - let gossip = cluster_info.gossip.read().unwrap(); assert_eq!( - gossip.pull.pull_request_time().len(), + cluster_info.gossip.pull.pull_request_time().len(), CRDS_UNIQUE_PUBKEY_CAPACITY ); } diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index c3afb7a194..645b092e51 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -5,10 +5,7 @@ use { std::{ collections::HashMap, ops::{Deref, DerefMut}, - sync::{ - atomic::{AtomicU64, Ordering}, - RwLock, - }, + sync::atomic::{AtomicU64, Ordering}, time::Instant, }, }; @@ -160,15 +157,15 @@ pub(crate) struct GossipStats { pub(crate) fn submit_gossip_stats( stats: &GossipStats, - gossip: &RwLock, + gossip: &CrdsGossip, stakes: &HashMap, ) { let (table_size, num_nodes, purged_values_size, failed_inserts_size) = { - let gossip = gossip.read().unwrap(); + let gossip_crds = gossip.crds.read().unwrap(); ( - gossip.crds.len(), - gossip.crds.num_nodes(), - gossip.crds.num_purged(), + gossip_crds.len(), + gossip_crds.num_nodes(), + gossip_crds.num_purged(), gossip.pull.failed_inserts_size(), ) }; diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 95bdc91a81..8a7b20a841 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -26,14 +26,14 @@ use { std::{ collections::{HashMap, HashSet}, net::SocketAddr, - sync::Mutex, + sync::{Mutex, RwLock}, time::Duration, }, }; #[derive(Default)] pub struct CrdsGossip { - pub crds: Crds, + pub crds: RwLock, pub push: CrdsGossipPush, pub pull: CrdsGossipPull, } @@ -42,13 +42,13 @@ impl CrdsGossip { /// process a push message to the network /// Returns unique origins' pubkeys of upserted values. pub fn process_push_message( - &mut self, + &self, from: &Pubkey, values: Vec, now: u64, ) -> HashSet { self.push - .process_push_message(&mut self.crds, from, values, now) + .process_push_message(&self.crds, from, values, now) .into_iter() .filter_map(Result::ok) .collect() @@ -69,18 +69,21 @@ impl CrdsGossip { } pub fn new_push_messages( - &mut self, + &self, pending_push_messages: Vec, now: u64, ) -> HashMap> { - for entry in pending_push_messages { - let _ = self.crds.insert(entry, now); + { + let mut crds = self.crds.write().unwrap(); + for entry in pending_push_messages { + let _ = crds.insert(entry, now); + } } self.push.new_push_messages(&self.crds, now) } pub(crate) fn push_duplicate_shred( - &mut self, + &self, keypair: &Keypair, shred: &Shred, other_payload: &[u8], @@ -91,8 +94,8 @@ impl CrdsGossip { let pubkey = keypair.pubkey(); // Skip if there are already records of duplicate shreds for this slot. let shred_slot = shred.slot(); - if self - .crds + let mut crds = self.crds.write().unwrap(); + if crds .get_records(&pubkey) .any(|value| match &value.value.data { CrdsData::DuplicateShred(_, value) => value.slot == shred_slot, @@ -111,8 +114,7 @@ impl CrdsGossip { )?; // Find the index of oldest duplicate shred. let mut num_dup_shreds = 0; - let offset = self - .crds + let offset = crds .get_records(&pubkey) .filter_map(|value| match &value.value.data { CrdsData::DuplicateShred(ix, value) => { @@ -136,7 +138,7 @@ impl CrdsGossip { }); let now = timestamp(); for entry in entries { - if let Err(err) = self.crds.insert(entry, now) { + if let Err(err) = crds.insert(entry, now) { error!("push_duplicate_shred faild: {:?}", err); } } @@ -174,13 +176,14 @@ impl CrdsGossip { stakes: &HashMap, gossip_validators: Option<&HashSet>, ) { + let network_size = self.crds.read().unwrap().num_nodes(); self.push.refresh_push_active_set( &self.crds, stakes, gossip_validators, self_pubkey, self_shred_version, - self.crds.num_nodes(), + network_size, CRDS_GOSSIP_NUM_ACTIVE, ) } @@ -221,11 +224,11 @@ impl CrdsGossip { self.pull.mark_pull_request_creation_time(from, now) } /// process a pull request and create a response - pub fn process_pull_requests(&mut self, callers: I, now: u64) + pub fn process_pull_requests(&self, callers: I, now: u64) where I: IntoIterator, { - CrdsGossipPull::process_pull_requests(&mut self.crds, callers, now); + CrdsGossipPull::process_pull_requests(&self.crds, callers, now); } pub fn generate_pull_responses( @@ -254,7 +257,7 @@ impl CrdsGossip { /// process a pull response pub fn process_pull_responses( - &mut self, + &self, from: &Pubkey, responses: Vec, responses_expired_timeout: Vec, @@ -263,7 +266,7 @@ impl CrdsGossip { process_pull_stats: &mut ProcessPullStats, ) { self.pull.process_pull_responses( - &mut self.crds, + &self.crds, from, responses, responses_expired_timeout, @@ -283,7 +286,7 @@ impl CrdsGossip { } pub fn purge( - &mut self, + &self, self_pubkey: &Pubkey, thread_pool: &ThreadPool, now: u64, @@ -298,9 +301,11 @@ impl CrdsGossip { //sanity check assert_eq!(timeouts[self_pubkey], std::u64::MAX); assert!(timeouts.contains_key(&Pubkey::default())); - rv = CrdsGossipPull::purge_active(thread_pool, &mut self.crds, now, timeouts); + rv = CrdsGossipPull::purge_active(thread_pool, &self.crds, now, timeouts); } self.crds + .write() + .unwrap() .trim_purged(now.saturating_sub(5 * self.pull.crds_timeout)); self.pull.purge_failed_inserts(now); rv @@ -308,8 +313,9 @@ impl CrdsGossip { // Only for tests and simulations. pub(crate) fn mock_clone(&self) -> Self { + let crds = self.crds.read().unwrap().clone(); Self { - crds: self.crds.clone(), + crds: RwLock::new(crds), push: self.push.mock_clone(), pull: self.pull.mock_clone(), } @@ -343,12 +349,14 @@ mod test { #[test] fn test_prune_errors() { - let mut crds_gossip = CrdsGossip::default(); + let crds_gossip = CrdsGossip::default(); let id = Pubkey::new(&[0; 32]); let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0); let prune_pubkey = Pubkey::new(&[2; 32]); crds_gossip .crds + .write() + .unwrap() .insert( CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())), 0, diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index c0a19653fa..89ca3b3005 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -215,7 +215,7 @@ impl CrdsGossipPull { pub(crate) fn new_pull_request( &self, thread_pool: &ThreadPool, - crds: &Crds, + crds: &RwLock, self_keypair: &Keypair, self_shred_version: u16, now: u64, @@ -225,8 +225,8 @@ impl CrdsGossipPull { ping_cache: &Mutex, pings: &mut Vec<(SocketAddr, Ping)>, ) -> Result<(ContactInfo, Vec), CrdsGossipError> { - let (weights, peers): (Vec<_>, Vec<_>) = self - .pull_options( + let (weights, peers): (Vec<_>, Vec<_>) = { + self.pull_options( crds, &self_keypair.pubkey(), self_shred_version, @@ -235,7 +235,9 @@ impl CrdsGossipPull { stakes, ) .into_iter() - .unzip(); + .map(|(weight, node, gossip_addr)| (weight, (node, gossip_addr))) + .unzip() + }; if peers.is_empty() { return Err(CrdsGossipError::NoPeers); } @@ -248,36 +250,45 @@ impl CrdsGossipPull { let mut ping_cache = ping_cache.lock().unwrap(); let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok(); let now = Instant::now(); - peers.find(|peer| { - let node = (peer.id, peer.gossip); - let (check, ping) = ping_cache.check(now, node, &mut pingf); + peers.find(|node| { + let (_, gossip_addr) = *node; + let (check, ping) = ping_cache.check(now, *node, &mut pingf); if let Some(ping) = ping { - pings.push((peer.gossip, ping)); + pings.push((gossip_addr, ping)); } check }) }; - match peer { - None => Err(CrdsGossipError::NoPeers), - Some(peer) => { - let filters = self.build_crds_filters(thread_pool, crds, bloom_size); - Ok((peer.clone(), filters)) - } - } + let peer = match peer { + None => return Err(CrdsGossipError::NoPeers), + Some((node, _gossip_addr)) => node, + }; + let filters = self.build_crds_filters(thread_pool, crds, bloom_size); + let peer = match crds.read().unwrap().get_contact_info(peer) { + None => return Err(CrdsGossipError::NoPeers), + Some(node) => node.clone(), + }; + Ok((peer, filters)) } - fn pull_options<'a>( + fn pull_options( &self, - crds: &'a Crds, + crds: &RwLock, self_id: &Pubkey, self_shred_version: u16, now: u64, gossip_validators: Option<&HashSet>, stakes: &HashMap, - ) -> Vec<(u64, &'a ContactInfo)> { + ) -> Vec<( + u64, // weight + Pubkey, // node + SocketAddr, // gossip address + )> { let mut rng = rand::thread_rng(); let active_cutoff = now.saturating_sub(PULL_ACTIVE_TIMEOUT_MS); let pull_request_time = self.pull_request_time.read().unwrap(); + // crds should be locked last after self.pull_request_time. + let crds = crds.read().unwrap(); crds.get_nodes() .filter_map(|value| { let info = value.value.contact_info().unwrap(); @@ -310,7 +321,7 @@ impl CrdsGossipPull { let weight = get_weight(max_weight, since, stake); // Weights are bounded by max_weight defined above. // So this type-cast should be safe. - ((weight * 100.0) as u64, item) + ((weight * 100.0) as u64, item.id, item.gossip) }) .collect() } @@ -324,10 +335,11 @@ impl CrdsGossipPull { } /// process a pull request - pub(crate) fn process_pull_requests(crds: &mut Crds, callers: I, now: u64) + pub(crate) fn process_pull_requests(crds: &RwLock, callers: I, now: u64) where I: IntoIterator, { + let mut crds = crds.write().unwrap(); for caller in callers { let key = caller.pubkey(); let _ = crds.insert(caller, now); @@ -337,7 +349,7 @@ impl CrdsGossipPull { /// Create gossip responses to pull requests pub(crate) fn generate_pull_responses( - crds: &Crds, + crds: &RwLock, requests: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, @@ -353,7 +365,7 @@ impl CrdsGossipPull { // .2 => hash value of outdated values which will fail to insert. pub(crate) fn filter_pull_responses( &self, - crds: &Crds, + crds: &RwLock, timeouts: &HashMap, responses: Vec, now: u64, @@ -365,6 +377,7 @@ impl CrdsGossipPull { .get(&Pubkey::default()) .copied() .unwrap_or(self.msg_timeout); + let crds = crds.read().unwrap(); let upsert = |response: CrdsValue| { let owner = response.label().pubkey(); // Check if the crds value is older than the msg_timeout @@ -399,7 +412,7 @@ impl CrdsGossipPull { /// process a vec of pull responses pub(crate) fn process_pull_responses( &self, - crds: &mut Crds, + crds: &RwLock, from: &Pubkey, responses: Vec, responses_expired_timeout: Vec, @@ -408,6 +421,7 @@ impl CrdsGossipPull { stats: &mut ProcessPullStats, ) { let mut owners = HashSet::new(); + let mut crds = crds.write().unwrap(); for response in responses_expired_timeout { let _ = crds.insert(response, now); } @@ -425,6 +439,7 @@ impl CrdsGossipPull { for owner in owners { crds.update_record_timestamp(&owner, now); } + drop(crds); stats.failed_insert += failed_inserts.len(); self.purge_failed_inserts(now); let failed_inserts = failed_inserts.into_iter().zip(repeat(now)); @@ -452,7 +467,7 @@ impl CrdsGossipPull { pub fn build_crds_filters( &self, thread_pool: &ThreadPool, - crds: &Crds, + crds: &RwLock, bloom_size: usize, ) -> Vec { const PAR_MIN_LENGTH: usize = 512; @@ -460,7 +475,10 @@ impl CrdsGossipPull { const MIN_NUM_BLOOM_ITEMS: usize = 512; #[cfg(not(debug_assertions))] const MIN_NUM_BLOOM_ITEMS: usize = 65_536; - let num_items = crds.len() + crds.num_purged() + self.failed_inserts.read().unwrap().len(); + let failed_inserts = self.failed_inserts.read().unwrap(); + // crds should be locked last after self.failed_inserts. + let crds = crds.read().unwrap(); + let num_items = crds.len() + crds.num_purged() + failed_inserts.len(); let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items); let filters = CrdsFilterSet::new(num_items, bloom_size); thread_pool.install(|| { @@ -469,21 +487,21 @@ impl CrdsGossipPull { .map(|v| v.value_hash) .chain(crds.purged().with_min_len(PAR_MIN_LENGTH)) .chain( - self.failed_inserts - .read() - .unwrap() + failed_inserts .par_iter() .with_min_len(PAR_MIN_LENGTH) .map(|(v, _)| *v), ) .for_each(|v| filters.add(v)); }); + drop(crds); + drop(failed_inserts); filters.into() } /// filter values that fail the bloom filter up to max_bytes fn filter_crds_values( - crds: &Crds, + crds: &RwLock, filters: &[(CrdsValue, CrdsFilter)], mut output_size_limit: usize, // Limit number of crds values returned. now: u64, @@ -495,6 +513,7 @@ impl CrdsGossipPull { now.saturating_sub(msg_timeout)..now.saturating_add(msg_timeout); let mut dropped_requests = 0; let mut total_skipped = 0; + let crds = crds.read().unwrap(); let ret: Vec<_> = filters .iter() .map(|(caller, filter)| { @@ -565,10 +584,11 @@ impl CrdsGossipPull { /// Purge values from the crds that are older then `active_timeout` pub(crate) fn purge_active( thread_pool: &ThreadPool, - crds: &mut Crds, + crds: &RwLock, now: u64, timeouts: &HashMap, ) -> usize { + let mut crds = crds.write().unwrap(); let labels = crds.find_old_labels(thread_pool, now, timeouts); for label in &labels { crds.remove(label, now); @@ -580,7 +600,7 @@ impl CrdsGossipPull { #[cfg(test)] fn process_pull_response( &self, - crds: &mut Crds, + crds: &RwLock, from: &Pubkey, timeouts: &HashMap, response: Vec, @@ -710,14 +730,13 @@ pub(crate) mod tests { stakes.insert(id, i * 100); } let now = 1024; + let crds = RwLock::new(crds); let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, None, &stakes); assert!(!options.is_empty()); - options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); + options + .sort_by(|(weight_l, _, _), (weight_r, _, _)| weight_r.partial_cmp(weight_l).unwrap()); // check that the highest stake holder is also the heaviest weighted. - assert_eq!( - *stakes.get(&options.get(0).unwrap().1.id).unwrap(), - 3000_u64 - ); + assert_eq!(stakes[&options[0].1], 3000_u64); } #[test] @@ -757,12 +776,13 @@ pub(crate) mod tests { crds.insert(spy.clone(), 0).unwrap(); crds.insert(node_123.clone(), 0).unwrap(); crds.insert(node_456.clone(), 0).unwrap(); + let crds = RwLock::new(crds); // shred version 123 should ignore nodes with versions 0 and 456 let options = node .pull_options(&crds, &me.label().pubkey(), 123, 0, None, &stakes) .iter() - .map(|(_, c)| c.id) + .map(|(_, pk, _)| *pk) .collect::>(); assert_eq!(options.len(), 1); assert!(!options.contains(&spy.pubkey())); @@ -772,7 +792,7 @@ pub(crate) mod tests { let options = node .pull_options(&crds, &spy.label().pubkey(), 0, 0, None, &stakes) .iter() - .map(|(_, c)| c.id) + .map(|(_, pk, _)| *pk) .collect::>(); assert_eq!(options.len(), 3); assert!(options.contains(&me.pubkey())); @@ -800,6 +820,7 @@ pub(crate) mod tests { crds.insert(me.clone(), 0).unwrap(); crds.insert(node_123.clone(), 0).unwrap(); + let crds = RwLock::new(crds); // Empty gossip_validators -- will pull from nobody let mut gossip_validators = HashSet::new(); @@ -836,7 +857,7 @@ pub(crate) mod tests { &stakes, ); assert_eq!(options.len(), 1); - assert_eq!(options[0].1.id, node_123.pubkey()); + assert_eq!(options[0].1, node_123.pubkey()); } #[test] @@ -902,9 +923,11 @@ pub(crate) mod tests { num_inserts += 1; } } + let crds = RwLock::new(crds); assert!(num_inserts > 30_000, "num inserts: {}", num_inserts); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32)); + let crds = crds.read().unwrap(); let purged: Vec<_> = thread_pool.install(|| crds.purged().collect()); let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect(); assert_eq!(hash_values.len(), 40_000); @@ -928,7 +951,7 @@ pub(crate) mod tests { #[test] fn test_new_pull_request() { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); - let mut crds = Crds::default(); + let crds = RwLock::::default(); let node_keypair = Keypair::new(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &node_keypair.pubkey(), @@ -956,7 +979,7 @@ pub(crate) mod tests { Err(CrdsGossipError::NoPeers) ); - crds.insert(entry, 0).unwrap(); + crds.write().unwrap().insert(entry, 0).unwrap(); assert_eq!( node.new_pull_request( &thread_pool, @@ -979,7 +1002,7 @@ pub(crate) mod tests { .unwrap() .mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - crds.insert(new.clone(), now).unwrap(); + crds.write().unwrap().insert(new.clone(), now).unwrap(); let req = node.new_pull_request( &thread_pool, &crds, @@ -998,7 +1021,7 @@ pub(crate) mod tests { node.mark_pull_request_creation_time(new.contact_info().unwrap().id, now); let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now); let offline = CrdsValue::new_unsigned(CrdsData::ContactInfo(offline)); - crds.insert(offline, now).unwrap(); + crds.write().unwrap().insert(offline, now).unwrap(); let req = node.new_pull_request( &thread_pool, &crds, @@ -1041,6 +1064,7 @@ pub(crate) mod tests { ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); crds.insert(new.clone(), now).unwrap(); + let crds = RwLock::new(crds); // set request creation time to now. let now = now + 50_000; @@ -1130,6 +1154,7 @@ pub(crate) mod tests { ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); node_crds.insert(new, 0).unwrap(); + let node_crds = RwLock::new(node_crds); let mut pings = Vec::new(); let req = node.new_pull_request( &thread_pool, @@ -1144,7 +1169,7 @@ pub(crate) mod tests { &mut pings, ); - let mut dest_crds = Crds::default(); + let dest_crds = RwLock::::default(); let (_, filters) = req.unwrap(); let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = CrdsGossipPull::generate_pull_responses( @@ -1161,6 +1186,8 @@ pub(crate) mod tests { CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, ))); dest_crds + .write() + .unwrap() .insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS) .unwrap(); @@ -1217,6 +1244,7 @@ pub(crate) mod tests { ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); node_crds.insert(new, 0).unwrap(); + let node_crds = RwLock::new(node_crds); let mut pings = Vec::new(); let req = node.new_pull_request( &thread_pool, @@ -1231,7 +1259,7 @@ pub(crate) mod tests { &mut pings, ); - let mut dest_crds = Crds::default(); + let dest_crds = RwLock::::default(); let (_, filters) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = CrdsGossipPull::generate_pull_responses( @@ -1240,11 +1268,9 @@ pub(crate) mod tests { /*output_size_limit=*/ usize::MAX, 0, ); - CrdsGossipPull::process_pull_requests( - &mut dest_crds, - filters.into_iter().map(|(caller, _)| caller), - 1, - ); + let callers = filters.into_iter().map(|(caller, _)| caller); + CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1); + let dest_crds = dest_crds.read().unwrap(); assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(dest_crds.get(&caller.label()).is_some()); assert_eq!(dest_crds.get(&caller.label()).unwrap().local_timestamp, 1); @@ -1277,6 +1303,7 @@ pub(crate) mod tests { ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); dest_crds.insert(new.clone(), 0).unwrap(); + let dest_crds = RwLock::new(dest_crds); // node contains a key from the dest node, but at an older local timestamp let same_key = ContactInfo::new_localhost(&new_id, 0); @@ -1286,6 +1313,7 @@ pub(crate) mod tests { assert!(same_key.wallclock() < new.wallclock()); node_crds.insert(same_key.clone(), 0).unwrap(); assert_eq!(node_crds.get(&same_key.label()).unwrap().local_timestamp, 0); + let node_crds = RwLock::new(node_crds); let mut done = false; let mut pings = Vec::new(); let ping_cache = Mutex::new(ping_cache); @@ -1312,7 +1340,7 @@ pub(crate) mod tests { 0, ); CrdsGossipPull::process_pull_requests( - &mut dest_crds, + &dest_crds, filters.into_iter().map(|(caller, _)| caller), 0, ); @@ -1328,7 +1356,7 @@ pub(crate) mod tests { assert_eq!(rsp.len(), MIN_NUM_BLOOM_FILTERS); let failed = node .process_pull_response( - &mut node_crds, + &node_crds, &node_pubkey, &node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()), rsp.into_iter().flatten().collect(), @@ -1336,9 +1364,15 @@ pub(crate) mod tests { ) .0; assert_eq!(failed, 0); - assert_eq!(node_crds.get(&new.label()).unwrap().local_timestamp, 1); + assert_eq!(1, { + let node_crds = node_crds.read().unwrap(); + node_crds.get(&new.label()).unwrap().local_timestamp + }); // verify that the whole record was updated for dest since this is a response from dest - assert_eq!(node_crds.get(&same_key.label()).unwrap().local_timestamp, 1); + assert_eq!(1, { + let node_crds = node_crds.read().unwrap(); + node_crds.get(&same_key.label()).unwrap().local_timestamp + }); done = true; break; } @@ -1369,16 +1403,17 @@ pub(crate) mod tests { node_label ); // purge + let node_crds = RwLock::new(node_crds); let timeouts = node.make_timeouts(node_pubkey, &HashMap::new(), Duration::default()); - CrdsGossipPull::purge_active(&thread_pool, &mut node_crds, node.crds_timeout, &timeouts); + CrdsGossipPull::purge_active(&thread_pool, &node_crds, node.crds_timeout, &timeouts); //verify self is still valid after purge - assert_eq!( - node_crds.get(&node_label).unwrap().value.label(), - node_label - ); - assert_eq!(node_crds.get(&old.label()), None); - assert_eq!(node_crds.num_purged(), 1); + assert_eq!(node_label, { + let node_crds = node_crds.read().unwrap(); + node_crds.get(&node_label).unwrap().value.label() + }); + assert_eq!(node_crds.read().unwrap().get(&old.label()), None); + assert_eq!(node_crds.read().unwrap().num_purged(), 1); for _ in 0..30 { // there is a chance of a false positive with bloom filters // assert that purged value is still in the set @@ -1388,6 +1423,7 @@ pub(crate) mod tests { } // purge the value + let mut node_crds = node_crds.write().unwrap(); node_crds.trim_purged(node.crds_timeout + 1); assert_eq!(node_crds.num_purged(), 0); } @@ -1472,7 +1508,7 @@ pub(crate) mod tests { #[test] fn test_process_pull_response() { - let mut node_crds = Crds::default(); + let node_crds = RwLock::::default(); let node = CrdsGossipPull::default(); let peer_pubkey = solana_sdk::pubkey::new_rand(); @@ -1485,7 +1521,7 @@ pub(crate) mod tests { // inserting a fresh value should be fine. assert_eq!( node.process_pull_response( - &mut node_crds, + &node_crds, &peer_pubkey, &timeouts, vec![peer_entry.clone()], @@ -1495,14 +1531,14 @@ pub(crate) mod tests { 0 ); - let mut node_crds = Crds::default(); + let node_crds = RwLock::::default(); let unstaked_peer_entry = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&peer_pubkey, 0), )); // check that old contact infos fail if they are too old, regardless of "timeouts" assert_eq!( node.process_pull_response( - &mut node_crds, + &node_crds, &peer_pubkey, &timeouts, vec![peer_entry.clone(), unstaked_peer_entry], @@ -1512,11 +1548,11 @@ pub(crate) mod tests { 4 ); - let mut node_crds = Crds::default(); + let node_crds = RwLock::::default(); // check that old contact infos can still land as long as they have a "timeouts" entry assert_eq!( node.process_pull_response( - &mut node_crds, + &node_crds, &peer_pubkey, &timeouts, vec![peer_entry], @@ -1533,7 +1569,7 @@ pub(crate) mod tests { // but a recent contact info (inserted above) exists assert_eq!( node.process_pull_response( - &mut node_crds, + &node_crds, &peer_pubkey, &timeouts, vec![peer_vote.clone()], @@ -1543,11 +1579,11 @@ pub(crate) mod tests { 0 ); - let mut node_crds = Crds::default(); + let node_crds = RwLock::::default(); // without a contact info, inserting an old value should fail assert_eq!( node.process_pull_response( - &mut node_crds, + &node_crds, &peer_pubkey, &timeouts, vec![peer_vote], diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 35daf59585..fce4b633c8 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -97,9 +97,9 @@ impl Default for CrdsGossipPush { } } impl CrdsGossipPush { - pub fn num_pending(&self, crds: &Crds) -> usize { + pub fn num_pending(&self, crds: &RwLock) -> usize { let mut cursor: Cursor = *self.crds_cursor.lock().unwrap(); - crds.get_entries(&mut cursor).count() + crds.read().unwrap().get_entries(&mut cursor).count() } fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 { @@ -203,7 +203,7 @@ impl CrdsGossipPush { /// Returns origins' pubkeys of upserted values. pub(crate) fn process_push_message( &self, - crds: &mut Crds, + crds: &RwLock, from: &Pubkey, values: Vec, now: u64, @@ -228,6 +228,7 @@ impl CrdsGossipPush { }) .collect() }; + let mut crds = crds.write().unwrap(); values .into_iter() .map(|value| { @@ -249,7 +250,11 @@ impl CrdsGossipPush { /// peers. /// The list of push messages is created such that all the randomly selected peers have not /// pruned the source addresses. - pub fn new_push_messages(&self, crds: &Crds, now: u64) -> HashMap> { + pub(crate) fn new_push_messages( + &self, + crds: &RwLock, + now: u64, + ) -> HashMap> { let active_set = self.active_set.read().unwrap(); let active_set_len = active_set.len(); let push_fanout = self.push_fanout.min(active_set_len); @@ -262,6 +267,8 @@ impl CrdsGossipPush { let mut push_messages: HashMap> = HashMap::new(); let wallclock_window = self.wallclock_window(now); let mut crds_cursor = self.crds_cursor.lock().unwrap(); + // crds should be locked last after self.{active_set,crds_cursor}. + let crds = crds.read().unwrap(); let entries = crds .get_entries(crds_cursor.deref_mut()) .map(|entry| &entry.value) @@ -287,6 +294,7 @@ impl CrdsGossipPush { } } } + drop(crds); drop(crds_cursor); drop(active_set); self.num_pushes.fetch_add(num_pushes, Ordering::Relaxed); @@ -318,7 +326,7 @@ impl CrdsGossipPush { /// * ratio - active_set.len()/ratio is the number of actives to rotate pub(crate) fn refresh_push_active_set( &self, - crds: &Crds, + crds: &RwLock, stakes: &HashMap, gossip_validators: Option<&HashSet>, self_id: &Pubkey, @@ -333,19 +341,20 @@ impl CrdsGossipPush { #[cfg(not(debug_assertions))] const MIN_NUM_BLOOM_ITEMS: usize = CRDS_UNIQUE_PUBKEY_CAPACITY; let mut rng = rand::thread_rng(); - let mut active_set = self.active_set.write().unwrap(); - let need = Self::compute_need(self.num_active, active_set.len(), ratio); let mut new_items = HashMap::new(); - let (weights, peers): (Vec<_>, Vec<_>) = self - .push_options(crds, self_id, self_shred_version, stakes, gossip_validators) - .into_iter() - .unzip(); + let (weights, peers): (Vec<_>, Vec<_>) = { + self.push_options(crds, self_id, self_shred_version, stakes, gossip_validators) + .into_iter() + .unzip() + }; if peers.is_empty() { return; } let num_bloom_items = MIN_NUM_BLOOM_ITEMS.max(network_size); let shuffle = WeightedShuffle::new(&mut rng, &weights).unwrap(); - for peer in shuffle.map(|i| peers[i].id) { + let mut active_set = self.active_set.write().unwrap(); + let need = Self::compute_need(self.num_active, active_set.len(), ratio); + for peer in shuffle.map(|i| peers[i]) { if new_items.len() >= need { break; } @@ -371,19 +380,21 @@ impl CrdsGossipPush { } } - fn push_options<'a>( + fn push_options( &self, - crds: &'a Crds, + crds: &RwLock, self_id: &Pubkey, self_shred_version: u16, stakes: &HashMap, gossip_validators: Option<&HashSet>, - ) -> Vec<(u64, &'a ContactInfo)> { + ) -> Vec<(/*weight:*/ u64, /*node:*/ Pubkey)> { let now = timestamp(); let mut rng = rand::thread_rng(); let max_weight = u16::MAX as f32 - 1.0; let active_cutoff = now.saturating_sub(PUSH_ACTIVE_TIMEOUT_MS); let last_pushed_to = self.last_pushed_to.read().unwrap(); + // crds should be locked last after self.last_pushed_to. + let crds = crds.read().unwrap(); crds.get_nodes() .filter_map(|value| { let info = value.value.contact_info().unwrap(); @@ -413,7 +424,7 @@ impl CrdsGossipPush { let weight = get_weight(max_weight, since, stake); // Weights are bounded by max_weight defined above. // So this type-cast should be safe. - ((weight * 100.0) as u64, info) + ((weight * 100.0) as u64, info.id) }) .collect() } @@ -467,7 +478,7 @@ mod test { #[test] fn test_prune() { - let mut crds = Crds::default(); + let crds = RwLock::::default(); let push = CrdsGossipPush::default(); let mut stakes = HashMap::new(); @@ -482,7 +493,7 @@ mod test { let low_staked_peers = (0..10).map(|_| solana_sdk::pubkey::new_rand()); let mut low_staked_set = HashSet::new(); low_staked_peers.for_each(|p| { - push.process_push_message(&mut crds, &p, vec![value.clone()], 0); + push.process_push_message(&crds, &p, vec![value.clone()], 0); low_staked_set.insert(p); stakes.insert(p, 1); }); @@ -504,7 +515,7 @@ mod test { let high_staked_peer = solana_sdk::pubkey::new_rand(); let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10; stakes.insert(high_staked_peer, high_stake); - push.process_push_message(&mut crds, &high_staked_peer, vec![value], 0); + push.process_push_message(&crds, &high_staked_peer, vec![value], 0); let pruned = { let mut received_cache = push.received_cache.lock().unwrap(); @@ -529,7 +540,7 @@ mod test { #[test] fn test_process_push_one() { - let mut crds = Crds::default(); + let crds = RwLock::::default(); let push = CrdsGossipPush::default(); let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), @@ -538,20 +549,20 @@ mod test { let label = value.label(); // push a new message assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0), [Ok(label.pubkey())], ); - assert_eq!(crds.get(&label).unwrap().value, value); + assert_eq!(crds.read().unwrap().get(&label).unwrap().value, value); // push it again assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value], 0), [Err(CrdsGossipError::PushMessageOldVersion)], ); } #[test] fn test_process_push_old_version() { - let mut crds = Crds::default(); + let crds = RwLock::::default(); let push = CrdsGossipPush::default(); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ci.wallclock = 1; @@ -559,7 +570,7 @@ mod test { // push a new message assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value], 0), [Ok(ci.id)], ); @@ -567,13 +578,13 @@ mod test { ci.wallclock = 0; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value], 0), [Err(CrdsGossipError::PushMessageOldVersion)], ); } #[test] fn test_process_push_timeout() { - let mut crds = Crds::default(); + let crds = RwLock::::default(); let push = CrdsGossipPush::default(); let timeout = push.msg_timeout; let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); @@ -582,7 +593,7 @@ mod test { ci.wallclock = timeout + 1; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value], 0), [Err(CrdsGossipError::PushMessageTimeout)], ); @@ -590,13 +601,13 @@ mod test { ci.wallclock = 0; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value], timeout + 1), + push.process_push_message(&crds, &Pubkey::default(), vec![value], timeout + 1), [Err(CrdsGossipError::PushMessageTimeout)] ); } #[test] fn test_process_push_update() { - let mut crds = Crds::default(); + let crds = RwLock::::default(); let push = CrdsGossipPush::default(); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); let origin = ci.id; @@ -605,7 +616,7 @@ mod test { // push a new message assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value_old], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value_old], 0), [Ok(origin)], ); @@ -613,7 +624,7 @@ mod test { ci.wallclock = 1; let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value], 0), [Ok(origin)], ); } @@ -636,6 +647,7 @@ mod test { ))); assert_eq!(crds.insert(value1.clone(), now), Ok(())); + let crds = RwLock::new(crds); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let active_set = push.active_set.read().unwrap(); @@ -646,7 +658,7 @@ mod test { ))); assert!(active_set.get(&value2.label().pubkey()).is_none()); drop(active_set); - assert_eq!(crds.insert(value2.clone(), now), Ok(())); + assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(())); for _ in 0..30 { push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let active_set = push.active_set.read().unwrap(); @@ -662,7 +674,7 @@ mod test { let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0), )); - assert_eq!(crds.insert(value2.clone(), now), Ok(())); + assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(())); } push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert_eq!(push.active_set.read().unwrap().len(), push.num_active); @@ -684,14 +696,12 @@ mod test { stakes.insert(id, i * 100); push.last_pushed_to.write().unwrap().put(id, time); } + let crds = RwLock::new(crds); let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None); assert!(!options.is_empty()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); // check that the highest stake holder is also the heaviest weighted. - assert_eq!( - *stakes.get(&options.get(0).unwrap().1.id).unwrap(), - 10_000_u64 - ); + assert_eq!(stakes[&options[0].1], 10_000_u64); } #[test] @@ -732,23 +742,20 @@ mod test { crds.insert(spy.clone(), now).unwrap(); crds.insert(node_123.clone(), now).unwrap(); crds.insert(node_456, now).unwrap(); + let crds = RwLock::new(crds); // shred version 123 should ignore nodes with versions 0 and 456 let options = node .push_options(&crds, &me.label().pubkey(), 123, &stakes, None) .iter() - .map(|(_, c)| c.id) + .map(|(_, pk)| *pk) .collect::>(); assert_eq!(options.len(), 1); assert!(!options.contains(&spy.pubkey())); assert!(options.contains(&node_123.pubkey())); // spy nodes should not push to people on different shred versions - let options = node - .push_options(&crds, &spy.label().pubkey(), 0, &stakes, None) - .iter() - .map(|(_, c)| c.id) - .collect::>(); + let options = node.push_options(&crds, &spy.label().pubkey(), 0, &stakes, None); assert!(options.is_empty()); } @@ -773,6 +780,7 @@ mod test { crds.insert(me.clone(), 0).unwrap(); crds.insert(node_123.clone(), now).unwrap(); + let crds = RwLock::new(crds); // Unknown pubkey in gossip_validators -- will push to nobody let mut gossip_validators = HashSet::new(); @@ -808,7 +816,7 @@ mod test { ); assert_eq!(options.len(), 1); - assert_eq!(options[0].1.id, node_123.pubkey()); + assert_eq!(options[0].1, node_123.pubkey()); } #[test] @@ -821,6 +829,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer.clone(), now), Ok(())); + let crds = RwLock::new(crds); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -831,7 +840,7 @@ mod test { expected.insert(peer.label().pubkey(), vec![new_msg.clone()]); let origin = new_msg.pubkey(); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![new_msg], 0), [Ok(origin)] ); assert_eq!(push.active_set.read().unwrap().len(), 1); @@ -854,8 +863,9 @@ mod test { let origin: Vec<_> = peers.iter().map(|node| node.pubkey()).collect(); assert_eq!(crds.insert(peers[0].clone(), now), Ok(())); assert_eq!(crds.insert(peers[1].clone(), now), Ok(())); + let crds = RwLock::new(crds); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![peers[2].clone()], now), + push.process_push_message(&crds, &Pubkey::default(), vec![peers[2].clone()], now), [Ok(origin[2])], ); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); @@ -880,6 +890,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer.clone(), 0), Ok(())); + let crds = RwLock::new(crds); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -889,7 +900,7 @@ mod test { let expected = HashMap::new(); let origin = new_msg.pubkey(); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg.clone()], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![new_msg.clone()], 0), [Ok(origin)], ); push.process_prune_msg( @@ -908,6 +919,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer, 0), Ok(())); + let crds = RwLock::new(crds); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); @@ -916,7 +928,7 @@ mod test { let expected = HashMap::new(); let origin = new_msg.pubkey(); assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![new_msg], 1), + push.process_push_message(&crds, &Pubkey::default(), vec![new_msg], 1), [Ok(origin)], ); assert_eq!(push.new_push_messages(&crds, 0), expected); @@ -924,7 +936,7 @@ mod test { #[test] fn test_purge_old_received_cache() { - let mut crds = Crds::default(); + let crds = RwLock::::default(); let push = CrdsGossipPush::default(); let mut ci = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ci.wallclock = 0; @@ -932,14 +944,14 @@ mod test { let label = value.label(); // push a new message assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0), [Ok(label.pubkey())] ); - assert_eq!(crds.get(&label).unwrap().value, value); + assert_eq!(crds.write().unwrap().get(&label).unwrap().value, value); // push it again assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value.clone()], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0), [Err(CrdsGossipError::PushMessageOldVersion)], ); @@ -948,7 +960,7 @@ mod test { // push it again assert_eq!( - push.process_push_message(&mut crds, &Pubkey::default(), vec![value], 0), + push.process_push_message(&crds, &Pubkey::default(), vec![value], 0), [Err(CrdsGossipError::PushMessageOldVersion)], ); } diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 342580bc65..c4de78a382 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -33,24 +33,20 @@ use { struct Node { keypair: Arc, contact_info: ContactInfo, - gossip: Arc>, + gossip: Arc, ping_cache: Arc>, stake: u64, } impl Node { - fn new( - keypair: Arc, - contact_info: ContactInfo, - gossip: Arc>, - ) -> Self { + fn new(keypair: Arc, contact_info: ContactInfo, gossip: Arc) -> Self { Self::staked(keypair, contact_info, gossip, 0) } fn staked( keypair: Arc, contact_info: ContactInfo, - gossip: Arc>, + gossip: Arc, stake: u64, ) -> Self { let ping_cache = Arc::new(Mutex::new(PingCache::new( @@ -67,14 +63,6 @@ impl Node { } } -impl Deref for Node { - type Target = Arc>; - - fn deref(&self) -> &Self::Target { - &self.gossip - } -} - struct Network { nodes: HashMap, stake_pruned: u64, @@ -116,17 +104,24 @@ fn star_network_create(num: usize) -> Network { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); - let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), timestamp()).unwrap(); - node.crds.insert(entry.clone(), timestamp()).unwrap(); - let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + let node = CrdsGossip::default(); + { + let mut node_crds = node.crds.write().unwrap(); + node_crds.insert(new.clone(), timestamp()).unwrap(); + node_crds.insert(entry.clone(), timestamp()).unwrap(); + } + let node = Node::new(node_keypair, contact_info, Arc::new(node)); (new.label().pubkey(), node) }) .collect(); - let mut node = CrdsGossip::default(); + let node = CrdsGossip::default(); let id = entry.label().pubkey(); - node.crds.insert(entry, timestamp()).unwrap(); - let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + node.crds + .write() + .unwrap() + .insert(entry, timestamp()) + .unwrap(); + let node = Node::new(node_keypair, contact_info, Arc::new(node)); network.insert(id, node); Network::new(network) } @@ -135,22 +130,36 @@ fn rstar_network_create(num: usize) -> Network { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); - let mut origin = CrdsGossip::default(); + let origin = CrdsGossip::default(); let id = entry.label().pubkey(); - origin.crds.insert(entry, timestamp()).unwrap(); + origin + .crds + .write() + .unwrap() + .insert(entry, timestamp()) + .unwrap(); let mut network: HashMap<_, _> = (1..num) .map(|_| { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); - let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), timestamp()).unwrap(); - origin.crds.insert(new.clone(), timestamp()).unwrap(); - let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + let node = CrdsGossip::default(); + node.crds + .write() + .unwrap() + .insert(new.clone(), timestamp()) + .unwrap(); + origin + .crds + .write() + .unwrap() + .insert(new.clone(), timestamp()) + .unwrap(); + let node = Node::new(node_keypair, contact_info, Arc::new(node)); (new.label().pubkey(), node) }) .collect(); - let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(origin))); + let node = Node::new(node_keypair, contact_info, Arc::new(origin)); network.insert(id, node); Network::new(network) } @@ -161,9 +170,13 @@ fn ring_network_create(num: usize) -> Network { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); - let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), timestamp()).unwrap(); - let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node))); + let node = CrdsGossip::default(); + node.crds + .write() + .unwrap() + .insert(new.clone(), timestamp()) + .unwrap(); + let node = Node::new(node_keypair, contact_info, Arc::new(node)); (new.label().pubkey(), node) }) .collect(); @@ -173,15 +186,12 @@ fn ring_network_create(num: usize) -> Network { let start = &network[&keys[k]]; let start_id = keys[k]; let label = CrdsValueLabel::ContactInfo(start_id); - let gossip = start.gossip.lock().unwrap(); - gossip.crds.get(&label).unwrap().value.clone() + let gossip_crds = start.gossip.crds.read().unwrap(); + gossip_crds.get(&label).unwrap().value.clone() }; let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap(); - end.lock() - .unwrap() - .crds - .insert(start_info, timestamp()) - .unwrap(); + let mut end_crds = end.gossip.crds.write().unwrap(); + end_crds.insert(start_info, timestamp()).unwrap(); } Network::new(network) } @@ -193,14 +203,13 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone())); - let mut node = CrdsGossip::default(); - node.crds.insert(new.clone(), timestamp()).unwrap(); - let node = Node::staked( - node_keypair, - contact_info, - Arc::new(Mutex::new(node)), - stakes[n], - ); + let node = CrdsGossip::default(); + node.crds + .write() + .unwrap() + .insert(new.clone(), timestamp()) + .unwrap(); + let node = Node::staked(node_keypair, contact_info, Arc::new(node), stakes[n]); (new.label().pubkey(), node) }) .collect(); @@ -209,17 +218,18 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { let start_entries: Vec<_> = keys .iter() .map(|k| { - let start = &network[k].lock().unwrap(); + let start = &network[k]; let start_label = CrdsValueLabel::ContactInfo(*k); - start.crds.get(&start_label).unwrap().value.clone() + let gossip_crds = start.gossip.crds.read().unwrap(); + gossip_crds.get(&start_label).unwrap().value.clone() }) .collect(); for (end_pubkey, end) in network.iter_mut() { + let mut end_crds = end.gossip.crds.write().unwrap(); for k in 0..keys.len() { - let mut end = end.lock().unwrap(); if keys[k] != *end_pubkey { let start_info = start_entries[k].clone(); - end.crds.insert(start_info, timestamp()).unwrap(); + end_crds.insert(start_info, timestamp()).unwrap(); } } } @@ -247,7 +257,7 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver let network_values: Vec = network.values().cloned().collect(); network_values.par_iter().for_each(|node| { let node_pubkey = node.keypair.pubkey(); - node.lock().unwrap().refresh_push_active_set( + node.gossip.refresh_push_active_set( &node_pubkey, 0, // shred version &HashMap::new(), // stakes @@ -262,14 +272,14 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver let now = (start * 100) as u64; ts += 1000; // push a message to the network - network_values.par_iter().for_each(|locked_node| { - let node_pubkey = locked_node.keypair.pubkey(); - let node = &mut locked_node.lock().unwrap(); - let label = CrdsValueLabel::ContactInfo(node_pubkey); - let entry = node.crds.get(&label).unwrap(); - let mut m = entry.value.contact_info().cloned().unwrap(); + network_values.par_iter().for_each(|node| { + let node_pubkey = node.keypair.pubkey(); + let mut m = { + let node_crds = node.gossip.crds.read().unwrap(); + node_crds.get_contact_info(node_pubkey).cloned().unwrap() + }; m.wallclock = now; - node.process_push_message( + node.gossip.process_push_message( &Pubkey::default(), vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(m))], now, @@ -321,14 +331,13 @@ fn network_run_push( .par_iter() .map(|node| { let node_pubkey = node.keypair.pubkey(); - let mut node_lock = node.lock().unwrap(); - let timeouts = node_lock.make_timeouts( + let timeouts = node.gossip.make_timeouts( node_pubkey, &HashMap::default(), // stakes - Duration::from_millis(node_lock.pull.crds_timeout), + Duration::from_millis(node.gossip.pull.crds_timeout), ); - node_lock.purge(&node_pubkey, thread_pool, now, &timeouts); - (node_pubkey, node_lock.new_push_messages(vec![], now)) + node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts); + (node_pubkey, node.gossip.new_push_messages(vec![], now)) }) .collect(); let transfered: Vec<_> = requests @@ -344,8 +353,7 @@ fn network_run_push( let origins: HashSet<_> = network .get(&to) .unwrap() - .lock() - .unwrap() + .gossip .process_push_message(&from, msgs.clone(), now) .into_iter() .collect(); @@ -353,11 +361,8 @@ fn network_run_push( .get(&to) .map(|node| { let node_pubkey = node.keypair.pubkey(); - node.lock().unwrap().prune_received_cache( - &node_pubkey, - origins, - &stakes, - ) + node.gossip + .prune_received_cache(&node_pubkey, origins, &stakes) }) .unwrap(); @@ -374,18 +379,18 @@ fn network_run_push( .get(&from) .map(|node| { let node_pubkey = node.keypair.pubkey(); - let node = node.lock().unwrap(); let destination = node_pubkey; let now = timestamp(); - node.process_prune_msg( - &node_pubkey, - &to, - &destination, - &prune_keys, - now, - now, - ) - .unwrap() + node.gossip + .process_prune_msg( + &node_pubkey, + &to, + &destination, + &prune_keys, + now, + now, + ) + .unwrap() }) .unwrap(); } @@ -410,7 +415,7 @@ fn network_run_push( if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 { network_values.par_iter().for_each(|node| { let node_pubkey = node.keypair.pubkey(); - node.lock().unwrap().refresh_push_active_set( + node.gossip.refresh_push_active_set( &node_pubkey, 0, // shred version &HashMap::new(), // stakes @@ -420,10 +425,7 @@ fn network_run_push( } total = network_values .par_iter() - .map(|node| { - let gossip = node.gossip.lock().unwrap(); - gossip.push.num_pending(&gossip.crds) - }) + .map(|node| node.gossip.push.num_pending(&node.gossip.crds)) .sum(); trace!( "network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}", @@ -477,8 +479,7 @@ fn network_run_pull( .filter_map(|from| { let mut pings = Vec::new(); let (peer, filters) = from - .lock() - .unwrap() + .gossip .new_pull_request( thread_pool, from.keypair.deref(), @@ -492,9 +493,9 @@ fn network_run_pull( ) .ok()?; let from_pubkey = from.keypair.pubkey(); - let gossip = from.gossip.lock().unwrap(); let label = CrdsValueLabel::ContactInfo(from_pubkey); - let self_info = gossip.crds.get(&label).unwrap().value.clone(); + let gossip_crds = from.gossip.crds.read().unwrap(); + let self_info = gossip_crds.get(&label).unwrap().value.clone(); Some((peer.id, filters, self_info)) }) .collect() @@ -520,8 +521,7 @@ fn network_run_pull( .get(&to) .map(|node| { let rsp = node - .lock() - .unwrap() + .gossip .generate_pull_responses( &filters, /*output_size_limit=*/ usize::MAX, @@ -530,7 +530,7 @@ fn network_run_pull( .into_iter() .flatten() .collect(); - node.lock().unwrap().process_pull_requests( + node.gossip.process_pull_requests( filters.into_iter().map(|(caller, _)| caller), now, ); @@ -540,12 +540,12 @@ fn network_run_pull( bytes += serialized_size(&rsp).unwrap() as usize; msgs += rsp.len(); if let Some(node) = network.get(&from) { - let mut node = node.lock().unwrap(); - node.mark_pull_request_creation_time(from, now); + node.gossip.mark_pull_request_creation_time(from, now); let mut stats = ProcessPullStats::default(); - let (vers, vers_expired_timeout, failed_inserts) = - node.filter_pull_responses(&timeouts, rsp, now, &mut stats); - node.process_pull_responses( + let (vers, vers_expired_timeout, failed_inserts) = node + .gossip + .filter_pull_responses(&timeouts, rsp, now, &mut stats); + node.gossip.process_pull_responses( &from, vers, vers_expired_timeout, @@ -566,7 +566,7 @@ fn network_run_pull( } let total: usize = network_values .par_iter() - .map(|v| v.lock().unwrap().crds.len()) + .map(|v| v.gossip.crds.read().unwrap().len()) .sum(); convergance = total as f64 / ((num * num) as f64); if convergance > max_convergance { @@ -692,12 +692,14 @@ fn test_star_network_large_push() { } #[test] fn test_prune_errors() { - let mut crds_gossip = CrdsGossip::default(); + let crds_gossip = CrdsGossip::default(); let id = Pubkey::new(&[0; 32]); let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0); let prune_pubkey = Pubkey::new(&[2; 32]); crds_gossip .crds + .write() + .unwrap() .insert( CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())), 0, diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index 8107b6baf0..d0b922612a 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -321,11 +321,10 @@ pub fn cluster_info_scale() { .iter() .filter(|v| v.message.account_keys == tx.message.account_keys) .count(); - let gossip = node.gossip.read().unwrap(); - num_old += gossip.push.num_old.load(Ordering::Relaxed); - num_push_total += gossip.push.num_total.load(Ordering::Relaxed); - num_pushes += gossip.push.num_pushes.load(Ordering::Relaxed); - num_pulls += gossip.pull.num_pulls.load(Ordering::Relaxed); + num_old += node.gossip.push.num_old.load(Ordering::Relaxed); + num_push_total += node.gossip.push.num_total.load(Ordering::Relaxed); + num_pushes += node.gossip.push.num_pushes.load(Ordering::Relaxed); + num_pulls += node.gossip.pull.num_pulls.load(Ordering::Relaxed); if has_tx == 0 { not_done += 1; } @@ -348,11 +347,10 @@ pub fn cluster_info_scale() { ); sleep(Duration::from_millis(200)); for (node, _, _) in nodes.iter() { - let gossip = node.gossip.read().unwrap(); - gossip.push.num_old.store(0, Ordering::Relaxed); - gossip.push.num_total.store(0, Ordering::Relaxed); - gossip.push.num_pushes.store(0, Ordering::Relaxed); - gossip.pull.num_pulls.store(0, Ordering::Relaxed); + node.gossip.push.num_old.store(0, Ordering::Relaxed); + node.gossip.push.num_total.store(0, Ordering::Relaxed); + node.gossip.push.num_pushes.store(0, Ordering::Relaxed); + node.gossip.pull.num_pulls.store(0, Ordering::Relaxed); } } diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 8dd85128c1..e1935a8c35 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -745,9 +745,9 @@ mod tests { // This node is ahead of the trusted validators cluster_info .gossip + .crds .write() .unwrap() - .crds .insert( CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( trusted_validators[0], @@ -765,9 +765,9 @@ mod tests { // Node is slightly behind the trusted validators cluster_info .gossip + .crds .write() .unwrap() - .crds .insert( CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( trusted_validators[1], @@ -781,9 +781,9 @@ mod tests { // Node is far behind the trusted validators cluster_info .gossip + .crds .write() .unwrap() - .crds .insert( CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( trusted_validators[2],