From 2fd38d9912811c2f34ea91d69d66caa5044ff044 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 27 Dec 2020 13:31:05 +0000 Subject: [PATCH] indexes votes in crds table (#14272) --- core/src/cluster_info.rs | 43 ++++--------------- core/src/crds.rs | 89 ++++++++++++++++++++++++++++------------ core/src/crds_value.rs | 22 +++++++--- 3 files changed, 88 insertions(+), 66 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 60a0bb849e..3c416088e8 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -237,10 +237,8 @@ struct GossipStats { entrypoint2: Counter, gossip_packets_dropped_count: Counter, push_vote_read: Counter, - vote_process_push: Counter, get_votes: Counter, get_accounts_hash: Counter, - get_snapshot_hash: Counter, all_tvu_peers: Counter, tvu_peers: Counter, retransmit_peers: Counter, @@ -273,8 +271,6 @@ struct GossipStats { pull_request_ping_pong_check_failed_count: Counter, purge: Counter, epoch_slots_lookup: Counter, - epoch_slots_push: Counter, - push_message: Counter, new_pull_requests: Counter, new_pull_requests_count: Counter, mark_pull_request: Counter, @@ -1020,35 +1016,21 @@ impl ClusterInfo { let (labels, txs): (Vec, Vec) = self .time_gossip_read_lock("get_votes", &self.stats.get_votes) .crds - .iter() - .filter(|(_, x)| x.insert_timestamp > since) - .filter_map(|(label, x)| { - max_ts = std::cmp::max(x.insert_timestamp, max_ts); - x.value - .vote() - .map(|v| (label.clone(), v.transaction.clone())) + .get_votes() + .filter(|vote| vote.insert_timestamp > since) + .map(|vote| { + max_ts = std::cmp::max(vote.insert_timestamp, max_ts); + let transaction = match &vote.value.data { + CrdsData::Vote(_, vote) => vote.transaction.clone(), + _ => panic!("this should not happen!"), + }; + (vote.value.label(), transaction) }) .unzip(); inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); (labels, txs, max_ts) } - pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> { - self.time_gossip_read_lock("get_snapshot_hash", &self.stats.get_snapshot_hash) - .crds - .values() - .filter_map(|x| x.value.snapshot_hash()) - .filter_map(|x| { - for (table_slot, hash) in &x.hashes { - if *table_slot == slot { - return Some((x.from, *hash)); - } - } - None - }) - .collect() - } - pub fn get_accounts_hash_for_node(&self, pubkey: &Pubkey, map: F) -> Option where F: FnOnce(&Vec<(Slot, Hash)>) -> Y, @@ -2668,11 +2650,6 @@ impl ClusterInfo { ("entrypoint", self.stats.entrypoint.clear(), i64), ("entrypoint2", self.stats.entrypoint2.clear(), i64), ("push_vote_read", self.stats.push_vote_read.clear(), i64), - ( - "vote_process_push", - self.stats.vote_process_push.clear(), - i64 - ), ("get_votes", self.stats.get_votes.clear(), i64), ( "get_accounts_hash", @@ -2824,8 +2801,6 @@ impl ClusterInfo { self.stats.epoch_slots_lookup.clear(), i64 ), - ("epoch_slots_push", self.stats.epoch_slots_push.clear(), i64), - ("push_message", self.stats.push_message.clear(), i64), ( "new_pull_requests", self.stats.new_pull_requests.clear(), diff --git a/core/src/crds.rs b/core/src/crds.rs index 07e4cfae83..8918912501 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -28,7 +28,7 @@ use crate::contact_info::ContactInfo; use crate::crds_shards::CrdsShards; use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel, LowestSlot}; use bincode::serialize; -use indexmap::map::{rayon::ParValues, Entry, IndexMap, Iter, Values}; +use indexmap::map::{rayon::ParValues, Entry, IndexMap, Values}; use indexmap::set::IndexSet; use rayon::{prelude::*, ThreadPool}; use solana_sdk::hash::{hash, Hash}; @@ -47,8 +47,8 @@ pub struct Crds { table: IndexMap, pub num_inserts: usize, // Only used in tests. shards: CrdsShards, - // Indices of all crds values which are node ContactInfo. - nodes: IndexSet, + nodes: IndexSet, // Indices of nodes' ContactInfo. + votes: IndexSet, // Indices of Vote crds values. // Indices of all crds values associated with a node. records: HashMap>, } @@ -109,6 +109,7 @@ impl Default for Crds { num_inserts: 0, shards: CrdsShards::new(CRDS_SHARDS_BITS), nodes: IndexSet::default(), + votes: IndexSet::default(), records: HashMap::default(), } } @@ -141,9 +142,15 @@ impl Crds { Entry::Vacant(entry) => { let entry_index = entry.index(); self.shards.insert(entry_index, &new_value); - if let CrdsData::ContactInfo(_) = new_value.value.data { - self.nodes.insert(entry_index); - } + match new_value.value.data { + CrdsData::ContactInfo(_) => { + self.nodes.insert(entry_index); + } + CrdsData::Vote(_, _) => { + self.votes.insert(entry_index); + } + _ => (), + }; self.records .entry(new_value.value.pubkey()) .or_default() @@ -215,6 +222,11 @@ impl Crds { }) } + /// Returns all entries which are Vote. + pub(crate) fn get_votes(&self) -> impl Iterator { + self.votes.iter().map(move |i| self.table.index(*i)) + } + pub fn len(&self) -> usize { self.table.len() } @@ -223,10 +235,6 @@ impl Crds { self.table.is_empty() } - pub fn iter(&self) -> Iter<'_, CrdsValueLabel, VersionedCrdsValue> { - self.table.iter() - } - pub fn values(&self) -> Values<'_, CrdsValueLabel, VersionedCrdsValue> { self.table.values() } @@ -290,8 +298,14 @@ impl Crds { pub fn remove(&mut self, key: &CrdsValueLabel) -> Option { let (index, _ /*label*/, value) = self.table.swap_remove_full(key)?; self.shards.remove(index, &value); - if let CrdsData::ContactInfo(_) = value.value.data { - self.nodes.swap_remove(&index); + match value.value.data { + CrdsData::ContactInfo(_) => { + self.nodes.swap_remove(&index); + } + CrdsData::Vote(_, _) => { + self.votes.swap_remove(&index); + } + _ => (), } // Remove the index from records associated with the value's pubkey. let pubkey = value.value.pubkey(); @@ -313,10 +327,17 @@ impl Crds { let value = self.table.index(index); self.shards.remove(size, value); self.shards.insert(index, value); - if let CrdsData::ContactInfo(_) = value.value.data { - self.nodes.swap_remove(&size); - self.nodes.insert(index); - } + match value.value.data { + CrdsData::ContactInfo(_) => { + self.nodes.swap_remove(&size); + self.nodes.insert(index); + } + CrdsData::Vote(_, _) => { + self.votes.swap_remove(&size); + self.votes.insert(index); + } + _ => (), + }; let pubkey = value.value.pubkey(); let records = self.records.get_mut(&pubkey).unwrap(); records.swap_remove(&size); @@ -537,51 +558,67 @@ mod test { } #[test] - fn test_crds_nodes() { - fn check_crds_nodes(crds: &Crds) -> usize { + fn test_crds_value_indices() { + fn check_crds_value_indices(crds: &Crds) -> (usize, usize) { let num_nodes = crds .table .values() .filter(|value| matches!(value.value.data, CrdsData::ContactInfo(_))) .count(); + let num_votes = crds + .table + .values() + .filter(|value| matches!(value.value.data, CrdsData::Vote(_, _))) + .count(); assert_eq!(num_nodes, crds.get_nodes_contact_info().count()); - num_nodes + assert_eq!(num_votes, crds.get_votes().count()); + for vote in crds.get_votes() { + match vote.value.data { + CrdsData::Vote(_, _) => (), + _ => panic!("not a vote!"), + } + } + (num_nodes, num_votes) } let mut rng = thread_rng(); - let keypairs: Vec<_> = std::iter::repeat_with(Keypair::new).take(256).collect(); + let keypairs: Vec<_> = repeat_with(Keypair::new).take(128).collect(); let mut crds = Crds::default(); let mut num_inserts = 0; let mut num_overrides = 0; - for _ in 0..4096 { + for k in 0..4096 { let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair)); match crds.insert_versioned(value) { Ok(None) => { num_inserts += 1; - check_crds_nodes(&crds); } Ok(Some(_)) => { num_inserts += 1; num_overrides += 1; - check_crds_nodes(&crds); } Err(_) => (), } + if k % 64 == 0 { + check_crds_value_indices(&crds); + } } assert_eq!(num_inserts, crds.num_inserts); assert!(num_inserts > 700); assert!(num_overrides > 500); assert!(crds.table.len() > 200); assert!(num_inserts > crds.table.len()); - let num_nodes = check_crds_nodes(&crds); + let (num_nodes, num_votes) = check_crds_value_indices(&crds); assert!(num_nodes * 3 < crds.table.len()); - assert!(num_nodes > 150); + assert!(num_nodes > 100, "num nodes: {}", num_nodes); + assert!(num_votes > 100, "num votes: {}", num_votes); // Remove values one by one and assert that nodes indices stay valid. while !crds.table.is_empty() { let index = rng.gen_range(0, crds.table.len()); let key = crds.table.get_index(index).unwrap().0.clone(); crds.remove(&key); - check_crds_nodes(&crds); + if crds.table.len() % 64 == 0 { + check_crds_value_indices(&crds); + } } } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 43bf93a57e..a8aebbc67f 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -127,7 +127,7 @@ pub(crate) fn new_rand_timestamp(rng: &mut R) -> u64 { impl CrdsData { /// New random CrdsData for tests and benchmarks. fn new_rand(rng: &mut R, pubkey: Option) -> CrdsData { - let kind = rng.gen_range(0, 5); + let kind = rng.gen_range(0, 6); // TODO: Implement other kinds of CrdsData here. // TODO: Assign ranges to each arm proportional to their frequency in // the mainnet crds table. @@ -136,7 +136,8 @@ impl CrdsData { 1 => CrdsData::LowestSlot(rng.gen(), LowestSlot::new_rand(rng, pubkey)), 2 => CrdsData::SnapshotHashes(SnapshotHash::new_rand(rng, pubkey)), 3 => CrdsData::AccountsHashes(SnapshotHash::new_rand(rng, pubkey)), - _ => CrdsData::Version(Version::new_rand(rng, pubkey)), + 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), + _ => CrdsData::Vote(rng.gen_range(0, MAX_VOTES), Vote::new_rand(rng, pubkey)), } } } @@ -263,6 +264,15 @@ impl Vote { wallclock, } } + + /// New random Vote for tests and benchmarks. + fn new_rand(rng: &mut R, pubkey: Option) -> Self { + Self { + from: pubkey.unwrap_or_else(pubkey::new_rand), + transaction: Transaction::default(), + wallclock: new_rand_timestamp(rng), + } + } } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample)] @@ -821,7 +831,7 @@ mod test { let index = rng.gen_range(0, keys.len()); CrdsValue::new_rand(&mut rng, Some(&keys[index])) }) - .take(256) + .take(2048) .collect(); let mut currents = HashMap::new(); for value in filter_current(&values) { @@ -843,9 +853,9 @@ mod test { } assert_eq!(count, currents.len()); // Currently CrdsData::new_rand is only implemented for 5 different - // kinds and excludes Vote and EpochSlots, and so the unique labels - // cannot be more than 5 times number of keys. - assert!(currents.len() <= keys.len() * 5); + // kinds and excludes EpochSlots, and so the unique labels cannot be + // more than (5 + MAX_VOTES) times number of keys. + assert!(currents.len() <= keys.len() * (5 + MAX_VOTES as usize)); } #[test]