indexes crds values associated with a pubkey (#14088)

record_labels returns all the possible labels for a record identified by
a pubkey, used in updating timestamp of crds values:
https://github.com/solana-labs/solana/blob/1792100e2/core/src/crds_value.rs#L560-L577
https://github.com/solana-labs/solana/blob/1792100e2/core/src/crds.rs#L240-L251
The code relies on CrdsValueLabel to be limited to a small deterministic
set of possible values for a fixed pubkey. As we expand crds values to
include duplicate shreds, this limits what the duplicate proofs can be
keyed by in the table.
In addition the computation of these labels is inefficient and will
become more so as duplicate shreds and more types of crds values are
added. An alternative is to maintain an index of all crds values
associated with a pubkey.
This commit is contained in:
behzad nouri 2020-12-15 01:49:22 +00:00 committed by GitHub
parent 5294f70189
commit c2b7115031
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 53 deletions

View File

@ -36,8 +36,8 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::timing::timestamp;
use std::cmp;
use std::collections::HashMap;
use std::ops::Index;
use std::collections::{hash_map, HashMap};
use std::ops::{Index, IndexMut};
const CRDS_SHARDS_BITS: u32 = 8;
@ -49,6 +49,8 @@ pub struct Crds {
shards: CrdsShards,
// Indices of all crds values which are node ContactInfo.
nodes: IndexSet<usize>,
// Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>,
}
#[derive(PartialEq, Debug)]
@ -107,6 +109,7 @@ impl Default for Crds {
num_inserts: 0,
shards: CrdsShards::new(CRDS_SHARDS_BITS),
nodes: IndexSet::default(),
records: HashMap::default(),
}
}
}
@ -141,6 +144,10 @@ impl Crds {
if let CrdsData::ContactInfo(_) = new_value.value.data {
self.nodes.insert(entry_index);
}
self.records
.entry(new_value.value.pubkey())
.or_default()
.insert(entry_index);
entry.insert(new_value);
self.num_inserts += 1;
Ok(None)
@ -150,6 +157,9 @@ impl Crds {
self.shards.remove(index, entry.get());
self.shards.insert(index, &new_value);
self.num_inserts += 1;
// As long as the pubkey does not change, self.records
// does not need to be updated.
debug_assert_eq!(entry.get().value.pubkey(), new_value.value.pubkey());
Ok(Some(entry.insert(new_value)))
}
_ => {
@ -237,16 +247,15 @@ impl Crds {
.map(move |i| self.table.index(i))
}
fn update_label_timestamp(&mut self, id: &CrdsValueLabel, now: u64) {
if let Some(e) = self.table.get_mut(id) {
e.local_timestamp = cmp::max(e.local_timestamp, now);
}
}
/// Update the timestamp's of all the labels that are associated with Pubkey
pub fn update_record_timestamp(&mut self, pubkey: &Pubkey, now: u64) {
for label in CrdsValue::record_labels(*pubkey) {
self.update_label_timestamp(&label, now);
if let Some(indices) = self.records.get(pubkey) {
for index in indices {
let entry = self.table.index_mut(*index);
if entry.local_timestamp < now {
entry.local_timestamp = now;
}
}
}
}
@ -278,11 +287,21 @@ impl Crds {
}
pub fn remove(&mut self, key: &CrdsValueLabel) -> Option<VersionedCrdsValue> {
let (index, _, value) = self.table.swap_remove_full(key)?;
let (index, _ /*label*/, value) = self.table.swap_remove_full(key)?;
self.shards.remove(index, &value);
if let CrdsData::ContactInfo(_) = value.value.data {
self.nodes.swap_remove(&index);
}
// Remove the index from records associated with the value's pubkey.
let pubkey = value.value.pubkey();
let mut records_entry = match self.records.entry(pubkey) {
hash_map::Entry::Vacant(_) => panic!("this should not happen!"),
hash_map::Entry::Occupied(entry) => entry,
};
records_entry.get_mut().swap_remove(&index);
if records_entry.get().is_empty() {
records_entry.remove();
}
// If index == self.table.len(), then the removed entry was the last
// entry in the table, in which case no other keys were modified.
// Otherwise, the previously last element in the table is now moved to
@ -297,6 +316,10 @@ impl Crds {
self.nodes.swap_remove(&size);
self.nodes.insert(index);
}
let pubkey = value.value.pubkey();
let records = self.records.get_mut(&pubkey).unwrap();
records.swap_remove(&size);
records.insert(index);
}
Some(value)
}
@ -308,6 +331,7 @@ mod test {
use crate::contact_info::ContactInfo;
use rand::{thread_rng, Rng};
use rayon::ThreadPoolBuilder;
use std::iter::repeat_with;
#[test]
fn test_insert() {
@ -353,8 +377,6 @@ mod test {
)));
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
crds.update_label_timestamp(&val.label(), 1);
assert_eq!(crds.table[&val.label()].local_timestamp, 1);
assert_eq!(crds.table[&val.label()].insert_timestamp, 0);
let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
@ -562,6 +584,45 @@ mod test {
}
}
#[test]
fn test_crds_records() {
fn check_crds_records(crds: &Crds) {
assert_eq!(
crds.table.len(),
crds.records.values().map(IndexSet::len).sum::<usize>()
);
for (pubkey, indices) in &crds.records {
for index in indices {
let value = crds.table.index(*index);
assert_eq!(*pubkey, value.value.pubkey());
}
}
}
let mut rng = thread_rng();
let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect();
let mut crds = Crds::default();
for k in 0..4096 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
let _ = crds.insert_versioned(value);
if k % 64 == 0 {
check_crds_records(&crds);
}
}
assert!(crds.records.len() > 96);
assert!(crds.records.len() <= keypairs.len());
// Remove values one by one and assert that records stay valid.
while !crds.table.is_empty() {
let index = rng.gen_range(0, crds.table.len());
let key = crds.table.get_index(index).unwrap().0.clone();
crds.remove(&key);
if crds.table.len() % 64 == 0 {
check_crds_records(&crds);
}
}
assert!(crds.records.is_empty());
}
#[test]
fn test_remove_staked() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();

