indexes duplicate-shreds in gossip crds table (#29317)

Also adding Crds::get_duplicate_shreds which retrieves all upserted
duplicate-shreds since a given cursor using the index.
This commit is contained in:
behzad nouri 2022-12-20 13:48:05 +00:00 committed by GitHub
parent 9a82368dc9
commit 2d849a2eae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 0 deletions

View File

@ -33,6 +33,7 @@ use {
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, IncrementalSnapshotHashes, self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, IncrementalSnapshotHashes,
LowestSlot, NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK, LowestSlot, NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
}, },
duplicate_shred::DuplicateShred,
epoch_slots::EpochSlots, epoch_slots::EpochSlots,
gossip_error::GossipError, gossip_error::GossipError,
ping_pong::{self, PingCache, Pong}, ping_pong::{self, PingCache, Pong},
@ -1225,6 +1226,19 @@ impl ClusterInfo {
.collect() .collect()
} }
/// Returns duplicate-shreds inserted since the given cursor.
#[allow(dead_code)]
pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec<DuplicateShred> {
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_duplicate_shreds(cursor)
.map(|entry| match &entry.value.data {
CrdsData::DuplicateShred(_, dup) => dup.clone(),
_ => panic!("this should not happen!"),
})
.collect()
}
pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> { pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
let gossip_crds = self.gossip.crds.read().unwrap(); let gossip_crds = self.gossip.crds.read().unwrap();
if let Some(version) = gossip_crds.get::<&Version>(*pubkey) { if let Some(version) = gossip_crds.get::<&Version>(*pubkey) {

View File

@ -66,6 +66,8 @@ pub struct Crds {
votes: BTreeMap<u64 /*insert order*/, usize /*index*/>, votes: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of EpochSlots keyed by insert order. // Indices of EpochSlots keyed by insert order.
epoch_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>, epoch_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of DuplicateShred keyed by insert order.
duplicate_shreds: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of all crds values associated with a node. // Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>, records: HashMap<Pubkey, IndexSet<usize>>,
// Indices of all entries keyed by insert order. // Indices of all entries keyed by insert order.
@ -157,6 +159,7 @@ impl Default for Crds {
nodes: IndexSet::default(), nodes: IndexSet::default(),
votes: BTreeMap::default(), votes: BTreeMap::default(),
epoch_slots: BTreeMap::default(), epoch_slots: BTreeMap::default(),
duplicate_shreds: BTreeMap::default(),
records: HashMap::default(), records: HashMap::default(),
entries: BTreeMap::default(), entries: BTreeMap::default(),
purged: VecDeque::default(), purged: VecDeque::default(),
@ -227,6 +230,9 @@ impl Crds {
CrdsData::EpochSlots(_, _) => { CrdsData::EpochSlots(_, _) => {
self.epoch_slots.insert(value.ordinal, entry_index); self.epoch_slots.insert(value.ordinal, entry_index);
} }
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.insert(value.ordinal, entry_index);
}
_ => (), _ => (),
}; };
self.entries.insert(value.ordinal, entry_index); self.entries.insert(value.ordinal, entry_index);
@ -255,6 +261,10 @@ impl Crds {
self.epoch_slots.remove(&entry.get().ordinal); self.epoch_slots.remove(&entry.get().ordinal);
self.epoch_slots.insert(value.ordinal, entry_index); self.epoch_slots.insert(value.ordinal, entry_index);
} }
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.remove(&entry.get().ordinal);
self.duplicate_shreds.insert(value.ordinal, entry_index);
}
_ => (), _ => (),
} }
self.entries.remove(&entry.get().ordinal); self.entries.remove(&entry.get().ordinal);
@ -340,6 +350,21 @@ impl Crds {
}) })
} }
/// Returns duplicate-shreds inserted since the given cursor.
/// Updates the cursor as the values are consumed.
pub(crate) fn get_duplicate_shreds<'a>(
&'a self,
cursor: &'a mut Cursor,
) -> impl Iterator<Item = &'a VersionedCrdsValue> {
let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded);
self.duplicate_shreds
.range(range)
.map(move |(ordinal, index)| {
cursor.consume(*ordinal);
self.table.index(*index)
})
}
/// Returns all entries inserted since the given cursor. /// Returns all entries inserted since the given cursor.
pub(crate) fn get_entries<'a>( pub(crate) fn get_entries<'a>(
&'a self, &'a self,
@ -499,6 +524,9 @@ impl Crds {
CrdsData::EpochSlots(_, _) => { CrdsData::EpochSlots(_, _) => {
self.epoch_slots.remove(&value.ordinal); self.epoch_slots.remove(&value.ordinal);
} }
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.remove(&value.ordinal);
}
_ => (), _ => (),
} }
self.entries.remove(&value.ordinal); self.entries.remove(&value.ordinal);
@ -534,6 +562,9 @@ impl Crds {
CrdsData::EpochSlots(_, _) => { CrdsData::EpochSlots(_, _) => {
self.epoch_slots.insert(value.ordinal, index); self.epoch_slots.insert(value.ordinal, index);
} }
CrdsData::DuplicateShred(_, _) => {
self.duplicate_shreds.insert(value.ordinal, index);
}
_ => (), _ => (),
}; };
self.entries.insert(value.ordinal, index); self.entries.insert(value.ordinal, index);
@ -618,6 +649,7 @@ impl Crds {
nodes: self.nodes.clone(), nodes: self.nodes.clone(),
votes: self.votes.clone(), votes: self.votes.clone(),
epoch_slots: self.epoch_slots.clone(), epoch_slots: self.epoch_slots.clone(),
duplicate_shreds: self.duplicate_shreds.clone(),
records: self.records.clone(), records: self.records.clone(),
entries: self.entries.clone(), entries: self.entries.clone(),
purged: self.purged.clone(), purged: self.purged.clone(),