diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index e52b887df4..2f82f6441c 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -92,40 +92,40 @@ impl AccountantSkel { fn process( &mut self, r_reader: &streamer::Receiver, - s_sender: &streamer::Sender, - recycler: &streamer::Recycler, + s_responder: &streamer::Responder, + packet_recycler: &streamer::PacketRecycler, + response_recycler: &streamer::ResponseRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r_reader.recv_timeout(timer)?; let msgs_ = msgs.clone(); - let msgs__ = msgs.clone(); - let rsps = streamer::allocate(recycler); + let rsps = streamer::allocate(response_recycler); let rsps_ = rsps.clone(); - let l = msgs__.read().unwrap().packets.len(); - rsps.write() - .unwrap() - .packets - .resize(l, streamer::Packet::default()); { let mut num = 0; let mut ursps = rsps.write().unwrap(); for packet in &msgs.read().unwrap().packets { - let sz = packet.size; + let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = self.process_request(req) { - let rsp = &mut ursps.packets[num]; + if ursps.responses.len() <= num { + ursps + .responses + .resize((num + 1) * 2, streamer::Response::default()); + } + let rsp = &mut ursps.responses[num]; let v = serialize(&resp)?; let len = v.len(); - rsp.data[0..len].copy_from_slice(&v); - rsp.size = len; - rsp.set_addr(&packet.get_addr()); + rsp.data[..len].copy_from_slice(&v); + rsp.meta.size = len; + rsp.meta.set_addr(&packet.meta.get_addr()); num += 1; } } - ursps.packets.resize(num, streamer::Packet::default()); + ursps.responses.resize(num, streamer::Response::default()); } - s_sender.send(rsps_)?; - streamer::recycle(recycler, msgs_); + s_responder.send(rsps_)?; + streamer::recycle(packet_recycler, msgs_); Ok(()) } @@ -141,23 +141,30 @@ impl AccountantSkel { local.set_port(0); let write = UdpSocket::bind(local)?; - let recycler = Arc::new(Mutex::new(Vec::new())); + let packet_recycler = Arc::new(Mutex::new(Vec::new())); + let response_recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_receiver = streamer::receiver(read, exit.clone(), recycler.clone(), s_reader)?; + let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?; - let (s_sender, r_sender) = channel(); - let t_sender = streamer::sender(write, exit.clone(), recycler.clone(), r_sender); + let (s_responder, r_responder) = channel(); + let t_responder = + streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder); let t_server = spawn(move || { if let Ok(me) = Arc::try_unwrap(obj) { loop { - let e = me.lock().unwrap().process(&r_reader, &s_sender, &recycler); + let e = me.lock().unwrap().process( + &r_reader, + &s_responder, + &packet_recycler, + &response_recycler, + ); if e.is_err() && exit.load(Ordering::Relaxed) { break; } } } }); - Ok(vec![t_receiver, t_sender, t_server]) + Ok(vec![t_receiver, t_responder, t_server]) } } diff --git a/src/result.rs b/src/result.rs index 86f2118c83..96f2f7ff49 100644 --- a/src/result.rs +++ b/src/result.rs @@ -13,6 +13,7 @@ pub enum Error { RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), SendError, + Services, } pub type Result = std::result::Result; diff --git a/src/streamer.rs b/src/streamer.rs index 54f631b7b9..fe0b9ed153 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,38 +8,45 @@ use std::thread::{spawn, JoinHandle}; use result::{Error, Result}; const BLOCK_SIZE: usize = 1024 * 8; -pub const PACKET_SIZE: usize = 1024; +pub const PACKET_SIZE: usize = 256; +pub const RESP_SIZE: usize = 64 * 1024; +pub const NUM_RESP: usize = (BLOCK_SIZE * PACKET_SIZE) / RESP_SIZE; -#[derive(Clone)] -pub struct Packet { - pub data: [u8; PACKET_SIZE], +#[derive(Clone, Default)] +pub struct Meta { pub size: usize, pub addr: [u16; 8], pub port: u16, pub v6: bool, } + +#[derive(Clone)] +pub struct Packet { + pub data: [u8; PACKET_SIZE], + pub meta: Meta, +} + impl fmt::Debug for Packet { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "Packet {{ size: {:?}, addr: {:?} }}", - self.size, - self.get_addr() + self.meta.size, + self.meta.get_addr() ) } } + impl Default for Packet { fn default() -> Packet { Packet { data: [0u8; PACKET_SIZE], - size: 0, - addr: [0u16; 8], - port: 0, - v6: false, + meta: Meta::default(), } } } -impl Packet { + +impl Meta { pub fn get_addr(&self) -> SocketAddr { if !self.v6 { let ipv4 = Ipv4Addr::new( @@ -83,27 +90,74 @@ impl Packet { } } -#[derive(Clone, Debug, Default)] -pub struct PacketData { +#[derive(Debug)] +pub struct Packets { pub packets: Vec, } -pub type SharedPacketData = Arc>; -pub type Recycler = Arc>>; -pub type Receiver = mpsc::Receiver; -pub type Sender = mpsc::Sender; - -impl PacketData { - pub fn new() -> PacketData { - PacketData { +impl Default for Packets { + fn default() -> Packets { + Packets { packets: vec![Packet::default(); BLOCK_SIZE], } } +} + +#[derive(Clone)] +pub struct Response { + pub data: [u8; RESP_SIZE], + pub meta: Meta, +} + +impl fmt::Debug for Response { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Response {{ size: {:?}, addr: {:?} }}", + self.meta.size, + self.meta.get_addr() + ) + } +} + +impl Default for Response { + fn default() -> Response { + Response { + data: [0u8; RESP_SIZE], + meta: Meta::default(), + } + } +} + +#[derive(Debug)] +pub struct Responses { + pub responses: Vec, +} + +impl Default for Responses { + fn default() -> Responses { + Responses { + responses: vec![Response::default(); NUM_RESP], + } + } +} + +pub type SharedPackets = Arc>; +pub type PacketRecycler = Arc>>; +pub type Receiver = mpsc::Receiver; +pub type Sender = mpsc::Sender; +pub type SharedResponses = Arc>; +pub type ResponseRecycler = Arc>>; +pub type Responder = mpsc::Sender; +pub type ResponseReceiver = mpsc::Receiver; + +impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(BLOCK_SIZE, Packet::default()); let mut i = 0; + socket.set_nonblocking(false)?; for p in &mut self.packets { - p.size = 0; + p.meta.size = 0; match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { trace!("got {:?} messages", i); @@ -114,8 +168,8 @@ impl PacketData { return Err(Error::IO(e)); } Ok((nrecv, from)) => { - p.size = nrecv; - p.set_addr(&from); + p.meta.size = nrecv; + p.meta.set_addr(&from); if i == 0 { socket.set_nonblocking(true)?; } @@ -130,10 +184,13 @@ impl PacketData { self.packets.resize(sz, Packet::default()); Ok(()) } +} + +impl Responses { fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { - for p in &self.packets { - let a = p.get_addr(); - socket.send_to(&p.data[0..p.size], &a)?; + for p in &self.responses { + let a = p.meta.get_addr(); + socket.send_to(&p.data[..p.meta.size], &a)?; //TODO(anatoly): wtf do we do about errors? *num += 1; } @@ -141,13 +198,19 @@ impl PacketData { } } -pub fn allocate(recycler: &Recycler) -> SharedPacketData { +pub fn allocate(recycler: &Arc>>>>) -> Arc> +where + T: Default, +{ let mut gc = recycler.lock().expect("lock"); gc.pop() - .unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new()))) + .unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))) } -pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) { +pub fn recycle(recycler: &Arc>>>>, msgs: Arc>) +where + T: Default, +{ let mut gc = recycler.lock().expect("lock"); gc.push(msgs); } @@ -155,7 +218,7 @@ pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) { fn recv_loop( sock: &UdpSocket, exit: &Arc, - recycler: &Recycler, + recycler: &PacketRecycler, channel: &Sender, ) -> Result<()> { loop { @@ -181,7 +244,7 @@ fn recv_loop( pub fn receiver( sock: UdpSocket, exit: Arc, - recycler: Recycler, + recycler: PacketRecycler, channel: Sender, ) -> Result> { let timer = Duration::new(1, 0); @@ -192,7 +255,7 @@ pub fn receiver( })) } -fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> { +fn recv_send(sock: &UdpSocket, recycler: &ResponseRecycler, r: &ResponseReceiver) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r.recv_timeout(timer)?; let msgs_ = msgs.clone(); @@ -202,11 +265,11 @@ fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> Ok(()) } -pub fn sender( +pub fn responder( sock: UdpSocket, exit: Arc, - recycler: Recycler, - r: Receiver, + recycler: ResponseRecycler, + r: ResponseReceiver, ) -> JoinHandle<()> { spawn(move || loop { if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { @@ -226,29 +289,39 @@ mod bench { use std::time::SystemTime; use std::thread::{spawn, JoinHandle}; use std::sync::mpsc::channel; + use std::sync::atomic::{AtomicBool, Ordering}; use result::Result; - use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; + use streamer::{allocate, receiver, recycle, Packet, PacketRecycler, Receiver, PACKET_SIZE}; - fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc) -> JoinHandle<()> { + fn producer( + addr: &SocketAddr, + recycler: PacketRecycler, + exit: Arc, + ) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let msgs = allocate(recycler); + let msgs = allocate(&recycler); + let msgs_ = msgs.clone(); msgs.write().unwrap().packets.resize(10, Packet::default()); for w in msgs.write().unwrap().packets.iter_mut() { - w.size = PACKET_SIZE; - w.set_addr(&addr); + w.meta.size = PACKET_SIZE; + w.meta.set_addr(&addr); } spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } let mut num = 0; - msgs.read().unwrap().send_to(&send, &mut num).unwrap(); + for p in msgs_.read().unwrap().packets.iter() { + let a = p.meta.get_addr(); + send.send_to(&p.data[..p.meta.size], &a).unwrap(); + num += 1; + } assert_eq!(num, 10); }) } fn sinc( - recycler: &Recycler, + recycler: PacketRecycler, exit: Arc, rvs: Arc>, r: Receiver, @@ -262,7 +335,7 @@ mod bench { Ok(msgs) => { let msgs_ = msgs.clone(); *rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); - recycle(recycler, msgs_); + recycle(&recycler, msgs_); } _ => (), } @@ -275,10 +348,10 @@ mod bench { let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?; - let t_producer1 = producer(&addr, &recycler, exit.clone()); - let t_producer2 = producer(&addr, &recycler, exit.clone()); - let t_producer3 = producer(&addr, &recycler, exit.clone()); + let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?; + let t_producer1 = producer(&addr, recycler.clone(), exit.clone()); + let t_producer2 = producer(&addr, recycler.clone(), exit.clone()); + let t_producer3 = producer(&addr, recycler.clone(), exit.clone()); let rvs = Arc::new(Mutex::new(0)); let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader); @@ -315,7 +388,8 @@ mod test { use std::sync::mpsc::channel; use std::io::Write; use std::io; - use streamer::{allocate, receiver, sender, Packet, Receiver, PACKET_SIZE}; + use streamer::{allocate, receiver, responder, Packet, Packets, Receiver, Response, Responses, + PACKET_SIZE}; fn get_msgs(r: Receiver, num: &mut usize) { for _t in 0..5 { @@ -339,8 +413,8 @@ mod test { let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); - let (s_sender, r_sender) = channel(); - let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); + let (s_responder, r_responder) = channel(); + let t_responder = responder(send, exit.clone(), recycler.clone(), r_responder); let msgs = allocate(&recycler); msgs.write().unwrap().packets.resize(10, Packet::default()); for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { @@ -349,17 +423,20 @@ mod test { w.set_addr(&addr); assert_eq!(w.get_addr(), addr); } - s_sender.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); - t_sender.join().expect("join"); + t_responder.join().expect("join"); } #[test] pub fn streamer_debug() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); + write!(io::sink(), "{:?}", Packets::default()).unwrap(); + write!(io::sink(), "{:?}", Response::default()).unwrap(); + write!(io::sink(), "{:?}", Responses::default()).unwrap(); } #[test] pub fn streamer_send_test() { @@ -367,25 +444,29 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let recycler = Arc::new(Mutex::new(Vec::new())); + let packet_recycler = Arc::new(Mutex::new(Vec::new())); + let resp_recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); - let (s_sender, r_sender) = channel(); - let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); - let msgs = allocate(&recycler); - msgs.write().unwrap().packets.resize(10, Packet::default()); - for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { + let t_receiver = receiver(read, exit.clone(), packet_recycler.clone(), s_reader).unwrap(); + let (s_responder, r_responder) = channel(); + let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let msgs = allocate(&resp_recycler); + msgs.write() + .unwrap() + .responses + .resize(10, Response::default()); + for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() { w.data[0] = i as u8; - w.size = PACKET_SIZE; - w.set_addr(&addr); - assert_eq!(w.get_addr(), addr); + w.meta.size = PACKET_SIZE; + w.meta.set_addr(&addr); + assert_eq!(w.meta.get_addr(), addr); } - s_sender.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); - t_sender.join().expect("join"); + t_responder.join().expect("join"); } }