From 803dcb08009ab8335d7ed043526a00d3ae51e4c4 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 22 Mar 2018 14:05:23 -0600 Subject: [PATCH] Mutex -> AtomicBool --- src/accountant_skel.rs | 5 +++-- src/accountant_stub.rs | 5 +++-- src/bin/genesis-demo.rs | 2 +- src/bin/testnode.rs | 3 ++- src/streamer.rs | 30 ++++++++++++++++-------------- 5 files changed, 25 insertions(+), 20 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 885f05a2b..dc4dfbc00 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -9,6 +9,7 @@ use result::Result; use streamer; use std::sync::{Arc, Mutex}; use std::time::Duration; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::thread::{spawn, JoinHandle}; use std::default::Default; @@ -131,7 +132,7 @@ impl AccountantSkel { pub fn serve( obj: Arc>, addr: &str, - exit: Arc>, + exit: Arc, ) -> Result<[Arc>; 3]> { let read = UdpSocket::bind(addr)?; // make sure we are on the same interface @@ -152,7 +153,7 @@ impl AccountantSkel { let e = me.lock() .unwrap() .process(&r_reader, &s_sender, recycler.clone()); - if e.is_err() && *exit.lock().unwrap() { + if e.is_err() && exit.load(Ordering::Relaxed) { break; } }, diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 0c4f1082c..7ca067866 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -127,6 +127,7 @@ mod tests { use mint::Mint; use signature::{KeyPair, KeyPairUtil}; use std::sync::{Arc, Mutex}; + use std::sync::atomic::{AtomicBool, Ordering}; #[test] fn test_accountant_stub() { @@ -135,7 +136,7 @@ mod tests { let alice = Mint::new(10_000); let acc = Accountant::new(&alice, Some(30)); let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(Mutex::new(false)); + let exit = Arc::new(AtomicBool::new(false)); let acc = Arc::new(Mutex::new(AccountantSkel::new(acc))); let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); sleep(Duration::from_millis(30)); @@ -147,7 +148,7 @@ mod tests { .unwrap(); acc.wait_on_signature(&sig, &last_id).unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500); - *exit.lock().unwrap() = true; + exit.store(true, Ordering::Relaxed); for t in threads.iter() { match Arc::try_unwrap((*t).clone()) { Ok(j) => j.join().expect("join"), diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index ca53378a9..c5fddfead 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -10,7 +10,7 @@ use silk::hash::Hash; use std::io::stdin; fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event { - Event::Transaction(Transaction::new(&from, to, tokens, last_id)) + Event::Transaction(Transaction::new(from, to, tokens, last_id)) } fn main() { diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 4fcc5ef48..b9204e7e5 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -5,6 +5,7 @@ use silk::accountant_skel::AccountantSkel; use silk::accountant::Accountant; use std::io::{self, BufRead}; use std::sync::{Arc, Mutex}; +use std::sync::atomic::AtomicBool; fn main() { let addr = "127.0.0.1:8000"; @@ -14,7 +15,7 @@ fn main() { .lines() .map(|line| serde_json::from_str(&line.unwrap()).unwrap()); let acc = Accountant::new_from_entries(entries, Some(1000)); - let exit = Arc::new(Mutex::new(false)); + let exit = Arc::new(AtomicBool::new(false)); let skel = Arc::new(Mutex::new(AccountantSkel::new(acc))); eprintln!("Listening on {}", addr); let _threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); diff --git a/src/streamer.rs b/src/streamer.rs index 56e5d2f90..817c070c7 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,4 +1,5 @@ use std::sync::{Arc, Mutex, RwLock}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::fmt; use std::time::Duration; @@ -153,7 +154,7 @@ pub fn recycle(recycler: Recycler, msgs: SharedPacketData) { fn recv_loop( sock: &UdpSocket, - exit: Arc>, + exit: Arc, recycler: Recycler, channel: Sender, ) -> Result<()> { @@ -167,7 +168,7 @@ fn recv_loop( break; } Err(_) => { - if *exit.lock().unwrap() { + if exit.load(Ordering::Relaxed) { recycle(recycler.clone(), msgs_); return Ok(()); } @@ -179,7 +180,7 @@ fn recv_loop( pub fn receiver( sock: UdpSocket, - exit: Arc>, + exit: Arc, recycler: Recycler, channel: Sender, ) -> Result> { @@ -203,12 +204,12 @@ fn recv_send(sock: &UdpSocket, recycler: Recycler, r: &Receiver) -> Result<()> { pub fn sender( sock: UdpSocket, - exit: Arc>, + exit: Arc, recycler: Recycler, r: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - if recv_send(&sock, recycler.clone(), &r).is_err() && *exit.lock().unwrap() { + if recv_send(&sock, recycler.clone(), &r).is_err() && exit.load(Ordering::Relaxed) { break; } }) @@ -228,7 +229,7 @@ mod bench { use result::Result; use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; - fn producer(addr: &SocketAddr, recycler: Recycler, exit: Arc>) -> JoinHandle<()> { + fn producer(addr: &SocketAddr, recycler: Recycler, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); let msgs = allocate(recycler.clone()); msgs.write().unwrap().packets.resize(10, Packet::default()); @@ -237,7 +238,7 @@ mod bench { w.set_addr(&addr); } spawn(move || loop { - if *exit.lock().unwrap() { + if exit.load(Ordering::Relaxed) { return; } let mut num = 0; @@ -248,12 +249,12 @@ mod bench { fn sinc( recycler: Recycler, - exit: Arc>, + exit: Arc, rvs: Arc>, r: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - if *exit.lock().unwrap() { + if exit.load(Ordering::Relaxed) { return; } let timer = Duration::new(1, 0); @@ -270,7 +271,7 @@ mod bench { fn run_streamer_bench() -> Result<()> { let read = UdpSocket::bind("127.0.0.1:0")?; let addr = read.local_addr()?; - let exit = Arc::new(Mutex::new(false)); + let exit = Arc::new(AtomicBool::new(false)); let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); @@ -291,7 +292,7 @@ mod bench { let ftime = (time as f64) / 10000000000f64; let fcount = (end_val - start_val) as f64; println!("performance: {:?}", fcount / ftime); - *exit.lock().unwrap() = true; + exit.store(true, Ordering::Relaxed); t_reader.join()?; t_producer1.join()?; t_producer2.join()?; @@ -310,6 +311,7 @@ mod test { use std::sync::{Arc, Mutex}; use std::net::UdpSocket; use std::time::Duration; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::io::Write; use std::io; @@ -351,7 +353,7 @@ mod test { let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); - *exit.lock().unwrap() = true; + exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_sender.join().expect("join"); } @@ -364,7 +366,7 @@ mod test { let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let exit = Arc::new(Mutex::new(false)); + let exit = Arc::new(AtomicBool::new(false)); let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); @@ -382,7 +384,7 @@ mod test { let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); - *exit.lock().unwrap() = true; + exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_sender.join().expect("join"); }