From e39c0b34e57b1a86f31943d30e5f3acf34c73da1 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sun, 25 Mar 2018 00:06:48 -0700 Subject: [PATCH] update --- src/accountant_skel.rs | 14 ++++++-- src/streamer.rs | 81 +++++++++++++++++++++++++++--------------- 2 files changed, 64 insertions(+), 31 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index d8c2e5e3a..bcad4236f 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -110,7 +110,9 @@ impl AccountantSkel { let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = self.process_request(req) { if ursps.len() <= num { - ursps.responses.resize(num * 2, streamer::Response::default()); + ursps + .responses + .resize(num * 2, streamer::Response::default()); } let rsp = &mut ursps.responses[num]; let v = serialize(&resp)?; @@ -146,12 +148,18 @@ impl AccountantSkel { let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?; let (s_responder, r_responder) = channel(); - let t_responder = streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder); + let t_responder = + streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder); let t_server = spawn(move || { if let Ok(me) = Arc::try_unwrap(obj) { loop { - let e = me.lock().unwrap().process(&r_reader, &s_responder, &packet_recycler, &response_recycler); + let e = me.lock().unwrap().process( + &r_reader, + &s_responder, + &packet_recycler, + &response_recycler, + ); if e.is_err() && exit.load(Ordering::Relaxed) { break; } diff --git a/src/streamer.rs b/src/streamer.rs index f187d6849..66c49ef1e 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -25,6 +25,7 @@ pub struct Packet { pub data: [u8; PACKET_SIZE], pub meta: Meta, } + impl fmt::Debug for Packet { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( @@ -35,6 +36,7 @@ impl fmt::Debug for Packet { ) } } + impl Default for Packet { fn default() -> Packet { Packet { @@ -43,6 +45,7 @@ impl Default for Packet { } } } + impl Meta { pub fn get_addr(&self) -> SocketAddr { if !self.v6 { @@ -87,48 +90,65 @@ impl Meta { } } -#[derive(Clone, Debug, Default)] pub struct Packets { pub packets: Vec, } -#[derive(Clone, Debug, Default)] +impl Default for Packets { + fn default() -> Packets { + Packets { + packets: vec![Packet::default(); BLOCK_SIZE], + } + } +} + +#[derive(Clone)] pub struct Response { - pub resp: [u8; RESP_SIZE], + pub data: [u8; RESP_SIZE], pub meta: Meta, } +impl fmt::Debug for Response { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Response {{ size: {:?}, addr: {:?} }}", + self.meta.size, + self.meta.get_addr() + ) + } +} + +impl Default for Response { + fn default() -> Response { + Response { + data: [0u8; RESP_SIZE], + meta: Meta::default(), + } + } +} + pub struct Responses { pub responses: Vec, } +impl Default for Responses { + fn default() -> Responses { + Responses { + responses: vec![Response::default(); NUM_RESP], + } + } +} pub type SharedPackets = Arc>; pub type PacketRecycler = Arc>>; pub type Receiver = mpsc::Receiver; pub type Sender = mpsc::Sender; -pub type SharedResponses = Arc>; +pub type SharedResponses = Arc>; pub type ResponseRecycler = Arc>>; pub type Responder = mpsc::Sender; pub type ResponseReceiver = mpsc::Receiver; -impl Default for Responses { - pub fn default() -> Responses { - Responses { - packets: vec![Response::default(); NUM_RESP], - } - } -} - - -impl Default for Packets { - pub fn default() -> Packets { - Packets { - packets: vec![Packet::default(); BLOCK_SIZE], - } - } -} - impl Packets { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(BLOCK_SIZE, Packet::default()); @@ -172,16 +192,18 @@ impl Packets { } } -pub fn allocate(recycler: &Arc>>) -> Arc> -where T: Default +pub fn allocate(recycler: &Arc>>>>) -> Arc> +where + T: Default, { let mut gc = recycler.lock().expect("lock"); gc.pop() - .unwrap_or_else(|| Arc::new(RwLock::new(T::default()))) + .unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))) } -pub fn recycle(recycler: &Arc>>, msgs: Arc>) -where T: Default +pub fn recycle(recycler: &Arc>>>>, msgs: Arc>) +where + T: Default, { let mut gc = recycler.lock().expect("lock"); gc.push(msgs); @@ -350,7 +372,7 @@ mod test { use std::sync::mpsc::channel; use std::io::Write; use std::io; - use streamer::{allocate, receiver, responder, Packet, Receiver, PACKET_SIZE}; + use streamer::{allocate, receiver, responder, Packet, Receiver, Responses, PACKET_SIZE}; fn get_msgs(r: Receiver, num: &mut usize) { for _t in 0..5 { @@ -409,7 +431,10 @@ mod test { let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); let msgs = allocate(&resp_recycler); - msgs.write().unwrap().responses.resize(10, Responses::default()); + msgs.write() + .unwrap() + .responses + .resize(10, Responses::default()); for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() { w.data[0] = i as u8; w.meta.size = PACKET_SIZE;