From 62af09adbe9cad8b3886588be04dfb35e06e2a6b Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 25 Mar 2018 08:05:03 -0700 Subject: [PATCH] wip --- src/accountant_skel.rs | 2 +- src/streamer.rs | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index bcad4236f..3a02677b7 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -126,7 +126,7 @@ impl AccountantSkel { ursps.packets.resize(num, streamer::Packet::default()); } s_responder.send(rsps_)?; - streamer::recycle(recycler, msgs_); + streamer::recycle(packet_recycler, msgs_); Ok(()) } diff --git a/src/streamer.rs b/src/streamer.rs index 66c49ef1e..fd084426d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -90,6 +90,7 @@ impl Meta { } } +#[derive(Debug)] pub struct Packets { pub packets: Vec, } @@ -128,6 +129,7 @@ impl Default for Response { } } +#[derive(Debug)] pub struct Responses { pub responses: Vec, } @@ -153,6 +155,7 @@ impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(BLOCK_SIZE, Packet::default()); let mut i = 0; + socket.set_nonblocking(false)?; for p in &mut self.packets { p.meta.size = 0; match socket.recv_from(&mut p.data) { @@ -192,6 +195,18 @@ impl Packets { } } +impl Responses { + fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { + for p in &self.responses { + let a = p.meta.get_addr(); + socket.send_to(&p.data[..p.meta.size], &a)?; + //TODO(anatoly): wtf do we do about errors? + *num += 1; + } + Ok(()) + } +} + pub fn allocate(recycler: &Arc>>>>) -> Arc> where T: Default, @@ -372,7 +387,7 @@ mod test { use std::sync::mpsc::channel; use std::io::Write; use std::io; - use streamer::{allocate, receiver, responder, Packet, Receiver, Responses, PACKET_SIZE}; + use streamer::{allocate, receiver, responder, Packet, Receiver, Response, PACKET_SIZE}; fn get_msgs(r: Receiver, num: &mut usize) { for _t in 0..5 { @@ -434,7 +449,7 @@ mod test { msgs.write() .unwrap() .responses - .resize(10, Responses::default()); + .resize(10, Response::default()); for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() { w.data[0] = i as u8; w.meta.size = PACKET_SIZE;