indexes epoch slots in crds table (#15459)

ClusterInfo::get_epoch_slots_since scans the entire crds table to obtain
epoch-slots inserted since a timestamp:
https://github.com/solana-labs/solana/blob/013daa8f4/core/src/cluster_info.rs#L1245-L1262
The alternative is to index epoch-slots in crds table ordered by their
insert timestamp.
This commit is contained in:
behzad nouri 2021-02-26 14:12:04 +00:00 committed by GitHub
parent d5f4058968
commit 5a9896706c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 184 additions and 60 deletions

View File

@ -113,7 +113,7 @@ const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640);
pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000;
pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000;
/// Minimum serialized size of a Protocol::PullResponse packet.
const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 167;
const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161;
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
@ -1242,23 +1242,30 @@ impl ClusterInfo {
.map(|x| map(x.value.lowest_slot().unwrap(), x.insert_timestamp))
}
pub fn get_epoch_slots_since(&self, since: Option<u64>) -> (Vec<EpochSlots>, Option<u64>) {
pub fn get_epoch_slots_since(
&self,
timestamp: u64,
) -> (
Vec<EpochSlots>,
Option<u64>, // Most recent insert timestmap.
) {
let mut max_ts = 0;
let vals: Vec<_> = self
.gossip
.read()
.unwrap()
.crds
.values()
.filter(|x| {
since
.map(|since| x.insert_timestamp > since)
.unwrap_or(true)
.get_epoch_slots_since(timestamp)
.map(|value| {
max_ts = std::cmp::max(max_ts, value.insert_timestamp);
match &value.value.data {
CrdsData::EpochSlots(_, slots) => slots.clone(),
_ => panic!("this should not happen!"),
}
})
.filter_map(|x| Some((x.value.epoch_slots()?.clone(), x.insert_timestamp)))
.collect();
let max = vals.iter().map(|x| x.1).max().or(since);
let vec = vals.into_iter().map(|x| x.0).collect();
(vec, max)
let max_ts = if vals.is_empty() { None } else { Some(max_ts) };
(vals, max_ts)
}
pub fn get_node_version(&self, pubkey: &Pubkey) -> Option<solana_version::Version> {
@ -3658,7 +3665,11 @@ mod tests {
let crds_values = vec![CrdsValue::new_rand(&mut rng, None)];
let pull_response = Protocol::PullResponse(Pubkey::new_unique(), crds_values);
let size = serialized_size(&pull_response).unwrap();
assert!(PULL_RESPONSE_MIN_SERIALIZED_SIZE as u64 <= size);
assert!(
PULL_RESPONSE_MIN_SERIALIZED_SIZE as u64 <= size,
"pull-response serialized size: {}",
size
);
}
}
@ -3978,23 +3989,23 @@ mod tests {
let keys = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let (slots, since) = cluster_info.get_epoch_slots_since(None);
let (slots, since) = cluster_info.get_epoch_slots_since(0);
assert!(slots.is_empty());
assert!(since.is_none());
cluster_info.push_epoch_slots(&[0]);
cluster_info.flush_push_queue();
let (slots, since) = cluster_info.get_epoch_slots_since(Some(std::u64::MAX));
let (slots, since) = cluster_info.get_epoch_slots_since(std::u64::MAX);
assert!(slots.is_empty());
assert_eq!(since, Some(std::u64::MAX));
assert_eq!(since, None);
let (slots, since) = cluster_info.get_epoch_slots_since(None);
let (slots, since) = cluster_info.get_epoch_slots_since(0);
assert_eq!(slots.len(), 1);
assert!(since.is_some());
let (slots, since2) = cluster_info.get_epoch_slots_since(since);
let (slots, since2) = cluster_info.get_epoch_slots_since(since.unwrap() + 1);
assert!(slots.is_empty());
assert_eq!(since2, since);
assert_eq!(since2, None);
}
#[test]
@ -4327,7 +4338,7 @@ mod tests {
cluster_info.flush_push_queue();
cluster_info.push_epoch_slots(&range[16000..]);
cluster_info.flush_push_queue();
let (slots, since) = cluster_info.get_epoch_slots_since(None);
let (slots, since) = cluster_info.get_epoch_slots_since(0);
let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect();
assert_eq!(slots, range);
assert!(since.is_some());

View File

@ -2,11 +2,15 @@ use crate::{
cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots,
serve_repair::RepairType,
};
use itertools::Itertools;
use solana_runtime::{bank_forks::BankForks, epoch_stakes::NodeIdToVoteAccounts};
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::{Arc, RwLock},
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
};
// Limit the size of cluster-slots map in case
@ -18,10 +22,9 @@ pub type SlotPubkeys = HashMap<Pubkey, u64>;
#[derive(Default)]
pub struct ClusterSlots {
cluster_slots: RwLock<BTreeMap<Slot, Arc<RwLock<SlotPubkeys>>>>,
since: RwLock<Option<u64>>,
since: AtomicU64,
validator_stakes: RwLock<Arc<NodeIdToVoteAccounts>>,
epoch: RwLock<Option<u64>>,
self_id: RwLock<Pubkey>,
}
impl ClusterSlots {
@ -29,20 +32,48 @@ impl ClusterSlots {
self.cluster_slots.read().unwrap().get(&slot).cloned()
}
pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
self.update_peers(cluster_info, bank_forks);
let since = *self.since.read().unwrap();
self.update_peers(bank_forks);
let since = self.since.load(Ordering::Relaxed);
let (epoch_slots, since) = cluster_info.get_epoch_slots_since(since);
self.update_internal(root, epoch_slots, since);
}
fn update_internal(&self, root: Slot, epoch_slots_list: Vec<EpochSlots>, since: Option<u64>) {
for epoch_slots in epoch_slots_list {
let slots = epoch_slots.to_slots(root);
for slot in &slots {
if *slot <= root {
continue;
}
self.insert_node_id(*slot, epoch_slots.from);
}
// Attach validator's total stake.
let epoch_slots_list: Vec<_> = {
let validator_stakes = self.validator_stakes.read().unwrap();
epoch_slots_list
.into_iter()
.map(|epoch_slots| {
let stake = match validator_stakes.get(&epoch_slots.from) {
Some(v) => v.total_stake,
None => 0,
};
(epoch_slots, stake)
})
.collect()
};
let slot_nodes_stakes = epoch_slots_list
.into_iter()
.flat_map(|(epoch_slots, stake)| {
epoch_slots
.to_slots(root)
.into_iter()
.filter(|slot| *slot > root)
.zip(std::iter::repeat((epoch_slots.from, stake)))
})
.into_group_map();
let slot_nodes_stakes: Vec<_> = {
let mut cluster_slots = self.cluster_slots.write().unwrap();
slot_nodes_stakes
.into_iter()
.map(|(slot, nodes_stakes)| {
let slot_nodes = cluster_slots.entry(slot).or_default().clone();
(slot_nodes, nodes_stakes)
})
.collect()
};
for (slot_nodes, nodes_stakes) in slot_nodes_stakes {
slot_nodes.write().unwrap().extend(nodes_stakes);
}
{
let mut cluster_slots = self.cluster_slots.write().unwrap();
@ -54,7 +85,9 @@ impl ClusterSlots {
cluster_slots.split_off(&key);
}
}
*self.since.write().unwrap() = since;
if let Some(since) = since {
self.since.store(since + 1, Ordering::Relaxed);
}
}
pub fn collect(&self, id: &Pubkey) -> HashSet<Slot> {
@ -67,7 +100,8 @@ impl ClusterSlots {
.collect()
}
pub fn insert_node_id(&self, slot: Slot, node_id: Pubkey) {
#[cfg(test)]
pub(crate) fn insert_node_id(&self, slot: Slot, node_id: Pubkey) {
let balance = self
.validator_stakes
.read()
@ -85,7 +119,7 @@ impl ClusterSlots {
slot_pubkeys.write().unwrap().insert(node_id, balance);
}
fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
fn update_peers(&self, bank_forks: &RwLock<BankForks>) {
let root_bank = bank_forks.read().unwrap().root_bank();
let root_epoch = root_bank.epoch();
let my_epoch = *self.epoch.read().unwrap();
@ -93,16 +127,11 @@ impl ClusterSlots {
if Some(root_epoch) != my_epoch {
let validator_stakes = root_bank
.epoch_stakes(root_epoch)
.expect(
"Bank must have epoch stakes
for its own epoch",
)
.expect("Bank must have epoch stakes for its own epoch")
.node_id_to_vote_accounts()
.clone();
*self.validator_stakes.write().unwrap() = validator_stakes;
let id = cluster_info.id();
*self.self_id.write().unwrap() = id;
*self.epoch.write().unwrap() = Some(root_epoch);
}
}
@ -177,7 +206,7 @@ mod tests {
fn test_default() {
let cs = ClusterSlots::default();
assert!(cs.cluster_slots.read().unwrap().is_empty());
assert!(cs.since.read().unwrap().is_none());
assert_eq!(cs.since.load(Ordering::Relaxed), 0);
}
#[test]
@ -185,7 +214,7 @@ mod tests {
let cs = ClusterSlots::default();
cs.update_internal(0, vec![], None);
assert!(cs.cluster_slots.read().unwrap().is_empty());
assert!(cs.since.read().unwrap().is_none());
assert_eq!(cs.since.load(Ordering::Relaxed), 0);
}
#[test]
@ -193,7 +222,7 @@ mod tests {
let cs = ClusterSlots::default();
let epoch_slot = EpochSlots::default();
cs.update_internal(0, vec![epoch_slot], Some(0));
assert_eq!(*cs.since.read().unwrap(), Some(0));
assert_eq!(cs.since.load(Ordering::Relaxed), 1);
assert!(cs.lookup(0).is_none());
}
@ -204,7 +233,7 @@ mod tests {
let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[0], 0);
cs.update_internal(0, vec![epoch_slot], Some(0));
assert_eq!(*cs.since.read().unwrap(), Some(0));
assert_eq!(cs.since.load(Ordering::Relaxed), 1);
assert!(cs.lookup(0).is_none());
}
@ -214,7 +243,7 @@ mod tests {
let mut epoch_slot = EpochSlots::default();
epoch_slot.fill(&[1], 0);
cs.update_internal(0, vec![epoch_slot], Some(0));
assert_eq!(*cs.since.read().unwrap(), Some(0));
assert_eq!(cs.since.load(Ordering::Relaxed), 1);
assert!(cs.lookup(0).is_none());
assert!(cs.lookup(1).is_some());
assert_eq!(

View File

@ -36,8 +36,8 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::timing::timestamp;
use std::cmp;
use std::collections::{hash_map, HashMap};
use std::ops::{Index, IndexMut};
use std::collections::{hash_map, BTreeSet, HashMap};
use std::ops::{Bound, Index, IndexMut};
const CRDS_SHARDS_BITS: u32 = 8;
// Limit number of crds values associated with each unique pubkey. This
@ -52,6 +52,8 @@ pub struct Crds {
shards: CrdsShards,
nodes: IndexSet<usize>, // Indices of nodes' ContactInfo.
votes: IndexSet<usize>, // Indices of Vote crds values.
// Indices of EpochSlots crds values ordered by insert timestamp.
epoch_slots: BTreeSet<(u64 /*insert timestamp*/, usize)>,
// Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>,
}
@ -113,6 +115,7 @@ impl Default for Crds {
shards: CrdsShards::new(CRDS_SHARDS_BITS),
nodes: IndexSet::default(),
votes: IndexSet::default(),
epoch_slots: BTreeSet::default(),
records: HashMap::default(),
}
}
@ -152,6 +155,10 @@ impl Crds {
CrdsData::Vote(_, _) => {
self.votes.insert(entry_index);
}
CrdsData::EpochSlots(_, _) => {
self.epoch_slots
.insert((new_value.insert_timestamp, entry_index));
}
_ => (),
};
self.records
@ -163,9 +170,15 @@ impl Crds {
Ok(None)
}
Entry::Occupied(mut entry) if *entry.get() < new_value => {
let index = entry.index();
self.shards.remove(index, entry.get());
self.shards.insert(index, &new_value);
let entry_index = entry.index();
self.shards.remove(entry_index, entry.get());
self.shards.insert(entry_index, &new_value);
if let CrdsData::EpochSlots(_, _) = new_value.value.data {
self.epoch_slots
.remove(&(entry.get().insert_timestamp, entry_index));
self.epoch_slots
.insert((new_value.insert_timestamp, entry_index));
}
self.num_inserts += 1;
// As long as the pubkey does not change, self.records
// does not need to be updated.
@ -230,6 +243,17 @@ impl Crds {
self.votes.iter().map(move |i| self.table.index(*i))
}
/// Returns epoch-slots inserted since (or at) the given timestamp.
pub(crate) fn get_epoch_slots_since(
&self,
timestamp: u64,
) -> impl Iterator<Item = &VersionedCrdsValue> {
let range = (Bound::Included((timestamp, 0)), Bound::Unbounded);
self.epoch_slots
.range(range)
.map(move |(_, i)| self.table.index(*i))
}
/// Returns all records associated with a pubkey.
pub(crate) fn get_records(&self, pubkey: &Pubkey) -> impl Iterator<Item = &VersionedCrdsValue> {
self.records
@ -354,6 +378,9 @@ impl Crds {
CrdsData::Vote(_, _) => {
self.votes.swap_remove(&index);
}
CrdsData::EpochSlots(_, _) => {
self.epoch_slots.remove(&(value.insert_timestamp, index));
}
_ => (),
}
// Remove the index from records associated with the value's pubkey.
@ -385,6 +412,10 @@ impl Crds {
self.votes.swap_remove(&size);
self.votes.insert(index);
}
CrdsData::EpochSlots(_, _) => {
self.epoch_slots.remove(&(value.insert_timestamp, size));
self.epoch_slots.insert((value.insert_timestamp, index));
}
_ => (),
};
let pubkey = value.value.pubkey();
@ -643,7 +674,27 @@ mod test {
#[test]
fn test_crds_value_indices() {
fn check_crds_value_indices(crds: &Crds) -> (usize, usize) {
fn check_crds_value_indices<R: rand::Rng>(
rng: &mut R,
crds: &Crds,
) -> (usize, usize, usize) {
if !crds.table.is_empty() {
let since = crds.table[rng.gen_range(0, crds.table.len())].insert_timestamp;
let num_epoch_slots = crds
.table
.values()
.filter(|value| value.insert_timestamp >= since)
.filter(|value| matches!(value.value.data, CrdsData::EpochSlots(_, _)))
.count();
assert_eq!(num_epoch_slots, crds.get_epoch_slots_since(since).count());
for value in crds.get_epoch_slots_since(since) {
assert!(value.insert_timestamp >= since);
match value.value.data {
CrdsData::EpochSlots(_, _) => (),
_ => panic!("not an epoch-slot!"),
}
}
}
let num_nodes = crds
.table
.values()
@ -654,15 +705,27 @@ mod test {
.values()
.filter(|value| matches!(value.value.data, CrdsData::Vote(_, _)))
.count();
let num_epoch_slots = crds
.table
.values()
.filter(|value| matches!(value.value.data, CrdsData::EpochSlots(_, _)))
.count();
assert_eq!(num_nodes, crds.get_nodes_contact_info().count());
assert_eq!(num_votes, crds.get_votes().count());
assert_eq!(num_epoch_slots, crds.get_epoch_slots_since(0).count());
for vote in crds.get_votes() {
match vote.value.data {
CrdsData::Vote(_, _) => (),
_ => panic!("not a vote!"),
}
}
(num_nodes, num_votes)
for epoch_slots in crds.get_epoch_slots_since(0) {
match epoch_slots.value.data {
CrdsData::EpochSlots(_, _) => (),
_ => panic!("not an epoch-slot!"),
}
}
(num_nodes, num_votes, num_epoch_slots)
}
let mut rng = thread_rng();
let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect();
@ -683,7 +746,7 @@ mod test {
Err(_) => (),
}
if k % 64 == 0 {
check_crds_value_indices(&crds);
check_crds_value_indices(&mut rng, &crds);
}
}
assert_eq!(num_inserts, crds.num_inserts);
@ -691,17 +754,22 @@ mod test {
assert!(num_overrides > 500);
assert!(crds.table.len() > 200);
assert!(num_inserts > crds.table.len());
let (num_nodes, num_votes) = check_crds_value_indices(&crds);
let (num_nodes, num_votes, num_epoch_slots) = check_crds_value_indices(&mut rng, &crds);
assert!(num_nodes * 3 < crds.table.len());
assert!(num_nodes > 100, "num nodes: {}", num_nodes);
assert!(num_votes > 100, "num votes: {}", num_votes);
assert!(
num_epoch_slots > 100,
"num epoch slots: {}",
num_epoch_slots
);
// Remove values one by one and assert that nodes indices stay valid.
while !crds.table.is_empty() {
let index = rng.gen_range(0, crds.table.len());
let key = crds.table.get_index(index).unwrap().0.clone();
crds.remove(&key);
if crds.table.len() % 64 == 0 {
check_crds_value_indices(&crds);
check_crds_value_indices(&mut rng, &crds);
}
}
}

View File

@ -137,7 +137,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0, 6);
let kind = rng.gen_range(0, 7);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
@ -147,7 +147,11 @@ impl CrdsData {
2 => CrdsData::SnapshotHashes(SnapshotHash::new_rand(rng, pubkey)),
3 => CrdsData::AccountsHashes(SnapshotHash::new_rand(rng, pubkey)),
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
_ => CrdsData::Vote(rng.gen_range(0, MAX_VOTES), Vote::new_rand(rng, pubkey)),
5 => CrdsData::Vote(rng.gen_range(0, MAX_VOTES), Vote::new_rand(rng, pubkey)),
_ => CrdsData::EpochSlots(
rng.gen_range(0, MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
),
}
}
}

View File

@ -1,6 +1,5 @@
use crate::cluster_info::MAX_CRDS_OBJECT_SIZE;
use crate::crds_value::MAX_SLOT;
use crate::crds_value::MAX_WALLCLOCK;
use crate::crds_value::{self, MAX_SLOT, MAX_WALLCLOCK};
use bincode::serialized_size;
use bv::BitVec;
use flate2::{Compress, Compression, Decompress, FlushCompress, FlushDecompress};
@ -316,6 +315,19 @@ impl EpochSlots {
.flatten()
.collect()
}
/// New random EpochSlots for tests and simulations.
pub(crate) fn new_rand<R: rand::Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let now = crds_value::new_rand_timestamp(rng);
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
let mut epoch_slots = Self::new(pubkey, now);
let num_slots = rng.gen_range(0, 20);
let slots: Vec<_> = std::iter::repeat_with(|| 47825632 + rng.gen_range(0, 512))
.take(num_slots)
.collect();
epoch_slots.add(&slots);
epoch_slots
}
}
#[cfg(test)]