diff --git a/src/streamer.rs b/src/streamer.rs index 8d38e5f6f..fe0b9ed15 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -289,29 +289,39 @@ mod bench { use std::time::SystemTime; use std::thread::{spawn, JoinHandle}; use std::sync::mpsc::channel; + use std::sync::atomic::{AtomicBool, Ordering}; use result::Result; - use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; + use streamer::{allocate, receiver, recycle, Packet, PacketRecycler, Receiver, PACKET_SIZE}; - fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc) -> JoinHandle<()> { + fn producer( + addr: &SocketAddr, + recycler: PacketRecycler, + exit: Arc, + ) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let msgs = allocate(recycler); + let msgs = allocate(&recycler); + let msgs_ = msgs.clone(); msgs.write().unwrap().packets.resize(10, Packet::default()); for w in msgs.write().unwrap().packets.iter_mut() { - w.size = PACKET_SIZE; - w.set_addr(&addr); + w.meta.size = PACKET_SIZE; + w.meta.set_addr(&addr); } spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } let mut num = 0; - msgs.read().unwrap().send_to(&send, &mut num).unwrap(); + for p in msgs_.read().unwrap().packets.iter() { + let a = p.meta.get_addr(); + send.send_to(&p.data[..p.meta.size], &a).unwrap(); + num += 1; + } assert_eq!(num, 10); }) } fn sinc( - recycler: &Recycler, + recycler: PacketRecycler, exit: Arc, rvs: Arc>, r: Receiver, @@ -325,7 +335,7 @@ mod bench { Ok(msgs) => { let msgs_ = msgs.clone(); *rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); - recycle(recycler, msgs_); + recycle(&recycler, msgs_); } _ => (), } @@ -338,10 +348,10 @@ mod bench { let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?; - let t_producer1 = producer(&addr, &recycler, exit.clone()); - let t_producer2 = producer(&addr, &recycler, exit.clone()); - let t_producer3 = producer(&addr, &recycler, exit.clone()); + let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?; + let t_producer1 = producer(&addr, recycler.clone(), exit.clone()); + let t_producer2 = producer(&addr, recycler.clone(), exit.clone()); + let t_producer3 = producer(&addr, recycler.clone(), exit.clone()); let rvs = Arc::new(Mutex::new(0)); let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader);