Remove greedy fetch in shred_fetch stage (#6278)
* Remove greedy fetch in shred_fetch stage * cleanup
This commit is contained in:
parent
cfbfcb5734
commit
2db83e1a21
|
@ -263,7 +263,7 @@ impl Replicator {
|
||||||
.map(Arc::new)
|
.map(Arc::new)
|
||||||
.collect();
|
.collect();
|
||||||
let (blob_fetch_sender, blob_fetch_receiver) = channel();
|
let (blob_fetch_sender, blob_fetch_receiver) = channel();
|
||||||
let fetch_stage = ShredFetchStage::new_multi_socket(
|
let fetch_stage = ShredFetchStage::new(
|
||||||
blob_sockets,
|
blob_sockets,
|
||||||
blob_forward_sockets,
|
blob_forward_sockets,
|
||||||
&blob_fetch_sender,
|
&blob_fetch_sender,
|
||||||
|
|
|
@ -1,13 +1,11 @@
|
||||||
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
|
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
|
||||||
|
|
||||||
use crate::recycler::Recycler;
|
use crate::recycler::Recycler;
|
||||||
use crate::result;
|
|
||||||
use crate::result::Error;
|
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::streamer::{self, PacketReceiver, PacketSender};
|
use crate::streamer::{self, PacketSender};
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::mpsc::{channel, RecvTimeoutError};
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
|
|
||||||
|
@ -16,30 +14,7 @@ pub struct ShredFetchStage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShredFetchStage {
|
impl ShredFetchStage {
|
||||||
fn handle_forwarded_packets(
|
pub fn new(
|
||||||
recvr: &PacketReceiver,
|
|
||||||
sendr: &PacketSender,
|
|
||||||
) -> result::Result<()> {
|
|
||||||
let msgs = recvr.recv()?;
|
|
||||||
let mut batch = vec![msgs];
|
|
||||||
while let Ok(more) = recvr.try_recv() {
|
|
||||||
batch.push(more);
|
|
||||||
}
|
|
||||||
|
|
||||||
batch
|
|
||||||
.iter_mut()
|
|
||||||
.for_each(|b| b.packets.iter_mut().for_each(|p| p.meta.forward = true));
|
|
||||||
|
|
||||||
for packets in batch {
|
|
||||||
if sendr.send(packets).is_err() {
|
|
||||||
return Err(Error::SendError);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_multi_socket(
|
|
||||||
sockets: Vec<Arc<UdpSocket>>,
|
sockets: Vec<Arc<UdpSocket>>,
|
||||||
forward_sockets: Vec<Arc<UdpSocket>>,
|
forward_sockets: Vec<Arc<UdpSocket>>,
|
||||||
sender: &PacketSender,
|
sender: &PacketSender,
|
||||||
|
@ -70,14 +45,11 @@ impl ShredFetchStage {
|
||||||
let sender = sender.clone();
|
let sender = sender.clone();
|
||||||
let fwd_thread_hdl = Builder::new()
|
let fwd_thread_hdl = Builder::new()
|
||||||
.name("solana-tvu-fetch-stage-fwd-rcvr".to_string())
|
.name("solana-tvu-fetch-stage-fwd-rcvr".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || {
|
||||||
if let Err(e) = Self::handle_forwarded_packets(&forward_receiver, &sender) {
|
while let Some(mut p) = forward_receiver.iter().next() {
|
||||||
match e {
|
p.packets.iter_mut().for_each(|p| p.meta.forward = true);
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
if sender.send(p).is_err() {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
break;
|
||||||
Error::RecvError(_) => break,
|
|
||||||
Error::SendError => break,
|
|
||||||
_ => error!("{:?}", e),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -99,17 +99,13 @@ impl Tvu {
|
||||||
let (fetch_sender, fetch_receiver) = channel();
|
let (fetch_sender, fetch_receiver) = channel();
|
||||||
|
|
||||||
let repair_socket = Arc::new(repair_socket);
|
let repair_socket = Arc::new(repair_socket);
|
||||||
let mut blob_sockets: Vec<Arc<UdpSocket>> =
|
let mut fetch_sockets: Vec<Arc<UdpSocket>> =
|
||||||
fetch_sockets.into_iter().map(Arc::new).collect();
|
fetch_sockets.into_iter().map(Arc::new).collect();
|
||||||
blob_sockets.push(repair_socket.clone());
|
fetch_sockets.push(repair_socket.clone());
|
||||||
let blob_forward_sockets: Vec<Arc<UdpSocket>> =
|
let forward_sockets: Vec<Arc<UdpSocket>> =
|
||||||
tvu_forward_sockets.into_iter().map(Arc::new).collect();
|
tvu_forward_sockets.into_iter().map(Arc::new).collect();
|
||||||
let fetch_stage = ShredFetchStage::new_multi_socket(
|
let fetch_stage =
|
||||||
blob_sockets,
|
ShredFetchStage::new(fetch_sockets, forward_sockets, &fetch_sender, &exit);
|
||||||
blob_forward_sockets,
|
|
||||||
&fetch_sender,
|
|
||||||
&exit,
|
|
||||||
);
|
|
||||||
|
|
||||||
//TODO
|
//TODO
|
||||||
//the packets coming out of blob_receiver need to be sent to the GPU and verified
|
//the packets coming out of blob_receiver need to be sent to the GPU and verified
|
||||||
|
|
Loading…
Reference in New Issue