From ff0e623d302baa0348f6d3158cfb08a978a282bc Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 20 May 2021 13:41:12 -0400 Subject: [PATCH] removes the nested for loop from retransmit-stage The code can be simplified by just flattening the vector of packets. --- core/src/retransmit_stage.rs | 178 +++++++++++++++++------------------ 1 file changed, 88 insertions(+), 90 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8f09cbdbec..f513dba29b 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -330,10 +330,10 @@ fn retransmit( let packets = r_lock.recv_timeout(RECV_TIMEOUT)?; let mut timer_start = Measure::start("retransmit"); let mut total_packets = packets.packets.len(); - let mut packet_v = vec![packets]; + let mut packets = vec![packets]; while let Ok(nq) = r_lock.try_recv() { total_packets += nq.packets.len(); - packet_v.push(nq); + packets.push(nq); if total_packets >= MAX_PACKET_BATCH_SIZE { break; } @@ -382,95 +382,93 @@ fn retransmit( let mut packets_by_slot: HashMap = HashMap::new(); let mut packets_by_source: HashMap = HashMap::new(); let mut max_slot = 0; - for packets in packet_v { - for packet in packets.packets.iter() { - // skip discarded packets and repair packets - if packet.meta.discard { - total_packets -= 1; - discard_total += 1; - continue; - } - if packet.meta.repair { - total_packets -= 1; - repair_total += 1; - continue; - } - let shred_slot = match check_if_already_received(packet, shreds_received) { - Some(slot) => slot, - None => continue, - }; - max_slot = max_slot.max(shred_slot); - - if let Some(rpc_subscriptions) = rpc_subscriptions { - if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) { - rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived { - slot: shred_slot, - timestamp: timestamp(), - }); - } - } - - let mut compute_turbine_peers = Measure::start("turbine_start"); - let stakes_and_index = get_retransmit_peers( - my_id, - shred_slot, - leader_schedule_cache, - r_bank.deref(), - r_epoch_stakes_cache.deref(), - ); - let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( - &my_id, - &r_epoch_stakes_cache.peers, - &stakes_and_index, - packet.meta.seed, - ); - // If the node is on the critical path (i.e. the first node in each - // neighborhood), then we expect that the packet arrives at tvu - // socket as opposed to tvu-forwards. If this is not the case, then - // the turbine broadcast/retransmit tree mismatch across nodes. - if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) { - retransmit_tree_mismatch += 1; - } - peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); - // split off the indexes, we don't need the stakes anymore - let indexes: Vec<_> = shuffled_stakes_and_index - .into_iter() - .map(|(_, index)| index) - .collect(); - - let (neighbors, children) = - compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes); - let neighbors: Vec<_> = neighbors - .into_iter() - .filter_map(|index| { - let peer = &r_epoch_stakes_cache.peers[index]; - if peer.id == my_id { - None - } else { - Some(peer) - } - }) - .collect(); - let children: Vec<_> = children - .into_iter() - .map(|index| &r_epoch_stakes_cache.peers[index]) - .collect(); - compute_turbine_peers.stop(); - compute_turbine_peers_total += compute_turbine_peers.as_us(); - - *packets_by_slot.entry(packet.meta.slot).or_insert(0) += 1; - *packets_by_source - .entry(packet.meta.addr().to_string()) - .or_insert(0) += 1; - - let mut retransmit_time = Measure::start("retransmit_to"); - if !packet.meta.forward { - ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?; - } - ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?; - retransmit_time.stop(); - retransmit_total += retransmit_time.as_us(); + for packet in packets.iter().flat_map(|p| p.packets.iter()) { + // skip discarded packets and repair packets + if packet.meta.discard { + total_packets -= 1; + discard_total += 1; + continue; } + if packet.meta.repair { + total_packets -= 1; + repair_total += 1; + continue; + } + let shred_slot = match check_if_already_received(packet, shreds_received) { + Some(slot) => slot, + None => continue, + }; + max_slot = max_slot.max(shred_slot); + + if let Some(rpc_subscriptions) = rpc_subscriptions { + if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) { + rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived { + slot: shred_slot, + timestamp: timestamp(), + }); + } + } + + let mut compute_turbine_peers = Measure::start("turbine_start"); + let stakes_and_index = get_retransmit_peers( + my_id, + shred_slot, + leader_schedule_cache, + r_bank.deref(), + r_epoch_stakes_cache.deref(), + ); + let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( + &my_id, + &r_epoch_stakes_cache.peers, + &stakes_and_index, + packet.meta.seed, + ); + // If the node is on the critical path (i.e. the first node in each + // neighborhood), then we expect that the packet arrives at tvu socket + // as opposed to tvu-forwards. If this is not the case, then the + // turbine broadcast/retransmit tree is mismatched across nodes. + if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) { + retransmit_tree_mismatch += 1; + } + peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); + // split off the indexes, we don't need the stakes anymore + let indexes: Vec<_> = shuffled_stakes_and_index + .into_iter() + .map(|(_, index)| index) + .collect(); + debug_assert_eq!(my_id, r_epoch_stakes_cache.peers[indexes[my_index]].id); + + let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes); + let neighbors: Vec<_> = neighbors + .into_iter() + .filter_map(|index| { + let peer = &r_epoch_stakes_cache.peers[index]; + if peer.id == my_id { + None + } else { + Some(peer) + } + }) + .collect(); + let children: Vec<_> = children + .into_iter() + .map(|index| &r_epoch_stakes_cache.peers[index]) + .collect(); + compute_turbine_peers.stop(); + compute_turbine_peers_total += compute_turbine_peers.as_us(); + + *packets_by_slot.entry(packet.meta.slot).or_default() += 1; + *packets_by_source + .entry(packet.meta.addr().to_string()) + .or_default() += 1; + + let mut retransmit_time = Measure::start("retransmit_to"); + if !packet.meta.forward { + ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?; + } + ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?; + retransmit_time.stop(); + retransmit_total += retransmit_time.as_us(); } max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed); timer_start.stop();