View File

@ -557,25 +557,6 @@ impl CrdsValue {
}
}
/// Return all the possible labels for a record identified by Pubkey.
/// Excludes NodeInstance, which is pushed priodically, and does not need
/// to update its local-timestmap.
pub fn record_labels(key: Pubkey) -> impl Iterator<Item = CrdsValueLabel> {
const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 6] = [
CrdsValueLabel::ContactInfo,
CrdsValueLabel::LowestSlot,
CrdsValueLabel::SnapshotHashes,
CrdsValueLabel::AccountsHashes,
CrdsValueLabel::LegacyVersion,
CrdsValueLabel::Version,
];
CRDS_VALUE_LABEL_STUBS
.iter()
.map(move |f| (f)(key))
.chain((0..MAX_VOTES).map(move |ix| CrdsValueLabel::Vote(ix, key)))
.chain((0..MAX_EPOCH_SLOTS).map(move |ix| CrdsValueLabel::EpochSlots(ix, key)))
}
/// Returns the size (in bytes) of a CrdsValue
pub fn size(&self) -> u64 {
serialized_size(&self).expect("unable to serialize contact info")
@ -653,27 +634,6 @@ mod test {
use std::cmp::Ordering;
use std::iter::repeat_with;
#[test]
fn test_labels() {
let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
// this method should cover all the possible labels
for v in CrdsValue::record_labels(Pubkey::default()) {
match &v {
CrdsValueLabel::ContactInfo(_) => hits[0] = true,
CrdsValueLabel::LowestSlot(_) => hits[1] = true,
CrdsValueLabel::SnapshotHashes(_) => hits[2] = true,
CrdsValueLabel::AccountsHashes(_) => hits[3] = true,
CrdsValueLabel::LegacyVersion(_) => hits[4] = true,
CrdsValueLabel::Version(_) => hits[5] = true,
CrdsValueLabel::NodeInstance(_, _) => panic!("NodeInstance!?"),
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 6] = true,
CrdsValueLabel::EpochSlots(ix, _) => {
hits[*ix as usize + MAX_VOTES as usize + 6] = true
}
}
}
assert!(hits.iter().all(|x| *x));
}
#[test]
fn test_keys_and_values() {
let v = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));