uses btree-map instead of hash-map for cluster-slots (#15194)
retain traverses all values in the hashmap which is slow: https://github.com/solana-labs/solana/blob/88f22c360/core/src/cluster_slots.rs#L45 btree-map instead allows more efficient prunning there. In addition there is potential race condition here: https://github.com/solana-labs/solana/blob/88f22c360/core/src/cluster_slots.rs#L68-L74 If another thread inserts a value at the same slot key between the read and write lock, current thread will discard the inserted value.
This commit is contained in:
parent
f063f02c41
commit
2758588ddd
|
@ -5,16 +5,15 @@ use crate::{
|
|||
use solana_runtime::{bank_forks::BankForks, epoch_stakes::NodeIdToVoteAccounts};
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
pub type SlotPubkeys = HashMap<Pubkey, u64>;
|
||||
pub type ClusterSlotsMap = RwLock<HashMap<Slot, Arc<RwLock<SlotPubkeys>>>>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ClusterSlots {
|
||||
cluster_slots: ClusterSlotsMap,
|
||||
cluster_slots: RwLock<BTreeMap<Slot, Arc<RwLock<SlotPubkeys>>>>,
|
||||
since: RwLock<Option<u64>>,
|
||||
validator_stakes: RwLock<Arc<NodeIdToVoteAccounts>>,
|
||||
epoch: RwLock<Option<u64>>,
|
||||
|
@ -28,11 +27,10 @@ impl ClusterSlots {
|
|||
pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
|
||||
self.update_peers(cluster_info, bank_forks);
|
||||
let since = *self.since.read().unwrap();
|
||||
let epoch_slots = cluster_info.get_epoch_slots_since(since);
|
||||
self.update_internal(root, epoch_slots);
|
||||
let (epoch_slots, since) = cluster_info.get_epoch_slots_since(since);
|
||||
self.update_internal(root, epoch_slots, since);
|
||||
}
|
||||
fn update_internal(&self, root: Slot, epoch_slots: (Vec<EpochSlots>, Option<u64>)) {
|
||||
let (epoch_slots_list, since) = epoch_slots;
|
||||
fn update_internal(&self, root: Slot, epoch_slots_list: Vec<EpochSlots>, since: Option<u64>) {
|
||||
for epoch_slots in epoch_slots_list {
|
||||
let slots = epoch_slots.to_slots(root);
|
||||
for slot in &slots {
|
||||
|
@ -42,7 +40,10 @@ impl ClusterSlots {
|
|||
self.insert_node_id(*slot, epoch_slots.from);
|
||||
}
|
||||
}
|
||||
self.cluster_slots.write().unwrap().retain(|x, _| *x > root);
|
||||
{
|
||||
let mut cluster_slots = self.cluster_slots.write().unwrap();
|
||||
*cluster_slots = cluster_slots.split_off(&(root + 1));
|
||||
}
|
||||
*self.since.write().unwrap() = since;
|
||||
}
|
||||
|
||||
|
@ -51,9 +52,8 @@ impl ClusterSlots {
|
|||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|(_, keys)| keys.read().unwrap().get(id).is_some())
|
||||
.map(|(slot, _)| slot)
|
||||
.cloned()
|
||||
.filter(|(_, keys)| keys.read().unwrap().contains_key(id))
|
||||
.map(|(slot, _)| *slot)
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
@ -65,20 +65,14 @@ impl ClusterSlots {
|
|||
.get(&node_id)
|
||||
.map(|v| v.total_stake)
|
||||
.unwrap_or(0);
|
||||
let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(&slot).cloned();
|
||||
if slot_pubkeys.is_none() {
|
||||
let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default()));
|
||||
self.cluster_slots
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(slot, new_slot_pubkeys.clone());
|
||||
slot_pubkeys = Some(new_slot_pubkeys);
|
||||
}
|
||||
slot_pubkeys
|
||||
.unwrap()
|
||||
let slot_pubkeys = self
|
||||
.cluster_slots
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(node_id, balance);
|
||||
.entry(slot)
|
||||
.or_default()
|
||||
.clone();
|
||||
slot_pubkeys.write().unwrap().insert(node_id, balance);
|
||||
}
|
||||
|
||||
fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
|
||||
|
@ -179,7 +173,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_update_noop() {
|
||||
let cs = ClusterSlots::default();
|
||||
cs.update_internal(0, (vec![], None));
|
||||
cs.update_internal(0, vec![], None);
|
||||
assert!(cs.cluster_slots.read().unwrap().is_empty());
|
||||
assert!(cs.since.read().unwrap().is_none());
|
||||
}
|
||||
|
@ -188,7 +182,7 @@ mod tests {
|
|||
fn test_update_empty() {
|
||||
let cs = ClusterSlots::default();
|
||||
let epoch_slot = EpochSlots::default();
|
||||
cs.update_internal(0, (vec![epoch_slot], Some(0)));
|
||||
cs.update_internal(0, vec![epoch_slot], Some(0));
|
||||
assert_eq!(*cs.since.read().unwrap(), Some(0));
|
||||
assert!(cs.lookup(0).is_none());
|
||||
}
|
||||
|
@ -199,7 +193,7 @@ mod tests {
|
|||
let cs = ClusterSlots::default();
|
||||
let mut epoch_slot = EpochSlots::default();
|
||||
epoch_slot.fill(&[0], 0);
|
||||
cs.update_internal(0, (vec![epoch_slot], Some(0)));
|
||||
cs.update_internal(0, vec![epoch_slot], Some(0));
|
||||
assert_eq!(*cs.since.read().unwrap(), Some(0));
|
||||
assert!(cs.lookup(0).is_none());
|
||||
}
|
||||
|
@ -209,7 +203,7 @@ mod tests {
|
|||
let cs = ClusterSlots::default();
|
||||
let mut epoch_slot = EpochSlots::default();
|
||||
epoch_slot.fill(&[1], 0);
|
||||
cs.update_internal(0, (vec![epoch_slot], Some(0)));
|
||||
cs.update_internal(0, vec![epoch_slot], Some(0));
|
||||
assert_eq!(*cs.since.read().unwrap(), Some(0));
|
||||
assert!(cs.lookup(0).is_none());
|
||||
assert!(cs.lookup(1).is_some());
|
||||
|
@ -340,7 +334,7 @@ mod tests {
|
|||
);
|
||||
|
||||
*cs.validator_stakes.write().unwrap() = map;
|
||||
cs.update_internal(0, (vec![epoch_slot], None));
|
||||
cs.update_internal(0, vec![epoch_slot], None);
|
||||
assert!(cs.lookup(1).is_some());
|
||||
assert_eq!(
|
||||
cs.lookup(1)
|
||||
|
@ -357,7 +351,7 @@ mod tests {
|
|||
let cs = ClusterSlots::default();
|
||||
let mut epoch_slot = EpochSlots::default();
|
||||
epoch_slot.fill(&[1], 0);
|
||||
cs.update_internal(0, (vec![epoch_slot], None));
|
||||
cs.update_internal(0, vec![epoch_slot], None);
|
||||
let self_id = solana_sdk::pubkey::new_rand();
|
||||
assert_eq!(
|
||||
cs.generate_repairs_for_missing_slots(&self_id, 0),
|
||||
|
@ -371,7 +365,7 @@ mod tests {
|
|||
let mut epoch_slot = EpochSlots::default();
|
||||
epoch_slot.fill(&[1], 0);
|
||||
let self_id = epoch_slot.from;
|
||||
cs.update_internal(0, (vec![epoch_slot], None));
|
||||
cs.update_internal(0, vec![epoch_slot], None);
|
||||
let slots: Vec<Slot> = cs.collect(&self_id).into_iter().collect();
|
||||
assert_eq!(slots, vec![1]);
|
||||
}
|
||||
|
@ -382,7 +376,7 @@ mod tests {
|
|||
let mut epoch_slot = EpochSlots::default();
|
||||
epoch_slot.fill(&[1], 0);
|
||||
let self_id = epoch_slot.from;
|
||||
cs.update_internal(0, (vec![epoch_slot], None));
|
||||
cs.update_internal(0, vec![epoch_slot], None);
|
||||
assert!(cs
|
||||
.generate_repairs_for_missing_slots(&self_id, 0)
|
||||
.is_empty());
|
||||
|
|
Loading…
Reference in New Issue