adds reverse lookup index to cluster-nodes (#22892)

retransmit has to exclude slot leader from set of nodes for each shred; 
which currently requires a linear scan:
https://github.com/solana-labs/solana/blob/e3b137066/core/src/cluster_nodes.rs#L238-L242

This commit adds a reverse lookup index to avoid linear scan.
This commit is contained in:
behzad nouri 2022-02-02 19:27:50 +00:00 committed by GitHub
parent e3b137066d
commit dccbddad80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 26 additions and 23 deletions

View File

@ -49,11 +49,13 @@ pub struct ClusterNodes<T> {
// All staked nodes + other known tvu-peers + the node itself;
// sorted by (stake, pubkey) in descending order.
nodes: Vec<Node>,
// Reverse index from nodes pubkey to their index in self.nodes.
index: HashMap<Pubkey, /*index:*/ usize>,
weighted_shuffle: WeightedShuffle</*stake:*/ u64>,
// Weights and indices for sampling peers. weighted_{shuffle,best} expect
// weights >= 1. For backward compatibility we use max(1, stake) for
// weights and exclude nodes with no contact-info.
index: Vec<(/*weight:*/ u64, /*index:*/ usize)>,
compat_index: Vec<(/*weight:*/ u64, /*index:*/ usize)>,
_phantom: PhantomData<T>,
}
@ -86,12 +88,12 @@ impl Node {
impl<T> ClusterNodes<T> {
pub(crate) fn num_peers(&self) -> usize {
self.index.len()
self.compat_index.len()
}
// A peer is considered live if they generated their contact info recently.
pub(crate) fn num_peers_live(&self, now: u64) -> usize {
self.index
self.compat_index
.iter()
.filter_map(|(_, index)| self.nodes[*index].contact_info())
.filter(|node| {
@ -173,10 +175,10 @@ impl ClusterNodes<BroadcastStage> {
/// Returns the root of turbine broadcast tree, which the leader sends the
/// shred to.
fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> {
if self.index.is_empty() {
if self.compat_index.is_empty() {
None
} else {
let index = weighted_best(&self.index, shred_seed);
let index = weighted_best(&self.compat_index, shred_seed);
match &self.nodes[index].node {
NodeId::ContactInfo(node) => Some(node),
NodeId::Pubkey(_) => panic!("this should not happen!"),
@ -235,12 +237,8 @@ impl ClusterNodes<RetransmitStage> {
// Exclude slot leader from list of nodes.
if slot_leader == self.pubkey {
error!("retransmit from slot leader: {}", slot_leader);
} else if let Some(index) = self
.nodes
.iter()
.position(|node| node.pubkey() == slot_leader)
{
weighted_shuffle.remove_index(index);
} else if let Some(index) = self.index.get(&slot_leader) {
weighted_shuffle.remove_index(*index);
};
let mut rng = ChaChaRng::from_seed(shred_seed);
let nodes: Vec<_> = weighted_shuffle
@ -270,9 +268,9 @@ impl ClusterNodes<RetransmitStage> {
// Exclude leader from list of nodes.
let (weights, index): (Vec<u64>, Vec<usize>) = if slot_leader == self.pubkey {
error!("retransmit from slot leader: {}", slot_leader);
self.index.iter().copied().unzip()
self.compat_index.iter().copied().unzip()
} else {
self.index
self.compat_index
.iter()
.filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader)
.copied()
@ -305,12 +303,16 @@ fn new_cluster_nodes<T: 'static>(
) -> ClusterNodes<T> {
let self_pubkey = cluster_info.id();
let nodes = get_nodes(cluster_info, stakes);
let index: HashMap<_, _> = nodes
.iter()
.enumerate()
.map(|(ix, node)| (node.pubkey(), ix))
.collect();
let broadcast = TypeId::of::<T>() == TypeId::of::<BroadcastStage>();
let stakes: Vec<u64> = nodes.iter().map(|node| node.stake).collect();
let mut weighted_shuffle = WeightedShuffle::new(&stakes).unwrap();
if broadcast {
let index = nodes.iter().position(|node| node.pubkey() == self_pubkey);
weighted_shuffle.remove_index(index.unwrap());
weighted_shuffle.remove_index(index[&self_pubkey]);
}
// For backward compatibility:
// * nodes which do not have contact-info are excluded.
@ -318,7 +320,7 @@ fn new_cluster_nodes<T: 'static>(
// The sorting key here should be equivalent to
// solana_gossip::deprecated::sorted_stakes_with_index.
// Leader itself is excluded when sampling broadcast peers.
let index = nodes
let compat_index = nodes
.iter()
.enumerate()
.filter(|(_, node)| node.contact_info().is_some())
@ -329,8 +331,9 @@ fn new_cluster_nodes<T: 'static>(
ClusterNodes {
pubkey: self_pubkey,
nodes,
weighted_shuffle,
index,
weighted_shuffle,
compat_index,
_phantom: PhantomData::default(),
}
}
@ -550,7 +553,7 @@ mod tests {
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
// All nodes with contact-info should be in the index.
assert_eq!(cluster_nodes.index.len(), nodes.len());
assert_eq!(cluster_nodes.compat_index.len(), nodes.len());
// Staked nodes with no contact-info should be included.
assert!(cluster_nodes.nodes.len() > nodes.len());
// Assert that all nodes keep their contact-info.
@ -573,9 +576,9 @@ mod tests {
let (peers, stakes_and_index) =
sorted_retransmit_peers_and_stakes(&cluster_info, Some(&stakes));
assert_eq!(stakes_and_index.len(), peers.len());
assert_eq!(cluster_nodes.index.len(), peers.len());
assert_eq!(cluster_nodes.compat_index.len(), peers.len());
for (i, node) in cluster_nodes
.index
.compat_index
.iter()
.map(|(_, i)| &cluster_nodes.nodes[*i])
.enumerate()
@ -631,7 +634,7 @@ mod tests {
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
// All nodes with contact-info should be in the index.
// Excluding this node itself.
assert_eq!(cluster_nodes.index.len() + 1, nodes.len());
assert_eq!(cluster_nodes.compat_index.len() + 1, nodes.len());
// Staked nodes with no contact-info should be included.
assert!(cluster_nodes.nodes.len() > nodes.len());
// Assert that all nodes keep their contact-info.
@ -653,9 +656,9 @@ mod tests {
}
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes));
assert_eq!(peers_and_stakes.len(), peers.len());
assert_eq!(cluster_nodes.index.len(), peers.len());
assert_eq!(cluster_nodes.compat_index.len(), peers.len());
for (i, node) in cluster_nodes
.index
.compat_index
.iter()
.map(|(_, i)| &cluster_nodes.nodes[*i])
.enumerate()