removes legacy compatibility turbine peers shuffle code
This commit is contained in:
parent
d0b850cdd9
commit
2b718d00b0
|
@ -417,37 +417,12 @@ pub fn make_test_cluster<R: Rng>(
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use {
|
use super::*;
|
||||||
super::*,
|
|
||||||
solana_gossip::deprecated::{
|
|
||||||
shuffle_peers_and_index, sorted_retransmit_peers_and_stakes, sorted_stakes_with_index,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
// Legacy methods copied for testing backward compatibility.
|
|
||||||
|
|
||||||
fn get_broadcast_peers(
|
|
||||||
cluster_info: &ClusterInfo,
|
|
||||||
stakes: Option<&HashMap<Pubkey, u64>>,
|
|
||||||
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
|
|
||||||
let mut peers = cluster_info.tvu_peers();
|
|
||||||
let peers_and_stakes = stake_weight_peers(&mut peers, stakes);
|
|
||||||
(peers, peers_and_stakes)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn stake_weight_peers(
|
|
||||||
peers: &mut Vec<ContactInfo>,
|
|
||||||
stakes: Option<&HashMap<Pubkey, u64>>,
|
|
||||||
) -> Vec<(u64, usize)> {
|
|
||||||
peers.dedup();
|
|
||||||
sorted_stakes_with_index(peers, stakes)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_cluster_nodes_retransmit() {
|
fn test_cluster_nodes_retransmit() {
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
|
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
|
||||||
let this_node = cluster_info.my_contact_info();
|
|
||||||
// ClusterInfo::tvu_peers excludes the node itself.
|
// ClusterInfo::tvu_peers excludes the node itself.
|
||||||
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
|
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
|
||||||
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
|
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
|
||||||
|
@ -471,25 +446,6 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let (peers, stakes_and_index) =
|
|
||||||
sorted_retransmit_peers_and_stakes(&cluster_info, Some(&stakes));
|
|
||||||
assert_eq!(stakes_and_index.len(), peers.len());
|
|
||||||
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
|
|
||||||
// Remove slot leader from peers indices.
|
|
||||||
let stakes_and_index: Vec<_> = stakes_and_index
|
|
||||||
.into_iter()
|
|
||||||
.filter(|(_stake, index)| peers[*index].id != slot_leader)
|
|
||||||
.collect();
|
|
||||||
assert_eq!(peers.len(), stakes_and_index.len() + 1);
|
|
||||||
let mut shred_seed = [0u8; 32];
|
|
||||||
rng.fill(&mut shred_seed[..]);
|
|
||||||
let (self_index, shuffled_peers_and_stakes) =
|
|
||||||
shuffle_peers_and_index(&this_node.id, &peers, &stakes_and_index, shred_seed);
|
|
||||||
let shuffled_index: Vec<_> = shuffled_peers_and_stakes
|
|
||||||
.into_iter()
|
|
||||||
.map(|(_, index)| index)
|
|
||||||
.collect();
|
|
||||||
assert_eq!(this_node.id, peers[shuffled_index[self_index]].id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -520,7 +476,5 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes));
|
|
||||||
assert_eq!(peers_and_stakes.len(), peers.len());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,4 @@
|
||||||
use {
|
use solana_sdk::clock::Slot;
|
||||||
crate::{
|
|
||||||
cluster_info::ClusterInfo, contact_info::ContactInfo, weighted_shuffle::weighted_shuffle,
|
|
||||||
},
|
|
||||||
itertools::Itertools,
|
|
||||||
solana_sdk::{clock::Slot, pubkey::Pubkey},
|
|
||||||
std::collections::HashMap,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample, AbiEnumVisitor)]
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample, AbiEnumVisitor)]
|
||||||
enum CompressionType {
|
enum CompressionType {
|
||||||
|
@ -26,74 +19,3 @@ pub(crate) struct EpochIncompleteSlots {
|
||||||
compression: CompressionType,
|
compression: CompressionType,
|
||||||
compressed_list: Vec<u8>,
|
compressed_list: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Legacy methods copied for testing backward compatibility.
|
|
||||||
|
|
||||||
pub fn sorted_retransmit_peers_and_stakes(
|
|
||||||
cluster_info: &ClusterInfo,
|
|
||||||
stakes: Option<&HashMap<Pubkey, u64>>,
|
|
||||||
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
|
|
||||||
let mut peers = cluster_info.tvu_peers();
|
|
||||||
// insert "self" into this list for the layer and neighborhood computation
|
|
||||||
peers.push(cluster_info.my_contact_info());
|
|
||||||
let stakes_and_index = sorted_stakes_with_index(&peers, stakes);
|
|
||||||
(peers, stakes_and_index)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn sorted_stakes_with_index(
|
|
||||||
peers: &[ContactInfo],
|
|
||||||
stakes: Option<&HashMap<Pubkey, u64>>,
|
|
||||||
) -> Vec<(u64, usize)> {
|
|
||||||
let stakes_and_index: Vec<_> = peers
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(i, c)| {
|
|
||||||
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
|
|
||||||
// assumed to be missing entry. So let's make sure stake weights are atleast 1
|
|
||||||
let stake = 1.max(
|
|
||||||
stakes
|
|
||||||
.as_ref()
|
|
||||||
.map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)),
|
|
||||||
);
|
|
||||||
(stake, i)
|
|
||||||
})
|
|
||||||
.sorted_by(|(l_stake, l_info), (r_stake, r_info)| {
|
|
||||||
if r_stake == l_stake {
|
|
||||||
peers[*r_info].id.cmp(&peers[*l_info].id)
|
|
||||||
} else {
|
|
||||||
r_stake.cmp(l_stake)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
stakes_and_index
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn shuffle_peers_and_index(
|
|
||||||
id: &Pubkey,
|
|
||||||
peers: &[ContactInfo],
|
|
||||||
stakes_and_index: &[(u64, usize)],
|
|
||||||
seed: [u8; 32],
|
|
||||||
) -> (usize, Vec<(u64, usize)>) {
|
|
||||||
let shuffled_stakes_and_index = stake_weighted_shuffle(stakes_and_index, seed);
|
|
||||||
let self_index = shuffled_stakes_and_index
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.find_map(|(i, (_stake, index))| {
|
|
||||||
if peers[*index].id == *id {
|
|
||||||
Some(i)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
(self_index, shuffled_stakes_and_index)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn stake_weighted_shuffle(stakes_and_index: &[(u64, usize)], seed: [u8; 32]) -> Vec<(u64, usize)> {
|
|
||||||
let stake_weights = stakes_and_index.iter().map(|(w, _)| *w);
|
|
||||||
|
|
||||||
let shuffle = weighted_shuffle(stake_weights, seed);
|
|
||||||
|
|
||||||
shuffle.iter().map(|x| stakes_and_index[*x]).collect()
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,7 +13,7 @@ pub mod crds_gossip_pull;
|
||||||
pub mod crds_gossip_push;
|
pub mod crds_gossip_push;
|
||||||
pub mod crds_shards;
|
pub mod crds_shards;
|
||||||
pub mod crds_value;
|
pub mod crds_value;
|
||||||
pub mod deprecated;
|
mod deprecated;
|
||||||
pub mod duplicate_shred;
|
pub mod duplicate_shred;
|
||||||
pub mod epoch_slots;
|
pub mod epoch_slots;
|
||||||
pub mod gossip_error;
|
pub mod gossip_error;
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
#![allow(clippy::integer_arithmetic)]
|
#![allow(clippy::integer_arithmetic)]
|
||||||
use {
|
use {
|
||||||
crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError},
|
crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError},
|
||||||
|
itertools::Itertools,
|
||||||
rayon::{iter::ParallelIterator, prelude::*},
|
rayon::{iter::ParallelIterator, prelude::*},
|
||||||
serial_test::serial,
|
serial_test::serial,
|
||||||
solana_gossip::{
|
solana_gossip::{
|
||||||
cluster_info::{compute_retransmit_peers, ClusterInfo},
|
cluster_info::{compute_retransmit_peers, ClusterInfo},
|
||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
deprecated::{shuffle_peers_and_index, sorted_retransmit_peers_and_stakes},
|
weighted_shuffle::weighted_shuffle,
|
||||||
},
|
},
|
||||||
solana_sdk::{pubkey::Pubkey, signer::keypair::Keypair},
|
solana_sdk::{pubkey::Pubkey, signer::keypair::Keypair},
|
||||||
solana_streamer::socket::SocketAddrSpace,
|
solana_streamer::socket::SocketAddrSpace,
|
||||||
|
@ -32,6 +33,75 @@ fn find_insert_shred(id: &Pubkey, shred: i32, batches: &mut [Nodes]) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn sorted_retransmit_peers_and_stakes(
|
||||||
|
cluster_info: &ClusterInfo,
|
||||||
|
stakes: Option<&HashMap<Pubkey, u64>>,
|
||||||
|
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
|
||||||
|
let mut peers = cluster_info.tvu_peers();
|
||||||
|
// insert "self" into this list for the layer and neighborhood computation
|
||||||
|
peers.push(cluster_info.my_contact_info());
|
||||||
|
let stakes_and_index = sorted_stakes_with_index(&peers, stakes);
|
||||||
|
(peers, stakes_and_index)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sorted_stakes_with_index(
|
||||||
|
peers: &[ContactInfo],
|
||||||
|
stakes: Option<&HashMap<Pubkey, u64>>,
|
||||||
|
) -> Vec<(u64, usize)> {
|
||||||
|
let stakes_and_index: Vec<_> = peers
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, c)| {
|
||||||
|
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
|
||||||
|
// assumed to be missing entry. So let's make sure stake weights are atleast 1
|
||||||
|
let stake = 1.max(
|
||||||
|
stakes
|
||||||
|
.as_ref()
|
||||||
|
.map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)),
|
||||||
|
);
|
||||||
|
(stake, i)
|
||||||
|
})
|
||||||
|
.sorted_by(|(l_stake, l_info), (r_stake, r_info)| {
|
||||||
|
if r_stake == l_stake {
|
||||||
|
peers[*r_info].id.cmp(&peers[*l_info].id)
|
||||||
|
} else {
|
||||||
|
r_stake.cmp(l_stake)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
stakes_and_index
|
||||||
|
}
|
||||||
|
|
||||||
|
fn shuffle_peers_and_index(
|
||||||
|
id: &Pubkey,
|
||||||
|
peers: &[ContactInfo],
|
||||||
|
stakes_and_index: &[(u64, usize)],
|
||||||
|
seed: [u8; 32],
|
||||||
|
) -> (usize, Vec<(u64, usize)>) {
|
||||||
|
let shuffled_stakes_and_index = stake_weighted_shuffle(stakes_and_index, seed);
|
||||||
|
let self_index = shuffled_stakes_and_index
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.find_map(|(i, (_stake, index))| {
|
||||||
|
if peers[*index].id == *id {
|
||||||
|
Some(i)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
(self_index, shuffled_stakes_and_index)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stake_weighted_shuffle(stakes_and_index: &[(u64, usize)], seed: [u8; 32]) -> Vec<(u64, usize)> {
|
||||||
|
let stake_weights = stakes_and_index.iter().map(|(w, _)| *w);
|
||||||
|
|
||||||
|
let shuffle = weighted_shuffle(stake_weights, seed);
|
||||||
|
|
||||||
|
shuffle.iter().map(|x| stakes_and_index[*x]).collect()
|
||||||
|
}
|
||||||
|
|
||||||
fn retransmit(
|
fn retransmit(
|
||||||
mut shuffled_nodes: Vec<ContactInfo>,
|
mut shuffled_nodes: Vec<ContactInfo>,
|
||||||
senders: &HashMap<Pubkey, Sender<(i32, bool)>>,
|
senders: &HashMap<Pubkey, Sender<(i32, bool)>>,
|
||||||
|
|
Loading…
Reference in New Issue