From 753bd77b41a5698a077a46b52a6ceccd442c2f6b Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 10 Oct 2019 15:02:36 -0700 Subject: [PATCH] Use multicast to send retransmit packets (#6319) --- core/src/cluster_info.rs | 33 +++++++++++++++++++-------------- core/src/retransmit_stage.rs | 10 +++++----- core/tests/gossip.rs | 3 +-- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 95555153b9..a5547e39a8 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -21,7 +21,8 @@ use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}; use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; use crate::packet::{to_shared_blob, Blob, Packet, SharedBlob}; use crate::repair_service::RepairType; -use crate::result::Result; +use crate::result::{Error, Result}; +use crate::sendmmsg::multicast; use crate::staking_utils; use crate::streamer::{BlobReceiver, BlobSender}; use crate::weighted_shuffle::{weighted_best, weighted_shuffle}; @@ -737,29 +738,33 @@ impl ClusterInfo { /// # Remarks /// We need to avoid having obj locked while doing a io, such as the `send_to` pub fn retransmit_to( - id: &Pubkey, peers: &[&ContactInfo], - packet: &Packet, + packet: &mut Packet, slot_leader_pubkey: Option, s: &UdpSocket, forwarded: bool, ) -> Result<()> { trace!("retransmit orders {}", peers.len()); - let errs: Vec<_> = peers + let dests: Vec<_> = peers .iter() .filter(|v| v.id != slot_leader_pubkey.unwrap_or_default()) - .map(|v| { - let dest = if forwarded { &v.tvu_forwards } else { &v.tvu }; - debug!("{}: retransmit packet to {} {}", id, v.id, *dest,); - s.send_to(&packet.data, dest) - }) + .map(|v| if forwarded { &v.tvu_forwards } else { &v.tvu }) .collect(); - for e in errs { - if let Err(e) = &e { - inc_new_counter_error!("cluster_info-retransmit-send_to_error", 1, 1); - error!("retransmit result {:?}", e); + + let mut sent = 0; + while sent < dests.len() { + match multicast(s, packet, &dests[sent..]) { + Ok(n) => sent += n, + Err(e) => { + inc_new_counter_error!( + "cluster_info-retransmit-send_to_error", + dests.len() - sent, + 1 + ); + error!("retransmit result {:?}", e); + return Err(Error::IO(e)); + } } - e?; } Ok(()) } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 550b09a5a6..64414db11d 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -67,8 +67,8 @@ fn retransmit( let me = cluster_info.read().unwrap().my_data().clone(); let mut retransmit_total = 0; let mut compute_turbine_peers_total = 0; - for packets in packet_v { - for packet in &packets.packets { + for mut packets in packet_v { + for packet in packets.packets.iter_mut() { // skip repair packets if packet.meta.repair { total_packets -= 1; @@ -100,10 +100,10 @@ fn retransmit( leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); let mut retransmit_time = Measure::start("retransmit_to"); if !packet.meta.forward { - ClusterInfo::retransmit_to(&me.id, &neighbors, packet, leader, sock, true)?; - ClusterInfo::retransmit_to(&me.id, &children, packet, leader, sock, false)?; + ClusterInfo::retransmit_to(&neighbors, packet, leader, sock, true)?; + ClusterInfo::retransmit_to(&children, packet, leader, sock, false)?; } else { - ClusterInfo::retransmit_to(&me.id, &children, packet, leader, sock, true)?; + ClusterInfo::retransmit_to(&children, packet, leader, sock, true)?; } retransmit_time.stop(); retransmit_total += retransmit_time.as_ms(); diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index 9471acb486..c4a53b577b 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -177,9 +177,8 @@ pub fn cluster_info_retransmit() -> result::Result<()> { let mut p = Packet::default(); p.meta.size = 10; let peers = c1.read().unwrap().retransmit_peers(); - let self_id = c1.read().unwrap().id(); let retransmit_peers: Vec<_> = peers.iter().collect(); - ClusterInfo::retransmit_to(&self_id, &retransmit_peers, &p, None, &tn1, false)?; + ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false)?; let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| {