diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index f948c434b..d8c2e5e3a 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -92,20 +92,16 @@ impl AccountantSkel { fn process( &mut self, r_reader: &streamer::Receiver, - s_sender: &streamer::Sender, - recycler: &streamer::Recycler, + s_responder: &streamer::Responder, + packet_recycler: &streamer::PacketRecycler, + response_recycler: &streamer::ResponseRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r_reader.recv_timeout(timer)?; let msgs_ = msgs.clone(); let msgs__ = msgs.clone(); - let rsps = streamer::allocate(recycler); + let rsps = streamer::allocate(response_recycler); let rsps_ = rsps.clone(); - let l = msgs__.read().unwrap().packets.len(); - rsps.write() - .unwrap() - .packets - .resize(l, streamer::Packet::default()); { let mut num = 0; let mut ursps = rsps.write().unwrap(); @@ -113,10 +109,13 @@ impl AccountantSkel { let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = self.process_request(req) { - let rsp = &mut ursps.packets[num]; + if ursps.len() <= num { + ursps.responses.resize(num * 2, streamer::Response::default()); + } + let rsp = &mut ursps.responses[num]; let v = serialize(&resp)?; let len = v.len(); - rsp.data[0..len].copy_from_slice(&v); + rsp.data[..len].copy_from_slice(&v); rsp.meta.size = len; rsp.meta.set_addr(&packet.meta.get_addr()); num += 1; @@ -124,7 +123,7 @@ impl AccountantSkel { } ursps.packets.resize(num, streamer::Packet::default()); } - s_sender.send(rsps_)?; + s_responder.send(rsps_)?; streamer::recycle(recycler, msgs_); Ok(()) } @@ -141,23 +140,24 @@ impl AccountantSkel { local.set_port(0); let write = UdpSocket::bind(local)?; - let recycler = Arc::new(Mutex::new(Vec::new())); + let packet_recycler = Arc::new(Mutex::new(Vec::new())); + let response_recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_receiver = streamer::receiver(read, exit.clone(), recycler.clone(), s_reader)?; + let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?; - let (s_sender, r_sender) = channel(); - let t_sender = streamer::sender(write, exit.clone(), recycler.clone(), r_sender); + let (s_responder, r_responder) = channel(); + let t_responder = streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder); let t_server = spawn(move || { if let Ok(me) = Arc::try_unwrap(obj) { loop { - let e = me.lock().unwrap().process(&r_reader, &s_sender, &recycler); + let e = me.lock().unwrap().process(&r_reader, &s_responder, &packet_recycler, &response_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } } } }); - Ok(vec![t_receiver, t_sender, t_server]) + Ok(vec![t_receiver, t_responder, t_server]) } } diff --git a/src/lib.rs b/src/lib.rs index db8374c15..b3af5bcd7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,7 +14,6 @@ pub mod accountant; pub mod accountant_skel; pub mod accountant_stub; pub mod result; -pub mod services; extern crate bincode; extern crate chrono; extern crate generic_array;