diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 7ca962b213..0445d60cdb 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -523,6 +523,7 @@ mod tests { super::*, rand::{seq::SliceRandom, Rng}, solana_gossip::{ + crds::GossipRoute, crds_value::{CrdsData, CrdsValue}, deprecated::{ shuffle_peers_and_index, sorted_retransmit_peers_and_stakes, @@ -589,7 +590,10 @@ mod tests { for node in nodes.iter().skip(1) { let node = CrdsData::ContactInfo(node.clone()); let node = CrdsValue::new_unsigned(node); - assert_eq!(gossip_crds.insert(node, now), Ok(())); + assert_eq!( + gossip_crds.insert(node, now, GossipRoute::LocalMessage), + Ok(()) + ); } } (nodes, stakes, cluster_info) diff --git a/gossip/benches/crds.rs b/gossip/benches/crds.rs index a5bfac749d..9ee78a6aa6 100644 --- a/gossip/benches/crds.rs +++ b/gossip/benches/crds.rs @@ -6,7 +6,9 @@ use { rand::{thread_rng, Rng}, rayon::ThreadPoolBuilder, solana_gossip::{ - crds::Crds, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_value::CrdsValue, + crds::{Crds, GossipRoute}, + crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, + crds_value::CrdsValue, }, solana_sdk::pubkey::Pubkey, std::collections::HashMap, @@ -21,7 +23,7 @@ fn bench_find_old_labels(bencher: &mut Bencher) { let now = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 1000; std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng, None), rng.gen_range(0, now))) .take(50_000) - .for_each(|(v, ts)| assert!(crds.insert(v, ts).is_ok())); + .for_each(|(v, ts)| assert!(crds.insert(v, ts, GossipRoute::LocalMessage).is_ok())); let mut timeouts = HashMap::new(); timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS); bencher.iter(|| { diff --git a/gossip/benches/crds_gossip_pull.rs b/gossip/benches/crds_gossip_pull.rs index 3f44ddd72e..2f95469572 100644 --- a/gossip/benches/crds_gossip_pull.rs +++ b/gossip/benches/crds_gossip_pull.rs @@ -7,7 +7,7 @@ use { rayon::ThreadPoolBuilder, solana_gossip::{ cluster_info::MAX_BLOOM_SIZE, - crds::Crds, + crds::{Crds, GossipRoute}, crds_gossip_pull::{CrdsFilter, CrdsGossipPull}, crds_value::CrdsValue, }, @@ -39,7 +39,11 @@ fn bench_build_crds_filters(bencher: &mut Bencher) { let mut num_inserts = 0; for _ in 0..90_000 { if crds - .insert(CrdsValue::new_rand(&mut rng, None), rng.gen()) + .insert( + CrdsValue::new_rand(&mut rng, None), + rng.gen(), + GossipRoute::LocalMessage, + ) .is_ok() { num_inserts += 1; diff --git a/gossip/benches/crds_shards.rs b/gossip/benches/crds_shards.rs index 494a9f3946..1ed5816f4f 100644 --- a/gossip/benches/crds_shards.rs +++ b/gossip/benches/crds_shards.rs @@ -5,7 +5,7 @@ extern crate test; use { rand::{thread_rng, Rng}, solana_gossip::{ - crds::{Crds, VersionedCrdsValue}, + crds::{Crds, GossipRoute, VersionedCrdsValue}, crds_shards::CrdsShards, crds_value::CrdsValue, }, @@ -20,7 +20,8 @@ fn new_test_crds_value(rng: &mut R) -> VersionedCrdsValue { let value = CrdsValue::new_rand(rng, None); let label = value.label(); let mut crds = Crds::default(); - crds.insert(value, timestamp()).unwrap(); + crds.insert(value, timestamp(), GossipRoute::LocalMessage) + .unwrap(); crds.get::<&VersionedCrdsValue>(&label).cloned().unwrap() } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 3eefd7ef2a..e66c463e61 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -18,7 +18,7 @@ use { submit_gossip_stats, Counter, GossipStats, ScopedTimer, TimedGuard, }, contact_info::ContactInfo, - crds::{Crds, Cursor}, + crds::{Crds, Cursor, GossipRoute}, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, @@ -493,7 +493,8 @@ impl ClusterInfo { // TODO kill insert_info, only used by tests pub fn insert_info(&self, contact_info: ContactInfo) { let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair()); - let _ = self.gossip.crds.write().unwrap().insert(value, timestamp()); + let mut gossip_crds = self.gossip.crds.write().unwrap(); + let _ = gossip_crds.insert(value, timestamp(), GossipRoute::LocalMessage); } pub fn set_entrypoint(&self, entrypoint: ContactInfo) { @@ -608,7 +609,7 @@ impl ClusterInfo { let now = timestamp(); let mut gossip_crds = self.gossip.crds.write().unwrap(); for node in nodes { - if let Err(err) = gossip_crds.insert(node, now) { + if let Err(err) = gossip_crds.insert(node, now, GossipRoute::LocalMessage) { warn!("crds insert failed {:?}", err); } } @@ -908,7 +909,7 @@ impl ClusterInfo { let mut gossip_crds = self.gossip.crds.write().unwrap(); let now = timestamp(); for entry in entries { - if let Err(err) = gossip_crds.insert(entry, now) { + if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) { error!("push_epoch_slots failed: {:?}", err); } } @@ -983,7 +984,7 @@ impl ClusterInfo { let vote = CrdsData::Vote(vote_index, vote); let vote = CrdsValue::new_signed(vote, &self.keypair()); let mut gossip_crds = self.gossip.crds.write().unwrap(); - if let Err(err) = gossip_crds.insert(vote, now) { + if let Err(err) = gossip_crds.insert(vote, now, GossipRoute::LocalMessage) { error!("push_vote failed: {:?}", err); } } @@ -1327,7 +1328,8 @@ impl ClusterInfo { CrdsData::ContactInfo(self.my_contact_info()), &self.keypair(), ); - let _ = self.gossip.crds.write().unwrap().insert(value, timestamp()); + let mut gossip_crds = self.gossip.crds.write().unwrap(); + let _ = gossip_crds.insert(value, timestamp(), GossipRoute::LocalMessage); } // If the network entrypoint hasn't been discovered yet, add it to the crds table @@ -1486,7 +1488,7 @@ impl ClusterInfo { let mut gossip_crds = self.gossip.crds.write().unwrap(); let now = timestamp(); for entry in pending_push_messages { - let _ = gossip_crds.insert(entry, now); + let _ = gossip_crds.insert(entry, now, GossipRoute::LocalMessage); } } fn new_push_requests(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { @@ -3780,7 +3782,9 @@ mod tests { { let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); for entry in entries { - assert!(gossip_crds.insert(entry, /*now=*/ 0).is_ok()); + assert!(gossip_crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok()); } } // Should exclude other node's epoch-slot because of different @@ -4077,7 +4081,7 @@ mod tests { LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()), )); let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); - let _ = gossip_crds.insert(value, timestamp()); + let _ = gossip_crds.insert(value, timestamp(), GossipRoute::LocalMessage); } // only half the visible peers should be eligible to serve this repair assert_eq!(cluster_info.repair_peers(5).len(), 5); diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 937cfdf2d0..1c97868f47 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -1,7 +1,8 @@ use { crate::crds_gossip::CrdsGossip, + itertools::Itertools, solana_measure::measure::Measure, - solana_sdk::pubkey::Pubkey, + solana_sdk::{clock::Slot, pubkey::Pubkey}, std::{ collections::HashMap, ops::{Deref, DerefMut}, @@ -160,9 +161,10 @@ pub(crate) fn submit_gossip_stats( gossip: &CrdsGossip, stakes: &HashMap, ) { - let (table_size, num_nodes, num_pubkeys, purged_values_size, failed_inserts_size) = { + let (crds_stats, table_size, num_nodes, num_pubkeys, purged_values_size, failed_inserts_size) = { let gossip_crds = gossip.crds.read().unwrap(); ( + gossip_crds.take_stats(), gossip_crds.len(), gossip_crds.num_nodes(), gossip_crds.num_pubkeys(), @@ -446,4 +448,155 @@ pub(crate) fn submit_gossip_stats( i64 ), ); + let counts: Vec<_> = crds_stats + .pull + .counts + .iter() + .zip(crds_stats.push.counts.iter()) + .map(|(a, b)| a + b) + .collect(); + datapoint_info!( + "cluster_info_crds_stats", + ("ContactInfo", counts[0], i64), + ("ContactInfo-push", crds_stats.push.counts[0], i64), + ("ContactInfo-pull", crds_stats.pull.counts[0], i64), + ("Vote", counts[1], i64), + ("Vote-push", crds_stats.push.counts[1], i64), + ("Vote-pull", crds_stats.pull.counts[1], i64), + ("LowestSlot", counts[2], i64), + ("LowestSlot-push", crds_stats.push.counts[2], i64), + ("LowestSlot-pull", crds_stats.pull.counts[2], i64), + ("SnapshotHashes", counts[3], i64), + ("SnapshotHashes-push", crds_stats.push.counts[3], i64), + ("SnapshotHashes-pull", crds_stats.pull.counts[3], i64), + ("AccountsHashes", counts[4], i64), + ("AccountsHashes-push", crds_stats.push.counts[4], i64), + ("AccountsHashes-pull", crds_stats.pull.counts[4], i64), + ("EpochSlots", counts[5], i64), + ("EpochSlots-push", crds_stats.push.counts[5], i64), + ("EpochSlots-pull", crds_stats.pull.counts[5], i64), + ("LegacyVersion", counts[6], i64), + ("LegacyVersion-push", crds_stats.push.counts[6], i64), + ("LegacyVersion-pull", crds_stats.pull.counts[6], i64), + ("Version", counts[7], i64), + ("Version-push", crds_stats.push.counts[7], i64), + ("Version-pull", crds_stats.pull.counts[7], i64), + ("NodeInstance", counts[8], i64), + ("NodeInstance-push", crds_stats.push.counts[8], i64), + ("NodeInstance-pull", crds_stats.pull.counts[8], i64), + ("DuplicateShred", counts[9], i64), + ("DuplicateShred-push", crds_stats.push.counts[9], i64), + ("DuplicateShred-pull", crds_stats.pull.counts[9], i64), + ("IncrementalSnapshotHashes", counts[10], i64), + ( + "IncrementalSnapshotHashes-push", + crds_stats.push.counts[10], + i64 + ), + ( + "IncrementalSnapshotHashes-pull", + crds_stats.pull.counts[10], + i64 + ), + ("all", counts.iter().sum::(), i64), + ( + "all-push", + crds_stats.push.counts.iter().sum::(), + i64 + ), + ( + "all-pull", + crds_stats.pull.counts.iter().sum::(), + i64 + ), + ); + let fails: Vec<_> = crds_stats + .pull + .fails + .iter() + .zip(crds_stats.push.fails.iter()) + .map(|(a, b)| a + b) + .collect(); + datapoint_info!( + "cluster_info_crds_stats_fails", + ("ContactInfo", fails[0], i64), + ("ContactInfo-push", crds_stats.push.fails[0], i64), + ("ContactInfo-pull", crds_stats.pull.fails[0], i64), + ("Vote", fails[1], i64), + ("Vote-push", crds_stats.push.fails[1], i64), + ("Vote-pull", crds_stats.pull.fails[1], i64), + ("LowestSlot", fails[2], i64), + ("LowestSlot-push", crds_stats.push.fails[2], i64), + ("LowestSlot-pull", crds_stats.pull.fails[2], i64), + ("SnapshotHashes", fails[3], i64), + ("SnapshotHashes-push", crds_stats.push.fails[3], i64), + ("SnapshotHashes-pull", crds_stats.pull.fails[3], i64), + ("AccountsHashes", fails[4], i64), + ("AccountsHashes-push", crds_stats.push.fails[4], i64), + ("AccountsHashes-pull", crds_stats.pull.fails[4], i64), + ("EpochSlots", fails[5], i64), + ("EpochSlots-push", crds_stats.push.fails[5], i64), + ("EpochSlots-pull", crds_stats.pull.fails[5], i64), + ("LegacyVersion", fails[6], i64), + ("LegacyVersion-push", crds_stats.push.fails[6], i64), + ("LegacyVersion-pull", crds_stats.pull.fails[6], i64), + ("Version", fails[7], i64), + ("Version-push", crds_stats.push.fails[7], i64), + ("Version-pull", crds_stats.pull.fails[7], i64), + ("NodeInstance", fails[8], i64), + ("NodeInstance-push", crds_stats.push.fails[8], i64), + ("NodeInstance-pull", crds_stats.pull.fails[8], i64), + ("DuplicateShred", fails[9], i64), + ("DuplicateShred-push", crds_stats.push.fails[9], i64), + ("DuplicateShred-pull", crds_stats.pull.fails[9], i64), + ("IncrementalSnapshotHashes", fails[10], i64), + ( + "IncrementalSnapshotHashes-push", + crds_stats.push.fails[10], + i64 + ), + ( + "IncrementalSnapshotHashes-pull", + crds_stats.pull.fails[10], + i64 + ), + ("all", fails.iter().sum::(), i64), + ("all-push", crds_stats.push.fails.iter().sum::(), i64), + ("all-pull", crds_stats.pull.fails.iter().sum::(), i64), + ); + for (slot, num_votes) in &crds_stats.pull.votes { + datapoint_info!( + "cluster_info_crds_stats_votes_pull", + ("slot", *slot, i64), + ("num_votes", *num_votes, i64), + ); + } + for (slot, num_votes) in &crds_stats.push.votes { + datapoint_info!( + "cluster_info_crds_stats_votes_push", + ("slot", *slot, i64), + ("num_votes", *num_votes, i64), + ); + } + let votes: HashMap = crds_stats + .pull + .votes + .into_iter() + .map(|(slot, num_votes)| (*slot, *num_votes)) + .chain( + crds_stats + .push + .votes + .into_iter() + .map(|(slot, num_votes)| (*slot, *num_votes)), + ) + .into_grouping_map() + .aggregate(|acc, _slot, num_votes| Some(acc.unwrap_or_default() + num_votes)); + for (slot, num_votes) in votes { + datapoint_info!( + "cluster_info_crds_stats_votes", + ("slot", slot, i64), + ("num_votes", num_votes, i64), + ); + } } diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 529ec8872d..1047e36484 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -36,9 +36,11 @@ use { map::{rayon::ParValues, Entry, IndexMap}, set::IndexSet, }, + lru::LruCache, matches::debug_assert_matches, rayon::{prelude::*, ThreadPool}, solana_sdk::{ + clock::Slot, hash::{hash, Hash}, pubkey::Pubkey, }, @@ -46,12 +48,14 @@ use { cmp::Ordering, collections::{hash_map, BTreeMap, HashMap, VecDeque}, ops::{Bound, Index, IndexMut}, + sync::Mutex, }, }; const CRDS_SHARDS_BITS: u32 = 8; +// Number of vote slots to track in an lru-cache for metrics. +const VOTE_SLOTS_METRICS_CAP: usize = 100; -#[derive(Clone)] pub struct Crds { /// Stores the map of labels and values table: IndexMap, @@ -70,6 +74,7 @@ pub struct Crds { purged: VecDeque<(Hash, u64 /*timestamp*/)>, // Mapping from nodes' pubkeys to their respective shred-version. shred_versions: HashMap, + stats: Mutex, } #[derive(PartialEq, Debug)] @@ -78,6 +83,28 @@ pub enum CrdsError { UnknownStakes, } +#[derive(Clone, Copy)] +pub enum GossipRoute { + LocalMessage, + PullRequest, + PullResponse, + PushMessage, +} + +type CrdsCountsArray = [usize; 11]; + +pub(crate) struct CrdsDataStats { + pub(crate) counts: CrdsCountsArray, + pub(crate) fails: CrdsCountsArray, + pub(crate) votes: LruCache, +} + +#[derive(Default)] +pub(crate) struct CrdsStats { + pub(crate) pull: CrdsDataStats, + pub(crate) push: CrdsDataStats, +} + /// This structure stores some local metadata associated with the CrdsValue #[derive(PartialEq, Debug, Clone)] pub struct VersionedCrdsValue { @@ -130,6 +157,7 @@ impl Default for Crds { entries: BTreeMap::default(), purged: VecDeque::default(), shred_versions: HashMap::default(), + stats: Mutex::::default(), } } } @@ -170,12 +198,18 @@ impl Crds { } } - pub fn insert(&mut self, value: CrdsValue, now: u64) -> Result<(), CrdsError> { + pub fn insert( + &mut self, + value: CrdsValue, + now: u64, + route: GossipRoute, + ) -> Result<(), CrdsError> { let label = value.label(); let pubkey = value.pubkey(); let value = VersionedCrdsValue::new(value, self.cursor, now); match self.table.entry(label) { Entry::Vacant(entry) => { + self.stats.lock().unwrap().record_insert(&value, route); let entry_index = entry.index(); self.shards.insert(entry_index, &value); match &value.value.data { @@ -198,6 +232,7 @@ impl Crds { Ok(()) } Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => { + self.stats.lock().unwrap().record_insert(&value, route); let entry_index = entry.index(); self.shards.remove(entry_index, entry.get()); self.shards.insert(entry_index, &value); @@ -229,6 +264,7 @@ impl Crds { Ok(()) } Entry::Occupied(entry) => { + self.stats.lock().unwrap().record_fail(&value, route); trace!( "INSERT FAILED data: {} new.wallclock: {}", value.value.label(), @@ -556,6 +592,89 @@ impl Crds { } Ok(keys.len()) } + + pub(crate) fn take_stats(&self) -> CrdsStats { + std::mem::take(&mut self.stats.lock().unwrap()) + } + + // Only for tests and simulations. + pub(crate) fn mock_clone(&self) -> Self { + Self { + table: self.table.clone(), + cursor: self.cursor, + shards: self.shards.clone(), + nodes: self.nodes.clone(), + votes: self.votes.clone(), + epoch_slots: self.epoch_slots.clone(), + records: self.records.clone(), + entries: self.entries.clone(), + purged: self.purged.clone(), + shred_versions: self.shred_versions.clone(), + stats: Mutex::::default(), + } + } +} + +impl Default for CrdsDataStats { + fn default() -> Self { + Self { + counts: CrdsCountsArray::default(), + fails: CrdsCountsArray::default(), + votes: LruCache::new(VOTE_SLOTS_METRICS_CAP), + } + } +} + +impl CrdsDataStats { + fn record_insert(&mut self, entry: &VersionedCrdsValue) { + self.counts[Self::ordinal(entry)] += 1; + if let CrdsData::Vote(_, vote) = &entry.value.data { + if let Some(slot) = vote.slot() { + let num_nodes = self.votes.get(&slot).copied().unwrap_or_default(); + self.votes.put(slot, num_nodes + 1); + } + } + } + + fn record_fail(&mut self, entry: &VersionedCrdsValue) { + self.fails[Self::ordinal(entry)] += 1; + } + + fn ordinal(entry: &VersionedCrdsValue) -> usize { + match &entry.value.data { + CrdsData::ContactInfo(_) => 0, + CrdsData::Vote(_, _) => 1, + CrdsData::LowestSlot(_, _) => 2, + CrdsData::SnapshotHashes(_) => 3, + CrdsData::AccountsHashes(_) => 4, + CrdsData::EpochSlots(_, _) => 5, + CrdsData::LegacyVersion(_) => 6, + CrdsData::Version(_) => 7, + CrdsData::NodeInstance(_) => 8, + CrdsData::DuplicateShred(_, _) => 9, + CrdsData::IncrementalSnapshotHashes(_) => 10, + } + } +} + +impl CrdsStats { + fn record_insert(&mut self, entry: &VersionedCrdsValue, route: GossipRoute) { + match route { + GossipRoute::LocalMessage => (), + GossipRoute::PullRequest => (), + GossipRoute::PushMessage => self.push.record_insert(entry), + GossipRoute::PullResponse => self.pull.record_insert(entry), + } + } + + fn record_fail(&mut self, entry: &VersionedCrdsValue, route: GossipRoute) { + match route { + GossipRoute::LocalMessage => (), + GossipRoute::PullRequest => (), + GossipRoute::PushMessage => self.push.record_fail(entry), + GossipRoute::PullResponse => self.pull.record_fail(entry), + } + } } #[cfg(test)] @@ -580,7 +699,10 @@ mod tests { fn test_insert() { let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 0), Ok(())); + assert_eq!( + crds.insert(val.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.table.len(), 1); assert!(crds.table.contains_key(&val.label())); assert_eq!(crds.table[&val.label()].local_timestamp, 0); @@ -589,8 +711,14 @@ mod tests { fn test_update_old() { let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 0), Ok(())); - assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed)); + assert_eq!( + crds.insert(val.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); + assert_eq!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Err(CrdsError::InsertFailed) + ); assert!(crds.purged.is_empty()); assert_eq!(crds.table[&val.label()].local_timestamp, 0); } @@ -602,12 +730,15 @@ mod tests { 0, ))); let value_hash = hash(&serialize(&original).unwrap()); - assert_matches!(crds.insert(original, 0), Ok(())); + assert_matches!(crds.insert(original, 0, GossipRoute::LocalMessage), Ok(())); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::default(), 1, ))); - assert_eq!(crds.insert(val.clone(), 1), Ok(())); + assert_eq!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(*crds.purged.back().unwrap(), (value_hash, 1)); assert_eq!(crds.table[&val.label()].local_timestamp, 1); } @@ -618,13 +749,19 @@ mod tests { &Pubkey::default(), 0, ))); - assert_eq!(crds.insert(val.clone(), 0), Ok(())); + assert_eq!( + crds.insert(val.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.table[&val.label()].ordinal, 0); let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); let value_hash = hash(&serialize(&val2).unwrap()); assert_eq!(val2.label().pubkey(), val.label().pubkey()); - assert_eq!(crds.insert(val2.clone(), 0), Ok(())); + assert_eq!( + crds.insert(val2.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); crds.update_record_timestamp(&val.label().pubkey(), 2); assert_eq!(crds.table[&val.label()].local_timestamp, 2); @@ -639,7 +776,7 @@ mod tests { let mut ci = ContactInfo::default(); ci.wallclock += 1; let val3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci)); - assert_eq!(crds.insert(val3, 3), Ok(())); + assert_eq!(crds.insert(val3, 3, GossipRoute::LocalMessage), Ok(())); assert_eq!(*crds.purged.back().unwrap(), (value_hash, 3)); assert_eq!(crds.table[&val2.label()].local_timestamp, 3); assert_eq!(crds.table[&val2.label()].ordinal, 2); @@ -657,19 +794,22 @@ mod tests { let pubkey = Pubkey::new_unique(); let node = NodeInstance::new(&mut rng, pubkey, now); let node = make_crds_value(node); - assert_eq!(crds.insert(node, now), Ok(())); + assert_eq!(crds.insert(node, now, GossipRoute::LocalMessage), Ok(())); // A node-instance with a different key should insert fine even with // older timestamps. let other = NodeInstance::new(&mut rng, Pubkey::new_unique(), now - 1); let other = make_crds_value(other); - assert_eq!(crds.insert(other, now), Ok(())); + assert_eq!(crds.insert(other, now, GossipRoute::LocalMessage), Ok(())); // A node-instance with older timestamp should fail to insert, even if // the wallclock is more recent. let other = NodeInstance::new(&mut rng, pubkey, now - 1); let other = other.with_wallclock(now + 1); let other = make_crds_value(other); let value_hash = hash(&serialize(&other).unwrap()); - assert_eq!(crds.insert(other, now), Err(CrdsError::InsertFailed)); + assert_eq!( + crds.insert(other, now, GossipRoute::LocalMessage), + Err(CrdsError::InsertFailed) + ); assert_eq!(*crds.purged.back().unwrap(), (value_hash, now)); // A node instance with the same timestamp should insert only if the // random token is larger. @@ -678,7 +818,7 @@ mod tests { let other = NodeInstance::new(&mut rng, pubkey, now); let other = make_crds_value(other); let value_hash = hash(&serialize(&other).unwrap()); - match crds.insert(other, now) { + match crds.insert(other, now, GossipRoute::LocalMessage) { Ok(()) => num_overrides += 1, Err(CrdsError::InsertFailed) => { assert_eq!(*crds.purged.back().unwrap(), (value_hash, now)) @@ -693,7 +833,7 @@ mod tests { let other = NodeInstance::new(&mut rng, pubkey, now + k); let other = other.with_wallclock(now - 1); let other = make_crds_value(other); - match crds.insert(other, now) { + match crds.insert(other, now, GossipRoute::LocalMessage) { Ok(()) => (), _ => panic!(), } @@ -705,7 +845,10 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 1), Ok(())); + assert_eq!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(()) + ); let mut set = HashMap::new(); set.insert(Pubkey::default(), 0); assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty()); @@ -728,7 +871,10 @@ mod tests { let mut timeouts = HashMap::new(); let val = CrdsValue::new_rand(&mut rng, None); timeouts.insert(Pubkey::default(), 3); - assert_eq!(crds.insert(val.clone(), 0), Ok(())); + assert_eq!( + crds.insert(val.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty()); timeouts.insert(val.pubkey(), 1); assert_eq!( @@ -751,7 +897,10 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_matches!(crds.insert(val.clone(), 1), Ok(_)); + assert_matches!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(_) + ); let mut set = HashMap::new(); set.insert(Pubkey::default(), 1); assert_eq!( @@ -766,7 +915,10 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_eq!(crds.insert(val.clone(), 1), Ok(())); + assert_eq!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(()) + ); let mut set = HashMap::new(); //now < timestamp set.insert(Pubkey::default(), 0); @@ -808,7 +960,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - if let Ok(()) = crds.insert(value, local_timestamp) { + if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) { num_inserts += 1; check_crds_shards(&crds); } @@ -962,7 +1114,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - if let Ok(()) = crds.insert(value, local_timestamp) { + if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) { num_inserts += 1; } if k % 16 == 0 { @@ -1016,7 +1168,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - let _ = crds.insert(value, local_timestamp); + let _ = crds.insert(value, local_timestamp, GossipRoute::LocalMessage); if k % 64 == 0 { check_crds_records(&crds); } @@ -1047,7 +1199,10 @@ mod tests { node.shred_version = 42; let node = CrdsData::ContactInfo(node); let node = CrdsValue::new_unsigned(node); - assert_eq!(crds.insert(node, timestamp()), Ok(())); + assert_eq!( + crds.insert(node, timestamp(), GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.get_shred_version(&pubkey), Some(42)); // An outdated value should not update shred-version: let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); @@ -1055,7 +1210,10 @@ mod tests { node.shred_version = 8; let node = CrdsData::ContactInfo(node); let node = CrdsValue::new_unsigned(node); - assert_eq!(crds.insert(node, timestamp()), Err(CrdsError::InsertFailed)); + assert_eq!( + crds.insert(node, timestamp(), GossipRoute::LocalMessage), + Err(CrdsError::InsertFailed) + ); assert_eq!(crds.get_shred_version(&pubkey), Some(42)); // Update shred version: let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); @@ -1063,13 +1221,19 @@ mod tests { node.shred_version = 8; let node = CrdsData::ContactInfo(node); let node = CrdsValue::new_unsigned(node); - assert_eq!(crds.insert(node, timestamp()), Ok(())); + assert_eq!( + crds.insert(node, timestamp(), GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.get_shred_version(&pubkey), Some(8)); // Add other crds values with the same pubkey. let val = SnapshotHashes::new_rand(&mut rng, Some(pubkey)); let val = CrdsData::SnapshotHashes(val); let val = CrdsValue::new_unsigned(val); - assert_eq!(crds.insert(val, timestamp()), Ok(())); + assert_eq!( + crds.insert(val, timestamp(), GossipRoute::LocalMessage), + Ok(()) + ); assert_eq!(crds.get_shred_version(&pubkey), Some(8)); // Remove contact-info. Shred version should stay there since there // are still values associated with the pubkey. @@ -1106,7 +1270,7 @@ mod tests { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let local_timestamp = new_rand_timestamp(&mut rng); - let _ = crds.insert(value, local_timestamp); + let _ = crds.insert(value, local_timestamp, GossipRoute::LocalMessage); } let num_values = crds.table.len(); let num_pubkeys = num_unique_pubkeys(crds.table.values()); @@ -1147,7 +1311,10 @@ mod tests { let thread_pool = ThreadPoolBuilder::new().build().unwrap(); let mut crds = Crds::default(); let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); - assert_matches!(crds.insert(val.clone(), 1), Ok(_)); + assert_matches!( + crds.insert(val.clone(), 1, GossipRoute::LocalMessage), + Ok(_) + ); let mut set = HashMap::new(); //default has max timeout, but pubkey should still expire diff --git a/gossip/src/crds_entry.rs b/gossip/src/crds_entry.rs index a3541ef3f3..c2b5235b80 100644 --- a/gossip/src/crds_entry.rs +++ b/gossip/src/crds_entry.rs @@ -78,7 +78,10 @@ impl<'a, 'b> CrdsEntry<'a, 'b> for &'a SnapshotHashes { mod tests { use { super::*, - crate::{crds::Crds, crds_value::new_rand_timestamp}, + crate::{ + crds::{Crds, GossipRoute}, + crds_value::new_rand_timestamp, + }, rand::seq::SliceRandom, solana_sdk::signature::Keypair, std::collections::HashMap, @@ -94,7 +97,11 @@ mod tests { let keypair = keypairs.choose(&mut rng).unwrap(); let value = CrdsValue::new_rand(&mut rng, Some(keypair)); let key = value.label(); - if let Ok(()) = crds.insert(value.clone(), new_rand_timestamp(&mut rng)) { + if let Ok(()) = crds.insert( + value.clone(), + new_rand_timestamp(&mut rng), + GossipRoute::LocalMessage, + ) { entries.insert(key, value); } } diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 05625dcb9c..199c881c7f 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -8,7 +8,7 @@ use { crate::{ cluster_info::Ping, contact_info::ContactInfo, - crds::Crds, + crds::{Crds, GossipRoute}, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}, @@ -89,7 +89,7 @@ impl CrdsGossip { { let mut crds = self.crds.write().unwrap(); for entry in pending_push_messages { - let _ = crds.insert(entry, now); + let _ = crds.insert(entry, now, GossipRoute::LocalMessage); } } self.push.new_push_messages(&self.crds, now) @@ -151,7 +151,7 @@ impl CrdsGossip { }); let now = timestamp(); for entry in entries { - if let Err(err) = crds.insert(entry, now) { + if let Err(err) = crds.insert(entry, now, GossipRoute::LocalMessage) { error!("push_duplicate_shred faild: {:?}", err); } } @@ -337,7 +337,7 @@ impl CrdsGossip { // Only for tests and simulations. pub(crate) fn mock_clone(&self) -> Self { - let crds = self.crds.read().unwrap().clone(); + let crds = self.crds.read().unwrap().mock_clone(); Self { crds: RwLock::new(crds), push: self.push.mock_clone(), @@ -385,6 +385,7 @@ mod test { .insert( CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())), 0, + GossipRoute::LocalMessage, ) .unwrap(); crds_gossip.refresh_push_active_set( diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 544259d4df..c4167eb69f 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -15,7 +15,7 @@ use { crate::{ cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, contact_info::ContactInfo, - crds::{Crds, VersionedCrdsValue}, + crds::{Crds, GossipRoute, VersionedCrdsValue}, crds_gossip::{get_stake, get_weight}, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, @@ -348,7 +348,7 @@ impl CrdsGossipPull { let mut crds = crds.write().unwrap(); for caller in callers { let key = caller.pubkey(); - let _ = crds.insert(caller, now); + let _ = crds.insert(caller, now, GossipRoute::PullRequest); crds.update_record_timestamp(&key, now); } } @@ -430,12 +430,12 @@ impl CrdsGossipPull { let mut owners = HashSet::new(); let mut crds = crds.write().unwrap(); for response in responses_expired_timeout { - let _ = crds.insert(response, now); + let _ = crds.insert(response, now, GossipRoute::PullResponse); } let mut num_inserts = 0; for response in responses { let owner = response.pubkey(); - if let Ok(()) = crds.insert(response, now) { + if let Ok(()) = crds.insert(response, now, GossipRoute::PullResponse) { num_inserts += 1; owners.insert(owner); } @@ -731,14 +731,16 @@ pub(crate) mod tests { &solana_sdk::pubkey::new_rand(), 0, ))); - crds.insert(me.clone(), 0).unwrap(); + crds.insert(me.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); for i in 1..=30 { let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); let id = entry.label().pubkey(); - crds.insert(entry.clone(), 0).unwrap(); + crds.insert(entry.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); stakes.insert(id, i * 100); } let now = 1024; @@ -792,10 +794,14 @@ pub(crate) mod tests { ..ContactInfo::default() })); - crds.insert(me.clone(), 0).unwrap(); - crds.insert(spy.clone(), 0).unwrap(); - crds.insert(node_123.clone(), 0).unwrap(); - crds.insert(node_456.clone(), 0).unwrap(); + crds.insert(me.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(spy.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_456.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); let crds = RwLock::new(crds); // shred version 123 should ignore nodes with versions 0 and 456 @@ -854,8 +860,10 @@ pub(crate) mod tests { ..ContactInfo::default() })); - crds.insert(me.clone(), 0).unwrap(); - crds.insert(node_123.clone(), 0).unwrap(); + crds.insert(me.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); let crds = RwLock::new(crds); // Empty gossip_validators -- will pull from nobody @@ -958,7 +966,10 @@ pub(crate) mod tests { for _ in 0..40_000 { let keypair = keypairs.choose(&mut rng).unwrap(); let value = CrdsValue::new_rand(&mut rng, Some(keypair)); - if crds.insert(value, rng.gen()).is_ok() { + if crds + .insert(value, rng.gen(), GossipRoute::LocalMessage) + .is_ok() + { num_inserts += 1; } } @@ -1019,7 +1030,10 @@ pub(crate) mod tests { Err(CrdsGossipError::NoPeers) ); - crds.write().unwrap().insert(entry, 0).unwrap(); + crds.write() + .unwrap() + .insert(entry, 0, GossipRoute::LocalMessage) + .unwrap(); assert_eq!( node.new_pull_request( &thread_pool, @@ -1043,7 +1057,10 @@ pub(crate) mod tests { .unwrap() .mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - crds.write().unwrap().insert(new.clone(), now).unwrap(); + crds.write() + .unwrap() + .insert(new.clone(), now, GossipRoute::LocalMessage) + .unwrap(); let req = node.new_pull_request( &thread_pool, &crds, @@ -1063,7 +1080,10 @@ pub(crate) mod tests { node.mark_pull_request_creation_time(new.contact_info().unwrap().id, now); let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now); let offline = CrdsValue::new_unsigned(CrdsData::ContactInfo(offline)); - crds.write().unwrap().insert(offline, now).unwrap(); + crds.write() + .unwrap() + .insert(offline, now, GossipRoute::LocalMessage) + .unwrap(); let req = node.new_pull_request( &thread_pool, &crds, @@ -1098,15 +1118,17 @@ pub(crate) mod tests { 0, ))); let node = CrdsGossipPull::default(); - crds.insert(entry, now).unwrap(); + crds.insert(entry, now, GossipRoute::LocalMessage).unwrap(); let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(old.id, old.gossip, Instant::now()); let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(old)); - crds.insert(old.clone(), now).unwrap(); + crds.insert(old.clone(), now, GossipRoute::LocalMessage) + .unwrap(); let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - crds.insert(new.clone(), now).unwrap(); + crds.insert(new.clone(), now, GossipRoute::LocalMessage) + .unwrap(); let crds = RwLock::new(crds); // set request creation time to now. @@ -1193,11 +1215,13 @@ pub(crate) mod tests { ))); let caller = entry.clone(); let node = CrdsGossipPull::default(); - node_crds.insert(entry, 0).unwrap(); + node_crds + .insert(entry, 0, GossipRoute::LocalMessage) + .unwrap(); let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - node_crds.insert(new, 0).unwrap(); + node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap(); let node_crds = RwLock::new(node_crds); let mut pings = Vec::new(); let req = node.new_pull_request( @@ -1234,7 +1258,11 @@ pub(crate) mod tests { dest_crds .write() .unwrap() - .insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS) + .insert( + new, + CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + GossipRoute::LocalMessage, + ) .unwrap(); //should skip new value since caller is to old @@ -1283,7 +1311,9 @@ pub(crate) mod tests { ))); let caller = entry.clone(); let node = CrdsGossipPull::default(); - node_crds.insert(entry, 0).unwrap(); + node_crds + .insert(entry, 0, GossipRoute::LocalMessage) + .unwrap(); let mut ping_cache = PingCache::new( Duration::from_secs(20 * 60), // ttl 128, // capacity @@ -1291,7 +1321,7 @@ pub(crate) mod tests { let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - node_crds.insert(new, 0).unwrap(); + node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap(); let node_crds = RwLock::new(node_crds); let mut pings = Vec::new(); let req = node.new_pull_request( @@ -1340,7 +1370,9 @@ pub(crate) mod tests { let caller = entry.clone(); let node_pubkey = entry.label().pubkey(); let node = CrdsGossipPull::default(); - node_crds.insert(entry, 0).unwrap(); + node_crds + .insert(entry, 0, GossipRoute::LocalMessage) + .unwrap(); let mut ping_cache = PingCache::new( Duration::from_secs(20 * 60), // ttl 128, // capacity @@ -1348,14 +1380,16 @@ pub(crate) mod tests { let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - node_crds.insert(new, 0).unwrap(); + node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap(); let mut dest_crds = Crds::default(); let new_id = solana_sdk::pubkey::new_rand(); let new = ContactInfo::new_localhost(&new_id, 1); ping_cache.mock_pong(new.id, new.gossip, Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new)); - dest_crds.insert(new.clone(), 0).unwrap(); + dest_crds + .insert(new.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); let dest_crds = RwLock::new(dest_crds); // node contains a key from the dest node, but at an older local timestamp @@ -1364,7 +1398,9 @@ pub(crate) mod tests { let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(same_key)); assert_eq!(same_key.label(), new.label()); assert!(same_key.wallclock() < new.wallclock()); - node_crds.insert(same_key.clone(), 0).unwrap(); + node_crds + .insert(same_key.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); assert_eq!(0, { let entry: &VersionedCrdsValue = node_crds.get(&same_key.label()).unwrap(); entry.local_timestamp @@ -1449,12 +1485,16 @@ pub(crate) mod tests { let node_label = entry.label(); let node_pubkey = node_label.pubkey(); let node = CrdsGossipPull::default(); - node_crds.insert(entry, 0).unwrap(); + node_crds + .insert(entry, 0, GossipRoute::LocalMessage) + .unwrap(); let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &solana_sdk::pubkey::new_rand(), 0, ))); - node_crds.insert(old.clone(), 0).unwrap(); + node_crds + .insert(old.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); let value_hash = { let entry: &VersionedCrdsValue = node_crds.get(&old.label()).unwrap(); entry.value_hash diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 36cfbfc007..9f3ba781a8 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -15,7 +15,7 @@ use { crate::{ cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY, contact_info::ContactInfo, - crds::{Crds, Cursor}, + crds::{Crds, Cursor, GossipRoute}, crds_gossip::{get_stake, get_weight}, crds_gossip_error::CrdsGossipError, crds_value::CrdsValue, @@ -240,7 +240,7 @@ impl CrdsGossipPush { .map(|value| { let value = value?; let origin = value.pubkey(); - match crds.insert(value, now) { + match crds.insert(value, now, GossipRoute::PushMessage) { Ok(()) => Ok(origin), Err(_) => { self.num_old.fetch_add(1, Ordering::Relaxed); @@ -665,7 +665,10 @@ mod test { 0, ))); - assert_eq!(crds.insert(value1.clone(), now), Ok(())); + assert_eq!( + crds.insert(value1.clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); let crds = RwLock::new(crds); push.refresh_push_active_set( &crds, @@ -686,7 +689,12 @@ mod test { ))); assert!(active_set.get(&value2.label().pubkey()).is_none()); drop(active_set); - assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(())); + assert_eq!( + crds.write() + .unwrap() + .insert(value2.clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); for _ in 0..30 { push.refresh_push_active_set( &crds, @@ -711,7 +719,12 @@ mod test { let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo( ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0), )); - assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(())); + assert_eq!( + crds.write() + .unwrap() + .insert(value2.clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); } push.refresh_push_active_set( &crds, @@ -738,7 +751,8 @@ mod test { time, ))); let id = peer.label().pubkey(); - crds.insert(peer.clone(), time).unwrap(); + crds.insert(peer.clone(), time, GossipRoute::LocalMessage) + .unwrap(); stakes.insert(id, i * 100); push.last_pushed_to.write().unwrap().put(id, time); } @@ -791,10 +805,14 @@ mod test { ..ContactInfo::default() })); - crds.insert(me.clone(), now).unwrap(); - crds.insert(spy.clone(), now).unwrap(); - crds.insert(node_123.clone(), now).unwrap(); - crds.insert(node_456, now).unwrap(); + crds.insert(me.clone(), now, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(spy.clone(), now, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_123.clone(), now, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_456, now, GossipRoute::LocalMessage) + .unwrap(); let crds = RwLock::new(crds); // shred version 123 should ignore nodes with versions 0 and 456 @@ -845,8 +863,10 @@ mod test { ..ContactInfo::default() })); - crds.insert(me.clone(), 0).unwrap(); - crds.insert(node_123.clone(), now).unwrap(); + crds.insert(me.clone(), 0, GossipRoute::LocalMessage) + .unwrap(); + crds.insert(node_123.clone(), now, GossipRoute::LocalMessage) + .unwrap(); let crds = RwLock::new(crds); // Unknown pubkey in gossip_validators -- will push to nobody @@ -898,7 +918,10 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer.clone(), now), Ok(())); + assert_eq!( + crds.insert(peer.clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); let crds = RwLock::new(crds); push.refresh_push_active_set( &crds, @@ -940,8 +963,14 @@ mod test { }) .collect(); let origin: Vec<_> = peers.iter().map(|node| node.pubkey()).collect(); - assert_eq!(crds.insert(peers[0].clone(), now), Ok(())); - assert_eq!(crds.insert(peers[1].clone(), now), Ok(())); + assert_eq!( + crds.insert(peers[0].clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); + assert_eq!( + crds.insert(peers[1].clone(), now, GossipRoute::LocalMessage), + Ok(()) + ); let crds = RwLock::new(crds); assert_eq!( push.process_push_message(&crds, &Pubkey::default(), vec![peers[2].clone()], now), @@ -977,7 +1006,10 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer.clone(), 0), Ok(())); + assert_eq!( + crds.insert(peer.clone(), 0, GossipRoute::LocalMessage), + Ok(()) + ); let crds = RwLock::new(crds); push.refresh_push_active_set( &crds, @@ -1015,7 +1047,7 @@ mod test { &solana_sdk::pubkey::new_rand(), 0, ))); - assert_eq!(crds.insert(peer, 0), Ok(())); + assert_eq!(crds.insert(peer, 0, GossipRoute::LocalMessage), Ok(())); let crds = RwLock::new(crds); push.refresh_push_active_set( &crds, diff --git a/gossip/src/crds_shards.rs b/gossip/src/crds_shards.rs index f410ac5cdc..3bf7b126dd 100644 --- a/gossip/src/crds_shards.rs +++ b/gossip/src/crds_shards.rs @@ -134,7 +134,10 @@ where mod test { use { super::*, - crate::{crds::Crds, crds_value::CrdsValue}, + crate::{ + crds::{Crds, GossipRoute}, + crds_value::CrdsValue, + }, rand::{thread_rng, Rng}, solana_sdk::timing::timestamp, std::{collections::HashSet, iter::repeat_with, ops::Index}, @@ -144,7 +147,8 @@ mod test { let value = CrdsValue::new_rand(rng, None); let label = value.label(); let mut crds = Crds::default(); - crds.insert(value, timestamp()).unwrap(); + crds.insert(value, timestamp(), GossipRoute::LocalMessage) + .unwrap(); crds.get::<&VersionedCrdsValue>(&label).cloned().unwrap() } diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 4160ca3b8a..fb0f6b2f34 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -7,6 +7,7 @@ use { solana_gossip::{ cluster_info, contact_info::ContactInfo, + crds::GossipRoute, crds_gossip::*, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, @@ -108,8 +109,12 @@ fn star_network_create(num: usize) -> Network { let node = CrdsGossip::default(); { let mut node_crds = node.crds.write().unwrap(); - node_crds.insert(new.clone(), timestamp()).unwrap(); - node_crds.insert(entry.clone(), timestamp()).unwrap(); + node_crds + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) + .unwrap(); + node_crds + .insert(entry.clone(), timestamp(), GossipRoute::LocalMessage) + .unwrap(); } let node = Node::new(node_keypair, contact_info, Arc::new(node)); (new.label().pubkey(), node) @@ -120,7 +125,7 @@ fn star_network_create(num: usize) -> Network { node.crds .write() .unwrap() - .insert(entry, timestamp()) + .insert(entry, timestamp(), GossipRoute::LocalMessage) .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(node)); network.insert(id, node); @@ -137,7 +142,7 @@ fn rstar_network_create(num: usize) -> Network { .crds .write() .unwrap() - .insert(entry, timestamp()) + .insert(entry, timestamp(), GossipRoute::LocalMessage) .unwrap(); let mut network: HashMap<_, _> = (1..num) .map(|_| { @@ -148,13 +153,13 @@ fn rstar_network_create(num: usize) -> Network { node.crds .write() .unwrap() - .insert(new.clone(), timestamp()) + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); origin .crds .write() .unwrap() - .insert(new.clone(), timestamp()) + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(node)); (new.label().pubkey(), node) @@ -175,7 +180,7 @@ fn ring_network_create(num: usize) -> Network { node.crds .write() .unwrap() - .insert(new.clone(), timestamp()) + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(node)); (new.label().pubkey(), node) @@ -192,7 +197,9 @@ fn ring_network_create(num: usize) -> Network { }; let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap(); let mut end_crds = end.gossip.crds.write().unwrap(); - end_crds.insert(start_info, timestamp()).unwrap(); + end_crds + .insert(start_info, timestamp(), GossipRoute::LocalMessage) + .unwrap(); } Network::new(network) } @@ -208,7 +215,7 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { node.crds .write() .unwrap() - .insert(new.clone(), timestamp()) + .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); let node = Node::staked(node_keypair, contact_info, Arc::new(node), stakes[n]); (new.label().pubkey(), node) @@ -230,7 +237,9 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network { for k in 0..keys.len() { if keys[k] != *end_pubkey { let start_info = start_entries[k].clone(); - end_crds.insert(start_info, timestamp()).unwrap(); + end_crds + .insert(start_info, timestamp(), GossipRoute::LocalMessage) + .unwrap(); } } } @@ -709,6 +718,7 @@ fn test_prune_errors() { .insert( CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())), 0, + GossipRoute::LocalMessage, ) .unwrap(); crds_gossip.refresh_push_active_set( diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 0a602b8c41..b3e42cbb16 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -494,6 +494,7 @@ mod tests { crate::rpc::create_validator_exit, solana_gossip::{ contact_info::ContactInfo, + crds::GossipRoute, crds_value::{CrdsData, CrdsValue, SnapshotHashes}, }, solana_ledger::{ @@ -794,6 +795,7 @@ mod tests { ], ))), 1, + GossipRoute::LocalMessage, ) .unwrap(); assert_eq!(rm.health_check(), "ok"); @@ -810,6 +812,7 @@ mod tests { vec![(1000 + health_check_slot_distance - 1, Hash::default())], ))), 1, + GossipRoute::LocalMessage, ) .unwrap(); assert_eq!(rm.health_check(), "ok"); @@ -826,6 +829,7 @@ mod tests { vec![(1000 + health_check_slot_distance, Hash::default())], ))), 1, + GossipRoute::LocalMessage, ) .unwrap(); assert_eq!(rm.health_check(), "behind");