implements generic lookups into gossip crds table (#18765)

This commit adds CrdsEntry trait which allows generic lookups into crds
table. For example to get ContactInfo or LowestSlot associated with a
Pubkey, the lookup code would be respectively:
   crds.get::<&ContactInfo>(pubkey)
   crds.get::<&LowestSlot>(pubkey)
This commit is contained in:
behzad nouri 2021-07-21 12:16:26 +00:00 committed by GitHub
parent 65152373de
commit bbd22f06f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 204 additions and 108 deletions

View File

@ -179,7 +179,7 @@ impl ClusterSlotsService {
mod test { mod test {
use { use {
super::*, super::*,
solana_gossip::{cluster_info::Node, crds_value::CrdsValueLabel}, solana_gossip::{cluster_info::Node, crds_value::LowestSlot},
solana_sdk::pubkey::Pubkey, solana_sdk::pubkey::Pubkey,
}; };
@ -191,10 +191,8 @@ mod test {
ClusterSlotsService::update_lowest_slot(5, &cluster_info); ClusterSlotsService::update_lowest_slot(5, &cluster_info);
cluster_info.flush_push_queue(); cluster_info.flush_push_queue();
let lowest = { let lowest = {
let label = CrdsValueLabel::LowestSlot(pubkey);
let gossip_crds = cluster_info.gossip.crds.read().unwrap(); let gossip_crds = cluster_info.gossip.crds.read().unwrap();
let entry = gossip_crds.get(&label).unwrap(); gossip_crds.get::<&LowestSlot>(pubkey).unwrap().clone()
entry.value.lowest_slot().unwrap().clone()
}; };
assert_eq!(lowest.lowest, 5); assert_eq!(lowest.lowest, 5);
} }

View File

@ -21,7 +21,7 @@ fn new_test_crds_value<R: Rng>(rng: &mut R) -> VersionedCrdsValue {
let label = value.label(); let label = value.label();
let mut crds = Crds::default(); let mut crds = Crds::default();
crds.insert(value, timestamp()).unwrap(); crds.insert(value, timestamp()).unwrap();
crds.get(&label).cloned().unwrap() crds.get::<&VersionedCrdsValue>(&label).cloned().unwrap()
} }
fn bench_crds_shards_find(bencher: &mut Bencher, num_values: usize, mask_bits: u32) { fn bench_crds_shards_find(bencher: &mut Bencher, num_values: usize, mask_bits: u32) {

View File

@ -628,7 +628,7 @@ impl ClusterInfo {
F: FnOnce(&ContactInfo) -> Y, F: FnOnce(&ContactInfo) -> Y,
{ {
let gossip_crds = self.gossip.crds.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds.get_contact_info(*id).map(map) gossip_crds.get(*id).map(map)
} }
pub fn lookup_contact_info_by_gossip_addr( pub fn lookup_contact_info_by_gossip_addr(
@ -653,8 +653,8 @@ impl ClusterInfo {
let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); let label = CrdsValueLabel::EpochSlots(ix, self_pubkey);
let gossip_crds = self.gossip.crds.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds gossip_crds
.get(&label) .get::<&CrdsValue>(&label)
.and_then(|v| v.value.epoch_slots()) .and_then(|v| v.epoch_slots())
.cloned() .cloned()
.unwrap_or_else(|| EpochSlots::new(self_pubkey, timestamp())) .unwrap_or_else(|| EpochSlots::new(self_pubkey, timestamp()))
} }
@ -816,7 +816,7 @@ impl ClusterInfo {
let last = { let last = {
let gossip_crds = self.gossip.crds.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds gossip_crds
.get_lowest_slot(self_pubkey) .get::<&LowestSlot>(self_pubkey)
.map(|x| x.lowest) .map(|x| x.lowest)
.unwrap_or_default() .unwrap_or_default()
}; };
@ -843,7 +843,7 @@ impl ClusterInfo {
(0..crds_value::MAX_EPOCH_SLOTS) (0..crds_value::MAX_EPOCH_SLOTS)
.filter_map(|ix| { .filter_map(|ix| {
let label = CrdsValueLabel::EpochSlots(ix, self_pubkey); let label = CrdsValueLabel::EpochSlots(ix, self_pubkey);
let epoch_slots = gossip_crds.get(&label)?.value.epoch_slots()?; let epoch_slots = gossip_crds.get::<&CrdsValue>(&label)?.epoch_slots()?;
let first_slot = epoch_slots.first_slot()?; let first_slot = epoch_slots.first_slot()?;
Some((epoch_slots.wallclock, first_slot, ix)) Some((epoch_slots.wallclock, first_slot, ix))
}) })
@ -985,9 +985,9 @@ impl ClusterInfo {
(0..MAX_LOCKOUT_HISTORY as u8) (0..MAX_LOCKOUT_HISTORY as u8)
.filter_map(|ix| { .filter_map(|ix| {
let vote = CrdsValueLabel::Vote(ix, self_pubkey); let vote = CrdsValueLabel::Vote(ix, self_pubkey);
let vote = gossip_crds.get(&vote)?; let vote: &CrdsData = gossip_crds.get(&vote)?;
num_crds_votes += 1; num_crds_votes += 1;
match &vote.value.data { match &vote {
CrdsData::Vote(_, vote) if should_evict_vote(vote) => { CrdsData::Vote(_, vote) if should_evict_vote(vote) => {
Some((vote.wallclock, ix)) Some((vote.wallclock, ix))
} }
@ -1009,8 +1009,8 @@ impl ClusterInfo {
self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read); self.time_gossip_read_lock("gossip_read_push_vote", &self.stats.push_vote_read);
(0..MAX_LOCKOUT_HISTORY as u8).find(|ix| { (0..MAX_LOCKOUT_HISTORY as u8).find(|ix| {
let vote = CrdsValueLabel::Vote(*ix, self_pubkey); let vote = CrdsValueLabel::Vote(*ix, self_pubkey);
if let Some(vote) = gossip_crds.get(&vote) { if let Some(vote) = gossip_crds.get::<&CrdsData>(&vote) {
match &vote.value.data { match &vote {
CrdsData::Vote(_, prev_vote) => match prev_vote.slot() { CrdsData::Vote(_, prev_vote) => match prev_vote.slot() {
Some(prev_vote_slot) => prev_vote_slot == vote_slot, Some(prev_vote_slot) => prev_vote_slot == vote_slot,
None => { None => {
@ -1084,8 +1084,8 @@ impl ClusterInfo {
F: FnOnce(&Vec<(Slot, Hash)>) -> Y, F: FnOnce(&Vec<(Slot, Hash)>) -> Y,
{ {
self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash) self.time_gossip_read_lock("get_accounts_hash", &self.stats.get_accounts_hash)
.get(&CrdsValueLabel::AccountsHashes(*pubkey)) .get::<&CrdsValue>(&CrdsValueLabel::AccountsHashes(*pubkey))
.map(|x| &x.value.accounts_hash().unwrap().hashes) .map(|x| &x.accounts_hash().unwrap().hashes)
.map(map) .map(map)
} }
@ -1094,10 +1094,8 @@ impl ClusterInfo {
F: FnOnce(&Vec<(Slot, Hash)>) -> Y, F: FnOnce(&Vec<(Slot, Hash)>) -> Y,
{ {
let gossip_crds = self.gossip.crds.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds let hashes = &gossip_crds.get::<&SnapshotHash>(*pubkey)?.hashes;
.get(&CrdsValueLabel::SnapshotHashes(*pubkey)) Some(map(hashes))
.map(|x| &x.value.snapshot_hash().unwrap().hashes)
.map(map)
} }
/// Returns epoch-slots inserted since the given cursor. /// Returns epoch-slots inserted since the given cursor.
@ -1120,12 +1118,10 @@ impl ClusterInfo {
pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> { pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
let gossip_crds = self.gossip.crds.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
let version = gossip_crds.get(&CrdsValueLabel::Version(*pubkey)); if let Some(version) = gossip_crds.get::<&Version>(*pubkey) {
if let Some(version) = version.and_then(|v| v.value.version()) {
return Some(version.version.clone()); return Some(version.version.clone());
} }
let version = gossip_crds.get(&CrdsValueLabel::LegacyVersion(*pubkey))?; let version: &crds_value::LegacyVersion = gossip_crds.get(*pubkey)?;
let version = version.value.legacy_version()?;
Some(version.version.clone().into()) Some(version.version.clone().into())
} }
@ -1198,7 +1194,7 @@ impl ClusterInfo {
&& node.shred_version == self_shred_version && node.shred_version == self_shred_version
&& ContactInfo::is_valid_tvu_address(&node.tvu) && ContactInfo::is_valid_tvu_address(&node.tvu)
&& ContactInfo::is_valid_address(&node.serve_repair) && ContactInfo::is_valid_address(&node.serve_repair)
&& match gossip_crds.get_lowest_slot(node.id) { && match gossip_crds.get::<&LowestSlot>(node.id) {
None => true, // fallback to legacy behavior None => true, // fallback to legacy behavior
Some(lowest_slot) => lowest_slot.lowest <= slot, Some(lowest_slot) => lowest_slot.lowest <= slot,
} }
@ -1446,7 +1442,7 @@ impl ClusterInfo {
push_messages push_messages
.into_iter() .into_iter()
.filter_map(|(pubkey, messages)| { .filter_map(|(pubkey, messages)| {
let peer = gossip_crds.get_contact_info(pubkey)?; let peer: &ContactInfo = gossip_crds.get(pubkey)?;
Some((peer.gossip, messages)) Some((peer.gossip, messages))
}) })
.collect() .collect()
@ -2197,7 +2193,7 @@ impl ClusterInfo {
.into_par_iter() .into_par_iter()
.with_min_len(256) .with_min_len(256)
.filter_map(|(from, prunes)| { .filter_map(|(from, prunes)| {
let peer = gossip_crds.get_contact_info(from)?; let peer: &ContactInfo = gossip_crds.get(from)?;
let mut prune_data = PruneData { let mut prune_data = PruneData {
pubkey: self_pubkey, pubkey: self_pubkey,
prunes, prunes,
@ -3294,7 +3290,7 @@ mod tests {
let label = CrdsValueLabel::ContactInfo(d.id); let label = CrdsValueLabel::ContactInfo(d.id);
cluster_info.insert_info(d); cluster_info.insert_info(d);
let gossip_crds = cluster_info.gossip.crds.read().unwrap(); let gossip_crds = cluster_info.gossip.crds.read().unwrap();
assert!(gossip_crds.get(&label).is_some()); assert!(gossip_crds.get::<&CrdsValue>(&label).is_some());
} }
fn assert_in_range(x: u16, range: (u16, u16)) { fn assert_in_range(x: u16, range: (u16, u16)) {
@ -3559,7 +3555,7 @@ mod tests {
let gossip_crds = cluster_info.gossip.crds.read().unwrap(); let gossip_crds = cluster_info.gossip.crds.read().unwrap();
let mut vote_slots = HashSet::new(); let mut vote_slots = HashSet::new();
for label in labels { for label in labels {
match &gossip_crds.get(&label).unwrap().value.data { match &gossip_crds.get::<&CrdsData>(&label).unwrap() {
CrdsData::Vote(_, vote) => { CrdsData::Vote(_, vote) => {
assert!(vote_slots.insert(vote.slot().unwrap())); assert!(vote_slots.insert(vote.slot().unwrap()));
} }

View File

@ -27,8 +27,9 @@
use { use {
crate::{ crate::{
contact_info::ContactInfo, contact_info::ContactInfo,
crds_entry::CrdsEntry,
crds_shards::CrdsShards, crds_shards::CrdsShards,
crds_value::{CrdsData, CrdsValue, CrdsValueLabel, LowestSlot}, crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
}, },
bincode::serialize, bincode::serialize,
indexmap::{ indexmap::{
@ -241,31 +242,24 @@ impl Crds {
} }
} }
pub fn get(&self, label: &CrdsValueLabel) -> Option<&VersionedCrdsValue> { pub fn get<'a, 'b, V>(&'a self, key: V::Key) -> Option<V>
self.table.get(label) where
} V: CrdsEntry<'a, 'b>,
{
pub fn get_contact_info(&self, pubkey: Pubkey) -> Option<&ContactInfo> { V::get_entry(&self.table, key)
let label = CrdsValueLabel::ContactInfo(pubkey);
self.table.get(&label)?.value.contact_info()
} }
pub(crate) fn get_shred_version(&self, pubkey: &Pubkey) -> Option<u16> { pub(crate) fn get_shred_version(&self, pubkey: &Pubkey) -> Option<u16> {
self.shred_versions.get(pubkey).copied() self.shred_versions.get(pubkey).copied()
} }
pub fn get_lowest_slot(&self, pubkey: Pubkey) -> Option<&LowestSlot> {
let lable = CrdsValueLabel::LowestSlot(pubkey);
self.table.get(&lable)?.value.lowest_slot()
}
/// Returns all entries which are ContactInfo. /// Returns all entries which are ContactInfo.
pub fn get_nodes(&self) -> impl Iterator<Item = &VersionedCrdsValue> { pub(crate) fn get_nodes(&self) -> impl Iterator<Item = &VersionedCrdsValue> {
self.nodes.iter().map(move |i| self.table.index(*i)) self.nodes.iter().map(move |i| self.table.index(*i))
} }
/// Returns ContactInfo of all known nodes. /// Returns ContactInfo of all known nodes.
pub fn get_nodes_contact_info(&self) -> impl Iterator<Item = &ContactInfo> { pub(crate) fn get_nodes_contact_info(&self) -> impl Iterator<Item = &ContactInfo> {
self.get_nodes().map(|v| match &v.value.data { self.get_nodes().map(|v| match &v.value.data {
CrdsData::ContactInfo(info) => info, CrdsData::ContactInfo(info) => info,
_ => panic!("this should not happen!"), _ => panic!("this should not happen!"),
@ -337,7 +331,7 @@ impl Crds {
self.table.values() self.table.values()
} }
pub fn par_values(&self) -> ParValues<'_, CrdsValueLabel, VersionedCrdsValue> { pub(crate) fn par_values(&self) -> ParValues<'_, CrdsValueLabel, VersionedCrdsValue> {
self.table.par_values() self.table.par_values()
} }
@ -361,7 +355,7 @@ impl Crds {
/// Returns all crds values which the first 'mask_bits' /// Returns all crds values which the first 'mask_bits'
/// of their hash value is equal to 'mask'. /// of their hash value is equal to 'mask'.
pub fn filter_bitmask( pub(crate) fn filter_bitmask(
&self, &self,
mask: u64, mask: u64,
mask_bits: u32, mask_bits: u32,
@ -372,7 +366,7 @@ impl Crds {
} }
/// Update the timestamp's of all the labels that are associated with Pubkey /// Update the timestamp's of all the labels that are associated with Pubkey
pub fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) { pub(crate) fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) {
// It suffices to only overwrite the origin's timestamp since that is // It suffices to only overwrite the origin's timestamp since that is
// used when purging old values. If the origin does not exist in the // used when purging old values. If the origin does not exist in the
// table, fallback to exhaustive update on all associated records. // table, fallback to exhaustive update on all associated records.
@ -1075,7 +1069,7 @@ mod tests {
// Remove contact-info. Shred version should stay there since there // Remove contact-info. Shred version should stay there since there
// are still values associated with the pubkey. // are still values associated with the pubkey.
crds.remove(&CrdsValueLabel::ContactInfo(pubkey), timestamp()); crds.remove(&CrdsValueLabel::ContactInfo(pubkey), timestamp());
assert_eq!(crds.get_contact_info(pubkey), None); assert_eq!(crds.get::<&ContactInfo>(pubkey), None);
assert_eq!(crds.get_shred_version(&pubkey), Some(8)); assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Remove the remaining entry with the same pubkey. // Remove the remaining entry with the same pubkey.
crds.remove(&CrdsValueLabel::SnapshotHashes(pubkey), timestamp()); crds.remove(&CrdsValueLabel::SnapshotHashes(pubkey), timestamp());

121
gossip/src/crds_entry.rs Normal file
View File

@ -0,0 +1,121 @@
use {
crate::{
contact_info::ContactInfo,
crds::VersionedCrdsValue,
crds_value::{
CrdsData, CrdsValue, CrdsValueLabel, LegacyVersion, LowestSlot, SnapshotHash, Version,
},
},
indexmap::IndexMap,
solana_sdk::pubkey::Pubkey,
};
type CrdsTable = IndexMap<CrdsValueLabel, VersionedCrdsValue>;
/// Represents types which can be looked up from crds table given a key. e.g.
/// CrdsValueLabel -> VersionedCrdsValue, CrdsValue, CrdsData
/// Pubkey -> ContactInfo, LowestSlot, SnapshotHash, ...
pub trait CrdsEntry<'a, 'b>: Sized {
type Key; // Lookup key.
fn get_entry(table: &'a CrdsTable, key: Self::Key) -> Option<Self>;
}
macro_rules! impl_crds_entry (
// Lookup by CrdsValueLabel.
($name:ident, |$entry:ident| $body:expr) => (
impl<'a, 'b> CrdsEntry<'a, 'b> for &'a $name {
type Key = &'b CrdsValueLabel;
fn get_entry(table:&'a CrdsTable, key: Self::Key) -> Option<Self> {
let $entry = table.get(key);
$body
}
}
);
// Lookup by Pubkey.
($name:ident, $pat:pat, $expr:expr) => (
impl<'a, 'b> CrdsEntry<'a, 'b> for &'a $name {
type Key = Pubkey;
fn get_entry(table:&'a CrdsTable, key: Self::Key) -> Option<Self> {
let key = CrdsValueLabel::$name(key);
match &table.get(&key)?.value.data {
$pat => Some($expr),
_ => None,
}
}
}
);
);
// Lookup by CrdsValueLabel.
impl_crds_entry!(CrdsData, |entry| Some(&entry?.value.data));
impl_crds_entry!(CrdsValue, |entry| Some(&entry?.value));
impl_crds_entry!(VersionedCrdsValue, |entry| entry);
// Lookup by Pubkey.
impl_crds_entry!(ContactInfo, CrdsData::ContactInfo(node), node);
impl_crds_entry!(LegacyVersion, CrdsData::LegacyVersion(version), version);
impl_crds_entry!(LowestSlot, CrdsData::LowestSlot(_, slot), slot);
impl_crds_entry!(Version, CrdsData::Version(version), version);
impl<'a, 'b> CrdsEntry<'a, 'b> for &'a SnapshotHash {
type Key = Pubkey;
fn get_entry(table: &'a CrdsTable, key: Self::Key) -> Option<Self> {
let key = CrdsValueLabel::SnapshotHashes(key);
match &table.get(&key)?.value.data {
CrdsData::SnapshotHashes(snapshot_hash) => Some(snapshot_hash),
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{crds::Crds, crds_value::new_rand_timestamp},
rand::seq::SliceRandom,
solana_sdk::signature::Keypair,
std::collections::HashMap,
};
#[test]
fn test_get_crds_entry() {
let mut rng = rand::thread_rng();
let mut crds = Crds::default();
let keypairs: Vec<_> = std::iter::repeat_with(Keypair::new).take(32).collect();
let mut entries = HashMap::new();
for _ in 0..256 {
let keypair = keypairs.choose(&mut rng).unwrap();
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let key = value.label();
if let Ok(()) = crds.insert(value.clone(), new_rand_timestamp(&mut rng)) {
entries.insert(key, value);
}
}
assert!(crds.len() > 64);
assert_eq!(crds.len(), entries.len());
for entry in entries.values() {
let key = entry.label();
assert_eq!(crds.get::<&CrdsValue>(&key), Some(entry));
assert_eq!(crds.get::<&CrdsData>(&key), Some(&entry.data));
assert_eq!(crds.get::<&VersionedCrdsValue>(&key).unwrap().value, *entry);
let key = entry.pubkey();
match &entry.data {
CrdsData::ContactInfo(node) => {
assert_eq!(crds.get::<&ContactInfo>(key), Some(node))
}
CrdsData::LowestSlot(_, slot) => {
assert_eq!(crds.get::<&LowestSlot>(key), Some(slot))
}
CrdsData::Version(version) => assert_eq!(crds.get::<&Version>(key), Some(version)),
CrdsData::LegacyVersion(version) => {
assert_eq!(crds.get::<&LegacyVersion>(key), Some(version))
}
CrdsData::SnapshotHashes(hash) => {
assert_eq!(crds.get::<&SnapshotHash>(key), Some(hash))
}
_ => (),
}
}
}
}

View File

@ -266,7 +266,7 @@ impl CrdsGossipPull {
Some((node, _gossip_addr)) => node, Some((node, _gossip_addr)) => node,
}; };
let filters = self.build_crds_filters(thread_pool, crds, bloom_size); let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
let peer = match crds.read().unwrap().get_contact_info(peer) { let peer = match crds.read().unwrap().get::<&ContactInfo>(peer) {
None => return Err(CrdsGossipError::NoPeers), None => return Err(CrdsGossipError::NoPeers),
Some(node) => node.clone(), Some(node) => node.clone(),
}; };
@ -393,7 +393,7 @@ impl CrdsGossipPull {
} else if now <= response.wallclock().saturating_add(timeout) { } else if now <= response.wallclock().saturating_add(timeout) {
active_values.push(response); active_values.push(response);
None None
} else if crds.get_contact_info(owner).is_some() { } else if crds.get::<&ContactInfo>(owner).is_some() {
// Silently insert this old value without bumping record // Silently insert this old value without bumping record
// timestamps // timestamps
expired_values.push(response); expired_values.push(response);
@ -1275,8 +1275,11 @@ pub(crate) mod tests {
CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1); CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1);
let dest_crds = dest_crds.read().unwrap(); let dest_crds = dest_crds.read().unwrap();
assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(rsp.iter().all(|rsp| rsp.is_empty()));
assert!(dest_crds.get(&caller.label()).is_some()); assert!(dest_crds.get::<&CrdsValue>(&caller.label()).is_some());
assert_eq!(dest_crds.get(&caller.label()).unwrap().local_timestamp, 1); assert_eq!(1, {
let entry: &VersionedCrdsValue = dest_crds.get(&caller.label()).unwrap();
entry.local_timestamp
});
} }
#[test] #[test]
fn test_process_pull_request_response() { fn test_process_pull_request_response() {
@ -1315,7 +1318,10 @@ pub(crate) mod tests {
assert_eq!(same_key.label(), new.label()); assert_eq!(same_key.label(), new.label());
assert!(same_key.wallclock() < new.wallclock()); assert!(same_key.wallclock() < new.wallclock());
node_crds.insert(same_key.clone(), 0).unwrap(); node_crds.insert(same_key.clone(), 0).unwrap();
assert_eq!(node_crds.get(&same_key.label()).unwrap().local_timestamp, 0); assert_eq!(0, {
let entry: &VersionedCrdsValue = node_crds.get(&same_key.label()).unwrap();
entry.local_timestamp
});
let node_crds = RwLock::new(node_crds); let node_crds = RwLock::new(node_crds);
let mut done = false; let mut done = false;
let mut pings = Vec::new(); let mut pings = Vec::new();
@ -1369,12 +1375,14 @@ pub(crate) mod tests {
assert_eq!(failed, 0); assert_eq!(failed, 0);
assert_eq!(1, { assert_eq!(1, {
let node_crds = node_crds.read().unwrap(); let node_crds = node_crds.read().unwrap();
node_crds.get(&new.label()).unwrap().local_timestamp let entry: &VersionedCrdsValue = node_crds.get(&new.label()).unwrap();
entry.local_timestamp
}); });
// verify that the whole record was updated for dest since this is a response from dest // verify that the whole record was updated for dest since this is a response from dest
assert_eq!(1, { assert_eq!(1, {
let node_crds = node_crds.read().unwrap(); let node_crds = node_crds.read().unwrap();
node_crds.get(&same_key.label()).unwrap().local_timestamp let entry: &VersionedCrdsValue = node_crds.get(&same_key.label()).unwrap();
entry.local_timestamp
}); });
done = true; done = true;
break; break;
@ -1398,11 +1406,13 @@ pub(crate) mod tests {
0, 0,
))); )));
node_crds.insert(old.clone(), 0).unwrap(); node_crds.insert(old.clone(), 0).unwrap();
let value_hash = node_crds.get(&old.label()).unwrap().value_hash; let value_hash = {
let entry: &VersionedCrdsValue = node_crds.get(&old.label()).unwrap();
entry.value_hash
};
//verify self is valid //verify self is valid
assert_eq!( assert_eq!(
node_crds.get(&node_label).unwrap().value.label(), node_crds.get::<&CrdsValue>(&node_label).unwrap().label(),
node_label node_label
); );
// purge // purge
@ -1413,9 +1423,12 @@ pub(crate) mod tests {
//verify self is still valid after purge //verify self is still valid after purge
assert_eq!(node_label, { assert_eq!(node_label, {
let node_crds = node_crds.read().unwrap(); let node_crds = node_crds.read().unwrap();
node_crds.get(&node_label).unwrap().value.label() node_crds.get::<&CrdsValue>(&node_label).unwrap().label()
}); });
assert_eq!(node_crds.read().unwrap().get(&old.label()), None); assert_eq!(
node_crds.read().unwrap().get::<&CrdsValue>(&old.label()),
None
);
assert_eq!(node_crds.read().unwrap().num_purged(), 1); assert_eq!(node_crds.read().unwrap().num_purged(), 1);
for _ in 0..30 { for _ in 0..30 {
// there is a chance of a false positive with bloom filters // there is a chance of a false positive with bloom filters

View File

@ -561,7 +561,7 @@ mod test {
push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0),
[Ok(label.pubkey())], [Ok(label.pubkey())],
); );
assert_eq!(crds.read().unwrap().get(&label).unwrap().value, value); assert_eq!(crds.read().unwrap().get::<&CrdsValue>(&label), Some(&value));
// push it again // push it again
assert_eq!( assert_eq!(
@ -956,7 +956,10 @@ mod test {
push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0), push.process_push_message(&crds, &Pubkey::default(), vec![value.clone()], 0),
[Ok(label.pubkey())] [Ok(label.pubkey())]
); );
assert_eq!(crds.write().unwrap().get(&label).unwrap().value, value); assert_eq!(
crds.write().unwrap().get::<&CrdsValue>(&label),
Some(&value)
);
// push it again // push it again
assert_eq!( assert_eq!(

View File

@ -145,7 +145,7 @@ mod test {
let label = value.label(); let label = value.label();
let mut crds = Crds::default(); let mut crds = Crds::default();
crds.insert(value, timestamp()).unwrap(); crds.insert(value, timestamp()).unwrap();
crds.get(&label).cloned().unwrap() crds.get::<&VersionedCrdsValue>(&label).cloned().unwrap()
} }
// Returns true if the first mask_bits most significant bits of hash is the // Returns true if the first mask_bits most significant bits of hash is the

View File

@ -584,56 +584,20 @@ impl CrdsValue {
} }
} }
#[cfg(test)] pub(crate) fn accounts_hash(&self) -> Option<&SnapshotHash> {
fn vote(&self) -> Option<&Vote> {
match &self.data {
CrdsData::Vote(_, vote) => Some(vote),
_ => None,
}
}
pub fn lowest_slot(&self) -> Option<&LowestSlot> {
match &self.data {
CrdsData::LowestSlot(_, slots) => Some(slots),
_ => None,
}
}
pub fn snapshot_hash(&self) -> Option<&SnapshotHash> {
match &self.data {
CrdsData::SnapshotHashes(slots) => Some(slots),
_ => None,
}
}
pub fn accounts_hash(&self) -> Option<&SnapshotHash> {
match &self.data { match &self.data {
CrdsData::AccountsHashes(slots) => Some(slots), CrdsData::AccountsHashes(slots) => Some(slots),
_ => None, _ => None,
} }
} }
pub fn epoch_slots(&self) -> Option<&EpochSlots> { pub(crate) fn epoch_slots(&self) -> Option<&EpochSlots> {
match &self.data { match &self.data {
CrdsData::EpochSlots(_, slots) => Some(slots), CrdsData::EpochSlots(_, slots) => Some(slots),
_ => None, _ => None,
} }
} }
pub fn legacy_version(&self) -> Option<&LegacyVersion> {
match &self.data {
CrdsData::LegacyVersion(legacy_version) => Some(legacy_version),
_ => None,
}
}
pub fn version(&self) -> Option<&Version> {
match &self.data {
CrdsData::Version(version) => Some(version),
_ => None,
}
}
/// Returns the size (in bytes) of a CrdsValue /// Returns the size (in bytes) of a CrdsValue
pub fn size(&self) -> u64 { pub fn size(&self) -> u64 {
serialized_size(&self).expect("unable to serialize contact info") serialized_size(&self).expect("unable to serialize contact info")
@ -641,7 +605,7 @@ impl CrdsValue {
/// Returns true if, regardless of prunes, this crds-value /// Returns true if, regardless of prunes, this crds-value
/// should be pushed to the receiving node. /// should be pushed to the receiving node.
pub fn should_force_push(&self, peer: &Pubkey) -> bool { pub(crate) fn should_force_push(&self, peer: &Pubkey) -> bool {
match &self.data { match &self.data {
CrdsData::NodeInstance(node) => node.from == *peer, CrdsData::NodeInstance(node) => node.from == *peer,
_ => false, _ => false,
@ -710,7 +674,10 @@ mod test {
Vote::new(Pubkey::default(), test_tx(), 0), Vote::new(Pubkey::default(), test_tx(), 0),
)); ));
assert_eq!(v.wallclock(), 0); assert_eq!(v.wallclock(), 0);
let key = v.vote().unwrap().from; let key = match &v.data {
CrdsData::Vote(_, vote) => vote.from,
_ => panic!(),
};
assert_eq!(v.label(), CrdsValueLabel::Vote(0, key)); assert_eq!(v.label(), CrdsValueLabel::Vote(0, key));
let v = CrdsValue::new_unsigned(CrdsData::LowestSlot( let v = CrdsValue::new_unsigned(CrdsData::LowestSlot(
@ -718,7 +685,10 @@ mod test {
LowestSlot::new(Pubkey::default(), 0, 0), LowestSlot::new(Pubkey::default(), 0, 0),
)); ));
assert_eq!(v.wallclock(), 0); assert_eq!(v.wallclock(), 0);
let key = v.lowest_slot().unwrap().from; let key = match &v.data {
CrdsData::LowestSlot(_, data) => data.from,
_ => panic!(),
};
assert_eq!(v.label(), CrdsValueLabel::LowestSlot(key)); assert_eq!(v.label(), CrdsValueLabel::LowestSlot(key));
} }

View File

@ -6,6 +6,7 @@ mod cluster_info_metrics;
#[macro_use] #[macro_use]
pub mod contact_info; pub mod contact_info;
pub mod crds; pub mod crds;
pub mod crds_entry;
pub mod crds_gossip; pub mod crds_gossip;
pub mod crds_gossip_error; pub mod crds_gossip_error;
pub mod crds_gossip_pull; pub mod crds_gossip_pull;

View File

@ -187,7 +187,7 @@ fn ring_network_create(num: usize) -> Network {
let start_id = keys[k]; let start_id = keys[k];
let label = CrdsValueLabel::ContactInfo(start_id); let label = CrdsValueLabel::ContactInfo(start_id);
let gossip_crds = start.gossip.crds.read().unwrap(); let gossip_crds = start.gossip.crds.read().unwrap();
gossip_crds.get(&label).unwrap().value.clone() gossip_crds.get::<&CrdsValue>(&label).unwrap().clone()
}; };
let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap(); let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap();
let mut end_crds = end.gossip.crds.write().unwrap(); let mut end_crds = end.gossip.crds.write().unwrap();
@ -221,7 +221,7 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
let start = &network[k]; let start = &network[k];
let start_label = CrdsValueLabel::ContactInfo(*k); let start_label = CrdsValueLabel::ContactInfo(*k);
let gossip_crds = start.gossip.crds.read().unwrap(); let gossip_crds = start.gossip.crds.read().unwrap();
gossip_crds.get(&start_label).unwrap().value.clone() gossip_crds.get::<&CrdsValue>(&start_label).unwrap().clone()
}) })
.collect(); .collect();
for (end_pubkey, end) in network.iter_mut() { for (end_pubkey, end) in network.iter_mut() {
@ -276,7 +276,7 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
let node_pubkey = node.keypair.pubkey(); let node_pubkey = node.keypair.pubkey();
let mut m = { let mut m = {
let node_crds = node.gossip.crds.read().unwrap(); let node_crds = node.gossip.crds.read().unwrap();
node_crds.get_contact_info(node_pubkey).cloned().unwrap() node_crds.get::<&ContactInfo>(node_pubkey).cloned().unwrap()
}; };
m.wallclock = now; m.wallclock = now;
node.gossip.process_push_message( node.gossip.process_push_message(
@ -495,7 +495,7 @@ fn network_run_pull(
let from_pubkey = from.keypair.pubkey(); let from_pubkey = from.keypair.pubkey();
let label = CrdsValueLabel::ContactInfo(from_pubkey); let label = CrdsValueLabel::ContactInfo(from_pubkey);
let gossip_crds = from.gossip.crds.read().unwrap(); let gossip_crds = from.gossip.crds.read().unwrap();
let self_info = gossip_crds.get(&label).unwrap().value.clone(); let self_info = gossip_crds.get::<&CrdsValue>(&label).unwrap().clone();
Some((peer.id, filters, self_info)) Some((peer.id, filters, self_info))
}) })
.collect() .collect()