indexes crds values by their insert order

This commit is contained in:
behzad nouri 2021-04-30 12:11:26 -04:00
parent ff95e2aaa6
commit dfa3e7a61c
1 changed files with 46 additions and 5 deletions

View File

@ -56,6 +56,8 @@ pub struct Crds {
epoch_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>,
// Indices of all entries keyed by insert order.
entries: BTreeMap<u64 /*insert order*/, usize /*index*/>,
}
#[derive(PartialEq, Debug)]
@ -117,6 +119,7 @@ impl Default for Crds {
votes: IndexSet::default(),
epoch_slots: BTreeMap::default(),
records: HashMap::default(),
entries: BTreeMap::default(),
}
}
}
@ -155,6 +158,7 @@ impl Crds {
local_timestamp: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
let label = value.label();
let pubkey = value.pubkey();
let value = VersionedCrdsValue::new(value, self.cursor, local_timestamp);
match self.table.entry(label) {
Entry::Vacant(entry) => {
@ -172,10 +176,8 @@ impl Crds {
}
_ => (),
};
self.records
.entry(value.value.pubkey())
.or_default()
.insert(entry_index);
self.entries.insert(value.ordinal, entry_index);
self.records.entry(pubkey).or_default().insert(entry_index);
self.cursor.consume(value.ordinal);
entry.insert(value);
Ok(None)
@ -188,9 +190,11 @@ impl Crds {
self.epoch_slots.remove(&entry.get().ordinal);
self.epoch_slots.insert(value.ordinal, entry_index);
}
self.entries.remove(&entry.get().ordinal);
self.entries.insert(value.ordinal, entry_index);
// As long as the pubkey does not change, self.records
// does not need to be updated.
debug_assert_eq!(entry.get().value.pubkey(), value.value.pubkey());
debug_assert_eq!(entry.get().value.pubkey(), pubkey);
self.cursor.consume(value.ordinal);
Ok(Some(entry.insert(value)))
}
@ -271,6 +275,18 @@ impl Crds {
})
}
/// Returns all entries inserted since the given cursor.
pub(crate) fn get_entries<'a>(
&'a self,
cursor: &'a mut Cursor,
) -> impl Iterator<Item = &'a VersionedCrdsValue> {
let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded);
self.entries.range(range).map(move |(ordinal, index)| {
cursor.consume(*ordinal);
self.table.index(*index)
})
}
/// Returns all records associated with a pubkey.
pub(crate) fn get_records(&self, pubkey: &Pubkey) -> impl Iterator<Item = &VersionedCrdsValue> {
self.records
@ -417,6 +433,7 @@ impl Crds {
}
_ => (),
}
self.entries.remove(&value.ordinal);
// Remove the index from records associated with the value's pubkey.
let pubkey = value.value.pubkey();
let mut records_entry = match self.records.entry(pubkey) {
@ -451,6 +468,7 @@ impl Crds {
}
_ => (),
};
self.entries.insert(value.ordinal, index);
let pubkey = value.value.pubkey();
let records = self.records.get_mut(&pubkey).unwrap();
records.swap_remove(&size);
@ -833,6 +851,25 @@ mod test {
_ => panic!("not a vote!"),
}
}
let num_entries = crds
.table
.values()
.filter(|value| value.ordinal >= since)
.count();
let mut cursor = Cursor(since);
assert_eq!(num_entries, crds.get_entries(&mut cursor).count());
assert_eq!(
cursor.0,
crds.entries
.iter()
.last()
.map(|(k, _)| k + 1)
.unwrap_or_default()
.max(since)
);
for value in crds.get_entries(&mut Cursor(since)) {
assert!(value.ordinal >= since);
}
let num_nodes = crds
.table
.values()
@ -848,6 +885,10 @@ mod test {
.values()
.filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _)))
.count();
assert_eq!(
crds.table.len(),
crds.get_entries(&mut Cursor::default()).count()
);
assert_eq!(num_nodes, crds.get_nodes_contact_info().count());
assert_eq!(num_votes, crds.get_votes(&mut Cursor::default()).count());
assert_eq!(