adds metrics tracking crds writes and votes (#20953)

This commit is contained in:
behzad nouri 2021-10-26 13:02:30 +00:00 committed by GitHub
parent cd5e690427
commit 1297a13586
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 544 additions and 111 deletions

View File

@ -523,6 +523,7 @@ mod tests {
super::*,
rand::{seq::SliceRandom, Rng},
solana_gossip::{
crds::GossipRoute,
crds_value::{CrdsData, CrdsValue},
deprecated::{
shuffle_peers_and_index, sorted_retransmit_peers_and_stakes,
@ -589,7 +590,10 @@ mod tests {
for node in nodes.iter().skip(1) {
let node = CrdsData::ContactInfo(node.clone());
let node = CrdsValue::new_unsigned(node);
assert_eq!(gossip_crds.insert(node, now), Ok(()));
assert_eq!(
gossip_crds.insert(node, now, GossipRoute::LocalMessage),
Ok(())
);
}
}
(nodes, stakes, cluster_info)

View File

@ -6,7 +6,9 @@ use {
rand::{thread_rng, Rng},
rayon::ThreadPoolBuilder,
solana_gossip::{
crds::Crds, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_value::CrdsValue,
crds::{Crds, GossipRoute},
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
crds_value::CrdsValue,
},
solana_sdk::pubkey::Pubkey,
std::collections::HashMap,
@ -21,7 +23,7 @@ fn bench_find_old_labels(bencher: &mut Bencher) {
let now = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS + CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 1000;
std::iter::repeat_with(|| (CrdsValue::new_rand(&mut rng, None), rng.gen_range(0, now)))
.take(50_000)
.for_each(|(v, ts)| assert!(crds.insert(v, ts).is_ok()));
.for_each(|(v, ts)| assert!(crds.insert(v, ts, GossipRoute::LocalMessage).is_ok()));
let mut timeouts = HashMap::new();
timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS);
bencher.iter(|| {

View File

@ -7,7 +7,7 @@ use {
rayon::ThreadPoolBuilder,
solana_gossip::{
cluster_info::MAX_BLOOM_SIZE,
crds::Crds,
crds::{Crds, GossipRoute},
crds_gossip_pull::{CrdsFilter, CrdsGossipPull},
crds_value::CrdsValue,
},
@ -39,7 +39,11 @@ fn bench_build_crds_filters(bencher: &mut Bencher) {
let mut num_inserts = 0;
for _ in 0..90_000 {
if crds
.insert(CrdsValue::new_rand(&mut rng, None), rng.gen())
.insert(
CrdsValue::new_rand(&mut rng, None),
rng.gen(),
GossipRoute::LocalMessage,
)
.is_ok()
{
num_inserts += 1;

View File

@ -5,7 +5,7 @@ extern crate test;
use {
rand::{thread_rng, Rng},
solana_gossip::{
crds::{Crds, VersionedCrdsValue},
crds::{Crds, GossipRoute, VersionedCrdsValue},
crds_shards::CrdsShards,
crds_value::CrdsValue,
},
@ -20,7 +20,8 @@ fn new_test_crds_value<R: Rng>(rng: &mut R) -> VersionedCrdsValue {
let value = CrdsValue::new_rand(rng, None);
let label = value.label();
let mut crds = Crds::default();
crds.insert(value, timestamp()).unwrap();
crds.insert(value, timestamp(), GossipRoute::LocalMessage)
.unwrap();
crds.get::<&VersionedCrdsValue>(&label).cloned().unwrap()
}

View File

@ -18,7 +18,7 @@ use {
submit_gossip_stats, Counter, GossipStats, ScopedTimer, TimedGuard,
},
contact_info::ContactInfo,
crds::{Crds, Cursor},
crds::{Crds, Cursor, GossipRoute},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
@ -493,7 +493,8 @@ impl ClusterInfo {
// TODO kill insert_info, only used by tests
pub fn insert_info(&self, contact_info: ContactInfo) {
let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair());
let _ = self.gossip.crds.write().unwrap().insert(value, timestamp());
let mut gossip_crds = self.gossip.crds.write().unwrap();
let _ = gossip_crds.insert(value, timestamp(), GossipRoute::LocalMessage);
}
pub fn set_entrypoint(&self, entrypoint: ContactInfo) {
@ -608,7 +609,7 @@ impl ClusterInfo {
let now = timestamp();
let mut gossip_crds = self.gossip.crds.write().unwrap();
for node in nodes {
if let Err(err) = gossip_crds.insert(node, now) {
if let Err(err) = gossip_crds.insert(node, now, GossipRoute::LocalMessage) {
warn!("crds insert failed {:?}", err);
}
}
@ -908,7 +909,7 @@ impl ClusterInfo {
let mut gossip_crds = self.gossip.crds.write().unwrap();
let now = timestamp();
for entry in entries {
if let Err(err) = gossip_crds.insert(entry, now) {
if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_epoch_slots failed: {:?}", err);
}
}
@ -983,7 +984,7 @@ impl ClusterInfo {
let vote = CrdsData::Vote(vote_index, vote);
let vote = CrdsValue::new_signed(vote, &self.keypair());
let mut gossip_crds = self.gossip.crds.write().unwrap();
if let Err(err) = gossip_crds.insert(vote, now) {
if let Err(err) = gossip_crds.insert(vote, now, GossipRoute::LocalMessage) {
error!("push_vote failed: {:?}", err);
}
}
@ -1327,7 +1328,8 @@ impl ClusterInfo {
CrdsData::ContactInfo(self.my_contact_info()),
&self.keypair(),
);
let _ = self.gossip.crds.write().unwrap().insert(value, timestamp());
let mut gossip_crds = self.gossip.crds.write().unwrap();
let _ = gossip_crds.insert(value, timestamp(), GossipRoute::LocalMessage);
}
// If the network entrypoint hasn't been discovered yet, add it to the crds table
@ -1486,7 +1488,7 @@ impl ClusterInfo {
let mut gossip_crds = self.gossip.crds.write().unwrap();
let now = timestamp();
for entry in pending_push_messages {
let _ = gossip_crds.insert(entry, now);
let _ = gossip_crds.insert(entry, now, GossipRoute::LocalMessage);
}
}
fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
@ -3780,7 +3782,9 @@ mod tests {
{
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
for entry in entries {
assert!(gossip_crds.insert(entry, /*now=*/ 0).is_ok());
assert!(gossip_crds
.insert(entry, /*now=*/ 0, GossipRoute::LocalMessage)
.is_ok());
}
}
// Should exclude other node's epoch-slot because of different
@ -4077,7 +4081,7 @@ mod tests {
LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()),
));
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
let _ = gossip_crds.insert(value, timestamp());
let _ = gossip_crds.insert(value, timestamp(), GossipRoute::LocalMessage);
}
// only half the visible peers should be eligible to serve this repair
assert_eq!(cluster_info.repair_peers(5).len(), 5);

View File

@ -1,7 +1,8 @@
use {
crate::crds_gossip::CrdsGossip,
itertools::Itertools,
solana_measure::measure::Measure,
solana_sdk::pubkey::Pubkey,
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::HashMap,
ops::{Deref, DerefMut},
@ -160,9 +161,10 @@ pub(crate) fn submit_gossip_stats(
gossip: &CrdsGossip,
stakes: &HashMap<Pubkey, u64>,
) {
let (table_size, num_nodes, num_pubkeys, purged_values_size, failed_inserts_size) = {
let (crds_stats, table_size, num_nodes, num_pubkeys, purged_values_size, failed_inserts_size) = {
let gossip_crds = gossip.crds.read().unwrap();
(
gossip_crds.take_stats(),
gossip_crds.len(),
gossip_crds.num_nodes(),
gossip_crds.num_pubkeys(),
@ -446,4 +448,155 @@ pub(crate) fn submit_gossip_stats(
i64
),
);
let counts: Vec<_> = crds_stats
.pull
.counts
.iter()
.zip(crds_stats.push.counts.iter())
.map(|(a, b)| a + b)
.collect();
datapoint_info!(
"cluster_info_crds_stats",
("ContactInfo", counts[0], i64),
("ContactInfo-push", crds_stats.push.counts[0], i64),
("ContactInfo-pull", crds_stats.pull.counts[0], i64),
("Vote", counts[1], i64),
("Vote-push", crds_stats.push.counts[1], i64),
("Vote-pull", crds_stats.pull.counts[1], i64),
("LowestSlot", counts[2], i64),
("LowestSlot-push", crds_stats.push.counts[2], i64),
("LowestSlot-pull", crds_stats.pull.counts[2], i64),
("SnapshotHashes", counts[3], i64),
("SnapshotHashes-push", crds_stats.push.counts[3], i64),
("SnapshotHashes-pull", crds_stats.pull.counts[3], i64),
("AccountsHashes", counts[4], i64),
("AccountsHashes-push", crds_stats.push.counts[4], i64),
("AccountsHashes-pull", crds_stats.pull.counts[4], i64),
("EpochSlots", counts[5], i64),
("EpochSlots-push", crds_stats.push.counts[5], i64),
("EpochSlots-pull", crds_stats.pull.counts[5], i64),
("LegacyVersion", counts[6], i64),
("LegacyVersion-push", crds_stats.push.counts[6], i64),
("LegacyVersion-pull", crds_stats.pull.counts[6], i64),
("Version", counts[7], i64),
("Version-push", crds_stats.push.counts[7], i64),
("Version-pull", crds_stats.pull.counts[7], i64),
("NodeInstance", counts[8], i64),
("NodeInstance-push", crds_stats.push.counts[8], i64),
("NodeInstance-pull", crds_stats.pull.counts[8], i64),
("DuplicateShred", counts[9], i64),
("DuplicateShred-push", crds_stats.push.counts[9], i64),
("DuplicateShred-pull", crds_stats.pull.counts[9], i64),
("IncrementalSnapshotHashes", counts[10], i64),
(
"IncrementalSnapshotHashes-push",
crds_stats.push.counts[10],
i64
),
(
"IncrementalSnapshotHashes-pull",
crds_stats.pull.counts[10],
i64
),
("all", counts.iter().sum::<usize>(), i64),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
i64
),
(
"all-pull",
crds_stats.pull.counts.iter().sum::<usize>(),
i64
),
);
let fails: Vec<_> = crds_stats
.pull
.fails
.iter()
.zip(crds_stats.push.fails.iter())
.map(|(a, b)| a + b)
.collect();
datapoint_info!(
"cluster_info_crds_stats_fails",
("ContactInfo", fails[0], i64),
("ContactInfo-push", crds_stats.push.fails[0], i64),
("ContactInfo-pull", crds_stats.pull.fails[0], i64),
("Vote", fails[1], i64),
("Vote-push", crds_stats.push.fails[1], i64),
("Vote-pull", crds_stats.pull.fails[1], i64),
("LowestSlot", fails[2], i64),
("LowestSlot-push", crds_stats.push.fails[2], i64),
("LowestSlot-pull", crds_stats.pull.fails[2], i64),
("SnapshotHashes", fails[3], i64),
("SnapshotHashes-push", crds_stats.push.fails[3], i64),
("SnapshotHashes-pull", crds_stats.pull.fails[3], i64),
("AccountsHashes", fails[4], i64),
("AccountsHashes-push", crds_stats.push.fails[4], i64),
("AccountsHashes-pull", crds_stats.pull.fails[4], i64),
("EpochSlots", fails[5], i64),
("EpochSlots-push", crds_stats.push.fails[5], i64),
("EpochSlots-pull", crds_stats.pull.fails[5], i64),
("LegacyVersion", fails[6], i64),
("LegacyVersion-push", crds_stats.push.fails[6], i64),
("LegacyVersion-pull", crds_stats.pull.fails[6], i64),
("Version", fails[7], i64),
("Version-push", crds_stats.push.fails[7], i64),
("Version-pull", crds_stats.pull.fails[7], i64),
("NodeInstance", fails[8], i64),
("NodeInstance-push", crds_stats.push.fails[8], i64),
("NodeInstance-pull", crds_stats.pull.fails[8], i64),
("DuplicateShred", fails[9], i64),
("DuplicateShred-push", crds_stats.push.fails[9], i64),
("DuplicateShred-pull", crds_stats.pull.fails[9], i64),
("IncrementalSnapshotHashes", fails[10], i64),
(
"IncrementalSnapshotHashes-push",
crds_stats.push.fails[10],
i64
),
(
"IncrementalSnapshotHashes-pull",
crds_stats.pull.fails[10],
i64
),
("all", fails.iter().sum::<usize>(), i64),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
for (slot, num_votes) in &crds_stats.pull.votes {
datapoint_info!(
"cluster_info_crds_stats_votes_pull",
("slot", *slot, i64),
("num_votes", *num_votes, i64),
);
}
for (slot, num_votes) in &crds_stats.push.votes {
datapoint_info!(
"cluster_info_crds_stats_votes_push",
("slot", *slot, i64),
("num_votes", *num_votes, i64),
);
}
let votes: HashMap<Slot, usize> = crds_stats
.pull
.votes
.into_iter()
.map(|(slot, num_votes)| (*slot, *num_votes))
.chain(
crds_stats
.push
.votes
.into_iter()
.map(|(slot, num_votes)| (*slot, *num_votes)),
)
.into_grouping_map()
.aggregate(|acc, _slot, num_votes| Some(acc.unwrap_or_default() + num_votes));
for (slot, num_votes) in votes {
datapoint_info!(
"cluster_info_crds_stats_votes",
("slot", slot, i64),
("num_votes", num_votes, i64),
);
}
}

View File

@ -36,9 +36,11 @@ use {
map::{rayon::ParValues, Entry, IndexMap},
set::IndexSet,
},
lru::LruCache,
matches::debug_assert_matches,
rayon::{prelude::*, ThreadPool},
solana_sdk::{
clock::Slot,
hash::{hash, Hash},
pubkey::Pubkey,
},
@ -46,12 +48,14 @@ use {
cmp::Ordering,
collections::{hash_map, BTreeMap, HashMap, VecDeque},
ops::{Bound, Index, IndexMut},
sync::Mutex,
},
};
const CRDS_SHARDS_BITS: u32 = 8;
// Number of vote slots to track in an lru-cache for metrics.
const VOTE_SLOTS_METRICS_CAP: usize = 100;
#[derive(Clone)]
pub struct Crds {
/// Stores the map of labels and values
table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
@ -70,6 +74,7 @@ pub struct Crds {
purged: VecDeque<(Hash, u64 /*timestamp*/)>,
// Mapping from nodes' pubkeys to their respective shred-version.
shred_versions: HashMap<Pubkey, u16>,
stats: Mutex<CrdsStats>,
}
#[derive(PartialEq, Debug)]
@ -78,6 +83,28 @@ pub enum CrdsError {
UnknownStakes,
}
#[derive(Clone, Copy)]
pub enum GossipRoute {
LocalMessage,
PullRequest,
PullResponse,
PushMessage,
}
type CrdsCountsArray = [usize; 11];
pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
pub(crate) fails: CrdsCountsArray,
pub(crate) votes: LruCache<Slot, /*count:*/ usize>,
}
#[derive(Default)]
pub(crate) struct CrdsStats {
pub(crate) pull: CrdsDataStats,
pub(crate) push: CrdsDataStats,
}
/// This structure stores some local metadata associated with the CrdsValue
#[derive(PartialEq, Debug, Clone)]
pub struct VersionedCrdsValue {
@ -130,6 +157,7 @@ impl Default for Crds {
entries: BTreeMap::default(),
purged: VecDeque::default(),
shred_versions: HashMap::default(),
stats: Mutex::<CrdsStats>::default(),
}
}
}
@ -170,12 +198,18 @@ impl Crds {
}
}
pub fn insert(&mut self, value: CrdsValue, now: u64) -> Result<(), CrdsError> {
pub fn insert(
&mut self,
value: CrdsValue,
now: u64,
route: GossipRoute,
) -> Result<(), CrdsError> {
let label = value.label();
let pubkey = value.pubkey();
let value = VersionedCrdsValue::new(value, self.cursor, now);
match self.table.entry(label) {
Entry::Vacant(entry) => {
self.stats.lock().unwrap().record_insert(&value, route);
let entry_index = entry.index();
self.shards.insert(entry_index, &value);
match &value.value.data {
@ -198,6 +232,7 @@ impl Crds {
Ok(())
}
Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => {
self.stats.lock().unwrap().record_insert(&value, route);
let entry_index = entry.index();
self.shards.remove(entry_index, entry.get());
self.shards.insert(entry_index, &value);
@ -229,6 +264,7 @@ impl Crds {
Ok(())
}
Entry::Occupied(entry) => {
self.stats.lock().unwrap().record_fail(&value, route);
trace!(
"INSERT FAILED data: {} new.wallclock: {}",
value.value.label(),
@ -556,6 +592,89 @@ impl Crds {
}
Ok(keys.len())
}
pub(crate) fn take_stats(&self) -> CrdsStats {
std::mem::take(&mut self.stats.lock().unwrap())
}
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
Self {
table: self.table.clone(),
cursor: self.cursor,
shards: self.shards.clone(),
nodes: self.nodes.clone(),
votes: self.votes.clone(),
epoch_slots: self.epoch_slots.clone(),
records: self.records.clone(),
entries: self.entries.clone(),
purged: self.purged.clone(),
shred_versions: self.shred_versions.clone(),
stats: Mutex::<CrdsStats>::default(),
}
}
}
impl Default for CrdsDataStats {
fn default() -> Self {
Self {
counts: CrdsCountsArray::default(),
fails: CrdsCountsArray::default(),
votes: LruCache::new(VOTE_SLOTS_METRICS_CAP),
}
}
}
impl CrdsDataStats {
fn record_insert(&mut self, entry: &VersionedCrdsValue) {
self.counts[Self::ordinal(entry)] += 1;
if let CrdsData::Vote(_, vote) = &entry.value.data {
if let Some(slot) = vote.slot() {
let num_nodes = self.votes.get(&slot).copied().unwrap_or_default();
self.votes.put(slot, num_nodes + 1);
}
}
}
fn record_fail(&mut self, entry: &VersionedCrdsValue) {
self.fails[Self::ordinal(entry)] += 1;
}
fn ordinal(entry: &VersionedCrdsValue) -> usize {
match &entry.value.data {
CrdsData::ContactInfo(_) => 0,
CrdsData::Vote(_, _) => 1,
CrdsData::LowestSlot(_, _) => 2,
CrdsData::SnapshotHashes(_) => 3,
CrdsData::AccountsHashes(_) => 4,
CrdsData::EpochSlots(_, _) => 5,
CrdsData::LegacyVersion(_) => 6,
CrdsData::Version(_) => 7,
CrdsData::NodeInstance(_) => 8,
CrdsData::DuplicateShred(_, _) => 9,
CrdsData::IncrementalSnapshotHashes(_) => 10,
}
}
}
impl CrdsStats {
fn record_insert(&mut self, entry: &VersionedCrdsValue, route: GossipRoute) {
match route {
GossipRoute::LocalMessage => (),
GossipRoute::PullRequest => (),
GossipRoute::PushMessage => self.push.record_insert(entry),
GossipRoute::PullResponse => self.pull.record_insert(entry),
}
}
fn record_fail(&mut self, entry: &VersionedCrdsValue, route: GossipRoute) {
match route {
GossipRoute::LocalMessage => (),
GossipRoute::PullRequest => (),
GossipRoute::PushMessage => self.push.record_fail(entry),
GossipRoute::PullResponse => self.pull.record_fail(entry),
}
}
}
#[cfg(test)]
@ -580,7 +699,10 @@ mod tests {
fn test_insert() {
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(
crds.insert(val.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.table.len(), 1);
assert!(crds.table.contains_key(&val.label()));
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
@ -589,8 +711,14 @@ mod tests {
fn test_update_old() {
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed));
assert_eq!(
crds.insert(val.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Err(CrdsError::InsertFailed)
);
assert!(crds.purged.is_empty());
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
}
@ -602,12 +730,15 @@ mod tests {
0,
)));
let value_hash = hash(&serialize(&original).unwrap());
assert_matches!(crds.insert(original, 0), Ok(()));
assert_matches!(crds.insert(original, 0, GossipRoute::LocalMessage), Ok(()));
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
1,
)));
assert_eq!(crds.insert(val.clone(), 1), Ok(()));
assert_eq!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(*crds.purged.back().unwrap(), (value_hash, 1));
assert_eq!(crds.table[&val.label()].local_timestamp, 1);
}
@ -618,13 +749,19 @@ mod tests {
&Pubkey::default(),
0,
)));
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(
crds.insert(val.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.table[&val.label()].ordinal, 0);
let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let value_hash = hash(&serialize(&val2).unwrap());
assert_eq!(val2.label().pubkey(), val.label().pubkey());
assert_eq!(crds.insert(val2.clone(), 0), Ok(()));
assert_eq!(
crds.insert(val2.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
crds.update_record_timestamp(&val.label().pubkey(), 2);
assert_eq!(crds.table[&val.label()].local_timestamp, 2);
@ -639,7 +776,7 @@ mod tests {
let mut ci = ContactInfo::default();
ci.wallclock += 1;
let val3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
assert_eq!(crds.insert(val3, 3), Ok(()));
assert_eq!(crds.insert(val3, 3, GossipRoute::LocalMessage), Ok(()));
assert_eq!(*crds.purged.back().unwrap(), (value_hash, 3));
assert_eq!(crds.table[&val2.label()].local_timestamp, 3);
assert_eq!(crds.table[&val2.label()].ordinal, 2);
@ -657,19 +794,22 @@ mod tests {
let pubkey = Pubkey::new_unique();
let node = NodeInstance::new(&mut rng, pubkey, now);
let node = make_crds_value(node);
assert_eq!(crds.insert(node, now), Ok(()));
assert_eq!(crds.insert(node, now, GossipRoute::LocalMessage), Ok(()));
// A node-instance with a different key should insert fine even with
// older timestamps.
let other = NodeInstance::new(&mut rng, Pubkey::new_unique(), now - 1);
let other = make_crds_value(other);
assert_eq!(crds.insert(other, now), Ok(()));
assert_eq!(crds.insert(other, now, GossipRoute::LocalMessage), Ok(()));
// A node-instance with older timestamp should fail to insert, even if
// the wallclock is more recent.
let other = NodeInstance::new(&mut rng, pubkey, now - 1);
let other = other.with_wallclock(now + 1);
let other = make_crds_value(other);
let value_hash = hash(&serialize(&other).unwrap());
assert_eq!(crds.insert(other, now), Err(CrdsError::InsertFailed));
assert_eq!(
crds.insert(other, now, GossipRoute::LocalMessage),
Err(CrdsError::InsertFailed)
);
assert_eq!(*crds.purged.back().unwrap(), (value_hash, now));
// A node instance with the same timestamp should insert only if the
// random token is larger.
@ -678,7 +818,7 @@ mod tests {
let other = NodeInstance::new(&mut rng, pubkey, now);
let other = make_crds_value(other);
let value_hash = hash(&serialize(&other).unwrap());
match crds.insert(other, now) {
match crds.insert(other, now, GossipRoute::LocalMessage) {
Ok(()) => num_overrides += 1,
Err(CrdsError::InsertFailed) => {
assert_eq!(*crds.purged.back().unwrap(), (value_hash, now))
@ -693,7 +833,7 @@ mod tests {
let other = NodeInstance::new(&mut rng, pubkey, now + k);
let other = other.with_wallclock(now - 1);
let other = make_crds_value(other);
match crds.insert(other, now) {
match crds.insert(other, now, GossipRoute::LocalMessage) {
Ok(()) => (),
_ => panic!(),
}
@ -705,7 +845,10 @@ mod tests {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 1), Ok(()));
assert_eq!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(())
);
let mut set = HashMap::new();
set.insert(Pubkey::default(), 0);
assert!(crds.find_old_labels(&thread_pool, 0, &set).is_empty());
@ -728,7 +871,10 @@ mod tests {
let mut timeouts = HashMap::new();
let val = CrdsValue::new_rand(&mut rng, None);
timeouts.insert(Pubkey::default(), 3);
assert_eq!(crds.insert(val.clone(), 0), Ok(()));
assert_eq!(
crds.insert(val.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
assert!(crds.find_old_labels(&thread_pool, 2, &timeouts).is_empty());
timeouts.insert(val.pubkey(), 1);
assert_eq!(
@ -751,7 +897,10 @@ mod tests {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_matches!(crds.insert(val.clone(), 1), Ok(_));
assert_matches!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(_)
);
let mut set = HashMap::new();
set.insert(Pubkey::default(), 1);
assert_eq!(
@ -766,7 +915,10 @@ mod tests {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 1), Ok(()));
assert_eq!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(())
);
let mut set = HashMap::new();
//now < timestamp
set.insert(Pubkey::default(), 0);
@ -808,7 +960,7 @@ mod tests {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
if let Ok(()) = crds.insert(value, local_timestamp) {
if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) {
num_inserts += 1;
check_crds_shards(&crds);
}
@ -962,7 +1114,7 @@ mod tests {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
if let Ok(()) = crds.insert(value, local_timestamp) {
if let Ok(()) = crds.insert(value, local_timestamp, GossipRoute::LocalMessage) {
num_inserts += 1;
}
if k % 16 == 0 {
@ -1016,7 +1168,7 @@ mod tests {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
let _ = crds.insert(value, local_timestamp);
let _ = crds.insert(value, local_timestamp, GossipRoute::LocalMessage);
if k % 64 == 0 {
check_crds_records(&crds);
}
@ -1047,7 +1199,10 @@ mod tests {
node.shred_version = 42;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Ok(()));
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// An outdated value should not update shred-version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
@ -1055,7 +1210,10 @@ mod tests {
node.shred_version = 8;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Err(CrdsError::InsertFailed));
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Err(CrdsError::InsertFailed)
);
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// Update shred version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
@ -1063,13 +1221,19 @@ mod tests {
node.shred_version = 8;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Ok(()));
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Add other crds values with the same pubkey.
let val = SnapshotHashes::new_rand(&mut rng, Some(pubkey));
let val = CrdsData::SnapshotHashes(val);
let val = CrdsValue::new_unsigned(val);
assert_eq!(crds.insert(val, timestamp()), Ok(()));
assert_eq!(
crds.insert(val, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Remove contact-info. Shred version should stay there since there
// are still values associated with the pubkey.
@ -1106,7 +1270,7 @@ mod tests {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
let _ = crds.insert(value, local_timestamp);
let _ = crds.insert(value, local_timestamp, GossipRoute::LocalMessage);
}
let num_values = crds.table.len();
let num_pubkeys = num_unique_pubkeys(crds.table.values());
@ -1147,7 +1311,10 @@ mod tests {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_matches!(crds.insert(val.clone(), 1), Ok(_));
assert_matches!(
crds.insert(val.clone(), 1, GossipRoute::LocalMessage),
Ok(_)
);
let mut set = HashMap::new();
//default has max timeout, but pubkey should still expire

View File

@ -78,7 +78,10 @@ impl<'a, 'b> CrdsEntry<'a, 'b> for &'a SnapshotHashes {
mod tests {
use {
super::*,
crate::{crds::Crds, crds_value::new_rand_timestamp},
crate::{
crds::{Crds, GossipRoute},
crds_value::new_rand_timestamp,
},
rand::seq::SliceRandom,
solana_sdk::signature::Keypair,
std::collections::HashMap,
@ -94,7 +97,11 @@ mod tests {
let keypair = keypairs.choose(&mut rng).unwrap();
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let key = value.label();
if let Ok(()) = crds.insert(value.clone(), new_rand_timestamp(&mut rng)) {
if let Ok(()) = crds.insert(
value.clone(),
new_rand_timestamp(&mut rng),
GossipRoute::LocalMessage,
) {
entries.insert(key, value);
}
}

View File

@ -8,7 +8,7 @@ use {
crate::{
cluster_info::Ping,
contact_info::ContactInfo,
crds::Crds,
crds::{Crds, GossipRoute},
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
@ -89,7 +89,7 @@ impl CrdsGossip {
{
let mut crds = self.crds.write().unwrap();
for entry in pending_push_messages {
let _ = crds.insert(entry, now);
let _ = crds.insert(entry, now, GossipRoute::LocalMessage);
}
}
self.push.new_push_messages(&self.crds, now)
@ -151,7 +151,7 @@ impl CrdsGossip {
});
let now = timestamp();
for entry in entries {
if let Err(err) = crds.insert(entry, now) {
if let Err(err) = crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_duplicate_shred faild: {:?}", err);
}
}
@ -337,7 +337,7 @@ impl CrdsGossip {
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let crds = self.crds.read().unwrap().clone();
let crds = self.crds.read().unwrap().mock_clone();
Self {
crds: RwLock::new(crds),
push: self.push.mock_clone(),
@ -385,6 +385,7 @@ mod test {
.insert(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
0,
GossipRoute::LocalMessage,
)
.unwrap();
crds_gossip.refresh_push_active_set(

View File

@ -15,7 +15,7 @@ use {
crate::{
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
contact_info::ContactInfo,
crds::{Crds, VersionedCrdsValue},
crds::{Crds, GossipRoute, VersionedCrdsValue},
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
@ -348,7 +348,7 @@ impl CrdsGossipPull {
let mut crds = crds.write().unwrap();
for caller in callers {
let key = caller.pubkey();
let _ = crds.insert(caller, now);
let _ = crds.insert(caller, now, GossipRoute::PullRequest);
crds.update_record_timestamp(&key, now);
}
}
@ -430,12 +430,12 @@ impl CrdsGossipPull {
let mut owners = HashSet::new();
let mut crds = crds.write().unwrap();
for response in responses_expired_timeout {
let _ = crds.insert(response, now);
let _ = crds.insert(response, now, GossipRoute::PullResponse);
}
let mut num_inserts = 0;
for response in responses {
let owner = response.pubkey();
if let Ok(()) = crds.insert(response, now) {
if let Ok(()) = crds.insert(response, now, GossipRoute::PullResponse) {
num_inserts += 1;
owners.insert(owner);
}
@ -731,14 +731,16 @@ pub(crate) mod tests {
&solana_sdk::pubkey::new_rand(),
0,
)));
crds.insert(me.clone(), 0).unwrap();
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
for i in 1..=30 {
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let id = entry.label().pubkey();
crds.insert(entry.clone(), 0).unwrap();
crds.insert(entry.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
stakes.insert(id, i * 100);
}
let now = 1024;
@ -792,10 +794,14 @@ pub(crate) mod tests {
..ContactInfo::default()
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(spy.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
crds.insert(node_456.clone(), 0).unwrap();
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(spy.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_456.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);
// shred version 123 should ignore nodes with versions 0 and 456
@ -854,8 +860,10 @@ pub(crate) mod tests {
..ContactInfo::default()
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);
// Empty gossip_validators -- will pull from nobody
@ -958,7 +966,10 @@ pub(crate) mod tests {
for _ in 0..40_000 {
let keypair = keypairs.choose(&mut rng).unwrap();
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
if crds.insert(value, rng.gen()).is_ok() {
if crds
.insert(value, rng.gen(), GossipRoute::LocalMessage)
.is_ok()
{
num_inserts += 1;
}
}
@ -1019,7 +1030,10 @@ pub(crate) mod tests {
Err(CrdsGossipError::NoPeers)
);
crds.write().unwrap().insert(entry, 0).unwrap();
crds.write()
.unwrap()
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
assert_eq!(
node.new_pull_request(
&thread_pool,
@ -1043,7 +1057,10 @@ pub(crate) mod tests {
.unwrap()
.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
crds.write().unwrap().insert(new.clone(), now).unwrap();
crds.write()
.unwrap()
.insert(new.clone(), now, GossipRoute::LocalMessage)
.unwrap();
let req = node.new_pull_request(
&thread_pool,
&crds,
@ -1063,7 +1080,10 @@ pub(crate) mod tests {
node.mark_pull_request_creation_time(new.contact_info().unwrap().id, now);
let offline = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), now);
let offline = CrdsValue::new_unsigned(CrdsData::ContactInfo(offline));
crds.write().unwrap().insert(offline, now).unwrap();
crds.write()
.unwrap()
.insert(offline, now, GossipRoute::LocalMessage)
.unwrap();
let req = node.new_pull_request(
&thread_pool,
&crds,
@ -1098,15 +1118,17 @@ pub(crate) mod tests {
0,
)));
let node = CrdsGossipPull::default();
crds.insert(entry, now).unwrap();
crds.insert(entry, now, GossipRoute::LocalMessage).unwrap();
let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(old.id, old.gossip, Instant::now());
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(old));
crds.insert(old.clone(), now).unwrap();
crds.insert(old.clone(), now, GossipRoute::LocalMessage)
.unwrap();
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
crds.insert(new.clone(), now).unwrap();
crds.insert(new.clone(), now, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);
// set request creation time to now.
@ -1193,11 +1215,13 @@ pub(crate) mod tests {
)));
let caller = entry.clone();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap();
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
let node_crds = RwLock::new(node_crds);
let mut pings = Vec::new();
let req = node.new_pull_request(
@ -1234,7 +1258,11 @@ pub(crate) mod tests {
dest_crds
.write()
.unwrap()
.insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS)
.insert(
new,
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
GossipRoute::LocalMessage,
)
.unwrap();
//should skip new value since caller is to old
@ -1283,7 +1311,9 @@ pub(crate) mod tests {
)));
let caller = entry.clone();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
128, // capacity
@ -1291,7 +1321,7 @@ pub(crate) mod tests {
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap();
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
let node_crds = RwLock::new(node_crds);
let mut pings = Vec::new();
let req = node.new_pull_request(
@ -1340,7 +1370,9 @@ pub(crate) mod tests {
let caller = entry.clone();
let node_pubkey = entry.label().pubkey();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
128, // capacity
@ -1348,14 +1380,16 @@ pub(crate) mod tests {
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap();
node_crds.insert(new, 0, GossipRoute::LocalMessage).unwrap();
let mut dest_crds = Crds::default();
let new_id = solana_sdk::pubkey::new_rand();
let new = ContactInfo::new_localhost(&new_id, 1);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
dest_crds.insert(new.clone(), 0).unwrap();
dest_crds
.insert(new.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let dest_crds = RwLock::new(dest_crds);
// node contains a key from the dest node, but at an older local timestamp
@ -1364,7 +1398,9 @@ pub(crate) mod tests {
let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(same_key));
assert_eq!(same_key.label(), new.label());
assert!(same_key.wallclock() < new.wallclock());
node_crds.insert(same_key.clone(), 0).unwrap();
node_crds
.insert(same_key.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
assert_eq!(0, {
let entry: &VersionedCrdsValue = node_crds.get(&same_key.label()).unwrap();
entry.local_timestamp
@ -1449,12 +1485,16 @@ pub(crate) mod tests {
let node_label = entry.label();
let node_pubkey = node_label.pubkey();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
node_crds
.insert(entry, 0, GossipRoute::LocalMessage)
.unwrap();
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
node_crds.insert(old.clone(), 0).unwrap();
node_crds
.insert(old.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
let value_hash = {
let entry: &VersionedCrdsValue = node_crds.get(&old.label()).unwrap();
entry.value_hash

View File

@ -15,7 +15,7 @@ use {
crate::{
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
contact_info::ContactInfo,
crds::{Crds, Cursor},
crds::{Crds, Cursor, GossipRoute},
crds_gossip::{get_stake, get_weight},
crds_gossip_error::CrdsGossipError,
crds_value::CrdsValue,
@ -240,7 +240,7 @@ impl CrdsGossipPush {
.map(|value| {
let value = value?;
let origin = value.pubkey();
match crds.insert(value, now) {
match crds.insert(value, now, GossipRoute::PushMessage) {
Ok(()) => Ok(origin),
Err(_) => {
self.num_old.fetch_add(1, Ordering::Relaxed);
@ -665,7 +665,10 @@ mod test {
0,
)));
assert_eq!(crds.insert(value1.clone(), now), Ok(()));
assert_eq!(
crds.insert(value1.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
let crds = RwLock::new(crds);
push.refresh_push_active_set(
&crds,
@ -686,7 +689,12 @@ mod test {
)));
assert!(active_set.get(&value2.label().pubkey()).is_none());
drop(active_set);
assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(()));
assert_eq!(
crds.write()
.unwrap()
.insert(value2.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
for _ in 0..30 {
push.refresh_push_active_set(
&crds,
@ -711,7 +719,12 @@ mod test {
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0),
));
assert_eq!(crds.write().unwrap().insert(value2.clone(), now), Ok(()));
assert_eq!(
crds.write()
.unwrap()
.insert(value2.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
}
push.refresh_push_active_set(
&crds,
@ -738,7 +751,8 @@ mod test {
time,
)));
let id = peer.label().pubkey();
crds.insert(peer.clone(), time).unwrap();
crds.insert(peer.clone(), time, GossipRoute::LocalMessage)
.unwrap();
stakes.insert(id, i * 100);
push.last_pushed_to.write().unwrap().put(id, time);
}
@ -791,10 +805,14 @@ mod test {
..ContactInfo::default()
}));
crds.insert(me.clone(), now).unwrap();
crds.insert(spy.clone(), now).unwrap();
crds.insert(node_123.clone(), now).unwrap();
crds.insert(node_456, now).unwrap();
crds.insert(me.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(spy.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), now, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_456, now, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);
// shred version 123 should ignore nodes with versions 0 and 456
@ -845,8 +863,10 @@ mod test {
..ContactInfo::default()
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), now).unwrap();
crds.insert(me.clone(), 0, GossipRoute::LocalMessage)
.unwrap();
crds.insert(node_123.clone(), now, GossipRoute::LocalMessage)
.unwrap();
let crds = RwLock::new(crds);
// Unknown pubkey in gossip_validators -- will push to nobody
@ -898,7 +918,10 @@ mod test {
&solana_sdk::pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), now), Ok(()));
assert_eq!(
crds.insert(peer.clone(), now, GossipRoute::LocalMessage),
Ok(())
);
let crds = RwLock::new(crds);
push.refresh_push_active_set(
&crds,
@ -940,8 +963,14 @@ mod test {
})
.collect();
let origin: Vec<_> = peers.iter().map(|node| node.pubkey()).collect();
assert_eq!(crds.insert(peers[0].clone(), now), Ok(()));
assert_eq!(crds.insert(peers[1].clone(), now), Ok(()));
assert_eq!(
crds.insert(peers[0].clone(), now, GossipRoute::LocalMessage),
Ok(())
);
assert_eq!(
crds.insert(peers[1].clone(), now, GossipRoute::LocalMessage),
Ok(())
);
let crds = RwLock::new(crds);
assert_eq!(
push.process_push_message(&crds, &Pubkey::default(), vec![peers[2].clone()], now),
@ -977,7 +1006,10 @@ mod test {
&solana_sdk::pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), 0), Ok(()));
assert_eq!(
crds.insert(peer.clone(), 0, GossipRoute::LocalMessage),
Ok(())
);
let crds = RwLock::new(crds);
push.refresh_push_active_set(
&crds,
@ -1015,7 +1047,7 @@ mod test {
&solana_sdk::pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer, 0), Ok(()));
assert_eq!(crds.insert(peer, 0, GossipRoute::LocalMessage), Ok(()));
let crds = RwLock::new(crds);
push.refresh_push_active_set(
&crds,

View File

@ -134,7 +134,10 @@ where
mod test {
use {
super::*,
crate::{crds::Crds, crds_value::CrdsValue},
crate::{
crds::{Crds, GossipRoute},
crds_value::CrdsValue,
},
rand::{thread_rng, Rng},
solana_sdk::timing::timestamp,
std::{collections::HashSet, iter::repeat_with, ops::Index},
@ -144,7 +147,8 @@ mod test {
let value = CrdsValue::new_rand(rng, None);
let label = value.label();
let mut crds = Crds::default();
crds.insert(value, timestamp()).unwrap();
crds.insert(value, timestamp(), GossipRoute::LocalMessage)
.unwrap();
crds.get::<&VersionedCrdsValue>(&label).cloned().unwrap()
}

View File

@ -7,6 +7,7 @@ use {
solana_gossip::{
cluster_info,
contact_info::ContactInfo,
crds::GossipRoute,
crds_gossip::*,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
@ -108,8 +109,12 @@ fn star_network_create(num: usize) -> Network {
let node = CrdsGossip::default();
{
let mut node_crds = node.crds.write().unwrap();
node_crds.insert(new.clone(), timestamp()).unwrap();
node_crds.insert(entry.clone(), timestamp()).unwrap();
node_crds
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
node_crds
.insert(entry.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
}
let node = Node::new(node_keypair, contact_info, Arc::new(node));
(new.label().pubkey(), node)
@ -120,7 +125,7 @@ fn star_network_create(num: usize) -> Network {
node.crds
.write()
.unwrap()
.insert(entry, timestamp())
.insert(entry, timestamp(), GossipRoute::LocalMessage)
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(node));
network.insert(id, node);
@ -137,7 +142,7 @@ fn rstar_network_create(num: usize) -> Network {
.crds
.write()
.unwrap()
.insert(entry, timestamp())
.insert(entry, timestamp(), GossipRoute::LocalMessage)
.unwrap();
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
@ -148,13 +153,13 @@ fn rstar_network_create(num: usize) -> Network {
node.crds
.write()
.unwrap()
.insert(new.clone(), timestamp())
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
origin
.crds
.write()
.unwrap()
.insert(new.clone(), timestamp())
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(node));
(new.label().pubkey(), node)
@ -175,7 +180,7 @@ fn ring_network_create(num: usize) -> Network {
node.crds
.write()
.unwrap()
.insert(new.clone(), timestamp())
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
let node = Node::new(node_keypair, contact_info, Arc::new(node));
(new.label().pubkey(), node)
@ -192,7 +197,9 @@ fn ring_network_create(num: usize) -> Network {
};
let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap();
let mut end_crds = end.gossip.crds.write().unwrap();
end_crds.insert(start_info, timestamp()).unwrap();
end_crds
.insert(start_info, timestamp(), GossipRoute::LocalMessage)
.unwrap();
}
Network::new(network)
}
@ -208,7 +215,7 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
node.crds
.write()
.unwrap()
.insert(new.clone(), timestamp())
.insert(new.clone(), timestamp(), GossipRoute::LocalMessage)
.unwrap();
let node = Node::staked(node_keypair, contact_info, Arc::new(node), stakes[n]);
(new.label().pubkey(), node)
@ -230,7 +237,9 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
for k in 0..keys.len() {
if keys[k] != *end_pubkey {
let start_info = start_entries[k].clone();
end_crds.insert(start_info, timestamp()).unwrap();
end_crds
.insert(start_info, timestamp(), GossipRoute::LocalMessage)
.unwrap();
}
}
}
@ -709,6 +718,7 @@ fn test_prune_errors() {
.insert(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
0,
GossipRoute::LocalMessage,
)
.unwrap();
crds_gossip.refresh_push_active_set(

View File

@ -494,6 +494,7 @@ mod tests {
crate::rpc::create_validator_exit,
solana_gossip::{
contact_info::ContactInfo,
crds::GossipRoute,
crds_value::{CrdsData, CrdsValue, SnapshotHashes},
},
solana_ledger::{
@ -794,6 +795,7 @@ mod tests {
],
))),
1,
GossipRoute::LocalMessage,
)
.unwrap();
assert_eq!(rm.health_check(), "ok");
@ -810,6 +812,7 @@ mod tests {
vec![(1000 + health_check_slot_distance - 1, Hash::default())],
))),
1,
GossipRoute::LocalMessage,
)
.unwrap();
assert_eq!(rm.health_check(), "ok");
@ -826,6 +829,7 @@ mod tests {
vec![(1000 + health_check_slot_distance, Hash::default())],
))),
1,
GossipRoute::LocalMessage,
)
.unwrap();
assert_eq!(rm.health_check(), "behind");