indexes votes in crds table (#14272)

This commit is contained in:
behzad nouri 2020-12-27 13:31:05 +00:00 committed by GitHub
parent 49019c6613
commit 2fd38d9912
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 66 deletions

View File

@ -237,10 +237,8 @@ struct GossipStats {
entrypoint2: Counter, entrypoint2: Counter,
gossip_packets_dropped_count: Counter, gossip_packets_dropped_count: Counter,
push_vote_read: Counter, push_vote_read: Counter,
vote_process_push: Counter,
get_votes: Counter, get_votes: Counter,
get_accounts_hash: Counter, get_accounts_hash: Counter,
get_snapshot_hash: Counter,
all_tvu_peers: Counter, all_tvu_peers: Counter,
tvu_peers: Counter, tvu_peers: Counter,
retransmit_peers: Counter, retransmit_peers: Counter,
@ -273,8 +271,6 @@ struct GossipStats {
pull_request_ping_pong_check_failed_count: Counter, pull_request_ping_pong_check_failed_count: Counter,
purge: Counter, purge: Counter,
epoch_slots_lookup: Counter, epoch_slots_lookup: Counter,
epoch_slots_push: Counter,
push_message: Counter,
new_pull_requests: Counter, new_pull_requests: Counter,
new_pull_requests_count: Counter, new_pull_requests_count: Counter,
mark_pull_request: Counter, mark_pull_request: Counter,
@ -1020,35 +1016,21 @@ impl ClusterInfo {
let (labels, txs): (Vec<CrdsValueLabel>, Vec<Transaction>) = self let (labels, txs): (Vec<CrdsValueLabel>, Vec<Transaction>) = self
.time_gossip_read_lock("get_votes", &self.stats.get_votes) .time_gossip_read_lock("get_votes", &self.stats.get_votes)
.crds .crds
.iter() .get_votes()
.filter(|(_, x)| x.insert_timestamp > since) .filter(|vote| vote.insert_timestamp > since)
.filter_map(|(label, x)| { .map(|vote| {
max_ts = std::cmp::max(x.insert_timestamp, max_ts); max_ts = std::cmp::max(vote.insert_timestamp, max_ts);
x.value let transaction = match &vote.value.data {
.vote() CrdsData::Vote(_, vote) => vote.transaction.clone(),
.map(|v| (label.clone(), v.transaction.clone())) _ => panic!("this should not happen!"),
};
(vote.value.label(), transaction)
}) })
.unzip(); .unzip();
inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); inc_new_counter_info!("cluster_info-get_votes-count", txs.len());
(labels, txs, max_ts) (labels, txs, max_ts)
} }
pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> {
self.time_gossip_read_lock("get_snapshot_hash", &self.stats.get_snapshot_hash)
.crds
.values()
.filter_map(|x| x.value.snapshot_hash())
.filter_map(|x| {
for (table_slot, hash) in &x.hashes {
if *table_slot == slot {
return Some((x.from, *hash));
}
}
None
})
.collect()
}
pub fn get_accounts_hash_for_node<F, Y>(&self, pubkey: &Pubkey, map: F) -> Option<Y> pub fn get_accounts_hash_for_node<F, Y>(&self, pubkey: &Pubkey, map: F) -> Option<Y>
where where
F: FnOnce(&Vec<(Slot, Hash)>) -> Y, F: FnOnce(&Vec<(Slot, Hash)>) -> Y,
@ -2668,11 +2650,6 @@ impl ClusterInfo {
("entrypoint", self.stats.entrypoint.clear(), i64), ("entrypoint", self.stats.entrypoint.clear(), i64),
("entrypoint2", self.stats.entrypoint2.clear(), i64), ("entrypoint2", self.stats.entrypoint2.clear(), i64),
("push_vote_read", self.stats.push_vote_read.clear(), i64), ("push_vote_read", self.stats.push_vote_read.clear(), i64),
(
"vote_process_push",
self.stats.vote_process_push.clear(),
i64
),
("get_votes", self.stats.get_votes.clear(), i64), ("get_votes", self.stats.get_votes.clear(), i64),
( (
"get_accounts_hash", "get_accounts_hash",
@ -2824,8 +2801,6 @@ impl ClusterInfo {
self.stats.epoch_slots_lookup.clear(), self.stats.epoch_slots_lookup.clear(),
i64 i64
), ),
("epoch_slots_push", self.stats.epoch_slots_push.clear(), i64),
("push_message", self.stats.push_message.clear(), i64),
( (
"new_pull_requests", "new_pull_requests",
self.stats.new_pull_requests.clear(), self.stats.new_pull_requests.clear(),

View File

@ -28,7 +28,7 @@ use crate::contact_info::ContactInfo;
use crate::crds_shards::CrdsShards; use crate::crds_shards::CrdsShards;
use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel, LowestSlot}; use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel, LowestSlot};
use bincode::serialize; use bincode::serialize;
use indexmap::map::{rayon::ParValues, Entry, IndexMap, Iter, Values}; use indexmap::map::{rayon::ParValues, Entry, IndexMap, Values};
use indexmap::set::IndexSet; use indexmap::set::IndexSet;
use rayon::{prelude::*, ThreadPool}; use rayon::{prelude::*, ThreadPool};
use solana_sdk::hash::{hash, Hash}; use solana_sdk::hash::{hash, Hash};
@ -47,8 +47,8 @@ pub struct Crds {
table: IndexMap<CrdsValueLabel, VersionedCrdsValue>, table: IndexMap<CrdsValueLabel, VersionedCrdsValue>,
pub num_inserts: usize, // Only used in tests. pub num_inserts: usize, // Only used in tests.
shards: CrdsShards, shards: CrdsShards,
// Indices of all crds values which are node ContactInfo. nodes: IndexSet<usize>, // Indices of nodes' ContactInfo.
nodes: IndexSet<usize>, votes: IndexSet<usize>, // Indices of Vote crds values.
// Indices of all crds values associated with a node. // Indices of all crds values associated with a node.
records: HashMap<Pubkey, IndexSet<usize>>, records: HashMap<Pubkey, IndexSet<usize>>,
} }
@ -109,6 +109,7 @@ impl Default for Crds {
num_inserts: 0, num_inserts: 0,
shards: CrdsShards::new(CRDS_SHARDS_BITS), shards: CrdsShards::new(CRDS_SHARDS_BITS),
nodes: IndexSet::default(), nodes: IndexSet::default(),
votes: IndexSet::default(),
records: HashMap::default(), records: HashMap::default(),
} }
} }
@ -141,9 +142,15 @@ impl Crds {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
let entry_index = entry.index(); let entry_index = entry.index();
self.shards.insert(entry_index, &new_value); self.shards.insert(entry_index, &new_value);
if let CrdsData::ContactInfo(_) = new_value.value.data { match new_value.value.data {
self.nodes.insert(entry_index); CrdsData::ContactInfo(_) => {
} self.nodes.insert(entry_index);
}
CrdsData::Vote(_, _) => {
self.votes.insert(entry_index);
}
_ => (),
};
self.records self.records
.entry(new_value.value.pubkey()) .entry(new_value.value.pubkey())
.or_default() .or_default()
@ -215,6 +222,11 @@ impl Crds {
}) })
} }
/// Returns all entries which are Vote.
pub(crate) fn get_votes(&self) -> impl Iterator<Item = &VersionedCrdsValue> {
self.votes.iter().map(move |i| self.table.index(*i))
}
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.table.len() self.table.len()
} }
@ -223,10 +235,6 @@ impl Crds {
self.table.is_empty() self.table.is_empty()
} }
pub fn iter(&self) -> Iter<'_, CrdsValueLabel, VersionedCrdsValue> {
self.table.iter()
}
pub fn values(&self) -> Values<'_, CrdsValueLabel, VersionedCrdsValue> { pub fn values(&self) -> Values<'_, CrdsValueLabel, VersionedCrdsValue> {
self.table.values() self.table.values()
} }
@ -290,8 +298,14 @@ impl Crds {
pub fn remove(&mut self, key: &CrdsValueLabel) -> Option<VersionedCrdsValue> { pub fn remove(&mut self, key: &CrdsValueLabel) -> Option<VersionedCrdsValue> {
let (index, _ /*label*/, value) = self.table.swap_remove_full(key)?; let (index, _ /*label*/, value) = self.table.swap_remove_full(key)?;
self.shards.remove(index, &value); self.shards.remove(index, &value);
if let CrdsData::ContactInfo(_) = value.value.data { match value.value.data {
self.nodes.swap_remove(&index); CrdsData::ContactInfo(_) => {
self.nodes.swap_remove(&index);
}
CrdsData::Vote(_, _) => {
self.votes.swap_remove(&index);
}
_ => (),
} }
// Remove the index from records associated with the value's pubkey. // Remove the index from records associated with the value's pubkey.
let pubkey = value.value.pubkey(); let pubkey = value.value.pubkey();
@ -313,10 +327,17 @@ impl Crds {
let value = self.table.index(index); let value = self.table.index(index);
self.shards.remove(size, value); self.shards.remove(size, value);
self.shards.insert(index, value); self.shards.insert(index, value);
if let CrdsData::ContactInfo(_) = value.value.data { match value.value.data {
self.nodes.swap_remove(&size); CrdsData::ContactInfo(_) => {
self.nodes.insert(index); self.nodes.swap_remove(&size);
} self.nodes.insert(index);
}
CrdsData::Vote(_, _) => {
self.votes.swap_remove(&size);
self.votes.insert(index);
}
_ => (),
};
let pubkey = value.value.pubkey(); let pubkey = value.value.pubkey();
let records = self.records.get_mut(&pubkey).unwrap(); let records = self.records.get_mut(&pubkey).unwrap();
records.swap_remove(&size); records.swap_remove(&size);
@ -537,51 +558,67 @@ mod test {
} }
#[test] #[test]
fn test_crds_nodes() { fn test_crds_value_indices() {
fn check_crds_nodes(crds: &Crds) -> usize { fn check_crds_value_indices(crds: &Crds) -> (usize, usize) {
let num_nodes = crds let num_nodes = crds
.table .table
.values() .values()
.filter(|value| matches!(value.value.data, CrdsData::ContactInfo(_))) .filter(|value| matches!(value.value.data, CrdsData::ContactInfo(_)))
.count(); .count();
let num_votes = crds
.table
.values()
.filter(|value| matches!(value.value.data, CrdsData::Vote(_, _)))
.count();
assert_eq!(num_nodes, crds.get_nodes_contact_info().count()); assert_eq!(num_nodes, crds.get_nodes_contact_info().count());
num_nodes assert_eq!(num_votes, crds.get_votes().count());
for vote in crds.get_votes() {
match vote.value.data {
CrdsData::Vote(_, _) => (),
_ => panic!("not a vote!"),
}
}
(num_nodes, num_votes)
} }
let mut rng = thread_rng(); let mut rng = thread_rng();
let keypairs: Vec<_> = std::iter::repeat_with(Keypair::new).take(256).collect(); let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect();
let mut crds = Crds::default(); let mut crds = Crds::default();
let mut num_inserts = 0; let mut num_inserts = 0;
let mut num_overrides = 0; let mut num_overrides = 0;
for _ in 0..4096 { for k in 0..4096 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
match crds.insert_versioned(value) { match crds.insert_versioned(value) {
Ok(None) => { Ok(None) => {
num_inserts += 1; num_inserts += 1;
check_crds_nodes(&crds);
} }
Ok(Some(_)) => { Ok(Some(_)) => {
num_inserts += 1; num_inserts += 1;
num_overrides += 1; num_overrides += 1;
check_crds_nodes(&crds);
} }
Err(_) => (), Err(_) => (),
} }
if k % 64 == 0 {
check_crds_value_indices(&crds);
}
} }
assert_eq!(num_inserts, crds.num_inserts); assert_eq!(num_inserts, crds.num_inserts);
assert!(num_inserts > 700); assert!(num_inserts > 700);
assert!(num_overrides > 500); assert!(num_overrides > 500);
assert!(crds.table.len() > 200); assert!(crds.table.len() > 200);
assert!(num_inserts > crds.table.len()); assert!(num_inserts > crds.table.len());
let num_nodes = check_crds_nodes(&crds); let (num_nodes, num_votes) = check_crds_value_indices(&crds);
assert!(num_nodes * 3 < crds.table.len()); assert!(num_nodes * 3 < crds.table.len());
assert!(num_nodes > 150); assert!(num_nodes > 100, "num nodes: {}", num_nodes);
assert!(num_votes > 100, "num votes: {}", num_votes);
// Remove values one by one and assert that nodes indices stay valid. // Remove values one by one and assert that nodes indices stay valid.
while !crds.table.is_empty() { while !crds.table.is_empty() {
let index = rng.gen_range(0, crds.table.len()); let index = rng.gen_range(0, crds.table.len());
let key = crds.table.get_index(index).unwrap().0.clone(); let key = crds.table.get_index(index).unwrap().0.clone();
crds.remove(&key); crds.remove(&key);
check_crds_nodes(&crds); if crds.table.len() % 64 == 0 {
check_crds_value_indices(&crds);
}
} }
} }

View File

@ -127,7 +127,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData { impl CrdsData {
/// New random CrdsData for tests and benchmarks. /// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData { fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0, 5); let kind = rng.gen_range(0, 6);
// TODO: Implement other kinds of CrdsData here. // TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in // TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table. // the mainnet crds table.
@ -136,7 +136,8 @@ impl CrdsData {
1 => CrdsData::LowestSlot(rng.gen(), LowestSlot::new_rand(rng, pubkey)), 1 => CrdsData::LowestSlot(rng.gen(), LowestSlot::new_rand(rng, pubkey)),
2 => CrdsData::SnapshotHashes(SnapshotHash::new_rand(rng, pubkey)), 2 => CrdsData::SnapshotHashes(SnapshotHash::new_rand(rng, pubkey)),
3 => CrdsData::AccountsHashes(SnapshotHash::new_rand(rng, pubkey)), 3 => CrdsData::AccountsHashes(SnapshotHash::new_rand(rng, pubkey)),
_ => CrdsData::Version(Version::new_rand(rng, pubkey)), 4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
_ => CrdsData::Vote(rng.gen_range(0, MAX_VOTES), Vote::new_rand(rng, pubkey)),
} }
} }
} }
@ -263,6 +264,15 @@ impl Vote {
wallclock, wallclock,
} }
} }
/// New random Vote for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
Self {
from: pubkey.unwrap_or_else(pubkey::new_rand),
transaction: Transaction::default(),
wallclock: new_rand_timestamp(rng),
}
}
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample)]
@ -821,7 +831,7 @@ mod test {
let index = rng.gen_range(0, keys.len()); let index = rng.gen_range(0, keys.len());
CrdsValue::new_rand(&mut rng, Some(&keys[index])) CrdsValue::new_rand(&mut rng, Some(&keys[index]))
}) })
.take(256) .take(2048)
.collect(); .collect();
let mut currents = HashMap::new(); let mut currents = HashMap::new();
for value in filter_current(&values) { for value in filter_current(&values) {
@ -843,9 +853,9 @@ mod test {
} }
assert_eq!(count, currents.len()); assert_eq!(count, currents.len());
// Currently CrdsData::new_rand is only implemented for 5 different // Currently CrdsData::new_rand is only implemented for 5 different
// kinds and excludes Vote and EpochSlots, and so the unique labels // kinds and excludes EpochSlots, and so the unique labels cannot be
// cannot be more than 5 times number of keys. // more than (5 + MAX_VOTES) times number of keys.
assert!(currents.len() <= keys.len() * 5); assert!(currents.len() <= keys.len() * (5 + MAX_VOTES as usize));
} }
#[test] #[test]