2019-10-07 11:08:01 -07:00
|
|
|
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
|
2018-06-13 21:52:23 -07:00
|
|
|
|
2019-11-07 19:48:33 -08:00
|
|
|
use crate::packet::{Packet, PacketsRecycler};
|
2019-10-09 13:11:19 -07:00
|
|
|
use crate::streamer::{self, PacketReceiver, PacketSender};
|
2019-11-04 20:13:43 -08:00
|
|
|
use solana_perf::cuda_runtime::PinnedVec;
|
|
|
|
use solana_perf::recycler::Recycler;
|
2018-06-13 21:52:23 -07:00
|
|
|
use std::net::UdpSocket;
|
2019-03-04 20:50:02 -08:00
|
|
|
use std::sync::atomic::AtomicBool;
|
2019-10-09 10:36:05 -07:00
|
|
|
use std::sync::mpsc::channel;
|
2018-06-13 21:52:23 -07:00
|
|
|
use std::sync::Arc;
|
2019-08-20 17:16:06 -07:00
|
|
|
use std::thread::{self, Builder, JoinHandle};
|
2018-06-13 21:52:23 -07:00
|
|
|
|
2019-10-07 11:08:01 -07:00
|
|
|
pub struct ShredFetchStage {
|
2018-07-03 21:14:08 -07:00
|
|
|
thread_hdls: Vec<JoinHandle<()>>,
|
2018-06-13 21:52:23 -07:00
|
|
|
}
|
|
|
|
|
2019-10-07 11:08:01 -07:00
|
|
|
impl ShredFetchStage {
|
2019-10-09 13:11:19 -07:00
|
|
|
// updates packets received on a channel and sends them on another channel
|
|
|
|
fn modify_packets<F>(recvr: PacketReceiver, sendr: PacketSender, modify: F)
|
|
|
|
where
|
|
|
|
F: Fn(&mut Packet),
|
|
|
|
{
|
|
|
|
while let Some(mut p) = recvr.iter().next() {
|
|
|
|
p.packets.iter_mut().for_each(|p| modify(p));
|
|
|
|
if sendr.send(p).is_err() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn packet_modifier<F>(
|
|
|
|
sockets: Vec<Arc<UdpSocket>>,
|
|
|
|
exit: &Arc<AtomicBool>,
|
|
|
|
sender: PacketSender,
|
|
|
|
recycler: Recycler<PinnedVec<Packet>>,
|
|
|
|
modify: F,
|
|
|
|
) -> (Vec<JoinHandle<()>>, JoinHandle<()>)
|
|
|
|
where
|
|
|
|
F: Fn(&mut Packet) + Send + 'static,
|
|
|
|
{
|
|
|
|
let (packet_sender, packet_receiver) = channel();
|
|
|
|
let streamers = sockets
|
|
|
|
.into_iter()
|
|
|
|
.map(|s| {
|
|
|
|
streamer::receiver(
|
|
|
|
s,
|
|
|
|
&exit,
|
|
|
|
packet_sender.clone(),
|
|
|
|
recycler.clone(),
|
|
|
|
"packet_modifier",
|
|
|
|
)
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
let sender = sender.clone();
|
|
|
|
let modifier_hdl = Builder::new()
|
|
|
|
.name("solana-tvu-fetch-stage-packet-modifier".to_string())
|
|
|
|
.spawn(|| Self::modify_packets(packet_receiver, sender, modify))
|
|
|
|
.unwrap();
|
|
|
|
(streamers, modifier_hdl)
|
|
|
|
}
|
|
|
|
|
2019-10-09 10:36:05 -07:00
|
|
|
pub fn new(
|
2019-08-20 17:16:06 -07:00
|
|
|
sockets: Vec<Arc<UdpSocket>>,
|
|
|
|
forward_sockets: Vec<Arc<UdpSocket>>,
|
2019-10-09 13:11:19 -07:00
|
|
|
repair_socket: Arc<UdpSocket>,
|
2019-08-20 17:16:06 -07:00
|
|
|
sender: &PacketSender,
|
|
|
|
exit: &Arc<AtomicBool>,
|
|
|
|
) -> Self {
|
2019-11-07 19:48:33 -08:00
|
|
|
let recycler: PacketsRecycler = Recycler::warmed(100, 1024);
|
|
|
|
|
2019-08-20 17:16:06 -07:00
|
|
|
let tvu_threads = sockets.into_iter().map(|socket| {
|
|
|
|
streamer::receiver(
|
|
|
|
socket,
|
|
|
|
&exit,
|
|
|
|
sender.clone(),
|
|
|
|
recycler.clone(),
|
2019-10-08 09:54:49 -07:00
|
|
|
"shred_fetch_stage",
|
2019-08-20 17:16:06 -07:00
|
|
|
)
|
|
|
|
});
|
|
|
|
|
2019-10-09 13:11:19 -07:00
|
|
|
let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier(
|
|
|
|
forward_sockets,
|
|
|
|
&exit,
|
|
|
|
sender.clone(),
|
|
|
|
recycler.clone(),
|
|
|
|
|p| p.meta.forward = true,
|
|
|
|
);
|
2019-08-20 17:16:06 -07:00
|
|
|
|
2019-10-09 13:11:19 -07:00
|
|
|
let (repair_receiver, repair_handler) = Self::packet_modifier(
|
|
|
|
vec![repair_socket],
|
|
|
|
&exit,
|
|
|
|
sender.clone(),
|
|
|
|
recycler.clone(),
|
|
|
|
|p| p.meta.repair = true,
|
|
|
|
);
|
2019-08-20 17:16:06 -07:00
|
|
|
|
2019-10-09 13:11:19 -07:00
|
|
|
let mut thread_hdls: Vec<_> = tvu_threads
|
|
|
|
.chain(tvu_forwards_threads.into_iter())
|
|
|
|
.collect();
|
|
|
|
thread_hdls.extend(repair_receiver.into_iter());
|
2019-08-20 17:16:06 -07:00
|
|
|
thread_hdls.push(fwd_thread_hdl);
|
2019-10-09 13:11:19 -07:00
|
|
|
thread_hdls.push(repair_handler);
|
2019-08-20 17:16:06 -07:00
|
|
|
|
|
|
|
Self { thread_hdls }
|
|
|
|
}
|
2018-07-03 21:14:08 -07:00
|
|
|
|
2019-11-13 10:12:09 -08:00
|
|
|
pub fn join(self) -> thread::Result<()> {
|
2018-09-13 14:00:17 -07:00
|
|
|
for thread_hdl in self.thread_hdls {
|
2018-07-03 21:14:08 -07:00
|
|
|
thread_hdl.join()?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|