From f52f02a4343f7380243a190a17856d3e15495a43 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 24 Mar 2018 18:01:40 -0700 Subject: [PATCH 01/10] services --- src/accountant_skel.rs | 6 +++--- src/lib.rs | 1 + src/result.rs | 1 + src/streamer.rs | 41 +++++++++++++++++++++-------------------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index e52b887df4..f948c434b6 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -110,15 +110,15 @@ impl AccountantSkel { let mut num = 0; let mut ursps = rsps.write().unwrap(); for packet in &msgs.read().unwrap().packets { - let sz = packet.size; + let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = self.process_request(req) { let rsp = &mut ursps.packets[num]; let v = serialize(&resp)?; let len = v.len(); rsp.data[0..len].copy_from_slice(&v); - rsp.size = len; - rsp.set_addr(&packet.get_addr()); + rsp.meta.size = len; + rsp.meta.set_addr(&packet.meta.get_addr()); num += 1; } } diff --git a/src/lib.rs b/src/lib.rs index b3af5bcd7f..db8374c15b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod accountant; pub mod accountant_skel; pub mod accountant_stub; pub mod result; +pub mod services; extern crate bincode; extern crate chrono; extern crate generic_array; diff --git a/src/result.rs b/src/result.rs index 86f2118c83..96f2f7ff49 100644 --- a/src/result.rs +++ b/src/result.rs @@ -13,6 +13,7 @@ pub enum Error { RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), SendError, + Services, } pub type Result = std::result::Result; diff --git a/src/streamer.rs b/src/streamer.rs index 54f631b7b9..a7cea82c45 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,23 +8,27 @@ use std::thread::{spawn, JoinHandle}; use result::{Error, Result}; const BLOCK_SIZE: usize = 1024 * 8; -pub const PACKET_SIZE: usize = 1024; - -#[derive(Clone)] -pub struct Packet { - pub data: [u8; PACKET_SIZE], +pub const PACKET_SIZE: usize = 256; +#[derive(Clone, Default)] +pub struct Meta { pub size: usize, pub addr: [u16; 8], pub port: u16, pub v6: bool, } + +#[derive(Clone)] +pub struct Packet { + pub data: [u8; PACKET_SIZE], + pub meta: Meta, +} impl fmt::Debug for Packet { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "Packet {{ size: {:?}, addr: {:?} }}", - self.size, - self.get_addr() + self.meta.size, + self.meta.get_addr() ) } } @@ -32,14 +36,11 @@ impl Default for Packet { fn default() -> Packet { Packet { data: [0u8; PACKET_SIZE], - size: 0, - addr: [0u16; 8], - port: 0, - v6: false, + meta: Meta::default(), } } } -impl Packet { +impl Meta { pub fn get_addr(&self) -> SocketAddr { if !self.v6 { let ipv4 = Ipv4Addr::new( @@ -103,7 +104,7 @@ impl PacketData { self.packets.resize(BLOCK_SIZE, Packet::default()); let mut i = 0; for p in &mut self.packets { - p.size = 0; + p.meta.size = 0; match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { trace!("got {:?} messages", i); @@ -114,8 +115,8 @@ impl PacketData { return Err(Error::IO(e)); } Ok((nrecv, from)) => { - p.size = nrecv; - p.set_addr(&from); + p.meta.size = nrecv; + p.meta.set_addr(&from); if i == 0 { socket.set_nonblocking(true)?; } @@ -132,8 +133,8 @@ impl PacketData { } fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { for p in &self.packets { - let a = p.get_addr(); - socket.send_to(&p.data[0..p.size], &a)?; + let a = p.meta.get_addr(); + socket.send_to(&p.data[..p.meta.size], &a)?; //TODO(anatoly): wtf do we do about errors? *num += 1; } @@ -376,9 +377,9 @@ mod test { msgs.write().unwrap().packets.resize(10, Packet::default()); for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { w.data[0] = i as u8; - w.size = PACKET_SIZE; - w.set_addr(&addr); - assert_eq!(w.get_addr(), addr); + w.meta.size = PACKET_SIZE; + w.meta.set_addr(&addr); + assert_eq!(w.meta.get_addr(), addr); } s_sender.send(msgs).expect("send"); let mut num = 0; From 7732f3f5fb3fdf75a49c1276f1813aacbb5f8bcc Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 24 Mar 2018 18:01:54 -0700 Subject: [PATCH 02/10] services --- src/services.rs | 221 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 src/services.rs diff --git a/src/services.rs b/src/services.rs new file mode 100644 index 0000000000..25fc795207 --- /dev/null +++ b/src/services.rs @@ -0,0 +1,221 @@ +//! Small services library with named ports +//! see test for usage + +use std::sync::{Arc, Mutex, RwLock}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use streamer; +use result::Result; +use result::Error; + +pub enum Port { + Main, + PacketReader, + Accountant, + PacketSender, +} + +impl Port { + fn to_usize(self) -> usize { + match self { + Port::Main => 0, + Port::PacketReader => 1, + Port::Accountant => 2, + Port::PacketSender => 3, + } + } +} + +#[derive(Clone)] +pub enum Data { + Signal, + SharedPacketData(streamer::SharedPacketData), +} + +struct Locked { + ports: Vec>, + readers: Vec>>>, + threads: Vec>>>>, +} + +pub struct Services { + lock: Arc>, + exit: Arc, +} + +pub type Ports = Vec>; + +impl Services { + pub fn new() -> Services { + let (s1, r1) = channel(); + let (s2, r2) = channel(); + let (s3, r3) = channel(); + let (s4, r4) = channel(); + let (s5, r5) = channel(); + let locked = Locked { + ports: [s1, s2, s3, s4, s5].to_vec(), + readers: [ + Arc::new(Mutex::new(r1)), + Arc::new(Mutex::new(r2)), + Arc::new(Mutex::new(r3)), + Arc::new(Mutex::new(r4)), + Arc::new(Mutex::new(r5)), + ].to_vec(), + threads: [ + Arc::new(None), + Arc::new(None), + Arc::new(None), + Arc::new(None), + Arc::new(None), + ].to_vec(), + }; + let exit = Arc::new(AtomicBool::new(false)); + Services { + lock: Arc::new(RwLock::new(locked)), + exit: exit, + } + } + pub fn source(&self, port: Port, func: F) -> Result<()> + where + F: Send + 'static + Fn(&Ports) -> Result<()>, + { + let mut w = self.lock.write().unwrap(); + let pz = port.to_usize(); + if w.threads[pz].is_some() { + return Err(Error::Services); + } + let c_ports = w.ports.clone(); + let c_exit = self.exit.clone(); + let j = spawn(move || loop { + match func(&c_ports) { + Ok(()) => (), + e => return e, + } + if c_exit.load(Ordering::Relaxed) == true { + return Ok(()); + } + }); + w.threads[pz] = Arc::new(Some(j)); + return Ok(()); + } + pub fn listen(&mut self, port: Port, func: F) -> Result<()> + where + F: Send + 'static + Fn(&Ports, Data) -> Result<()>, + { + let mut w = self.lock.write().unwrap(); + let pz = port.to_usize(); + if w.threads[pz].is_some() { + return Err(Error::Services); + } + let recv_lock = w.readers[pz].clone(); + let c_ports = w.ports.clone(); + let c_exit = self.exit.clone(); + let j: JoinHandle> = spawn(move || loop { + let recv = recv_lock.lock().unwrap(); + let timer = Duration::new(0, 500000); + match recv.recv_timeout(timer) { + Ok(val) => func(&c_ports, val).expect("services listen"), + _ => (), + } + if c_exit.load(Ordering::Relaxed) { + return Ok(()); + } + }); + w.threads[pz] = Arc::new(Some(j)); + return Ok(()); + } + pub fn send(ports: &Ports, to: Port, m: Data) -> Result<()> { + ports[to.to_usize()] + .send(m) + .or_else(|_| Err(Error::SendError)) + } + pub fn join(&mut self) -> Result<()> { + let pz = Port::Main.to_usize(); + let recv = self.lock.write().unwrap().readers[pz].clone(); + recv.lock().unwrap().recv()?; + self.shutdown()?; + return Ok(()); + } + pub fn shutdown(&mut self) -> Result<()> { + self.exit.store(true, Ordering::Relaxed); + let r = self.lock.read().unwrap(); + for t in r.threads.iter() { + match Arc::try_unwrap((*t).clone()) { + Ok(Some(j)) => j.join()??, + _ => (), + }; + } + return Ok(()); + } +} + +#[cfg(test)] +mod test { + use services::Services; + use services::Port::{Accountant, Main, PacketReader}; + use services::Data::Signal; + use std::sync::{Arc, Mutex}; + + #[test] + fn test_init() { + let mut o = Services::new(); + assert_matches!(o.shutdown(), Ok(())); + } + #[test] + fn test_join() { + let mut o = Services::new(); + assert_matches!( + o.source(PacketReader, move |ports| Services::send( + ports, + Main, + Signal + )), + Ok(()) + ); + assert_matches!(o.join(), Ok(())); + } + #[test] + fn test_source() { + let mut o = Services::new(); + assert_matches!( + o.source(PacketReader, move |ports| Services::send( + ports, + Main, + Signal + )), + Ok(()) + ); + assert!(o.source(PacketReader, move |_ports| Ok(())).is_err()); + assert!(o.listen(PacketReader, move |_ports, _data| Ok(())).is_err()); + assert_matches!(o.join(), Ok(())); + } + #[test] + fn test_listen() { + let mut o = Services::new(); + let val = Arc::new(Mutex::new(false)); + assert_matches!( + o.source(PacketReader, move |ports| Services::send( + ports, + Accountant, + Signal + )), + Ok(()) + ); + let c_val = val.clone(); + assert_matches!( + o.listen(Accountant, move |ports, data| match data { + Signal => { + *c_val.lock().unwrap() = true; + Services::send(ports, Main, Signal) + } + _ => Ok(()), + }), + Ok(()) + ); + assert_matches!(o.join(), Ok(())); + assert_eq!(*val.lock().unwrap(), true); + } + +} From 533b3170a7a28d5a8d7c4dda7dbb488118ff5c48 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 24 Mar 2018 23:31:54 -0700 Subject: [PATCH 03/10] responder --- src/services.rs | 221 ------------------------------------------------ src/streamer.rs | 97 ++++++++++++++------- 2 files changed, 66 insertions(+), 252 deletions(-) delete mode 100644 src/services.rs diff --git a/src/services.rs b/src/services.rs deleted file mode 100644 index 25fc795207..0000000000 --- a/src/services.rs +++ /dev/null @@ -1,221 +0,0 @@ -//! Small services library with named ports -//! see test for usage - -use std::sync::{Arc, Mutex, RwLock}; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread::{spawn, JoinHandle}; -use std::time::Duration; -use streamer; -use result::Result; -use result::Error; - -pub enum Port { - Main, - PacketReader, - Accountant, - PacketSender, -} - -impl Port { - fn to_usize(self) -> usize { - match self { - Port::Main => 0, - Port::PacketReader => 1, - Port::Accountant => 2, - Port::PacketSender => 3, - } - } -} - -#[derive(Clone)] -pub enum Data { - Signal, - SharedPacketData(streamer::SharedPacketData), -} - -struct Locked { - ports: Vec>, - readers: Vec>>>, - threads: Vec>>>>, -} - -pub struct Services { - lock: Arc>, - exit: Arc, -} - -pub type Ports = Vec>; - -impl Services { - pub fn new() -> Services { - let (s1, r1) = channel(); - let (s2, r2) = channel(); - let (s3, r3) = channel(); - let (s4, r4) = channel(); - let (s5, r5) = channel(); - let locked = Locked { - ports: [s1, s2, s3, s4, s5].to_vec(), - readers: [ - Arc::new(Mutex::new(r1)), - Arc::new(Mutex::new(r2)), - Arc::new(Mutex::new(r3)), - Arc::new(Mutex::new(r4)), - Arc::new(Mutex::new(r5)), - ].to_vec(), - threads: [ - Arc::new(None), - Arc::new(None), - Arc::new(None), - Arc::new(None), - Arc::new(None), - ].to_vec(), - }; - let exit = Arc::new(AtomicBool::new(false)); - Services { - lock: Arc::new(RwLock::new(locked)), - exit: exit, - } - } - pub fn source(&self, port: Port, func: F) -> Result<()> - where - F: Send + 'static + Fn(&Ports) -> Result<()>, - { - let mut w = self.lock.write().unwrap(); - let pz = port.to_usize(); - if w.threads[pz].is_some() { - return Err(Error::Services); - } - let c_ports = w.ports.clone(); - let c_exit = self.exit.clone(); - let j = spawn(move || loop { - match func(&c_ports) { - Ok(()) => (), - e => return e, - } - if c_exit.load(Ordering::Relaxed) == true { - return Ok(()); - } - }); - w.threads[pz] = Arc::new(Some(j)); - return Ok(()); - } - pub fn listen(&mut self, port: Port, func: F) -> Result<()> - where - F: Send + 'static + Fn(&Ports, Data) -> Result<()>, - { - let mut w = self.lock.write().unwrap(); - let pz = port.to_usize(); - if w.threads[pz].is_some() { - return Err(Error::Services); - } - let recv_lock = w.readers[pz].clone(); - let c_ports = w.ports.clone(); - let c_exit = self.exit.clone(); - let j: JoinHandle> = spawn(move || loop { - let recv = recv_lock.lock().unwrap(); - let timer = Duration::new(0, 500000); - match recv.recv_timeout(timer) { - Ok(val) => func(&c_ports, val).expect("services listen"), - _ => (), - } - if c_exit.load(Ordering::Relaxed) { - return Ok(()); - } - }); - w.threads[pz] = Arc::new(Some(j)); - return Ok(()); - } - pub fn send(ports: &Ports, to: Port, m: Data) -> Result<()> { - ports[to.to_usize()] - .send(m) - .or_else(|_| Err(Error::SendError)) - } - pub fn join(&mut self) -> Result<()> { - let pz = Port::Main.to_usize(); - let recv = self.lock.write().unwrap().readers[pz].clone(); - recv.lock().unwrap().recv()?; - self.shutdown()?; - return Ok(()); - } - pub fn shutdown(&mut self) -> Result<()> { - self.exit.store(true, Ordering::Relaxed); - let r = self.lock.read().unwrap(); - for t in r.threads.iter() { - match Arc::try_unwrap((*t).clone()) { - Ok(Some(j)) => j.join()??, - _ => (), - }; - } - return Ok(()); - } -} - -#[cfg(test)] -mod test { - use services::Services; - use services::Port::{Accountant, Main, PacketReader}; - use services::Data::Signal; - use std::sync::{Arc, Mutex}; - - #[test] - fn test_init() { - let mut o = Services::new(); - assert_matches!(o.shutdown(), Ok(())); - } - #[test] - fn test_join() { - let mut o = Services::new(); - assert_matches!( - o.source(PacketReader, move |ports| Services::send( - ports, - Main, - Signal - )), - Ok(()) - ); - assert_matches!(o.join(), Ok(())); - } - #[test] - fn test_source() { - let mut o = Services::new(); - assert_matches!( - o.source(PacketReader, move |ports| Services::send( - ports, - Main, - Signal - )), - Ok(()) - ); - assert!(o.source(PacketReader, move |_ports| Ok(())).is_err()); - assert!(o.listen(PacketReader, move |_ports, _data| Ok(())).is_err()); - assert_matches!(o.join(), Ok(())); - } - #[test] - fn test_listen() { - let mut o = Services::new(); - let val = Arc::new(Mutex::new(false)); - assert_matches!( - o.source(PacketReader, move |ports| Services::send( - ports, - Accountant, - Signal - )), - Ok(()) - ); - let c_val = val.clone(); - assert_matches!( - o.listen(Accountant, move |ports, data| match data { - Signal => { - *c_val.lock().unwrap() = true; - Services::send(ports, Main, Signal) - } - _ => Ok(()), - }), - Ok(()) - ); - assert_matches!(o.join(), Ok(())); - assert_eq!(*val.lock().unwrap(), true); - } - -} diff --git a/src/streamer.rs b/src/streamer.rs index a7cea82c45..f187d68490 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -9,6 +9,9 @@ use result::{Error, Result}; const BLOCK_SIZE: usize = 1024 * 8; pub const PACKET_SIZE: usize = 256; +pub const RESP_SIZE: usize = 64 * 1024; +pub const NUM_RESP: usize = (BLOCK_SIZE * PACKET_SIZE) / RESP_SIZE; + #[derive(Clone, Default)] pub struct Meta { pub size: usize, @@ -85,21 +88,48 @@ impl Meta { } #[derive(Clone, Debug, Default)] -pub struct PacketData { +pub struct Packets { pub packets: Vec, } -pub type SharedPacketData = Arc>; -pub type Recycler = Arc>>; -pub type Receiver = mpsc::Receiver; -pub type Sender = mpsc::Sender; +#[derive(Clone, Debug, Default)] +pub struct Response { + pub resp: [u8; RESP_SIZE], + pub meta: Meta, +} -impl PacketData { - pub fn new() -> PacketData { - PacketData { +pub struct Responses { + pub responses: Vec, +} + + +pub type SharedPackets = Arc>; +pub type PacketRecycler = Arc>>; +pub type Receiver = mpsc::Receiver; +pub type Sender = mpsc::Sender; +pub type SharedResponses = Arc>; +pub type ResponseRecycler = Arc>>; +pub type Responder = mpsc::Sender; +pub type ResponseReceiver = mpsc::Receiver; + +impl Default for Responses { + pub fn default() -> Responses { + Responses { + packets: vec![Response::default(); NUM_RESP], + } + } +} + + +impl Default for Packets { + pub fn default() -> Packets { + Packets { packets: vec![Packet::default(); BLOCK_SIZE], } } +} + +impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(BLOCK_SIZE, Packet::default()); let mut i = 0; @@ -142,13 +172,17 @@ impl PacketData { } } -pub fn allocate(recycler: &Recycler) -> SharedPacketData { +pub fn allocate(recycler: &Arc>>) -> Arc> +where T: Default +{ let mut gc = recycler.lock().expect("lock"); gc.pop() - .unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new()))) + .unwrap_or_else(|| Arc::new(RwLock::new(T::default()))) } -pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) { +pub fn recycle(recycler: &Arc>>, msgs: Arc>) +where T: Default +{ let mut gc = recycler.lock().expect("lock"); gc.push(msgs); } @@ -156,7 +190,7 @@ pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) { fn recv_loop( sock: &UdpSocket, exit: &Arc, - recycler: &Recycler, + recycler: &PacketRecycler, channel: &Sender, ) -> Result<()> { loop { @@ -182,7 +216,7 @@ fn recv_loop( pub fn receiver( sock: UdpSocket, exit: Arc, - recycler: Recycler, + recycler: PacketRecycler, channel: Sender, ) -> Result> { let timer = Duration::new(1, 0); @@ -193,7 +227,7 @@ pub fn receiver( })) } -fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> { +fn recv_send(sock: &UdpSocket, recycler: &ResponseRecycler, r: &ResponseReceiver) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r.recv_timeout(timer)?; let msgs_ = msgs.clone(); @@ -203,11 +237,11 @@ fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> Ok(()) } -pub fn sender( +pub fn responder( sock: UdpSocket, exit: Arc, - recycler: Recycler, - r: Receiver, + recycler: ResponseRecycler, + r: ResponseReceiver, ) -> JoinHandle<()> { spawn(move || loop { if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { @@ -316,7 +350,7 @@ mod test { use std::sync::mpsc::channel; use std::io::Write; use std::io; - use streamer::{allocate, receiver, sender, Packet, Receiver, PACKET_SIZE}; + use streamer::{allocate, receiver, responder, Packet, Receiver, PACKET_SIZE}; fn get_msgs(r: Receiver, num: &mut usize) { for _t in 0..5 { @@ -340,8 +374,8 @@ mod test { let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); - let (s_sender, r_sender) = channel(); - let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); + let (s_responder, r_responder) = channel(); + let t_responder = responder(send, exit.clone(), recycler.clone(), r_responder); let msgs = allocate(&recycler); msgs.write().unwrap().packets.resize(10, Packet::default()); for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { @@ -350,13 +384,13 @@ mod test { w.set_addr(&addr); assert_eq!(w.get_addr(), addr); } - s_sender.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); - t_sender.join().expect("join"); + t_responder.join().expect("join"); } #[test] pub fn streamer_debug() { @@ -368,25 +402,26 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let recycler = Arc::new(Mutex::new(Vec::new())); + let packet_recycler = Arc::new(Mutex::new(Vec::new())); + let resp_recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); - let (s_sender, r_sender) = channel(); - let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); - let msgs = allocate(&recycler); - msgs.write().unwrap().packets.resize(10, Packet::default()); - for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { + let t_receiver = receiver(read, exit.clone(), packet_recycler.clone(), s_reader).unwrap(); + let (s_responder, r_responder) = channel(); + let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let msgs = allocate(&resp_recycler); + msgs.write().unwrap().responses.resize(10, Responses::default()); + for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() { w.data[0] = i as u8; w.meta.size = PACKET_SIZE; w.meta.set_addr(&addr); assert_eq!(w.meta.get_addr(), addr); } - s_sender.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); - t_sender.join().expect("join"); + t_responder.join().expect("join"); } } From 8ad90807eeb946c885de0b6f2ac340b7657c6e72 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 24 Mar 2018 23:46:25 -0700 Subject: [PATCH 04/10] responder with larger block size --- src/accountant_skel.rs | 34 +++++++++++++++++----------------- src/lib.rs | 1 - 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index f948c434b6..d8c2e5e3ab 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -92,20 +92,16 @@ impl AccountantSkel { fn process( &mut self, r_reader: &streamer::Receiver, - s_sender: &streamer::Sender, - recycler: &streamer::Recycler, + s_responder: &streamer::Responder, + packet_recycler: &streamer::PacketRecycler, + response_recycler: &streamer::ResponseRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r_reader.recv_timeout(timer)?; let msgs_ = msgs.clone(); let msgs__ = msgs.clone(); - let rsps = streamer::allocate(recycler); + let rsps = streamer::allocate(response_recycler); let rsps_ = rsps.clone(); - let l = msgs__.read().unwrap().packets.len(); - rsps.write() - .unwrap() - .packets - .resize(l, streamer::Packet::default()); { let mut num = 0; let mut ursps = rsps.write().unwrap(); @@ -113,10 +109,13 @@ impl AccountantSkel { let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = self.process_request(req) { - let rsp = &mut ursps.packets[num]; + if ursps.len() <= num { + ursps.responses.resize(num * 2, streamer::Response::default()); + } + let rsp = &mut ursps.responses[num]; let v = serialize(&resp)?; let len = v.len(); - rsp.data[0..len].copy_from_slice(&v); + rsp.data[..len].copy_from_slice(&v); rsp.meta.size = len; rsp.meta.set_addr(&packet.meta.get_addr()); num += 1; @@ -124,7 +123,7 @@ impl AccountantSkel { } ursps.packets.resize(num, streamer::Packet::default()); } - s_sender.send(rsps_)?; + s_responder.send(rsps_)?; streamer::recycle(recycler, msgs_); Ok(()) } @@ -141,23 +140,24 @@ impl AccountantSkel { local.set_port(0); let write = UdpSocket::bind(local)?; - let recycler = Arc::new(Mutex::new(Vec::new())); + let packet_recycler = Arc::new(Mutex::new(Vec::new())); + let response_recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_receiver = streamer::receiver(read, exit.clone(), recycler.clone(), s_reader)?; + let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?; - let (s_sender, r_sender) = channel(); - let t_sender = streamer::sender(write, exit.clone(), recycler.clone(), r_sender); + let (s_responder, r_responder) = channel(); + let t_responder = streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder); let t_server = spawn(move || { if let Ok(me) = Arc::try_unwrap(obj) { loop { - let e = me.lock().unwrap().process(&r_reader, &s_sender, &recycler); + let e = me.lock().unwrap().process(&r_reader, &s_responder, &packet_recycler, &response_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } } } }); - Ok(vec![t_receiver, t_sender, t_server]) + Ok(vec![t_receiver, t_responder, t_server]) } } diff --git a/src/lib.rs b/src/lib.rs index db8374c15b..b3af5bcd7f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,7 +14,6 @@ pub mod accountant; pub mod accountant_skel; pub mod accountant_stub; pub mod result; -pub mod services; extern crate bincode; extern crate chrono; extern crate generic_array; From e39c0b34e57b1a86f31943d30e5f3acf34c73da1 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 25 Mar 2018 00:06:48 -0700 Subject: [PATCH 05/10] update --- src/accountant_skel.rs | 14 ++++++-- src/streamer.rs | 81 +++++++++++++++++++++++++++--------------- 2 files changed, 64 insertions(+), 31 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index d8c2e5e3ab..bcad4236fc 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -110,7 +110,9 @@ impl AccountantSkel { let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = self.process_request(req) { if ursps.len() <= num { - ursps.responses.resize(num * 2, streamer::Response::default()); + ursps + .responses + .resize(num * 2, streamer::Response::default()); } let rsp = &mut ursps.responses[num]; let v = serialize(&resp)?; @@ -146,12 +148,18 @@ impl AccountantSkel { let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?; let (s_responder, r_responder) = channel(); - let t_responder = streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder); + let t_responder = + streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder); let t_server = spawn(move || { if let Ok(me) = Arc::try_unwrap(obj) { loop { - let e = me.lock().unwrap().process(&r_reader, &s_responder, &packet_recycler, &response_recycler); + let e = me.lock().unwrap().process( + &r_reader, + &s_responder, + &packet_recycler, + &response_recycler, + ); if e.is_err() && exit.load(Ordering::Relaxed) { break; } diff --git a/src/streamer.rs b/src/streamer.rs index f187d68490..66c49ef1e8 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -25,6 +25,7 @@ pub struct Packet { pub data: [u8; PACKET_SIZE], pub meta: Meta, } + impl fmt::Debug for Packet { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( @@ -35,6 +36,7 @@ impl fmt::Debug for Packet { ) } } + impl Default for Packet { fn default() -> Packet { Packet { @@ -43,6 +45,7 @@ impl Default for Packet { } } } + impl Meta { pub fn get_addr(&self) -> SocketAddr { if !self.v6 { @@ -87,48 +90,65 @@ impl Meta { } } -#[derive(Clone, Debug, Default)] pub struct Packets { pub packets: Vec, } -#[derive(Clone, Debug, Default)] +impl Default for Packets { + fn default() -> Packets { + Packets { + packets: vec![Packet::default(); BLOCK_SIZE], + } + } +} + +#[derive(Clone)] pub struct Response { - pub resp: [u8; RESP_SIZE], + pub data: [u8; RESP_SIZE], pub meta: Meta, } +impl fmt::Debug for Response { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Response {{ size: {:?}, addr: {:?} }}", + self.meta.size, + self.meta.get_addr() + ) + } +} + +impl Default for Response { + fn default() -> Response { + Response { + data: [0u8; RESP_SIZE], + meta: Meta::default(), + } + } +} + pub struct Responses { pub responses: Vec, } +impl Default for Responses { + fn default() -> Responses { + Responses { + responses: vec![Response::default(); NUM_RESP], + } + } +} pub type SharedPackets = Arc>; pub type PacketRecycler = Arc>>; pub type Receiver = mpsc::Receiver; pub type Sender = mpsc::Sender; -pub type SharedResponses = Arc>; +pub type SharedResponses = Arc>; pub type ResponseRecycler = Arc>>; pub type Responder = mpsc::Sender; pub type ResponseReceiver = mpsc::Receiver; -impl Default for Responses { - pub fn default() -> Responses { - Responses { - packets: vec![Response::default(); NUM_RESP], - } - } -} - - -impl Default for Packets { - pub fn default() -> Packets { - Packets { - packets: vec![Packet::default(); BLOCK_SIZE], - } - } -} - impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(BLOCK_SIZE, Packet::default()); @@ -172,16 +192,18 @@ impl Packets { } } -pub fn allocate(recycler: &Arc>>) -> Arc> -where T: Default +pub fn allocate(recycler: &Arc>>>>) -> Arc> +where + T: Default, { let mut gc = recycler.lock().expect("lock"); gc.pop() - .unwrap_or_else(|| Arc::new(RwLock::new(T::default()))) + .unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))) } -pub fn recycle(recycler: &Arc>>, msgs: Arc>) -where T: Default +pub fn recycle(recycler: &Arc>>>>, msgs: Arc>) +where + T: Default, { let mut gc = recycler.lock().expect("lock"); gc.push(msgs); @@ -350,7 +372,7 @@ mod test { use std::sync::mpsc::channel; use std::io::Write; use std::io; - use streamer::{allocate, receiver, responder, Packet, Receiver, PACKET_SIZE}; + use streamer::{allocate, receiver, responder, Packet, Receiver, Responses, PACKET_SIZE}; fn get_msgs(r: Receiver, num: &mut usize) { for _t in 0..5 { @@ -409,7 +431,10 @@ mod test { let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); let msgs = allocate(&resp_recycler); - msgs.write().unwrap().responses.resize(10, Responses::default()); + msgs.write() + .unwrap() + .responses + .resize(10, Responses::default()); for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() { w.data[0] = i as u8; w.meta.size = PACKET_SIZE; From 62af09adbe9cad8b3886588be04dfb35e06e2a6b Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 25 Mar 2018 08:05:03 -0700 Subject: [PATCH 06/10] wip --- src/accountant_skel.rs | 2 +- src/streamer.rs | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index bcad4236fc..3a02677b7e 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -126,7 +126,7 @@ impl AccountantSkel { ursps.packets.resize(num, streamer::Packet::default()); } s_responder.send(rsps_)?; - streamer::recycle(recycler, msgs_); + streamer::recycle(packet_recycler, msgs_); Ok(()) } diff --git a/src/streamer.rs b/src/streamer.rs index 66c49ef1e8..fd084426dd 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -90,6 +90,7 @@ impl Meta { } } +#[derive(Debug)] pub struct Packets { pub packets: Vec, } @@ -128,6 +129,7 @@ impl Default for Response { } } +#[derive(Debug)] pub struct Responses { pub responses: Vec, } @@ -153,6 +155,7 @@ impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(BLOCK_SIZE, Packet::default()); let mut i = 0; + socket.set_nonblocking(false)?; for p in &mut self.packets { p.meta.size = 0; match socket.recv_from(&mut p.data) { @@ -192,6 +195,18 @@ impl Packets { } } +impl Responses { + fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { + for p in &self.responses { + let a = p.meta.get_addr(); + socket.send_to(&p.data[..p.meta.size], &a)?; + //TODO(anatoly): wtf do we do about errors? + *num += 1; + } + Ok(()) + } +} + pub fn allocate(recycler: &Arc>>>>) -> Arc> where T: Default, @@ -372,7 +387,7 @@ mod test { use std::sync::mpsc::channel; use std::io::Write; use std::io; - use streamer::{allocate, receiver, responder, Packet, Receiver, Responses, PACKET_SIZE}; + use streamer::{allocate, receiver, responder, Packet, Receiver, Response, PACKET_SIZE}; fn get_msgs(r: Receiver, num: &mut usize) { for _t in 0..5 { @@ -434,7 +449,7 @@ mod test { msgs.write() .unwrap() .responses - .resize(10, Responses::default()); + .resize(10, Response::default()); for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() { w.data[0] = i as u8; w.meta.size = PACKET_SIZE; From 290960c3b575e7967287ba07bce2df8cf7eb2d3d Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 25 Mar 2018 08:06:33 -0700 Subject: [PATCH 07/10] wip --- src/accountant_skel.rs | 5 ++--- src/streamer.rs | 9 --------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 3a02677b7e..f713eb77ae 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -99,7 +99,6 @@ impl AccountantSkel { let timer = Duration::new(1, 0); let msgs = r_reader.recv_timeout(timer)?; let msgs_ = msgs.clone(); - let msgs__ = msgs.clone(); let rsps = streamer::allocate(response_recycler); let rsps_ = rsps.clone(); { @@ -109,7 +108,7 @@ impl AccountantSkel { let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = self.process_request(req) { - if ursps.len() <= num { + if ursps.responses.len() <= num { ursps .responses .resize(num * 2, streamer::Response::default()); @@ -123,7 +122,7 @@ impl AccountantSkel { num += 1; } } - ursps.packets.resize(num, streamer::Packet::default()); + ursps.responses.resize(num, streamer::Response::default()); } s_responder.send(rsps_)?; streamer::recycle(packet_recycler, msgs_); diff --git a/src/streamer.rs b/src/streamer.rs index fd084426dd..5f877bd889 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -184,15 +184,6 @@ impl Packets { self.packets.resize(sz, Packet::default()); Ok(()) } - fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { - for p in &self.packets { - let a = p.meta.get_addr(); - socket.send_to(&p.data[..p.meta.size], &a)?; - //TODO(anatoly): wtf do we do about errors? - *num += 1; - } - Ok(()) - } } impl Responses { From 8e551f5e32452f53709cc8491bc1ee7bd7bbd9b2 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 25 Mar 2018 08:22:04 -0700 Subject: [PATCH 08/10] debug trait tests --- src/streamer.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/streamer.rs b/src/streamer.rs index 5f877bd889..8d38e5f6f1 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -378,7 +378,8 @@ mod test { use std::sync::mpsc::channel; use std::io::Write; use std::io; - use streamer::{allocate, receiver, responder, Packet, Receiver, Response, PACKET_SIZE}; + use streamer::{allocate, receiver, responder, Packet, Packets, Receiver, Response, Responses, + PACKET_SIZE}; fn get_msgs(r: Receiver, num: &mut usize) { for _t in 0..5 { @@ -423,6 +424,9 @@ mod test { #[test] pub fn streamer_debug() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); + write!(io::sink(), "{:?}", Packets::default()).unwrap(); + write!(io::sink(), "{:?}", Response::default()).unwrap(); + write!(io::sink(), "{:?}", Responses::default()).unwrap(); } #[test] pub fn streamer_send_test() { From f089abb3c558a8d990e59b05914062235ff3cc11 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 25 Mar 2018 15:37:00 -0700 Subject: [PATCH 09/10] fix bench --- src/streamer.rs | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 8d38e5f6f1..fe0b9ed153 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -289,29 +289,39 @@ mod bench { use std::time::SystemTime; use std::thread::{spawn, JoinHandle}; use std::sync::mpsc::channel; + use std::sync::atomic::{AtomicBool, Ordering}; use result::Result; - use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; + use streamer::{allocate, receiver, recycle, Packet, PacketRecycler, Receiver, PACKET_SIZE}; - fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc) -> JoinHandle<()> { + fn producer( + addr: &SocketAddr, + recycler: PacketRecycler, + exit: Arc, + ) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let msgs = allocate(recycler); + let msgs = allocate(&recycler); + let msgs_ = msgs.clone(); msgs.write().unwrap().packets.resize(10, Packet::default()); for w in msgs.write().unwrap().packets.iter_mut() { - w.size = PACKET_SIZE; - w.set_addr(&addr); + w.meta.size = PACKET_SIZE; + w.meta.set_addr(&addr); } spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } let mut num = 0; - msgs.read().unwrap().send_to(&send, &mut num).unwrap(); + for p in msgs_.read().unwrap().packets.iter() { + let a = p.meta.get_addr(); + send.send_to(&p.data[..p.meta.size], &a).unwrap(); + num += 1; + } assert_eq!(num, 10); }) } fn sinc( - recycler: &Recycler, + recycler: PacketRecycler, exit: Arc, rvs: Arc>, r: Receiver, @@ -325,7 +335,7 @@ mod bench { Ok(msgs) => { let msgs_ = msgs.clone(); *rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); - recycle(recycler, msgs_); + recycle(&recycler, msgs_); } _ => (), } @@ -338,10 +348,10 @@ mod bench { let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?; - let t_producer1 = producer(&addr, &recycler, exit.clone()); - let t_producer2 = producer(&addr, &recycler, exit.clone()); - let t_producer3 = producer(&addr, &recycler, exit.clone()); + let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?; + let t_producer1 = producer(&addr, recycler.clone(), exit.clone()); + let t_producer2 = producer(&addr, recycler.clone(), exit.clone()); + let t_producer3 = producer(&addr, recycler.clone(), exit.clone()); let rvs = Arc::new(Mutex::new(0)); let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader); From c1783d77d75aa6164b8140ba3c9a43f8884eeeae Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 25 Mar 2018 16:18:27 -0700 Subject: [PATCH 10/10] fixed test --- src/accountant_skel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index f713eb77ae..2f82f6441c 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -111,7 +111,7 @@ impl AccountantSkel { if ursps.responses.len() <= num { ursps .responses - .resize(num * 2, streamer::Response::default()); + .resize((num + 1) * 2, streamer::Response::default()); } let rsp = &mut ursps.responses[num]; let v = serialize(&resp)?;