Do not retransmit Repair responses (#6284)
* Do not retransmit Repair responses * Add a test * Refactor neighboring functionality
This commit is contained in:
parent
95d15dc720
commit
32312f3c16
|
@ -456,6 +456,7 @@ impl ClusterInfo {
|
|||
.filter_map(|x| x.value.contact_info())
|
||||
.filter(|x| x.id != me)
|
||||
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
|
||||
.filter(|x| ContactInfo::is_valid_address(&x.tvu_forwards))
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE);
|
|||
pub struct Meta {
|
||||
pub size: usize,
|
||||
pub forward: bool,
|
||||
pub repair: bool,
|
||||
pub addr: [u16; 8],
|
||||
pub port: u16,
|
||||
pub v6: bool,
|
||||
|
|
|
@ -253,9 +253,8 @@ impl Replicator {
|
|||
};
|
||||
|
||||
let repair_socket = Arc::new(node.sockets.repair);
|
||||
let mut blob_sockets: Vec<Arc<UdpSocket>> =
|
||||
let blob_sockets: Vec<Arc<UdpSocket>> =
|
||||
node.sockets.tvu.into_iter().map(Arc::new).collect();
|
||||
blob_sockets.push(repair_socket.clone());
|
||||
let blob_forward_sockets: Vec<Arc<UdpSocket>> = node
|
||||
.sockets
|
||||
.tvu_forwards
|
||||
|
@ -266,6 +265,7 @@ impl Replicator {
|
|||
let fetch_stage = ShredFetchStage::new(
|
||||
blob_sockets,
|
||||
blob_forward_sockets,
|
||||
repair_socket.clone(),
|
||||
&blob_fetch_sender,
|
||||
&exit,
|
||||
);
|
||||
|
|
|
@ -58,6 +58,11 @@ pub fn retransmit(
|
|||
let mut compute_turbine_peers_total = 0;
|
||||
for packets in packet_v {
|
||||
for packet in &packets.packets {
|
||||
// skip repair packets
|
||||
if packet.meta.repair {
|
||||
total_packets -= 1;
|
||||
continue;
|
||||
}
|
||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
||||
&me.id,
|
||||
|
@ -229,3 +234,82 @@ impl Service for RetransmitStage {
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::blocktree::create_new_tmp_ledger;
|
||||
use crate::blocktree_processor::{process_blocktree, ProcessOptions};
|
||||
use crate::contact_info::ContactInfo;
|
||||
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
||||
use crate::packet::{Meta, Packet, Packets};
|
||||
use solana_netutil::find_available_port_in_range;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
||||
#[test]
|
||||
fn test_skip_repair() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(123);
|
||||
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
|
||||
let blocktree = Blocktree::open(&ledger_path).unwrap();
|
||||
let opts = ProcessOptions {
|
||||
full_leader_cache: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (bank_forks, _, cached_leader_schedule) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
let leader_schedule_cache = Arc::new(cached_leader_schedule);
|
||||
let bank_forks = Arc::new(RwLock::new(bank_forks));
|
||||
|
||||
let mut me = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
|
||||
let port = find_available_port_in_range((8000, 10000)).unwrap();
|
||||
let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap();
|
||||
// need to make sure tvu and tpu are valid addresses
|
||||
me.tvu_forwards = me_retransmit.local_addr().unwrap();
|
||||
let port = find_available_port_in_range((8000, 10000)).unwrap();
|
||||
me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port))
|
||||
.unwrap()
|
||||
.local_addr()
|
||||
.unwrap();
|
||||
|
||||
let other = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
|
||||
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(other);
|
||||
cluster_info.insert_info(me);
|
||||
|
||||
let retransmit_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap());
|
||||
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
||||
|
||||
let (retransmit_sender, retransmit_receiver) = channel();
|
||||
let t_retransmit = retransmitter(
|
||||
retransmit_socket,
|
||||
bank_forks,
|
||||
&leader_schedule_cache,
|
||||
cluster_info,
|
||||
retransmit_receiver,
|
||||
);
|
||||
let _thread_hdls = vec![t_retransmit];
|
||||
|
||||
let packets = Packets::new(vec![Packet::default()]);
|
||||
// it should send this over the sockets.
|
||||
retransmit_sender.send(packets).unwrap();
|
||||
let mut packets = Packets::new(vec![]);
|
||||
packets.recv_from(&me_retransmit).unwrap();
|
||||
assert_eq!(packets.packets.len(), 1);
|
||||
assert_eq!(packets.packets[0].meta.repair, false);
|
||||
|
||||
let repair = Packet {
|
||||
meta: Meta {
|
||||
repair: true,
|
||||
..Meta::default()
|
||||
},
|
||||
..Packet::default()
|
||||
};
|
||||
|
||||
// send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from
|
||||
let packets = Packets::new(vec![repair, Packet::default()]);
|
||||
retransmit_sender.send(packets).unwrap();
|
||||
let mut packets = Packets::new(vec![]);
|
||||
packets.recv_from(&me_retransmit).unwrap();
|
||||
assert_eq!(packets.packets.len(), 1);
|
||||
assert_eq!(packets.packets[0].meta.repair, false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
|
||||
|
||||
use crate::cuda_runtime::PinnedVec;
|
||||
use crate::packet::Packet;
|
||||
use crate::recycler::Recycler;
|
||||
use crate::service::Service;
|
||||
use crate::streamer::{self, PacketSender};
|
||||
use crate::streamer::{self, PacketReceiver, PacketSender};
|
||||
use std::net::UdpSocket;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::channel;
|
||||
|
@ -14,9 +16,54 @@ pub struct ShredFetchStage {
|
|||
}
|
||||
|
||||
impl ShredFetchStage {
|
||||
// updates packets received on a channel and sends them on another channel
|
||||
fn modify_packets<F>(recvr: PacketReceiver, sendr: PacketSender, modify: F)
|
||||
where
|
||||
F: Fn(&mut Packet),
|
||||
{
|
||||
while let Some(mut p) = recvr.iter().next() {
|
||||
p.packets.iter_mut().for_each(|p| modify(p));
|
||||
if sendr.send(p).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn packet_modifier<F>(
|
||||
sockets: Vec<Arc<UdpSocket>>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
sender: PacketSender,
|
||||
recycler: Recycler<PinnedVec<Packet>>,
|
||||
modify: F,
|
||||
) -> (Vec<JoinHandle<()>>, JoinHandle<()>)
|
||||
where
|
||||
F: Fn(&mut Packet) + Send + 'static,
|
||||
{
|
||||
let (packet_sender, packet_receiver) = channel();
|
||||
let streamers = sockets
|
||||
.into_iter()
|
||||
.map(|s| {
|
||||
streamer::receiver(
|
||||
s,
|
||||
&exit,
|
||||
packet_sender.clone(),
|
||||
recycler.clone(),
|
||||
"packet_modifier",
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let sender = sender.clone();
|
||||
let modifier_hdl = Builder::new()
|
||||
.name("solana-tvu-fetch-stage-packet-modifier".to_string())
|
||||
.spawn(|| Self::modify_packets(packet_receiver, sender, modify))
|
||||
.unwrap();
|
||||
(streamers, modifier_hdl)
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
sockets: Vec<Arc<UdpSocket>>,
|
||||
forward_sockets: Vec<Arc<UdpSocket>>,
|
||||
repair_socket: Arc<UdpSocket>,
|
||||
sender: &PacketSender,
|
||||
exit: &Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
|
@ -31,32 +78,28 @@ impl ShredFetchStage {
|
|||
)
|
||||
});
|
||||
|
||||
let (forward_sender, forward_receiver) = channel();
|
||||
let tvu_forwards_threads = forward_sockets.into_iter().map(|socket| {
|
||||
streamer::receiver(
|
||||
socket,
|
||||
&exit,
|
||||
forward_sender.clone(),
|
||||
recycler.clone(),
|
||||
"shred_fetch_stage",
|
||||
)
|
||||
});
|
||||
let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier(
|
||||
forward_sockets,
|
||||
&exit,
|
||||
sender.clone(),
|
||||
recycler.clone(),
|
||||
|p| p.meta.forward = true,
|
||||
);
|
||||
|
||||
let sender = sender.clone();
|
||||
let fwd_thread_hdl = Builder::new()
|
||||
.name("solana-tvu-fetch-stage-fwd-rcvr".to_string())
|
||||
.spawn(move || {
|
||||
while let Some(mut p) = forward_receiver.iter().next() {
|
||||
p.packets.iter_mut().for_each(|p| p.meta.forward = true);
|
||||
if sender.send(p).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
let (repair_receiver, repair_handler) = Self::packet_modifier(
|
||||
vec![repair_socket],
|
||||
&exit,
|
||||
sender.clone(),
|
||||
recycler.clone(),
|
||||
|p| p.meta.repair = true,
|
||||
);
|
||||
|
||||
let mut thread_hdls: Vec<_> = tvu_threads.chain(tvu_forwards_threads).collect();
|
||||
let mut thread_hdls: Vec<_> = tvu_threads
|
||||
.chain(tvu_forwards_threads.into_iter())
|
||||
.collect();
|
||||
thread_hdls.extend(repair_receiver.into_iter());
|
||||
thread_hdls.push(fwd_thread_hdl);
|
||||
thread_hdls.push(repair_handler);
|
||||
|
||||
Self { thread_hdls }
|
||||
}
|
||||
|
|
|
@ -99,13 +99,16 @@ impl Tvu {
|
|||
let (fetch_sender, fetch_receiver) = channel();
|
||||
|
||||
let repair_socket = Arc::new(repair_socket);
|
||||
let mut fetch_sockets: Vec<Arc<UdpSocket>> =
|
||||
fetch_sockets.into_iter().map(Arc::new).collect();
|
||||
fetch_sockets.push(repair_socket.clone());
|
||||
let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
|
||||
let forward_sockets: Vec<Arc<UdpSocket>> =
|
||||
tvu_forward_sockets.into_iter().map(Arc::new).collect();
|
||||
let fetch_stage =
|
||||
ShredFetchStage::new(fetch_sockets, forward_sockets, &fetch_sender, &exit);
|
||||
let fetch_stage = ShredFetchStage::new(
|
||||
fetch_sockets,
|
||||
forward_sockets,
|
||||
repair_socket.clone(),
|
||||
&fetch_sender,
|
||||
&exit,
|
||||
);
|
||||
|
||||
//TODO
|
||||
//the packets coming out of blob_receiver need to be sent to the GPU and verified
|
||||
|
|
Loading…
Reference in New Issue