Move streamer-specific utility into streamer module

This commit is contained in:
Greg Fitzgerald 2018-05-10 15:47:42 -06:00
parent 803b76e997
commit f384a2ce85
2 changed files with 20 additions and 20 deletions

View File

@ -64,6 +64,25 @@ fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Res
Ok(())
}
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
debug!("got msgs");
let mut len = msgs.read().unwrap().packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
len += more.read().unwrap().packets.len();
batch.push(more);
if len > 100_000 {
break;
}
}
debug!("batch len {}", batch.len());
Ok((batch, len))
}
pub fn responder(
sock: UdpSocket,
exit: Arc<AtomicBool>,

View File

@ -124,25 +124,6 @@ impl Tpu {
})
}
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
debug!("got msgs");
let mut len = msgs.read().unwrap().packets.len();
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
len += more.read().unwrap().packets.len();
batch.push(more);
if len > 100_000 {
break;
}
}
debug!("batch len {}", batch.len());
Ok((batch, len))
}
fn verify_batch(
batch: Vec<SharedPackets>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
@ -158,7 +139,7 @@ impl Tpu {
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?;
let (batch, len) = streamer::recv_batch(&recvr.lock().unwrap())?;
let now = Instant::now();
let batch_len = batch.len();
let rand_id = thread_rng().gen_range(0, 100);