Optimize retransmit stage (#6231)
* Optimize retransmit stage * Remove comment * Fix test * Skip iteration to fixup 0 stakes
This commit is contained in:
parent
b5f7a4bff9
commit
23ea8ae56b
|
@ -482,48 +482,17 @@ impl ClusterInfo {
|
|||
&& !ContactInfo::is_valid_address(&contact_info.tpu)
|
||||
}
|
||||
|
||||
fn stake_weighted_shuffle<S: std::hash::BuildHasher>(
|
||||
peers: &[ContactInfo],
|
||||
stakes: Option<&HashMap<Pubkey, u64, S>>,
|
||||
rng: ChaChaRng,
|
||||
) -> Vec<(u64, ContactInfo)> {
|
||||
let (stake_weights, peers_with_stakes): (Vec<_>, Vec<_>) = peers
|
||||
.iter()
|
||||
.map(|c| {
|
||||
let stake = stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0));
|
||||
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
|
||||
// assumed to be missing entry. So let's make sure stake weights are atleast 1
|
||||
(cmp::max(1, stake), (stake, c.clone()))
|
||||
})
|
||||
.sorted_by(|(_, (l_stake, l_info)), (_, (r_stake, r_info))| {
|
||||
if r_stake == l_stake {
|
||||
r_info.id.cmp(&l_info.id)
|
||||
} else {
|
||||
r_stake.cmp(&l_stake)
|
||||
}
|
||||
})
|
||||
.unzip();
|
||||
|
||||
let shuffle = weighted_shuffle(stake_weights, rng);
|
||||
|
||||
let mut out: Vec<(u64, ContactInfo)> = shuffle
|
||||
.iter()
|
||||
.map(|x| peers_with_stakes[*x].clone())
|
||||
.collect();
|
||||
|
||||
out.dedup();
|
||||
out
|
||||
}
|
||||
|
||||
fn peers_and_stakes<S: std::hash::BuildHasher>(
|
||||
fn sorted_stakes_with_index<S: std::hash::BuildHasher>(
|
||||
peers: &[ContactInfo],
|
||||
stakes: Option<&HashMap<Pubkey, u64, S>>,
|
||||
) -> Vec<(u64, usize)> {
|
||||
let mut stakes_and_index: Vec<_> = peers
|
||||
let stakes_and_index: Vec<_> = peers
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, c)| {
|
||||
let stake = stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0));
|
||||
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
|
||||
// assumed to be missing entry. So let's make sure stake weights are atleast 1
|
||||
let stake = 1.max(stakes.map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)));
|
||||
(stake, i)
|
||||
})
|
||||
.sorted_by(|(l_stake, l_info), (r_stake, r_info)| {
|
||||
|
@ -535,36 +504,50 @@ impl ClusterInfo {
|
|||
})
|
||||
.collect();
|
||||
|
||||
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
|
||||
// assumed to be missing entry. So let's make sure stake weights are atleast 1
|
||||
stakes_and_index
|
||||
.iter_mut()
|
||||
.for_each(|(stake, _)| *stake = cmp::max(1, *stake));
|
||||
}
|
||||
|
||||
stakes_and_index
|
||||
fn stake_weighted_shuffle(
|
||||
stakes_and_index: &[(u64, usize)],
|
||||
rng: ChaChaRng,
|
||||
) -> Vec<(u64, usize)> {
|
||||
let stake_weights = stakes_and_index.iter().map(|(w, _)| *w).collect();
|
||||
|
||||
let shuffle = weighted_shuffle(stake_weights, rng);
|
||||
|
||||
shuffle.iter().map(|x| stakes_and_index[*x]).collect()
|
||||
}
|
||||
|
||||
// Return sorted_retransmit_peers(including self) and their stakes
|
||||
pub fn sorted_retransmit_peers_and_stakes(
|
||||
&self,
|
||||
stakes: Option<&HashMap<Pubkey, u64>>,
|
||||
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
|
||||
let mut peers = self.retransmit_peers();
|
||||
// insert "self" into this list for the layer and neighborhood computation
|
||||
peers.push(self.lookup(&self.id()).unwrap().clone());
|
||||
let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes);
|
||||
(peers, stakes_and_index)
|
||||
}
|
||||
|
||||
/// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list
|
||||
pub fn shuffle_peers_and_index<S: std::hash::BuildHasher>(
|
||||
pub fn shuffle_peers_and_index(
|
||||
&self,
|
||||
stakes: Option<&HashMap<Pubkey, u64, S>>,
|
||||
peers: &[ContactInfo],
|
||||
stakes_and_index: &[(u64, usize)],
|
||||
rng: ChaChaRng,
|
||||
) -> (usize, Vec<ContactInfo>) {
|
||||
let mut peers = self.retransmit_peers();
|
||||
peers.push(self.lookup(&self.id()).unwrap().clone());
|
||||
let contacts_and_stakes: Vec<_> = ClusterInfo::stake_weighted_shuffle(&peers, stakes, rng);
|
||||
let mut index = 0;
|
||||
let peers: Vec<_> = contacts_and_stakes
|
||||
.into_iter()
|
||||
) -> (usize, Vec<(u64, usize)>) {
|
||||
let shuffled_stakes_and_index = ClusterInfo::stake_weighted_shuffle(stakes_and_index, rng);
|
||||
let mut self_index = 0;
|
||||
shuffled_stakes_and_index
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, (_, peer))| {
|
||||
if peer.id == self.id() {
|
||||
index = i;
|
||||
.for_each(|(i, (_stake, index))| {
|
||||
if peers[*index].id == self.id() {
|
||||
self_index = i;
|
||||
}
|
||||
peer
|
||||
})
|
||||
.collect();
|
||||
(index, peers)
|
||||
});
|
||||
(self_index, shuffled_stakes_and_index)
|
||||
}
|
||||
|
||||
/// compute broadcast table
|
||||
|
@ -716,8 +699,8 @@ impl ClusterInfo {
|
|||
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
|
||||
let mut peers = self.tvu_peers();
|
||||
peers.dedup();
|
||||
let peers_and_stakes = ClusterInfo::peers_and_stakes(&peers, stakes);
|
||||
(peers, peers_and_stakes)
|
||||
let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes);
|
||||
(peers, stakes_and_index)
|
||||
}
|
||||
|
||||
/// broadcast messages from the leader to layer 1 nodes
|
||||
|
@ -755,13 +738,13 @@ impl ClusterInfo {
|
|||
/// We need to avoid having obj locked while doing a io, such as the `send_to`
|
||||
pub fn retransmit_to(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
peers: &[ContactInfo],
|
||||
peers: &[&ContactInfo],
|
||||
packet: &Packet,
|
||||
slot_leader_pubkey: Option<Pubkey>,
|
||||
s: &UdpSocket,
|
||||
forwarded: bool,
|
||||
) -> Result<()> {
|
||||
let (me, orders): (ContactInfo, &[ContactInfo]) = {
|
||||
let (me, orders): (ContactInfo, &[&ContactInfo]) = {
|
||||
// copy to avoid locking during IO
|
||||
let s = obj.read().unwrap();
|
||||
(s.my_data().clone(), peers)
|
||||
|
@ -1524,27 +1507,28 @@ impl ClusterInfo {
|
|||
/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them
|
||||
/// 1 - also check if there are nodes in the next layer and repeat the layer 1 to layer 2 logic
|
||||
|
||||
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance)
|
||||
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake
|
||||
pub fn compute_retransmit_peers(
|
||||
fanout: usize,
|
||||
my_index: usize,
|
||||
peers: Vec<ContactInfo>,
|
||||
) -> (Vec<ContactInfo>, Vec<ContactInfo>) {
|
||||
stakes_and_index: Vec<usize>,
|
||||
) -> (Vec<usize>, Vec<usize>) {
|
||||
//calc num_layers and num_neighborhoods using the total number of nodes
|
||||
let (num_layers, layer_indices) = ClusterInfo::describe_data_plane(peers.len(), fanout);
|
||||
let (num_layers, layer_indices) =
|
||||
ClusterInfo::describe_data_plane(stakes_and_index.len(), fanout);
|
||||
|
||||
if num_layers <= 1 {
|
||||
/* single layer data plane */
|
||||
(peers, vec![])
|
||||
(stakes_and_index, vec![])
|
||||
} else {
|
||||
//find my layer
|
||||
let locality = ClusterInfo::localize(&layer_indices, fanout, my_index);
|
||||
let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len());
|
||||
let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec();
|
||||
let upper_bound = cmp::min(locality.neighbor_bounds.1, stakes_and_index.len());
|
||||
let neighbors = stakes_and_index[locality.neighbor_bounds.0..upper_bound].to_vec();
|
||||
let mut children = Vec::new();
|
||||
for ix in locality.next_layer_peers {
|
||||
if let Some(peer) = peers.get(ix) {
|
||||
children.push(peer.clone());
|
||||
if let Some(peer) = stakes_and_index.get(ix) {
|
||||
children.push(*peer);
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -41,15 +41,29 @@ fn retransmit(
|
|||
let r_bank = bank_forks.read().unwrap().working_bank();
|
||||
let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot());
|
||||
let mut peers_len = 0;
|
||||
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
|
||||
let (peers, stakes_and_index) = cluster_info
|
||||
.read()
|
||||
.unwrap()
|
||||
.sorted_retransmit_peers_and_stakes(stakes.as_ref());
|
||||
for packet in &packets.packets {
|
||||
let (my_index, mut peers) = cluster_info.read().unwrap().shuffle_peers_and_index(
|
||||
staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(),
|
||||
let (my_index, mut shuffled_stakes_and_index) =
|
||||
cluster_info.read().unwrap().shuffle_peers_and_index(
|
||||
&peers,
|
||||
&stakes_and_index,
|
||||
ChaChaRng::from_seed(packet.meta.seed),
|
||||
);
|
||||
peers_len = cmp::max(peers_len, peers.len());
|
||||
peers.remove(my_index);
|
||||
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
|
||||
shuffled_stakes_and_index.remove(my_index);
|
||||
// split off the indexes, we don't need the stakes anymore
|
||||
let indexes = shuffled_stakes_and_index
|
||||
.into_iter()
|
||||
.map(|(_, index)| index)
|
||||
.collect();
|
||||
|
||||
let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, peers);
|
||||
let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
|
||||
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
|
||||
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
|
||||
|
||||
let leader = leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
|
||||
if !packet.meta.forward {
|
||||
|
|
|
@ -50,15 +50,16 @@ fn retransmit(
|
|||
}
|
||||
});
|
||||
seed[0..4].copy_from_slice(&blob.to_le_bytes());
|
||||
let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_nodes);
|
||||
children.iter().for_each(|p| {
|
||||
let s = senders.get(&p.id).unwrap();
|
||||
let shuffled_indices = (0..shuffled_nodes.len()).collect();
|
||||
let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_indices);
|
||||
children.into_iter().for_each(|i| {
|
||||
let s = senders.get(&shuffled_nodes[i].id).unwrap();
|
||||
let _ = s.send((blob, retransmit));
|
||||
});
|
||||
|
||||
if retransmit {
|
||||
neighbors.iter().for_each(|p| {
|
||||
let s = senders.get(&p.id).unwrap();
|
||||
neighbors.into_iter().for_each(|i| {
|
||||
let s = senders.get(&shuffled_nodes[i].id).unwrap();
|
||||
let _ = s.send((blob, false));
|
||||
});
|
||||
}
|
||||
|
@ -113,8 +114,17 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
|
|||
.map(|i| {
|
||||
let mut seed = [0; 32];
|
||||
seed[0..4].copy_from_slice(&i.to_le_bytes());
|
||||
let (_, peers) = cluster_info
|
||||
.shuffle_peers_and_index(Some(&staked_nodes), ChaChaRng::from_seed(seed));
|
||||
let (peers, stakes_and_index) =
|
||||
cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes));
|
||||
let (_, shuffled_stakes_and_indexes) = cluster_info.shuffle_peers_and_index(
|
||||
&peers,
|
||||
&stakes_and_index,
|
||||
ChaChaRng::from_seed(seed),
|
||||
);
|
||||
let peers = shuffled_stakes_and_indexes
|
||||
.into_iter()
|
||||
.map(|(_, i)| peers[i].clone())
|
||||
.collect();
|
||||
peers
|
||||
})
|
||||
.collect();
|
||||
|
|
|
@ -177,7 +177,8 @@ pub fn cluster_info_retransmit() -> result::Result<()> {
|
|||
let mut p = Packet::default();
|
||||
p.meta.size = 10;
|
||||
let peers = c1.read().unwrap().retransmit_peers();
|
||||
ClusterInfo::retransmit_to(&c1, &peers, &p, None, &tn1, false)?;
|
||||
let retransmit_peers: Vec<_> = peers.iter().collect();
|
||||
ClusterInfo::retransmit_to(&c1, &retransmit_peers, &p, None, &tn1, false)?;
|
||||
let res: Vec<_> = [tn1, tn2, tn3]
|
||||
.into_par_iter()
|
||||
.map(|s| {
|
||||
|
|
Loading…
Reference in New Issue