implements cursor for gossip crds table queries (#16952)

VersionedCrdsValue.insert_timestamp is used for fetching crds values
inserted since last query:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298

So it is crucial that insert_timestamp does not go backward in time when
new values are inserted into the table. However std::time::SystemTime is
not monotonic, or due to workload, lock contention, thread scheduling,
etc, ... new values may be inserted with a stalled timestamp way in the
past. Additionally, reading system time for the above purpose is
inefficient/unnecessary.

This commit adds an ordinal index to crds values indicating their insert
order. Additionally, it implements a new Cursor type for fetching values
inserted since last query.
This commit is contained in:
behzad nouri 2021-05-06 14:04:17 +00:00 committed by GitHub
parent d19526e6c2
commit fa86a335b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 273 additions and 264 deletions

View File

@ -15,6 +15,7 @@
use crate::{
cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer},
contact_info::ContactInfo,
crds::Cursor,
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
@ -1120,20 +1121,13 @@ impl ClusterInfo {
Ok(())
}
/// Get votes in the crds
/// * since - The timestamp of when the vote inserted must be greater than
/// since. This allows the bank to query for new votes only.
///
/// * return - The votes, and the max timestamp from the new set.
pub fn get_votes(&self, since: u64) -> (Vec<CrdsValueLabel>, Vec<Transaction>, u64) {
let mut max_ts = since;
let (labels, txs): (Vec<CrdsValueLabel>, Vec<Transaction>) = self
/// Returns votes inserted since the given cursor.
pub fn get_votes(&self, cursor: &mut Cursor) -> (Vec<CrdsValueLabel>, Vec<Transaction>) {
let (labels, txs): (_, Vec<_>) = self
.time_gossip_read_lock("get_votes", &self.stats.get_votes)
.crds
.get_votes()
.filter(|vote| vote.insert_timestamp > since)
.get_votes(cursor)
.map(|vote| {
max_ts = std::cmp::max(vote.insert_timestamp, max_ts);
let transaction = match &vote.value.data {
CrdsData::Vote(_, vote) => vote.transaction().clone(),
_ => panic!("this should not happen!"),
@ -1142,7 +1136,7 @@ impl ClusterInfo {
})
.unzip();
inc_new_counter_info!("cluster_info-get_votes-count", txs.len());
(labels, txs, max_ts)
(labels, txs)
}
pub(crate) fn push_duplicate_shred(&self, shred: &Shred, other_payload: &[u8]) -> Result<()> {
@ -1180,52 +1174,15 @@ impl ClusterInfo {
.map(map)
}
pub fn get_lowest_slot_for_node<F, Y>(
&self,
pubkey: &Pubkey,
since: Option<u64>,
map: F,
) -> Option<Y>
where
F: FnOnce(&LowestSlot, u64) -> Y,
{
self.gossip
.read()
.unwrap()
.crds
.get(&CrdsValueLabel::LowestSlot(*pubkey))
.filter(|x| {
since
.map(|since| x.insert_timestamp > since)
.unwrap_or(true)
pub(crate) fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec<EpochSlots> {
let gossip = self.gossip.read().unwrap();
let entries = gossip.crds.get_epoch_slots(cursor);
entries
.map(|entry| match &entry.value.data {
CrdsData::EpochSlots(_, slots) => slots.clone(),
_ => panic!("this should not happen!"),
})
.map(|x| map(x.value.lowest_slot().unwrap(), x.insert_timestamp))
}
pub fn get_epoch_slots_since(
&self,
timestamp: u64,
) -> (
Vec<EpochSlots>,
Option<u64>, // Most recent insert timestmap.
) {
let mut max_ts = 0;
let vals: Vec<_> = self
.gossip
.read()
.unwrap()
.crds
.get_epoch_slots_since(timestamp)
.map(|value| {
max_ts = std::cmp::max(max_ts, value.insert_timestamp);
match &value.value.data {
CrdsData::EpochSlots(_, slots) => slots.clone(),
_ => panic!("this should not happen!"),
}
})
.collect();
let max_ts = if vals.is_empty() { None } else { Some(max_ts) };
(vals, max_ts)
.collect()
}
pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
@ -3670,7 +3627,8 @@ mod tests {
);
cluster_info.push_vote(&unrefresh_tower, unrefresh_tx.clone());
cluster_info.flush_push_queue();
let (_, votes, max_ts) = cluster_info.get_votes(0);
let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![unrefresh_tx.clone()]);
// Now construct vote for the slot to be refreshed later
@ -3691,9 +3649,9 @@ mod tests {
// shouldn't add the vote
cluster_info.refresh_vote(refresh_tx.clone(), refresh_slot);
cluster_info.flush_push_queue();
let (_, votes, max_ts) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![]);
let (_, votes, _) = cluster_info.get_votes(0);
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), 1);
assert!(votes.contains(&unrefresh_tx));
@ -3702,7 +3660,7 @@ mod tests {
cluster_info.flush_push_queue();
// Should be two votes in gossip
let (_, votes, _) = cluster_info.get_votes(0);
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), 2);
assert!(votes.contains(&unrefresh_tx));
assert!(votes.contains(&refresh_tx));
@ -3728,12 +3686,12 @@ mod tests {
cluster_info.flush_push_queue();
// The diff since `max_ts` should only be the latest refreshed vote
let (_, votes, _) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
assert_eq!(votes[0], latest_refresh_tx);
// Should still be two votes in gossip
let (_, votes, _) = cluster_info.get_votes(0);
let (_, votes) = cluster_info.get_votes(&mut Cursor::default());
assert_eq!(votes.len(), 2);
assert!(votes.contains(&unrefresh_tx));
assert!(votes.contains(&latest_refresh_tx));
@ -3747,10 +3705,9 @@ mod tests {
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
// make sure empty crds is handled correctly
let now = timestamp();
let (_, votes, max_ts) = cluster_info.get_votes(now);
let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![]);
assert_eq!(max_ts, now);
// add a vote
let vote = Vote::new(
@ -3770,8 +3727,7 @@ mod tests {
cluster_info.push_vote(&tower, tx.clone());
cluster_info.flush_push_queue();
// -1 to make sure that the clock is strictly lower then when insert occurred
let (labels, votes, max_ts) = cluster_info.get_votes(now - 1);
let (labels, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![tx]);
assert_eq!(labels.len(), 1);
match labels[0] {
@ -3781,12 +3737,9 @@ mod tests {
_ => panic!("Bad match"),
}
assert!(max_ts >= now - 1);
// make sure timestamp filter works
let (_, votes, new_max_ts) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes, vec![]);
assert_eq!(max_ts, new_max_ts);
}
fn new_vote_transaction<R: Rng>(rng: &mut R, slots: Vec<Slot>) -> Transaction {
@ -3804,8 +3757,8 @@ mod tests {
#[test]
fn test_push_votes_with_tower() {
let get_vote_slots = |cluster_info: &ClusterInfo, now| -> Vec<Slot> {
let (labels, _, _) = cluster_info.get_votes(now);
let get_vote_slots = |cluster_info: &ClusterInfo| -> Vec<Slot> {
let (labels, _) = cluster_info.get_votes(&mut Cursor::default());
let gossip = cluster_info.gossip.read().unwrap();
let mut vote_slots = HashSet::new();
for label in labels {
@ -3819,7 +3772,6 @@ mod tests {
vote_slots.into_iter().collect()
};
let mut rng = rand::thread_rng();
let now = timestamp();
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
@ -3830,7 +3782,7 @@ mod tests {
let vote = new_vote_transaction(&mut rng, vec![slot]);
cluster_info.push_vote(&tower, vote);
}
let vote_slots = get_vote_slots(&cluster_info, now);
let vote_slots = get_vote_slots(&cluster_info);
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
for vote_slot in vote_slots {
assert!(vote_slot < MAX_LOCKOUT_HISTORY as u64);
@ -3841,7 +3793,7 @@ mod tests {
tower.remove(23);
let vote = new_vote_transaction(&mut rng, vec![slot]);
cluster_info.push_vote(&tower, vote);
let vote_slots = get_vote_slots(&cluster_info, now);
let vote_slots = get_vote_slots(&cluster_info);
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
for vote_slot in vote_slots {
assert!(vote_slot <= slot);
@ -3855,7 +3807,7 @@ mod tests {
tower.remove(5);
let vote = new_vote_transaction(&mut rng, vec![slot]);
cluster_info.push_vote(&tower, vote);
let vote_slots = get_vote_slots(&cluster_info, now);
let vote_slots = get_vote_slots(&cluster_info);
assert_eq!(vote_slots.len(), MAX_LOCKOUT_HISTORY);
for vote_slot in vote_slots {
assert!(vote_slot <= slot);
@ -3869,23 +3821,17 @@ mod tests {
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let (slots, since) = cluster_info.get_epoch_slots_since(0);
let slots = cluster_info.get_epoch_slots(&mut Cursor::default());
assert!(slots.is_empty());
assert!(since.is_none());
cluster_info.push_epoch_slots(&[0]);
cluster_info.flush_push_queue();
let (slots, since) = cluster_info.get_epoch_slots_since(std::u64::MAX);
assert!(slots.is_empty());
assert_eq!(since, None);
let (slots, since) = cluster_info.get_epoch_slots_since(0);
let mut cursor = Cursor::default();
let slots = cluster_info.get_epoch_slots(&mut cursor);
assert_eq!(slots.len(), 1);
assert!(since.is_some());
let (slots, since2) = cluster_info.get_epoch_slots_since(since.unwrap() + 1);
let slots = cluster_info.get_epoch_slots(&mut cursor);
assert!(slots.is_empty());
assert_eq!(since2, None);
}
#[test]
@ -4228,10 +4174,9 @@ mod tests {
cluster_info.flush_push_queue();
cluster_info.push_epoch_slots(&range[16000..]);
cluster_info.flush_push_queue();
let (slots, since) = cluster_info.get_epoch_slots_since(0);
let slots = cluster_info.get_epoch_slots(&mut Cursor::default());
let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect();
assert_eq!(slots, range);
assert!(since.is_some());
}
#[test]

View File

@ -1,5 +1,6 @@
use crate::{
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
crds::Cursor,
crds_value::CrdsValueLabel,
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
@ -326,23 +327,18 @@ impl ClusterInfoVoteListener {
verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender,
verified_vote_transactions_sender: VerifiedVoteTransactionsSender,
) -> Result<()> {
let mut last_ts = 0;
loop {
if exit.load(Ordering::Relaxed) {
return Ok(());
}
let (labels, votes, new_ts) = cluster_info.get_votes(last_ts);
let mut cursor = Cursor::default();
while !exit.load(Ordering::Relaxed) {
let (labels, votes) = cluster_info.get_votes(&mut cursor);
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
last_ts = new_ts;
if !votes.is_empty() {
let (vote_txs, packets) = Self::verify_votes(votes, labels);
verified_vote_transactions_sender.send(vote_txs)?;
verified_vote_label_packets_sender.send(packets)?;
}
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
}
Ok(())
}
#[allow(clippy::type_complexity)]

View File

@ -1,5 +1,5 @@
use crate::{
cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots,
cluster_info::ClusterInfo, contact_info::ContactInfo, crds::Cursor, epoch_slots::EpochSlots,
serve_repair::RepairType,
};
use itertools::Itertools;
@ -7,10 +7,7 @@ use solana_runtime::{bank_forks::BankForks, epoch_stakes::NodeIdToVoteAccounts};
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
sync::{Arc, Mutex, RwLock},
};
// Limit the size of cluster-slots map in case
@ -22,22 +19,26 @@ pub type SlotPubkeys = HashMap<Pubkey, u64>;
#[derive(Default)]
pub struct ClusterSlots {
cluster_slots: RwLock<BTreeMap<Slot, Arc<RwLock<SlotPubkeys>>>>,
since: AtomicU64,
validator_stakes: RwLock<Arc<NodeIdToVoteAccounts>>,
epoch: RwLock<Option<u64>>,
cursor: Mutex<Cursor>,
}
impl ClusterSlots {
pub fn lookup(&self, slot: Slot) -> Option<Arc<RwLock<SlotPubkeys>>> {
self.cluster_slots.read().unwrap().get(&slot).cloned()
}
pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
self.update_peers(bank_forks);
let since = self.since.load(Ordering::Relaxed);
let (epoch_slots, since) = cluster_info.get_epoch_slots_since(since);
self.update_internal(root, epoch_slots, since);
let epoch_slots = {
let mut cursor = self.cursor.lock().unwrap();
cluster_info.get_epoch_slots(&mut cursor)
};
self.update_internal(root, epoch_slots);
}
fn update_internal(&self, root: Slot, epoch_slots_list: Vec<EpochSlots>, since: Option<u64>) {
fn update_internal(&self, root: Slot, epoch_slots_list: Vec<EpochSlots>) {
// Attach validator's total stake.
let epoch_slots_list: Vec<_> = {
let validator_stakes = self.validator_stakes.read().unwrap();
@ -86,9 +87,6 @@ impl ClusterSlots {
cluster_slots.split_off(&key);
}
}
if let Some(since) = since {
self.since.store(since + 1, Ordering::Relaxed);
}
}
pub fn collect(&self, id: &Pubkey) -> HashSet<Slot> {
@ -206,23 +204,20 @@ mod tests {
fn test_default() {
let cs = ClusterSlots::default();
assert!(cs.cluster_slots.read().unwrap().is_empty());
assert_eq!(cs.since.load(Ordering::Relaxed), 0);
}
#[test]
fn test_update_noop() {
let cs = ClusterSlots::default();
cs.update_internal(0, vec![], None);
cs.update_internal(0, vec![]);
assert!(cs.cluster_slots.read().unwrap().is_empty());
assert_eq!(cs.since.load(Ordering::Relaxed), 0);
}
#[test]
fn test_update_empty() {
let cs = ClusterSlots::default();
let epoch_slot = EpochSlots::default();
cs.update_internal(0, vec![epoch_slot], Some(0));
assert_eq!(cs.since.load(Ordering::Relaxed), 1);
cs.update_internal(0, vec![epoch_slot]);
assert!(cs.lookup(0).is_none());
}
@ -232,8 +227,7 @@ mod tests {
let cs = ClusterSlots::default();
let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[0], 0);
cs.update_internal(0, vec![epoch_slot], Some(0));
assert_eq!(cs.since.load(Ordering::Relaxed), 1);
cs.update_internal(0, vec![epoch_slot]);
assert!(cs.lookup(0).is_none());
}
@ -242,8 +236,7 @@ mod tests {
let cs = ClusterSlots::default();
let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0);
cs.update_internal(0, vec![epoch_slot], Some(0));
assert_eq!(cs.since.load(Ordering::Relaxed), 1);
cs.update_internal(0, vec![epoch_slot]);
assert!(cs.lookup(0).is_none());
assert!(cs.lookup(1).is_some());
assert_eq!(
@ -373,7 +366,7 @@ mod tests {
);
*cs.validator_stakes.write().unwrap() = map;
cs.update_internal(0, vec![epoch_slot], None);
cs.update_internal(0, vec![epoch_slot]);
assert!(cs.lookup(1).is_some());
assert_eq!(
cs.lookup(1)
@ -390,7 +383,7 @@ mod tests {
let cs = ClusterSlots::default();
let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0);
cs.update_internal(0, vec![epoch_slot], None);
cs.update_internal(0, vec![epoch_slot]);
let self_id = solana_sdk::pubkey::new_rand();
assert_eq!(
cs.generate_repairs_for_missing_slots(&self_id, 0),
@ -404,7 +397,7 @@ mod tests {
let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0);
let self_id = epoch_slot.from;
cs.update_internal(0, vec![epoch_slot], None);
cs.update_internal(0, vec![epoch_slot]);
let slots: Vec<Slot> = cs.collect(&self_id).into_iter().collect();
assert_eq!(slots, vec![1]);
}
@ -415,7 +408,7 @@ mod tests {
let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0);
let self_id = epoch_slot.from;
cs.update_internal(0, vec![epoch_slot], None);
cs.update_internal(0, vec![epoch_slot]);
assert!(cs
.generate_repairs_for_missing_slots(&self_id, 0)
.is_empty());

View File

@ -185,19 +185,21 @@ impl ClusterSlotsService {
#[cfg(test)]
mod test {
use super::*;
use crate::cluster_info::Node;
use crate::{cluster_info::Node, crds_value::CrdsValueLabel};
#[test]
pub fn test_update_lowest_slot() {
let node_info = Node::new_localhost_with_pubkey(&Pubkey::default());
let pubkey = Pubkey::new_unique();
let node_info = Node::new_localhost_with_pubkey(&pubkey);
let cluster_info = ClusterInfo::new_with_invalid_keypair(node_info.info);
ClusterSlotsService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info);
ClusterSlotsService::update_lowest_slot(&pubkey, 5, &cluster_info);
cluster_info.flush_push_queue();
let lowest = cluster_info
.get_lowest_slot_for_node(&Pubkey::default(), None, |lowest_slot, _| {
lowest_slot.clone()
})
.unwrap();
let lowest = {
let label = CrdsValueLabel::LowestSlot(pubkey);
let gossip = cluster_info.gossip.read().unwrap();
let entry = gossip.crds.get(&label).unwrap();
entry.value.lowest_slot().unwrap().clone()
};
assert_eq!(lowest.lowest, 5);
}
}

View File

@ -35,7 +35,7 @@ use solana_sdk::hash::{hash, Hash};
use solana_sdk::pubkey::Pubkey;
use std::{
cmp::Ordering,
collections::{hash_map, BTreeSet, HashMap},
collections::{hash_map, BTreeMap, HashMap},
ops::{Bound, Index, IndexMut},
};
@ -48,12 +48,12 @@ const MAX_CRDS_VALUES_PER_PUBKEY: usize = 32;
pub struct Crds {
/// Stores the map of labels and values
table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
pub num_inserts: usize, // Only used in tests.
cursor: Cursor, // Next insert ordinal location.
shards: CrdsShards,
nodes: IndexSet<usize>, // Indices of nodes' ContactInfo.
votes: IndexSet<usize>, // Indices of Vote crds values.
// Indices of EpochSlots crds values ordered by insert timestamp.
epoch_slots: BTreeSet<(u64 /*insert timestamp*/, usize)>,
// Indices of EpochSlots keyed by insert order.
epoch_slots: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>,
}
@ -71,21 +71,36 @@ pub enum CrdsError {
/// stored in the Crds
#[derive(PartialEq, Debug, Clone)]
pub struct VersionedCrdsValue {
/// Ordinal index indicating insert order.
ordinal: u64,
pub value: CrdsValue,
/// local time when inserted
pub(crate) insert_timestamp: u64,
/// local time when updated
pub(crate) local_timestamp: u64,
/// value hash
pub(crate) value_hash: Hash,
}
#[derive(Clone, Copy, Default)]
pub struct Cursor(u64);
impl Cursor {
fn ordinal(&self) -> u64 {
self.0
}
// Updates the cursor position given the ordinal index of value consumed.
#[inline]
fn consume(&mut self, ordinal: u64) {
self.0 = self.0.max(ordinal + 1);
}
}
impl VersionedCrdsValue {
fn new(local_timestamp: u64, value: CrdsValue) -> Self {
fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64) -> Self {
let value_hash = hash(&serialize(&value).unwrap());
VersionedCrdsValue {
ordinal: cursor.ordinal(),
value,
insert_timestamp: local_timestamp,
local_timestamp,
value_hash,
}
@ -96,11 +111,11 @@ impl Default for Crds {
fn default() -> Self {
Crds {
table: IndexMap::default(),
num_inserts: 0,
cursor: Cursor::default(),
shards: CrdsShards::new(CRDS_SHARDS_BITS),
nodes: IndexSet::default(),
votes: IndexSet::default(),
epoch_slots: BTreeSet::default(),
epoch_slots: BTreeMap::default(),
records: HashMap::default(),
}
}
@ -140,7 +155,7 @@ impl Crds {
local_timestamp: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
let label = value.label();
let value = VersionedCrdsValue::new(local_timestamp, value);
let value = VersionedCrdsValue::new(value, self.cursor, local_timestamp);
match self.table.entry(label) {
Entry::Vacant(entry) => {
let entry_index = entry.index();
@ -153,8 +168,7 @@ impl Crds {
self.votes.insert(entry_index);
}
CrdsData::EpochSlots(_, _) => {
self.epoch_slots
.insert((value.insert_timestamp, entry_index));
self.epoch_slots.insert(value.ordinal, entry_index);
}
_ => (),
};
@ -162,8 +176,8 @@ impl Crds {
.entry(value.value.pubkey())
.or_default()
.insert(entry_index);
self.cursor.consume(value.ordinal);
entry.insert(value);
self.num_inserts += 1;
Ok(None)
}
Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => {
@ -171,15 +185,13 @@ impl Crds {
self.shards.remove(entry_index, entry.get());
self.shards.insert(entry_index, &value);
if let CrdsData::EpochSlots(_, _) = value.value.data {
self.epoch_slots
.remove(&(entry.get().insert_timestamp, entry_index));
self.epoch_slots
.insert((value.insert_timestamp, entry_index));
self.epoch_slots.remove(&entry.get().ordinal);
self.epoch_slots.insert(value.ordinal, entry_index);
}
self.num_inserts += 1;
// As long as the pubkey does not change, self.records
// does not need to be updated.
debug_assert_eq!(entry.get().value.pubkey(), value.value.pubkey());
self.cursor.consume(value.ordinal);
Ok(Some(entry.insert(value)))
}
_ => {
@ -228,20 +240,35 @@ impl Crds {
})
}
/// Returns all entries which are Vote.
pub(crate) fn get_votes(&self) -> impl Iterator<Item = &VersionedCrdsValue> {
self.votes.iter().map(move |i| self.table.index(*i))
/// Returns all vote entries inserted since the given cursor.
/// Updates the cursor as the votes are consumed.
pub(crate) fn get_votes<'a>(
&'a self,
cursor: &'a mut Cursor,
) -> impl Iterator<Item = &'a VersionedCrdsValue> {
let since = cursor.ordinal();
self.votes.iter().filter_map(move |i| {
let entry = self.table.index(*i);
if entry.ordinal >= since {
cursor.consume(entry.ordinal);
Some(entry)
} else {
None
}
})
}
/// Returns epoch-slots inserted since (or at) the given timestamp.
pub(crate) fn get_epoch_slots_since(
&self,
timestamp: u64,
) -> impl Iterator<Item = &VersionedCrdsValue> {
let range = (Bound::Included((timestamp, 0)), Bound::Unbounded);
self.epoch_slots
.range(range)
.map(move |(_, i)| self.table.index(*i))
/// Returns epoch-slots inserted since the given cursor.
/// Updates the cursor as the values are consumed.
pub(crate) fn get_epoch_slots<'a>(
&'a self,
cursor: &'a mut Cursor,
) -> impl Iterator<Item = &'a VersionedCrdsValue> {
let range = (Bound::Included(cursor.ordinal()), Bound::Unbounded);
self.epoch_slots.range(range).map(move |(ordinal, index)| {
cursor.consume(*ordinal);
self.table.index(*index)
})
}
/// Returns all records associated with a pubkey.
@ -386,7 +413,7 @@ impl Crds {
self.votes.swap_remove(&index);
}
CrdsData::EpochSlots(_, _) => {
self.epoch_slots.remove(&(value.insert_timestamp, index));
self.epoch_slots.remove(&value.ordinal);
}
_ => (),
}
@ -420,8 +447,7 @@ impl Crds {
self.votes.insert(index);
}
CrdsData::EpochSlots(_, _) => {
self.epoch_slots.remove(&(value.insert_timestamp, size));
self.epoch_slots.insert((value.insert_timestamp, index));
self.epoch_slots.insert(value.ordinal, index);
}
_ => (),
};
@ -549,8 +575,7 @@ mod test {
0,
)));
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
assert_eq!(crds.table[&val.label()].insert_timestamp, 0);
assert_eq!(crds.table[&val.label()].ordinal, 0);
let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(val2.label().pubkey(), val.label().pubkey());
@ -558,20 +583,20 @@ mod test {
crds.update_record_timestamp(&val.label().pubkey(), 2);
assert_eq!(crds.table[&val.label()].local_timestamp, 2);
assert_eq!(crds.table[&val.label()].insert_timestamp, 0);
assert_eq!(crds.table[&val.label()].ordinal, 1);
assert_eq!(crds.table[&val2.label()].local_timestamp, 2);
assert_eq!(crds.table[&val2.label()].insert_timestamp, 0);
assert_eq!(crds.table[&val2.label()].ordinal, 1);
crds.update_record_timestamp(&val.label().pubkey(), 1);
assert_eq!(crds.table[&val.label()].local_timestamp, 2);
assert_eq!(crds.table[&val.label()].insert_timestamp, 0);
assert_eq!(crds.table[&val.label()].ordinal, 1);
let mut ci = ContactInfo::default();
ci.wallclock += 1;
let val3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
assert_matches!(crds.insert(val3, 3), Ok(Some(_)));
assert_eq!(crds.table[&val2.label()].local_timestamp, 3);
assert_eq!(crds.table[&val2.label()].insert_timestamp, 3);
assert_eq!(crds.table[&val2.label()].ordinal, 2);
}
#[test]
fn test_find_old_records_default() {
@ -729,7 +754,7 @@ mod test {
Err(_) => (),
}
}
assert_eq!(num_inserts, crds.num_inserts);
assert_eq!(num_inserts, crds.cursor.0 as usize);
assert!(num_inserts > 700);
assert!(num_overrides > 500);
assert!(crds.table.len() > 200);
@ -744,63 +769,108 @@ mod test {
}
}
fn check_crds_value_indices<R: rand::Rng>(
rng: &mut R,
crds: &Crds,
) -> (
usize, // number of nodes
usize, // number of votes
usize, // number of epoch slots
) {
let size = crds.table.len();
let since = if size == 0 || rng.gen() {
rng.gen_range(0, crds.cursor.0 + 1)
} else {
crds.table[rng.gen_range(0, size)].ordinal
};
let num_epoch_slots = crds
.table
.values()
.filter(|v| v.ordinal >= since)
.filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _)))
.count();
let mut cursor = Cursor(since);
assert_eq!(num_epoch_slots, crds.get_epoch_slots(&mut cursor).count());
assert_eq!(
cursor.0,
crds.epoch_slots
.iter()
.last()
.map(|(k, _)| k + 1)
.unwrap_or_default()
.max(since)
);
for value in crds.get_epoch_slots(&mut Cursor(since)) {
assert!(value.ordinal >= since);
match value.value.data {
CrdsData::EpochSlots(_, _) => (),
_ => panic!("not an epoch-slot!"),
}
}
let num_votes = crds
.table
.values()
.filter(|v| v.ordinal >= since)
.filter(|v| matches!(v.value.data, CrdsData::Vote(_, _)))
.count();
let mut cursor = Cursor(since);
assert_eq!(num_votes, crds.get_votes(&mut cursor).count());
assert_eq!(
cursor.0,
crds.table
.values()
.filter(|v| matches!(v.value.data, CrdsData::Vote(_, _)))
.map(|v| v.ordinal)
.max()
.map(|k| k + 1)
.unwrap_or_default()
.max(since)
);
for value in crds.get_votes(&mut Cursor(since)) {
assert!(value.ordinal >= since);
match value.value.data {
CrdsData::Vote(_, _) => (),
_ => panic!("not a vote!"),
}
}
let num_nodes = crds
.table
.values()
.filter(|v| matches!(v.value.data, CrdsData::ContactInfo(_)))
.count();
let num_votes = crds
.table
.values()
.filter(|v| matches!(v.value.data, CrdsData::Vote(_, _)))
.count();
let num_epoch_slots = crds
.table
.values()
.filter(|v| matches!(v.value.data, CrdsData::EpochSlots(_, _)))
.count();
assert_eq!(num_nodes, crds.get_nodes_contact_info().count());
assert_eq!(num_votes, crds.get_votes(&mut Cursor::default()).count());
assert_eq!(
num_epoch_slots,
crds.get_epoch_slots(&mut Cursor::default()).count()
);
for vote in crds.get_votes(&mut Cursor::default()) {
match vote.value.data {
CrdsData::Vote(_, _) => (),
_ => panic!("not a vote!"),
}
}
for epoch_slots in crds.get_epoch_slots(&mut Cursor::default()) {
match epoch_slots.value.data {
CrdsData::EpochSlots(_, _) => (),
_ => panic!("not an epoch-slot!"),
}
}
(num_nodes, num_votes, num_epoch_slots)
}
#[test]
fn test_crds_value_indices() {
fn check_crds_value_indices<R: rand::Rng>(
rng: &mut R,
crds: &Crds,
) -> (usize, usize, usize) {
if !crds.table.is_empty() {
let since = crds.table[rng.gen_range(0, crds.table.len())].insert_timestamp;
let num_epoch_slots = crds
.table
.values()
.filter(|value| {
value.insert_timestamp >= since
&& matches!(value.value.data, CrdsData::EpochSlots(_, _))
})
.count();
assert_eq!(num_epoch_slots, crds.get_epoch_slots_since(since).count());
for value in crds.get_epoch_slots_since(since) {
assert!(value.insert_timestamp >= since);
match value.value.data {
CrdsData::EpochSlots(_, _) => (),
_ => panic!("not an epoch-slot!"),
}
}
}
let num_nodes = crds
.table
.values()
.filter(|value| matches!(value.value.data, CrdsData::ContactInfo(_)))
.count();
let num_votes = crds
.table
.values()
.filter(|value| matches!(value.value.data, CrdsData::Vote(_, _)))
.count();
let num_epoch_slots = crds
.table
.values()
.filter(|value| matches!(value.value.data, CrdsData::EpochSlots(_, _)))
.count();
assert_eq!(num_nodes, crds.get_nodes_contact_info().count());
assert_eq!(num_votes, crds.get_votes().count());
assert_eq!(num_epoch_slots, crds.get_epoch_slots_since(0).count());
for vote in crds.get_votes() {
match vote.value.data {
CrdsData::Vote(_, _) => (),
_ => panic!("not a vote!"),
}
}
for epoch_slots in crds.get_epoch_slots_since(0) {
match epoch_slots.value.data {
CrdsData::EpochSlots(_, _) => (),
_ => panic!("not an epoch-slot!"),
}
}
(num_nodes, num_votes, num_epoch_slots)
}
let mut rng = thread_rng();
let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect();
let mut crds = Crds::default();
@ -820,11 +890,11 @@ mod test {
}
Err(_) => (),
}
if k % 64 == 0 {
if k % 16 == 0 {
check_crds_value_indices(&mut rng, &crds);
}
}
assert_eq!(num_inserts, crds.num_inserts);
assert_eq!(num_inserts, crds.cursor.0 as usize);
assert!(num_inserts > 700);
assert!(num_overrides > 500);
assert!(crds.table.len() > 200);
@ -843,7 +913,7 @@ mod test {
let index = rng.gen_range(0, crds.table.len());
let key = crds.table.get_index(index).unwrap().0.clone();
crds.remove(&key);
if crds.table.len() % 64 == 0 {
if crds.table.len() % 16 == 0 {
check_crds_value_indices(&mut rng, &crds);
}
}
@ -963,8 +1033,8 @@ mod test {
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_equal() {
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let v1 = VersionedCrdsValue::new(1, val.clone());
let v2 = VersionedCrdsValue::new(1, val);
let v1 = VersionedCrdsValue::new(val.clone(), Cursor::default(), 1);
let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1);
assert_eq!(v1, v2);
assert!(!(v1 != v2));
assert!(!overrides(&v1.value, &v2));
@ -974,17 +1044,22 @@ mod test {
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_hash_order() {
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
))),
Cursor::default(),
1, // local_timestamp
);
let v2 = VersionedCrdsValue::new(
{
let mut contact_info = ContactInfo::new_localhost(&Pubkey::default(), 0);
contact_info.rpc = socketaddr!("0.0.0.0:0");
CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info))
},
Cursor::default(),
1, // local_timestamp
);
let v2 = VersionedCrdsValue::new(1, {
let mut contact_info = ContactInfo::new_localhost(&Pubkey::default(), 0);
contact_info.rpc = socketaddr!("0.0.0.0:0");
CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info))
});
assert_eq!(v1.value.label(), v2.value.label());
assert_eq!(v1.value.wallclock(), v2.value.wallclock());
@ -1003,18 +1078,20 @@ mod test {
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_wallclock_order() {
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
1,
))),
Cursor::default(),
1, // local_timestamp
);
let v2 = VersionedCrdsValue::new(
1,
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
))),
Cursor::default(),
1, // local_timestamp
);
assert_eq!(v1.value.label(), v2.value.label());
assert!(overrides(&v1.value, &v2));
@ -1027,18 +1104,20 @@ mod test {
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_label_order() {
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
))),
Cursor::default(),
1, // local_timestamp
);
let v2 = VersionedCrdsValue::new(
1,
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
))),
Cursor::default(),
1, // local_timestamp
);
assert_ne!(v1, v2);
assert!(!(v1 == v2));

View File

@ -1240,13 +1240,6 @@ mod test {
);
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
assert!(dest_crds.lookup(&caller.label()).is_some());
assert_eq!(
dest_crds
.lookup_versioned(&caller.label())
.unwrap()
.insert_timestamp,
1
);
assert_eq!(
dest_crds
.lookup_versioned(&caller.label())

View File

@ -2509,6 +2509,7 @@ pub(crate) mod tests {
cluster_info::Node,
consensus::test::{initialize_state, VoteSimulator},
consensus::Tower,
crds::Cursor,
optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
progress_map::ValidatorStakeInfo,
replay_stage::ReplayStage,
@ -4800,7 +4801,8 @@ pub(crate) mod tests {
&mut voted_signatures,
has_new_vote_been_rooted,
);
let (_, votes, max_ts) = cluster_info.get_votes(0);
let mut cursor = Cursor::default();
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
let vote_tx = &votes[0];
assert_eq!(vote_tx.message.recent_blockhash, bank0.last_blockhash());
@ -4829,7 +4831,7 @@ pub(crate) mod tests {
);
// No new votes have been submitted to gossip
let (_, votes, _max_ts) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert!(votes.is_empty());
// Tower's latest vote tx blockhash hasn't changed either
assert_eq!(tower.last_vote_tx_blockhash(), bank0.last_blockhash());
@ -4850,7 +4852,7 @@ pub(crate) mod tests {
&mut voted_signatures,
has_new_vote_been_rooted,
);
let (_, votes, max_ts) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
let vote_tx = &votes[0];
assert_eq!(vote_tx.message.recent_blockhash, bank1.last_blockhash());
@ -4872,7 +4874,7 @@ pub(crate) mod tests {
&mut last_vote_refresh_time,
);
// No new votes have been submitted to gossip
let (_, votes, max_ts) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert!(votes.is_empty());
assert_eq!(tower.last_vote_tx_blockhash(), bank1.last_blockhash());
assert_eq!(tower.last_voted_slot().unwrap(), 1);
@ -4908,7 +4910,7 @@ pub(crate) mod tests {
&mut last_vote_refresh_time,
);
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
let (_, votes, max_ts) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert_eq!(votes.len(), 1);
let vote_tx = &votes[0];
assert_eq!(
@ -4963,7 +4965,7 @@ pub(crate) mod tests {
has_new_vote_been_rooted,
&mut last_vote_refresh_time,
);
let (_, votes, _max_ts) = cluster_info.get_votes(max_ts);
let (_, votes) = cluster_info.get_votes(&mut cursor);
assert!(votes.is_empty());
assert_eq!(
vote_tx.message.recent_blockhash,

View File

@ -3,8 +3,11 @@
extern crate log;
use rayon::iter::*;
use solana_core::cluster_info::{ClusterInfo, Node};
use solana_core::gossip_service::GossipService;
use solana_core::{
cluster_info::{ClusterInfo, Node},
crds::Cursor,
gossip_service::GossipService,
};
use solana_runtime::bank_forks::BankForks;
use solana_perf::packet::Packet;
@ -305,12 +308,11 @@ pub fn cluster_info_scale() {
let mut num_push_total = 0;
let mut num_pushes = 0;
let mut num_pulls = 0;
let mut num_inserts = 0;
for node in nodes.iter() {
//if node.0.get_votes(0).1.len() != (num_nodes * num_votes) {
let has_tx = node
.0
.get_votes(0)
.get_votes(&mut Cursor::default())
.1
.iter()
.filter(|v| v.message.account_keys == tx.message.account_keys)
@ -319,7 +321,6 @@ pub fn cluster_info_scale() {
num_push_total += node.0.gossip.read().unwrap().push.num_total;
num_pushes += node.0.gossip.read().unwrap().push.num_pushes;
num_pulls += node.0.gossip.read().unwrap().pull.num_pulls;
num_inserts += node.0.gossip.read().unwrap().crds.num_inserts;
if has_tx == 0 {
not_done += 1;
}
@ -329,7 +330,6 @@ pub fn cluster_info_scale() {
warn!("num_push_total: {}", num_push_total);
warn!("num_pushes: {}", num_pushes);
warn!("num_pulls: {}", num_pulls);
warn!("num_inserts: {}", num_inserts);
success = not_done < (nodes.len() / 20);
if success {
break;
@ -347,7 +347,6 @@ pub fn cluster_info_scale() {
node.0.gossip.write().unwrap().push.num_total = 0;
node.0.gossip.write().unwrap().push.num_pushes = 0;
node.0.gossip.write().unwrap().pull.num_pulls = 0;
node.0.gossip.write().unwrap().crds.num_inserts = 0;
}
}