removes legacy retransmit tests (#29817)
Retransmit code has moved to core/src/cluster_nodes.rs and has been significantly revised. gossip/tests/cluster_info.rs is testing the old code which is no longer relevant.
This commit is contained in:
parent
d75303f541
commit
590b75140f
|
@ -199,19 +199,6 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
|
|||
bit.store(0u64, Ordering::Relaxed);
|
||||
});
|
||||
}
|
||||
|
||||
// Only for tests and simulations.
|
||||
pub fn mock_clone(&self) -> Self {
|
||||
Self {
|
||||
keys: self.keys.clone(),
|
||||
bits: self
|
||||
.bits
|
||||
.iter()
|
||||
.map(|v| AtomicU64::new(v.load(Ordering::Relaxed)))
|
||||
.collect(),
|
||||
..*self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BloomHashIndex> From<AtomicBloom<T>> for Bloom<T> {
|
||||
|
|
|
@ -432,33 +432,6 @@ impl ClusterInfo {
|
|||
me
|
||||
}
|
||||
|
||||
// Should only be used by tests and simulations
|
||||
pub fn clone_with_id(&self, new_id: &Pubkey) -> Self {
|
||||
let mut my_contact_info = self.my_contact_info.read().unwrap().clone();
|
||||
my_contact_info.id = *new_id;
|
||||
ClusterInfo {
|
||||
gossip: self.gossip.mock_clone(),
|
||||
keypair: RwLock::new(self.keypair.read().unwrap().clone()),
|
||||
entrypoints: RwLock::new(self.entrypoints.read().unwrap().clone()),
|
||||
outbound_budget: self.outbound_budget.clone_non_atomic(),
|
||||
my_contact_info: RwLock::new(my_contact_info),
|
||||
ping_cache: Mutex::new(self.ping_cache.lock().unwrap().mock_clone()),
|
||||
stats: GossipStats::default(),
|
||||
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
local_message_pending_push_queue: Mutex::new(
|
||||
self.local_message_pending_push_queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.clone(),
|
||||
),
|
||||
contact_debug_interval: self.contact_debug_interval,
|
||||
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), *new_id, timestamp())),
|
||||
contact_info_path: PathBuf::default(),
|
||||
contact_save_interval: 0, // disabled
|
||||
..*self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_contact_debug_interval(&mut self, new: u64) {
|
||||
self.contact_debug_interval = new;
|
||||
}
|
||||
|
|
|
@ -642,24 +642,6 @@ impl Crds {
|
|||
pub(crate) fn take_stats(&self) -> CrdsStats {
|
||||
std::mem::take(&mut self.stats.lock().unwrap())
|
||||
}
|
||||
|
||||
// Only for tests and simulations.
|
||||
pub(crate) fn mock_clone(&self) -> Self {
|
||||
Self {
|
||||
table: self.table.clone(),
|
||||
cursor: self.cursor,
|
||||
shards: self.shards.clone(),
|
||||
nodes: self.nodes.clone(),
|
||||
votes: self.votes.clone(),
|
||||
epoch_slots: self.epoch_slots.clone(),
|
||||
duplicate_shreds: self.duplicate_shreds.clone(),
|
||||
records: self.records.clone(),
|
||||
entries: self.entries.clone(),
|
||||
purged: self.purged.clone(),
|
||||
shred_versions: self.shred_versions.clone(),
|
||||
stats: Mutex::<CrdsStats>::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CrdsDataStats {
|
||||
|
|
|
@ -320,16 +320,6 @@ impl CrdsGossip {
|
|||
self.pull.purge_failed_inserts(now);
|
||||
rv
|
||||
}
|
||||
|
||||
// Only for tests and simulations.
|
||||
pub(crate) fn mock_clone(&self) -> Self {
|
||||
let crds = self.crds.read().unwrap().mock_clone();
|
||||
Self {
|
||||
crds: RwLock::new(crds),
|
||||
push: self.push.mock_clone(),
|
||||
pull: self.pull.mock_clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns active and valid cluster nodes to gossip with.
|
||||
|
|
|
@ -569,16 +569,6 @@ impl CrdsGossipPull {
|
|||
stats.success,
|
||||
)
|
||||
}
|
||||
|
||||
// Only for tests and simulations.
|
||||
pub(crate) fn mock_clone(&self) -> Self {
|
||||
let failed_inserts = self.failed_inserts.read().unwrap().clone();
|
||||
Self {
|
||||
failed_inserts: RwLock::new(failed_inserts),
|
||||
num_pulls: AtomicUsize::new(self.num_pulls.load(Ordering::Relaxed)),
|
||||
..*self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -274,22 +274,6 @@ impl CrdsGossipPush {
|
|||
let mut active_set = self.active_set.write().unwrap();
|
||||
active_set.rotate(&mut rng, self.push_fanout * 3, network_size, &nodes, stakes)
|
||||
}
|
||||
|
||||
// Only for tests and simulations.
|
||||
pub(crate) fn mock_clone(&self) -> Self {
|
||||
let active_set = self.active_set.read().unwrap().mock_clone();
|
||||
let received_cache = self.received_cache.lock().unwrap().mock_clone();
|
||||
let crds_cursor = *self.crds_cursor.lock().unwrap();
|
||||
Self {
|
||||
active_set: RwLock::new(active_set),
|
||||
received_cache: Mutex::new(received_cache),
|
||||
crds_cursor: Mutex::new(crds_cursor),
|
||||
num_total: AtomicUsize::new(self.num_total.load(Ordering::Relaxed)),
|
||||
num_old: AtomicUsize::new(self.num_old.load(Ordering::Relaxed)),
|
||||
num_pushes: AtomicUsize::new(self.num_pushes.load(Ordering::Relaxed)),
|
||||
..*self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -242,27 +242,6 @@ impl PingCache {
|
|||
(check, ping)
|
||||
}
|
||||
|
||||
// Only for tests and simulations.
|
||||
pub(crate) fn mock_clone(&self) -> Self {
|
||||
let mut clone = Self {
|
||||
ttl: self.ttl,
|
||||
rate_limit_delay: self.rate_limit_delay,
|
||||
pings: LruCache::new(self.pings.cap()),
|
||||
pongs: LruCache::new(self.pongs.cap()),
|
||||
pending_cache: LruCache::new(self.pending_cache.cap()),
|
||||
};
|
||||
for (k, v) in self.pongs.iter().rev() {
|
||||
clone.pings.put(*k, *v);
|
||||
}
|
||||
for (k, v) in self.pongs.iter().rev() {
|
||||
clone.pongs.put(*k, *v);
|
||||
}
|
||||
for (k, v) in self.pending_cache.iter().rev() {
|
||||
clone.pending_cache.put(*k, *v);
|
||||
}
|
||||
clone
|
||||
}
|
||||
|
||||
/// Only for tests and simulations.
|
||||
pub fn mock_pong(&mut self, node: Pubkey, socket: SocketAddr, now: Instant) {
|
||||
self.pongs.put((node, socket), now);
|
||||
|
|
|
@ -101,19 +101,6 @@ impl PushActiveSet {
|
|||
fn get_entry(&self, stake: Option<&u64>) -> &PushActiveSetEntry {
|
||||
&self.0[get_stake_bucket(stake)]
|
||||
}
|
||||
|
||||
// Only for tests and simulations.
|
||||
pub(crate) fn mock_clone(&self) -> Self {
|
||||
Self(std::array::from_fn(|k| {
|
||||
PushActiveSetEntry(
|
||||
self.0[k]
|
||||
.0
|
||||
.iter()
|
||||
.map(|(&node, filter)| (node, filter.mock_clone()))
|
||||
.collect(),
|
||||
)
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl PushActiveSetEntry {
|
||||
|
|
|
@ -55,8 +55,8 @@ impl ReceivedCache {
|
|||
.flatten()
|
||||
}
|
||||
|
||||
// Only for tests and simulations.
|
||||
pub(crate) fn mock_clone(&self) -> Self {
|
||||
#[cfg(test)]
|
||||
fn mock_clone(&self) -> Self {
|
||||
let mut cache = LruCache::new(self.0.cap());
|
||||
for (&origin, entry) in self.0.iter().rev() {
|
||||
cache.put(origin, entry.clone());
|
||||
|
|
|
@ -1,314 +0,0 @@
|
|||
#![allow(clippy::integer_arithmetic)]
|
||||
use {
|
||||
crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError},
|
||||
itertools::Itertools,
|
||||
rand::SeedableRng,
|
||||
rand_chacha::ChaChaRng,
|
||||
rayon::{iter::ParallelIterator, prelude::*},
|
||||
serial_test::serial,
|
||||
solana_gossip::{
|
||||
cluster_info::{compute_retransmit_peers, ClusterInfo},
|
||||
legacy_contact_info::LegacyContactInfo as ContactInfo,
|
||||
weighted_shuffle::WeightedShuffle,
|
||||
},
|
||||
solana_sdk::{pubkey::Pubkey, signer::keypair::Keypair},
|
||||
solana_streamer::socket::SocketAddrSpace,
|
||||
std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::{Arc, Mutex},
|
||||
time::Instant,
|
||||
},
|
||||
};
|
||||
|
||||
type Nodes = HashMap<Pubkey, (bool, HashSet<i32>, Receiver<(i32, bool)>)>;
|
||||
|
||||
fn num_threads() -> usize {
|
||||
num_cpus::get()
|
||||
}
|
||||
|
||||
/// Search for the a node with the given balance
|
||||
fn find_insert_shred(id: &Pubkey, shred: i32, batches: &mut [Nodes]) {
|
||||
batches.par_iter_mut().for_each(|batch| {
|
||||
if batch.contains_key(id) {
|
||||
let _ = batch.get_mut(id).unwrap().1.insert(shred);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn sorted_retransmit_peers_and_stakes(
|
||||
cluster_info: &ClusterInfo,
|
||||
stakes: Option<&HashMap<Pubkey, u64>>,
|
||||
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
|
||||
let mut peers = cluster_info.tvu_peers();
|
||||
// insert "self" into this list for the layer and neighborhood computation
|
||||
peers.push(cluster_info.my_contact_info());
|
||||
let stakes_and_index = sorted_stakes_with_index(&peers, stakes);
|
||||
(peers, stakes_and_index)
|
||||
}
|
||||
|
||||
fn sorted_stakes_with_index(
|
||||
peers: &[ContactInfo],
|
||||
stakes: Option<&HashMap<Pubkey, u64>>,
|
||||
) -> Vec<(u64, usize)> {
|
||||
let stakes_and_index: Vec<_> = peers
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, c)| {
|
||||
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
|
||||
// assumed to be missing entry. So let's make sure stake weights are atleast 1
|
||||
let stake = 1.max(
|
||||
stakes
|
||||
.as_ref()
|
||||
.map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)),
|
||||
);
|
||||
(stake, i)
|
||||
})
|
||||
.sorted_by(|(l_stake, l_info), (r_stake, r_info)| {
|
||||
if r_stake == l_stake {
|
||||
peers[*r_info].id.cmp(&peers[*l_info].id)
|
||||
} else {
|
||||
r_stake.cmp(l_stake)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
stakes_and_index
|
||||
}
|
||||
|
||||
fn shuffle_peers_and_index(
|
||||
id: &Pubkey,
|
||||
peers: &[ContactInfo],
|
||||
stakes_and_index: &[(u64, usize)],
|
||||
seed: [u8; 32],
|
||||
) -> (usize, Vec<(u64, usize)>) {
|
||||
let shuffled_stakes_and_index = stake_weighted_shuffle(stakes_and_index, seed);
|
||||
let self_index = shuffled_stakes_and_index
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find_map(|(i, (_stake, index))| {
|
||||
if peers[*index].id == *id {
|
||||
Some(i)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
(self_index, shuffled_stakes_and_index)
|
||||
}
|
||||
|
||||
fn stake_weighted_shuffle(stakes_and_index: &[(u64, usize)], seed: [u8; 32]) -> Vec<(u64, usize)> {
|
||||
let mut rng = ChaChaRng::from_seed(seed);
|
||||
let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect();
|
||||
let shuffle = WeightedShuffle::new("stake_weighted_shuffle", &stake_weights);
|
||||
shuffle
|
||||
.shuffle(&mut rng)
|
||||
.map(|i| stakes_and_index[i])
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn retransmit(
|
||||
mut shuffled_nodes: Vec<ContactInfo>,
|
||||
senders: &HashMap<Pubkey, Sender<(i32, bool)>>,
|
||||
cluster: &ClusterInfo,
|
||||
fanout: usize,
|
||||
shred: i32,
|
||||
retransmit: bool,
|
||||
) -> i32 {
|
||||
let mut seed = [0; 32];
|
||||
let mut my_index = 0;
|
||||
let mut index = 0;
|
||||
shuffled_nodes.retain(|c| {
|
||||
if c.id == cluster.id() {
|
||||
my_index = index;
|
||||
false
|
||||
} else {
|
||||
index += 1;
|
||||
true
|
||||
}
|
||||
});
|
||||
seed[0..4].copy_from_slice(&shred.to_le_bytes());
|
||||
let shuffled_indices: Vec<_> = (0..shuffled_nodes.len()).collect();
|
||||
let (neighbors, children) = compute_retransmit_peers(fanout, my_index, &shuffled_indices);
|
||||
children.into_iter().for_each(|i| {
|
||||
let s = senders.get(&shuffled_nodes[i].id).unwrap();
|
||||
let _ = s.send((shred, retransmit));
|
||||
});
|
||||
|
||||
if retransmit {
|
||||
neighbors.into_iter().for_each(|i| {
|
||||
let s = senders.get(&shuffled_nodes[i].id).unwrap();
|
||||
let _ = s.send((shred, false));
|
||||
});
|
||||
}
|
||||
|
||||
shred
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn run_simulation(stakes: &[u64], fanout: usize) {
|
||||
let num_threads = num_threads();
|
||||
// set timeout to 5 minutes
|
||||
let timeout = 60 * 5;
|
||||
|
||||
// describe the leader
|
||||
let leader_info = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||
let cluster_info = ClusterInfo::new(
|
||||
leader_info.clone(),
|
||||
Arc::new(Keypair::new()),
|
||||
SocketAddrSpace::Unspecified,
|
||||
);
|
||||
|
||||
// setup staked nodes
|
||||
let mut staked_nodes = HashMap::new();
|
||||
|
||||
// setup accounts for all nodes (leader has 0 bal)
|
||||
let (s, r) = unbounded();
|
||||
let senders: Arc<Mutex<HashMap<Pubkey, Sender<(i32, bool)>>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
senders.lock().unwrap().insert(leader_info.id, s);
|
||||
let mut batches: Vec<Nodes> = Vec::with_capacity(num_threads);
|
||||
(0..num_threads).for_each(|_| batches.push(HashMap::new()));
|
||||
batches
|
||||
.get_mut(0)
|
||||
.unwrap()
|
||||
.insert(leader_info.id, (false, HashSet::new(), r));
|
||||
let range: Vec<_> = (1..=stakes.len()).collect();
|
||||
let chunk_size = (stakes.len() + num_threads - 1) / num_threads;
|
||||
range.chunks(chunk_size).for_each(|chunk| {
|
||||
chunk.iter().for_each(|i| {
|
||||
//distribute neighbors across threads to maximize parallel compute
|
||||
let batch_ix = *i % batches.len();
|
||||
let node = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||
staked_nodes.insert(node.id, stakes[*i - 1]);
|
||||
cluster_info.insert_info(node.clone());
|
||||
let (s, r) = unbounded();
|
||||
batches
|
||||
.get_mut(batch_ix)
|
||||
.unwrap()
|
||||
.insert(node.id, (false, HashSet::new(), r));
|
||||
senders.lock().unwrap().insert(node.id, s);
|
||||
})
|
||||
});
|
||||
let c_info = cluster_info.clone_with_id(&cluster_info.id());
|
||||
|
||||
let shreds_len = 100;
|
||||
let shuffled_peers: Vec<Vec<ContactInfo>> = (0..shreds_len as i32)
|
||||
.map(|i| {
|
||||
let mut seed = [0; 32];
|
||||
seed[0..4].copy_from_slice(&i.to_le_bytes());
|
||||
// TODO: Ideally these should use the new methods in
|
||||
// solana_core::cluster_nodes, however that would add build
|
||||
// dependency on solana_core which is not desired.
|
||||
let (peers, stakes_and_index) =
|
||||
sorted_retransmit_peers_and_stakes(&cluster_info, Some(&staked_nodes));
|
||||
let (_, shuffled_stakes_and_indexes) =
|
||||
shuffle_peers_and_index(&cluster_info.id(), &peers, &stakes_and_index, seed);
|
||||
shuffled_stakes_and_indexes
|
||||
.into_iter()
|
||||
.map(|(_, i)| peers[i].clone())
|
||||
.collect()
|
||||
})
|
||||
.collect();
|
||||
|
||||
// create some "shreds".
|
||||
(0..shreds_len).for_each(|i| {
|
||||
let broadcast_table = &shuffled_peers[i];
|
||||
find_insert_shred(&broadcast_table[0].id, i as i32, &mut batches);
|
||||
});
|
||||
|
||||
assert!(!batches.is_empty());
|
||||
|
||||
// start turbine simulation
|
||||
let now = Instant::now();
|
||||
batches.par_iter_mut().for_each(|batch| {
|
||||
let mut remaining = batch.len();
|
||||
let senders: HashMap<_, _> = senders.lock().unwrap().clone();
|
||||
while remaining > 0 {
|
||||
for (id, (layer1_done, recv, r)) in batch.iter_mut() {
|
||||
assert!(
|
||||
now.elapsed().as_secs() < timeout,
|
||||
"Timed out with {remaining:?} remaining nodes"
|
||||
);
|
||||
let cluster = c_info.clone_with_id(id);
|
||||
if !*layer1_done {
|
||||
recv.iter().for_each(|i| {
|
||||
retransmit(
|
||||
shuffled_peers[*i as usize].clone(),
|
||||
&senders,
|
||||
&cluster,
|
||||
fanout,
|
||||
*i,
|
||||
true,
|
||||
);
|
||||
});
|
||||
*layer1_done = true;
|
||||
}
|
||||
|
||||
//send and recv
|
||||
if recv.len() < shreds_len {
|
||||
loop {
|
||||
match r.try_recv() {
|
||||
Ok((data, retx)) => {
|
||||
if recv.insert(data) {
|
||||
let _ = retransmit(
|
||||
shuffled_peers[data as usize].clone(),
|
||||
&senders,
|
||||
&cluster,
|
||||
fanout,
|
||||
data,
|
||||
retx,
|
||||
);
|
||||
}
|
||||
if recv.len() == shreds_len {
|
||||
remaining -= 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(TryRecvError::Disconnected) => break,
|
||||
Err(TryRecvError::Empty) => break,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Recommended to not run these tests in parallel (they are resource heavy and want all the compute)
|
||||
|
||||
//todo add tests with network failures
|
||||
|
||||
// Run with a single layer
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_retransmit_small() {
|
||||
let stakes: Vec<_> = (0..200).collect();
|
||||
run_simulation(&stakes, 200);
|
||||
}
|
||||
|
||||
// Make sure at least 2 layers are used
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_retransmit_medium() {
|
||||
let num_nodes = 2000;
|
||||
let stakes: Vec<_> = (0..num_nodes).collect();
|
||||
run_simulation(&stakes, 200);
|
||||
}
|
||||
|
||||
// Make sure at least 2 layers are used but with equal stakes
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_retransmit_medium_equal_stakes() {
|
||||
let num_nodes = 2000;
|
||||
let stakes: Vec<_> = (0..num_nodes).map(|_| 10).collect();
|
||||
run_simulation(&stakes, 200);
|
||||
}
|
||||
|
||||
// Scale down the network and make sure many layers are used
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_retransmit_large() {
|
||||
let num_nodes = 4000;
|
||||
let stakes: Vec<_> = (0..num_nodes).collect();
|
||||
run_simulation(&stakes, 2);
|
||||
}
|
Loading…
Reference in New Issue