fix bench

This commit is contained in:
Anatoly Yakovenko 2018-03-25 15:37:00 -07:00
parent 8e551f5e32
commit f089abb3c5
1 changed files with 22 additions and 12 deletions

View File

@ -289,29 +289,39 @@ mod bench {
use std::time::SystemTime; use std::time::SystemTime;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::atomic::{AtomicBool, Ordering};
use result::Result; use result::Result;
use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; use streamer::{allocate, receiver, recycle, Packet, PacketRecycler, Receiver, PACKET_SIZE};
fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc<AtomicBool>) -> JoinHandle<()> { fn producer(
addr: &SocketAddr,
recycler: PacketRecycler,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap(); let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let msgs = allocate(recycler); let msgs = allocate(&recycler);
let msgs_ = msgs.clone();
msgs.write().unwrap().packets.resize(10, Packet::default()); msgs.write().unwrap().packets.resize(10, Packet::default());
for w in msgs.write().unwrap().packets.iter_mut() { for w in msgs.write().unwrap().packets.iter_mut() {
w.size = PACKET_SIZE; w.meta.size = PACKET_SIZE;
w.set_addr(&addr); w.meta.set_addr(&addr);
} }
spawn(move || loop { spawn(move || loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return; return;
} }
let mut num = 0; let mut num = 0;
msgs.read().unwrap().send_to(&send, &mut num).unwrap(); for p in msgs_.read().unwrap().packets.iter() {
let a = p.meta.get_addr();
send.send_to(&p.data[..p.meta.size], &a).unwrap();
num += 1;
}
assert_eq!(num, 10); assert_eq!(num, 10);
}) })
} }
fn sinc( fn sinc(
recycler: &Recycler, recycler: PacketRecycler,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
rvs: Arc<Mutex<usize>>, rvs: Arc<Mutex<usize>>,
r: Receiver, r: Receiver,
@ -325,7 +335,7 @@ mod bench {
Ok(msgs) => { Ok(msgs) => {
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); *rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
recycle(recycler, msgs_); recycle(&recycler, msgs_);
} }
_ => (), _ => (),
} }
@ -338,10 +348,10 @@ mod bench {
let recycler = Arc::new(Mutex::new(Vec::new())); let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?; let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?;
let t_producer1 = producer(&addr, &recycler, exit.clone()); let t_producer1 = producer(&addr, recycler.clone(), exit.clone());
let t_producer2 = producer(&addr, &recycler, exit.clone()); let t_producer2 = producer(&addr, recycler.clone(), exit.clone());
let t_producer3 = producer(&addr, &recycler, exit.clone()); let t_producer3 = producer(&addr, recycler.clone(), exit.clone());
let rvs = Arc::new(Mutex::new(0)); let rvs = Arc::new(Mutex::new(0));
let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader); let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader);