diff --git a/src/bin/bench-streamer.rs b/src/bin/bench-streamer.rs index 80506891c..b8eff762f 100644 --- a/src/bin/bench-streamer.rs +++ b/src/bin/bench-streamer.rs @@ -4,20 +4,20 @@ use solana::packet::{Packet, PacketRecycler, BLOB_SIZE, PACKET_DATA_SIZE}; use solana::result::Result; use solana::streamer::{receiver, PacketReceiver}; use std::net::{SocketAddr, UdpSocket}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::sleep; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use std::time::SystemTime; -fn producer(addr: &SocketAddr, recycler: PacketRecycler, exit: Arc) -> JoinHandle<()> { +fn producer(addr: &SocketAddr, recycler: &PacketRecycler, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); let msgs = recycler.allocate(); let msgs_ = msgs.clone(); msgs.write().unwrap().packets.resize(10, Packet::default()); - for w in msgs.write().unwrap().packets.iter_mut() { + for w in &mut msgs.write().unwrap().packets { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); } @@ -26,7 +26,7 @@ fn producer(addr: &SocketAddr, recycler: PacketRecycler, exit: Arc) return; } let mut num = 0; - for p in msgs_.read().unwrap().packets.iter() { + for p in &msgs_.read().unwrap().packets { let a = p.meta.addr(); assert!(p.meta.size < BLOB_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); @@ -39,7 +39,7 @@ fn producer(addr: &SocketAddr, recycler: PacketRecycler, exit: Arc) fn sink( recycler: PacketRecycler, exit: Arc, - rvs: Arc>, + rvs: Arc, r: PacketReceiver, ) -> JoinHandle<()> { spawn(move || loop { @@ -47,12 +47,9 @@ fn sink( return; } let timer = Duration::new(1, 0); - match r.recv_timeout(timer) { - Ok(msgs) => { - *rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); - recycler.recycle(msgs); - } - _ => (), + if let Ok(msgs) = r.recv_timeout(timer) { + rvs.fetch_add(msgs.read().unwrap().packets.len(), Ordering::Relaxed); + recycler.recycle(msgs); } }) } @@ -67,20 +64,20 @@ fn main() -> Result<()> { let (s_reader, r_reader) = channel(); let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); - let t_producer1 = producer(&addr, pack_recycler.clone(), exit.clone()); - let t_producer2 = producer(&addr, pack_recycler.clone(), exit.clone()); - let t_producer3 = producer(&addr, pack_recycler.clone(), exit.clone()); + let t_producer1 = producer(&addr, &pack_recycler, exit.clone()); + let t_producer2 = producer(&addr, &pack_recycler, exit.clone()); + let t_producer3 = producer(&addr, &pack_recycler, exit.clone()); - let rvs = Arc::new(Mutex::new(0)); + let rvs = Arc::new(AtomicUsize::new(0)); let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader); let start = SystemTime::now(); - let start_val = *rvs.lock().unwrap(); + let start_val = rvs.load(Ordering::Relaxed); sleep(Duration::new(5, 0)); let elapsed = start.elapsed().unwrap(); - let end_val = *rvs.lock().unwrap(); - let time = elapsed.as_secs() * 10000000000 + elapsed.subsec_nanos() as u64; - let ftime = (time as f64) / 10000000000f64; + let end_val = rvs.load(Ordering::Relaxed); + let time = elapsed.as_secs() * 10_000_000_000 + u64::from(elapsed.subsec_nanos()); + let ftime = (time as f64) / 10_000_000_000_f64; let fcount = (end_val - start_val) as f64; println!("performance: {:?}", fcount / ftime); exit.store(true, Ordering::Relaxed);