adds mapping from nodes pubkeys to their shred-version (#17940)

Crds values of nodes with different shred versions are creeping into
gossip table resulting in runtime issues as the one addressed in:
https://github.com/solana-labs/solana/pull/17899

This commit works towards enforcing more checks and filtering based on
shred version by adding necessary mapping and api to gossip table.
Once populated, pubkey->shred-version mapping persists as long as there
are any values associated with the pubkey.
This commit is contained in:
behzad nouri 2021-06-18 15:56:04 +00:00 committed by GitHub
parent abc9839d86
commit 5a99fa3790
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 82 additions and 20 deletions

View File

@ -18,6 +18,7 @@ indexmap = { version = "1.6", features = ["rayon"] }
itertools = "0.10.1"
log = "0.4.11"
lru = "0.6.1"
matches = "0.1.8"
num-traits = "0.2"
rand = "0.7.0"
rand_chacha = "0.2.2"

View File

@ -1173,16 +1173,13 @@ impl ClusterInfo {
/// Returns epoch-slots inserted since the given cursor.
/// Excludes entries from nodes with unkown or different shred version.
pub fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec<EpochSlots> {
let self_shred_version = self.my_shred_version();
let self_shred_version = Some(self.my_shred_version());
let gossip = self.gossip.read().unwrap();
let entries = gossip.crds.get_epoch_slots(cursor);
entries
.filter(
|entry| match gossip.crds.get_contact_info(entry.value.pubkey()) {
Some(node) => node.shred_version == self_shred_version,
None => false,
},
)
.filter(|entry| {
gossip.crds.get_shred_version(&entry.value.pubkey()) == self_shred_version
})
.map(|entry| match &entry.value.data {
CrdsData::EpochSlots(_, slots) => slots.clone(),
_ => panic!("this should not happen!"),
@ -2213,9 +2210,10 @@ impl ClusterInfo {
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id, from, len);
let shred_version = self
.lookup_contact_info(from, |ci| ci.shred_version)
.unwrap_or(0);
let shred_version = {
let gossip = self.gossip.read().unwrap();
gossip.crds.get_shred_version(from).unwrap_or_default()
};
Self::filter_by_shred_version(
from,
&mut crds_values,
@ -2365,10 +2363,7 @@ impl ClusterInfo {
let gossip = self.gossip.read().unwrap();
messages
.iter()
.map(|(from, _)| match gossip.crds.get_contact_info(*from) {
None => 0,
Some(info) => info.shred_version,
})
.map(|(from, _)| gossip.crds.get_shred_version(from).unwrap_or_default())
.collect()
};
// Filter out data if the origin has different shred version.

View File

@ -35,6 +35,7 @@ use {
map::{rayon::ParValues, Entry, IndexMap},
set::IndexSet,
},
matches::debug_assert_matches,
rayon::{prelude::*, ThreadPool},
solana_sdk::{
hash::{hash, Hash},
@ -66,6 +67,8 @@ pub struct Crds {
entries: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Hash of recently purged values.
purged: VecDeque<(Hash, u64 /*timestamp*/)>,
// Mapping from nodes' pubkeys to their respective shred-version.
shred_versions: HashMap<Pubkey, u16>,
}
#[derive(PartialEq, Debug)]
@ -125,6 +128,7 @@ impl Default for Crds {
records: HashMap::default(),
entries: BTreeMap::default(),
purged: VecDeque::default(),
shred_versions: HashMap::default(),
}
}
}
@ -173,9 +177,10 @@ impl Crds {
Entry::Vacant(entry) => {
let entry_index = entry.index();
self.shards.insert(entry_index, &value);
match value.value.data {
CrdsData::ContactInfo(_) => {
match &value.value.data {
CrdsData::ContactInfo(node) => {
self.nodes.insert(entry_index);
self.shred_versions.insert(pubkey, node.shred_version);
}
CrdsData::Vote(_, _) => {
self.votes.insert(value.ordinal, entry_index);
@ -195,7 +200,13 @@ impl Crds {
let entry_index = entry.index();
self.shards.remove(entry_index, entry.get());
self.shards.insert(entry_index, &value);
match value.value.data {
match &value.value.data {
CrdsData::ContactInfo(node) => {
self.shred_versions.insert(pubkey, node.shred_version);
// self.nodes does not need to be updated since the
// entry at this index was and stays contact-info.
debug_assert_matches!(entry.get().value.data, CrdsData::ContactInfo(_));
}
CrdsData::Vote(_, _) => {
self.votes.remove(&entry.get().ordinal);
self.votes.insert(value.ordinal, entry_index);
@ -239,6 +250,10 @@ impl Crds {
self.table.get(&label)?.value.contact_info()
}
pub(crate) fn get_shred_version(&self, pubkey: &Pubkey) -> Option<u16> {
self.shred_versions.get(pubkey).copied()
}
pub fn get_lowest_slot(&self, pubkey: Pubkey) -> Option<&LowestSlot> {
let lable = CrdsValueLabel::LowestSlot(pubkey);
self.table.get(&lable)?.value.lowest_slot()
@ -449,6 +464,7 @@ impl Crds {
records_entry.get_mut().swap_remove(&index);
if records_entry.get().is_empty() {
records_entry.remove();
self.shred_versions.remove(&pubkey);
}
// If index == self.table.len(), then the removed entry was the last
// entry in the table, in which case no other keys were modified.
@ -544,17 +560,20 @@ impl Crds {
}
#[cfg(test)]
mod test {
mod tests {
use {
super::*,
crate::{
contact_info::ContactInfo,
crds_value::{new_rand_timestamp, NodeInstance},
crds_value::{new_rand_timestamp, NodeInstance, SnapshotHash},
},
rand::{thread_rng, Rng, SeedableRng},
rand_chacha::ChaChaRng,
rayon::ThreadPoolBuilder,
solana_sdk::signature::{Keypair, Signer},
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
},
std::{collections::HashSet, iter::repeat_with},
};
@ -1017,6 +1036,53 @@ mod test {
assert!(crds.records.is_empty());
}
#[test]
fn test_get_shred_version() {
let mut rng = rand::thread_rng();
let pubkey = Pubkey::new_unique();
let mut crds = Crds::default();
assert_eq!(crds.get_shred_version(&pubkey), None);
// Initial insertion of a node with shred version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
let wallclock = node.wallclock;
node.shred_version = 42;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Ok(()));
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// An outdated value should not update shred-version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
node.wallclock = wallclock - 1; // outdated.
node.shred_version = 8;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Err(CrdsError::InsertFailed));
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// Update shred version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
node.wallclock = wallclock + 1; // so that it overrides the prev one.
node.shred_version = 8;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Ok(()));
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Add other crds values with the same pubkey.
let val = SnapshotHash::new_rand(&mut rng, Some(pubkey));
let val = CrdsData::SnapshotHashes(val);
let val = CrdsValue::new_unsigned(val);
assert_eq!(crds.insert(val, timestamp()), Ok(()));
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Remove contact-info. Shred version should stay there since there
// are still values associated with the pubkey.
crds.remove(&CrdsValueLabel::ContactInfo(pubkey), timestamp());
assert_eq!(crds.get_contact_info(pubkey), None);
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Remove the remaining entry with the same pubkey.
crds.remove(&CrdsValueLabel::SnapshotHashes(pubkey), timestamp());
assert_eq!(crds.get_records(&pubkey).count(), 0);
assert_eq!(crds.get_shred_version(&pubkey), None);
}
#[test]
#[allow(clippy::needless_collect)]
fn test_drop() {