diff --git a/core/src/crds.rs b/core/src/crds.rs index d4bdbb9f8b..0f22fc2553 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -56,6 +56,8 @@ pub struct Crds { epoch_slots: BTreeMap, // Indices of all crds values associated with a node. records: HashMap>, + // Indices of all entries keyed by insert order. + entries: BTreeMap, } #[derive(PartialEq, Debug)] @@ -117,6 +119,7 @@ impl Default for Crds { votes: IndexSet::default(), epoch_slots: BTreeMap::default(), records: HashMap::default(), + entries: BTreeMap::default(), } } } @@ -155,6 +158,7 @@ impl Crds { local_timestamp: u64, ) -> Result, CrdsError> { let label = value.label(); + let pubkey = value.pubkey(); let value = VersionedCrdsValue::new(value, self.cursor, local_timestamp); match self.table.entry(label) { Entry::Vacant(entry) => { @@ -172,10 +176,8 @@ impl Crds { } _ => (), }; - self.records - .entry(value.value.pubkey()) - .or_default() - .insert(entry_index); + self.entries.insert(value.ordinal, entry_index); + self.records.entry(pubkey).or_default().insert(entry_index); self.cursor.consume(value.ordinal); entry.insert(value); Ok(None) @@ -188,9 +190,11 @@ impl Crds { self.epoch_slots.remove(&entry.get().ordinal); self.epoch_slots.insert(value.ordinal, entry_index); } + self.entries.remove(&entry.get().ordinal); + self.entries.insert(value.ordinal, entry_index); // As long as the pubkey does not change, self.records // does not need to be updated. - debug_assert_eq!(entry.get().value.pubkey(), value.value.pubkey()); + debug_assert_eq!(entry.get().value.pubkey(), pubkey); self.cursor.consume(value.ordinal); Ok(Some(entry.insert(value))) } @@ -271,6 +275,18 @@ impl Crds { }) } + /// Returns all entries inserted since the given cursor. + pub(crate) fn get_entries<'a>( + &'a self, + cursor: &'a mut Cursor, + ) -> impl Iterator { + let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded); + self.entries.range(range).map(move |(ordinal, index)| { + cursor.consume(*ordinal); + self.table.index(*index) + }) + } + /// Returns all records associated with a pubkey. pub(crate) fn get_records(&self, pubkey: &Pubkey) -> impl Iterator { self.records @@ -417,6 +433,7 @@ impl Crds { } _ => (), } + self.entries.remove(&value.ordinal); // Remove the index from records associated with the value's pubkey. let pubkey = value.value.pubkey(); let mut records_entry = match self.records.entry(pubkey) { @@ -451,6 +468,7 @@ impl Crds { } _ => (), }; + self.entries.insert(value.ordinal, index); let pubkey = value.value.pubkey(); let records = self.records.get_mut(&pubkey).unwrap(); records.swap_remove(&size); @@ -833,6 +851,25 @@ mod test { _ => panic!("not a vote!"), } } + let num_entries = crds + .table + .values() + .filter(|value| value.ordinal >= since) + .count(); + let mut cursor = Cursor(since); + assert_eq!(num_entries, crds.get_entries(&mut cursor).count()); + assert_eq!( + cursor.0, + crds.entries + .iter() + .last() + .map(|(k, _)| k + 1) + .unwrap_or_default() + .max(since) + ); + for value in crds.get_entries(&mut Cursor(since)) { + assert!(value.ordinal >= since); + } let num_nodes = crds .table .values() @@ -848,6 +885,10 @@ mod test { .values() .filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _))) .count(); + assert_eq!( + crds.table.len(), + crds.get_entries(&mut Cursor::default()).count() + ); assert_eq!(num_nodes, crds.get_nodes_contact_info().count()); assert_eq!(num_votes, crds.get_votes(&mut Cursor::default()).count()); assert_eq!(