From 32312f3c165e0407bff4f53b69624ca12a7964e1 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 9 Oct 2019 13:11:19 -0700 Subject: [PATCH] Do not retransmit Repair responses (#6284) * Do not retransmit Repair responses * Add a test * Refactor neighboring functionality --- core/src/cluster_info.rs | 1 + core/src/packet.rs | 1 + core/src/replicator.rs | 4 +- core/src/retransmit_stage.rs | 84 ++++++++++++++++++++++++++++++++ core/src/shred_fetch_stage.rs | 91 ++++++++++++++++++++++++++--------- core/src/tvu.rs | 13 +++-- 6 files changed, 163 insertions(+), 31 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 0a850e776..d302e3572 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -456,6 +456,7 @@ impl ClusterInfo { .filter_map(|x| x.value.contact_info()) .filter(|x| x.id != me) .filter(|x| ContactInfo::is_valid_address(&x.tvu)) + .filter(|x| ContactInfo::is_valid_address(&x.tvu_forwards)) .cloned() .collect() } diff --git a/core/src/packet.rs b/core/src/packet.rs index b2faac352..6dc966f34 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -41,6 +41,7 @@ pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE); pub struct Meta { pub size: usize, pub forward: bool, + pub repair: bool, pub addr: [u16; 8], pub port: u16, pub v6: bool, diff --git a/core/src/replicator.rs b/core/src/replicator.rs index b1d012059..4ce451c3f 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -253,9 +253,8 @@ impl Replicator { }; let repair_socket = Arc::new(node.sockets.repair); - let mut blob_sockets: Vec> = + let blob_sockets: Vec> = node.sockets.tvu.into_iter().map(Arc::new).collect(); - blob_sockets.push(repair_socket.clone()); let blob_forward_sockets: Vec> = node .sockets .tvu_forwards @@ -266,6 +265,7 @@ impl Replicator { let fetch_stage = ShredFetchStage::new( blob_sockets, blob_forward_sockets, + repair_socket.clone(), &blob_fetch_sender, &exit, ); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index aa4750938..32c55392b 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -58,6 +58,11 @@ pub fn retransmit( let mut compute_turbine_peers_total = 0; for packets in packet_v { for packet in &packets.packets { + // skip repair packets + if packet.meta.repair { + total_packets -= 1; + continue; + } let mut compute_turbine_peers = Measure::start("turbine_start"); let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( &me.id, @@ -229,3 +234,82 @@ impl Service for RetransmitStage { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::blocktree::create_new_tmp_ledger; + use crate::blocktree_processor::{process_blocktree, ProcessOptions}; + use crate::contact_info::ContactInfo; + use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; + use crate::packet::{Meta, Packet, Packets}; + use solana_netutil::find_available_port_in_range; + use solana_sdk::pubkey::Pubkey; + + #[test] + fn test_skip_repair() { + let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(123); + let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); + let blocktree = Blocktree::open(&ledger_path).unwrap(); + let opts = ProcessOptions { + full_leader_cache: true, + ..ProcessOptions::default() + }; + let (bank_forks, _, cached_leader_schedule) = + process_blocktree(&genesis_block, &blocktree, None, opts).unwrap(); + let leader_schedule_cache = Arc::new(cached_leader_schedule); + let bank_forks = Arc::new(RwLock::new(bank_forks)); + + let mut me = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); + let port = find_available_port_in_range((8000, 10000)).unwrap(); + let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap(); + // need to make sure tvu and tpu are valid addresses + me.tvu_forwards = me_retransmit.local_addr().unwrap(); + let port = find_available_port_in_range((8000, 10000)).unwrap(); + me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port)) + .unwrap() + .local_addr() + .unwrap(); + + let other = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); + let mut cluster_info = ClusterInfo::new_with_invalid_keypair(other); + cluster_info.insert_info(me); + + let retransmit_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + + let (retransmit_sender, retransmit_receiver) = channel(); + let t_retransmit = retransmitter( + retransmit_socket, + bank_forks, + &leader_schedule_cache, + cluster_info, + retransmit_receiver, + ); + let _thread_hdls = vec![t_retransmit]; + + let packets = Packets::new(vec![Packet::default()]); + // it should send this over the sockets. + retransmit_sender.send(packets).unwrap(); + let mut packets = Packets::new(vec![]); + packets.recv_from(&me_retransmit).unwrap(); + assert_eq!(packets.packets.len(), 1); + assert_eq!(packets.packets[0].meta.repair, false); + + let repair = Packet { + meta: Meta { + repair: true, + ..Meta::default() + }, + ..Packet::default() + }; + + // send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from + let packets = Packets::new(vec![repair, Packet::default()]); + retransmit_sender.send(packets).unwrap(); + let mut packets = Packets::new(vec![]); + packets.recv_from(&me_retransmit).unwrap(); + assert_eq!(packets.packets.len(), 1); + assert_eq!(packets.packets[0].meta.repair, false); + } +} diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index be03f36b7..8c3922878 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,8 +1,10 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. +use crate::cuda_runtime::PinnedVec; +use crate::packet::Packet; use crate::recycler::Recycler; use crate::service::Service; -use crate::streamer::{self, PacketSender}; +use crate::streamer::{self, PacketReceiver, PacketSender}; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; @@ -14,9 +16,54 @@ pub struct ShredFetchStage { } impl ShredFetchStage { + // updates packets received on a channel and sends them on another channel + fn modify_packets(recvr: PacketReceiver, sendr: PacketSender, modify: F) + where + F: Fn(&mut Packet), + { + while let Some(mut p) = recvr.iter().next() { + p.packets.iter_mut().for_each(|p| modify(p)); + if sendr.send(p).is_err() { + break; + } + } + } + + fn packet_modifier( + sockets: Vec>, + exit: &Arc, + sender: PacketSender, + recycler: Recycler>, + modify: F, + ) -> (Vec>, JoinHandle<()>) + where + F: Fn(&mut Packet) + Send + 'static, + { + let (packet_sender, packet_receiver) = channel(); + let streamers = sockets + .into_iter() + .map(|s| { + streamer::receiver( + s, + &exit, + packet_sender.clone(), + recycler.clone(), + "packet_modifier", + ) + }) + .collect(); + let sender = sender.clone(); + let modifier_hdl = Builder::new() + .name("solana-tvu-fetch-stage-packet-modifier".to_string()) + .spawn(|| Self::modify_packets(packet_receiver, sender, modify)) + .unwrap(); + (streamers, modifier_hdl) + } + pub fn new( sockets: Vec>, forward_sockets: Vec>, + repair_socket: Arc, sender: &PacketSender, exit: &Arc, ) -> Self { @@ -31,32 +78,28 @@ impl ShredFetchStage { ) }); - let (forward_sender, forward_receiver) = channel(); - let tvu_forwards_threads = forward_sockets.into_iter().map(|socket| { - streamer::receiver( - socket, - &exit, - forward_sender.clone(), - recycler.clone(), - "shred_fetch_stage", - ) - }); + let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier( + forward_sockets, + &exit, + sender.clone(), + recycler.clone(), + |p| p.meta.forward = true, + ); - let sender = sender.clone(); - let fwd_thread_hdl = Builder::new() - .name("solana-tvu-fetch-stage-fwd-rcvr".to_string()) - .spawn(move || { - while let Some(mut p) = forward_receiver.iter().next() { - p.packets.iter_mut().for_each(|p| p.meta.forward = true); - if sender.send(p).is_err() { - break; - } - } - }) - .unwrap(); + let (repair_receiver, repair_handler) = Self::packet_modifier( + vec![repair_socket], + &exit, + sender.clone(), + recycler.clone(), + |p| p.meta.repair = true, + ); - let mut thread_hdls: Vec<_> = tvu_threads.chain(tvu_forwards_threads).collect(); + let mut thread_hdls: Vec<_> = tvu_threads + .chain(tvu_forwards_threads.into_iter()) + .collect(); + thread_hdls.extend(repair_receiver.into_iter()); thread_hdls.push(fwd_thread_hdl); + thread_hdls.push(repair_handler); Self { thread_hdls } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index bc9c3e4c1..01e2a8426 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -99,13 +99,16 @@ impl Tvu { let (fetch_sender, fetch_receiver) = channel(); let repair_socket = Arc::new(repair_socket); - let mut fetch_sockets: Vec> = - fetch_sockets.into_iter().map(Arc::new).collect(); - fetch_sockets.push(repair_socket.clone()); + let fetch_sockets: Vec> = fetch_sockets.into_iter().map(Arc::new).collect(); let forward_sockets: Vec> = tvu_forward_sockets.into_iter().map(Arc::new).collect(); - let fetch_stage = - ShredFetchStage::new(fetch_sockets, forward_sockets, &fetch_sender, &exit); + let fetch_stage = ShredFetchStage::new( + fetch_sockets, + forward_sockets, + repair_socket.clone(), + &fetch_sender, + &exit, + ); //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified