diff --git a/src/accountant.rs b/src/accountant.rs index 7f2e65fa9..bdad3cf6b 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -9,7 +9,7 @@ use plan::{Plan, Witness}; use transaction::Transaction; use signature::{KeyPair, PublicKey, Signature}; use mint::Mint; -use historian::{reserve_signature, Historian}; +use historian::Historian; use recorder::Signal; use std::sync::mpsc::SendError; use std::collections::{HashMap, HashSet}; @@ -30,13 +30,7 @@ pub type Result = result::Result; /// Commit funds to the 'to' party. fn complete_transaction(balances: &mut HashMap, plan: &Plan) { if let Plan::Pay(ref payment) = *plan { - if balances.contains_key(&payment.to) { - if let Some(x) = balances.get_mut(&payment.to) { - *x += payment.tokens; - } - } else { - balances.insert(payment.to, payment.tokens); - } + *balances.entry(payment.to).or_insert(0) += payment.tokens; } } @@ -122,7 +116,7 @@ impl Accountant { tr: &Transaction, allow_deposits: bool, ) -> Result<()> { - if !reserve_signature(&mut self.historian.signatures, &tr.sig) { + if !self.historian.reserve_signature(&tr.sig) { return Err(AccountingError::InvalidTransferSignature); } @@ -133,7 +127,7 @@ impl Accountant { } let mut plan = tr.plan.clone(); - plan.apply_witness(Witness::Timestamp(self.last_time)); + plan.apply_witness(&Witness::Timestamp(self.last_time)); if plan.is_complete() { complete_transaction(&mut self.balances, &plan); @@ -146,7 +140,7 @@ impl Accountant { fn process_verified_sig(&mut self, from: PublicKey, tx_sig: Signature) -> Result<()> { if let Occupied(mut e) = self.pending.entry(tx_sig) { - e.get_mut().apply_witness(Witness::Signature(from)); + e.get_mut().apply_witness(&Witness::Signature(from)); if e.get().is_complete() { complete_transaction(&mut self.balances, e.get()); e.remove_entry(); @@ -174,9 +168,9 @@ impl Accountant { // Check to see if any timelocked transactions can be completed. let mut completed = vec![]; for (key, plan) in &mut self.pending { - plan.apply_witness(Witness::Timestamp(self.last_time)); + plan.apply_witness(&Witness::Timestamp(self.last_time)); if plan.is_complete() { - complete_transaction(&mut self.balances, &plan); + complete_transaction(&mut self.balances, plan); completed.push(key.clone()); } } @@ -222,7 +216,7 @@ impl Accountant { } pub fn get_balance(self: &Self, pubkey: &PublicKey) -> Option { - self.balances.get(pubkey).map(|x| *x) + self.balances.get(pubkey).cloned() } } diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 885f05a2b..6ff75b830 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -9,6 +9,7 @@ use result::Result; use streamer; use std::sync::{Arc, Mutex}; use std::time::Duration; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::thread::{spawn, JoinHandle}; use std::default::Default; @@ -20,6 +21,7 @@ pub struct AccountantSkel { pub ledger: Vec, } +#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[derive(Serialize, Deserialize, Debug)] pub enum Request { Transaction(Transaction), @@ -91,13 +93,13 @@ impl AccountantSkel { &mut self, r_reader: &streamer::Receiver, s_sender: &streamer::Sender, - recycler: streamer::Recycler, + recycler: &streamer::Recycler, ) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r_reader.recv_timeout(timer)?; let msgs_ = msgs.clone(); let msgs__ = msgs.clone(); - let rsps = streamer::allocate(recycler.clone()); + let rsps = streamer::allocate(recycler); let rsps_ = rsps.clone(); let l = msgs__.read().unwrap().packets.len(); rsps.write() @@ -107,11 +109,11 @@ impl AccountantSkel { { let mut num = 0; let mut ursps = rsps.write().unwrap(); - for packet in msgs.read().unwrap().packets.iter() { + for packet in &msgs.read().unwrap().packets { let sz = packet.size; let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = self.process_request(req) { - let rsp = ursps.packets.get_mut(num).unwrap(); + let rsp = &mut ursps.packets[num]; let v = serialize(&resp)?; let len = v.len(); rsp.data[0..len].copy_from_slice(&v); @@ -131,7 +133,7 @@ impl AccountantSkel { pub fn serve( obj: Arc>, addr: &str, - exit: Arc>, + exit: Arc, ) -> Result<[Arc>; 3]> { let read = UdpSocket::bind(addr)?; // make sure we are on the same interface @@ -147,17 +149,14 @@ impl AccountantSkel { let t_sender = streamer::sender(write, exit.clone(), recycler.clone(), r_sender); let t_server = spawn(move || { - match Arc::try_unwrap(obj) { - Ok(me) => loop { - let e = me.lock() - .unwrap() - .process(&r_reader, &s_sender, recycler.clone()); - if e.is_err() && *exit.lock().unwrap() { + if let Ok(me) = Arc::try_unwrap(obj) { + loop { + let e = me.lock().unwrap().process(&r_reader, &s_sender, &recycler); + if e.is_err() && exit.load(Ordering::Relaxed) { break; } - }, - _ => (), - }; + } + } }); Ok([Arc::new(t_receiver), Arc::new(t_sender), Arc::new(t_server)]) } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 0c4f1082c..7ca067866 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -127,6 +127,7 @@ mod tests { use mint::Mint; use signature::{KeyPair, KeyPairUtil}; use std::sync::{Arc, Mutex}; + use std::sync::atomic::{AtomicBool, Ordering}; #[test] fn test_accountant_stub() { @@ -135,7 +136,7 @@ mod tests { let alice = Mint::new(10_000); let acc = Accountant::new(&alice, Some(30)); let bob_pubkey = KeyPair::new().pubkey(); - let exit = Arc::new(Mutex::new(false)); + let exit = Arc::new(AtomicBool::new(false)); let acc = Arc::new(Mutex::new(AccountantSkel::new(acc))); let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); sleep(Duration::from_millis(30)); @@ -147,7 +148,7 @@ mod tests { .unwrap(); acc.wait_on_signature(&sig, &last_id).unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500); - *exit.lock().unwrap() = true; + exit.store(true, Ordering::Relaxed); for t in threads.iter() { match Arc::try_unwrap((*t).clone()) { Ok(j) => j.join().expect("join"), diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 18f4e6556..eec97db87 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -33,7 +33,7 @@ fn main() { }) .collect(); let duration = now.elapsed(); - let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64; + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let bsps = txs as f64 / ns as f64; let nsps = ns as f64 / txs as f64; println!( @@ -48,7 +48,7 @@ fn main() { assert!(tr.verify()); } let duration = now.elapsed(); - let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64; + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let bsvps = txs as f64 / ns as f64; let nspsv = ns as f64 / txs as f64; println!( @@ -68,7 +68,7 @@ fn main() { acc.wait_on_signature(&sig, &last_id).unwrap(); let duration = now.elapsed(); - let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64; + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let tps = (txs * 1_000_000_000) as f64 / ns as f64; println!("Done. {} tps!", tps); let val = acc.get_balance(&mint_pubkey).unwrap().unwrap(); diff --git a/src/bin/genesis-demo.rs b/src/bin/genesis-demo.rs index ca53378a9..c5fddfead 100644 --- a/src/bin/genesis-demo.rs +++ b/src/bin/genesis-demo.rs @@ -10,7 +10,7 @@ use silk::hash::Hash; use std::io::stdin; fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event { - Event::Transaction(Transaction::new(&from, to, tokens, last_id)) + Event::Transaction(Transaction::new(from, to, tokens, last_id)) } fn main() { diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 4fcc5ef48..b9204e7e5 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -5,6 +5,7 @@ use silk::accountant_skel::AccountantSkel; use silk::accountant::Accountant; use std::io::{self, BufRead}; use std::sync::{Arc, Mutex}; +use std::sync::atomic::AtomicBool; fn main() { let addr = "127.0.0.1:8000"; @@ -14,7 +15,7 @@ fn main() { .lines() .map(|line| serde_json::from_str(&line.unwrap()).unwrap()); let acc = Accountant::new_from_entries(entries, Some(1000)); - let exit = Arc::new(Mutex::new(false)); + let exit = Arc::new(AtomicBool::new(false)); let skel = Arc::new(Mutex::new(AccountantSkel::new(acc))); eprintln!("Listening on {}", addr); let _threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); diff --git a/src/entry.rs b/src/entry.rs index 36db0d841..56a612bb4 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -9,8 +9,8 @@ pub struct Entry { } impl Entry { - /// Creates a Entry from the number of hashes 'num_hashes' since the previous event - /// and that resulting 'id'. + /// Creates a Entry from the number of hashes `num_hashes` since the previous event + /// and that resulting `id`. pub fn new_tick(num_hashes: u64, id: &Hash) -> Self { Entry { num_hashes, @@ -19,7 +19,7 @@ impl Entry { } } - /// Verifies self.id is the result of hashing a 'start_hash' 'self.num_hashes' times. + /// Verifies self.id is the result of hashing a `start_hash` `self.num_hashes` times. /// If the event is not a Tick, then hash that as well. pub fn verify(&self, start_hash: &Hash) -> bool { for event in &self.events { @@ -31,7 +31,7 @@ impl Entry { } } -/// Creates the hash 'num_hashes' after start_hash. If the event contains +/// Creates the hash `num_hashes` after `start_hash`. If the event contains /// signature, the final hash will be a hash of both the previous ID and /// the signature. pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash { @@ -56,7 +56,7 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash { id } -/// Creates the next Entry 'num_hashes' after 'start_hash'. +/// Creates the next Entry `num_hashes` after `start_hash`. pub fn create_entry(start_hash: &Hash, cur_hashes: u64, events: Vec) -> Entry { let num_hashes = cur_hashes + if events.is_empty() { 0 } else { 1 }; let id = next_hash(start_hash, 0, &events); @@ -67,7 +67,7 @@ pub fn create_entry(start_hash: &Hash, cur_hashes: u64, events: Vec) -> E } } -/// Creates the next Tick Entry 'num_hashes' after 'start_hash'. +/// Creates the next Tick Entry `num_hashes` after `start_hash`. pub fn create_entry_mut(start_hash: &mut Hash, cur_hashes: &mut u64, events: Vec) -> Entry { let entry = create_entry(start_hash, *cur_hashes, events); *start_hash = entry.id; @@ -75,7 +75,7 @@ pub fn create_entry_mut(start_hash: &mut Hash, cur_hashes: &mut u64, events: Vec entry } -/// Creates the next Tick Entry 'num_hashes' after 'start_hash'. +/// Creates the next Tick Entry `num_hashes` after `start_hash`. pub fn next_tick(start_hash: &Hash, num_hashes: u64) -> Entry { Entry { num_hashes, diff --git a/src/event.rs b/src/event.rs index bfaa58c87..8afa18b21 100644 --- a/src/event.rs +++ b/src/event.rs @@ -35,8 +35,7 @@ impl Event { pub fn get_signature(&self) -> Option { match *self { Event::Transaction(ref tr) => Some(tr.sig), - Event::Signature { .. } => None, - Event::Timestamp { .. } => None, + Event::Signature { .. } | Event::Timestamp { .. } => None, } } diff --git a/src/historian.rs b/src/historian.rs index a6a3b94de..a35f41d83 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -32,6 +32,14 @@ impl Historian { } } + pub fn reserve_signature(&mut self, sig: &Signature) -> bool { + if self.signatures.contains(sig) { + return false; + } + self.signatures.insert(*sig); + true + } + /// A background thread that will continue tagging received Event messages and /// sending back Entry messages until either the receiver or sender channel is closed. fn create_recorder( @@ -55,14 +63,6 @@ impl Historian { } } -pub fn reserve_signature(sigs: &mut HashSet, sig: &Signature) -> bool { - if sigs.contains(sig) { - return false; - } - sigs.insert(*sig); - true -} - #[cfg(test)] mod tests { use super::*; @@ -112,10 +112,11 @@ mod tests { #[test] fn test_duplicate_event_signature() { - let mut sigs = HashSet::new(); + let zero = Hash::default(); + let mut hist = Historian::new(&zero, None); let sig = Signature::default(); - assert!(reserve_signature(&mut sigs, &sig)); - assert!(!reserve_signature(&mut sigs, &sig)); + assert!(hist.reserve_signature(&sig)); + assert!(!hist.reserve_signature(&sig)); } #[test] diff --git a/src/ledger.rs b/src/ledger.rs index bb216ff30..d858d9179 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -1,14 +1,14 @@ //! The `ledger` crate provides the foundational data structures for Proof-of-History, //! an ordered log of events in time. -/// Each entry contains three pieces of data. The 'num_hashes' field is the number -/// of hashes performed since the previous entry. The 'id' field is the result -/// of hashing 'id' from the previous entry 'num_hashes' times. The 'event' -/// field points to an Event that took place shortly after 'id' was generated. +/// Each entry contains three pieces of data. The `num_hashes` field is the number +/// of hashes performed since the previous entry. The `id` field is the result +/// of hashing `id` from the previous entry `num_hashes` times. The `event` +/// field points to an Event that took place shortly after `id` was generated. /// -/// If you divide 'num_hashes' by the amount of time it takes to generate a new hash, you +/// If you divide `num_hashes` by the amount of time it takes to generate a new hash, you /// get a duration estimate since the last event. Since processing power increases -/// over time, one should expect the duration 'num_hashes' represents to decrease proportionally. +/// over time, one should expect the duration `num_hashes` represents to decrease proportionally. /// Though processing power varies across nodes, the network gives priority to the /// fastest processor. Duration should therefore be estimated by assuming that the hash /// was generated by the fastest processor at the time the entry was recorded. @@ -24,7 +24,7 @@ pub fn verify_slice(entries: &[Entry], start_hash: &Hash) -> bool { event_pairs.all(|(x0, x1)| x1.verify(&x0.id)) } -/// Create a vector of Ticks of length 'len' from 'start_hash' hash and 'num_hashes'. +/// Create a vector of Ticks of length `len` from `start_hash` hash and `num_hashes`. pub fn next_ticks(start_hash: &Hash, num_hashes: u64, len: usize) -> Vec { let mut id = *start_hash; let mut ticks = vec![]; diff --git a/src/plan.rs b/src/plan.rs index e455218ea..f28ccec45 100644 --- a/src/plan.rs +++ b/src/plan.rs @@ -75,8 +75,9 @@ impl Plan { pub fn verify(&self, spendable_tokens: i64) -> bool { match *self { - Plan::Pay(ref payment) => payment.tokens == spendable_tokens, - Plan::After(_, ref payment) => payment.tokens == spendable_tokens, + Plan::Pay(ref payment) | Plan::After(_, ref payment) => { + payment.tokens == spendable_tokens + } Plan::Race(ref a, ref b) => { a.1.tokens == spendable_tokens && b.1.tokens == spendable_tokens } @@ -85,13 +86,13 @@ impl Plan { /// Apply a witness to the spending plan to see if the plan can be reduced. /// If so, modify the plan in-place. - pub fn apply_witness(&mut self, witness: Witness) { + pub fn apply_witness(&mut self, witness: &Witness) { let new_payment = match *self { - Plan::After(ref cond, ref payment) if cond.is_satisfied(&witness) => Some(payment), - Plan::Race((ref cond, ref payment), _) if cond.is_satisfied(&witness) => Some(payment), - Plan::Race(_, (ref cond, ref payment)) if cond.is_satisfied(&witness) => Some(payment), + Plan::After(ref cond, ref payment) if cond.is_satisfied(witness) => Some(payment), + Plan::Race((ref cond, ref payment), _) if cond.is_satisfied(witness) => Some(payment), + Plan::Race(_, (ref cond, ref payment)) if cond.is_satisfied(witness) => Some(payment), _ => None, - }.map(|x| x.clone()); + }.cloned(); if let Some(payment) = new_payment { mem::replace(self, Plan::Pay(payment)); @@ -135,7 +136,7 @@ mod tests { let to = PublicKey::default(); let mut plan = Plan::new_authorized_payment(from, 42, to); - plan.apply_witness(Witness::Signature(from)); + plan.apply_witness(&Witness::Signature(from)); assert_eq!(plan, Plan::new_payment(42, to)); } @@ -145,7 +146,7 @@ mod tests { let to = PublicKey::default(); let mut plan = Plan::new_future_payment(dt, 42, to); - plan.apply_witness(Witness::Timestamp(dt)); + plan.apply_witness(&Witness::Timestamp(dt)); assert_eq!(plan, Plan::new_payment(42, to)); } @@ -156,11 +157,11 @@ mod tests { let to = PublicKey::default(); let mut plan = Plan::new_cancelable_future_payment(dt, from, 42, to); - plan.apply_witness(Witness::Timestamp(dt)); + plan.apply_witness(&Witness::Timestamp(dt)); assert_eq!(plan, Plan::new_payment(42, to)); let mut plan = Plan::new_cancelable_future_payment(dt, from, 42, to); - plan.apply_witness(Witness::Signature(from)); + plan.apply_witness(&Witness::Signature(from)); assert_eq!(plan, Plan::new_payment(42, from)); } } diff --git a/src/recorder.rs b/src/recorder.rs index 0a5a0bc14..a590cd878 100644 --- a/src/recorder.rs +++ b/src/recorder.rs @@ -12,6 +12,7 @@ use hash::{hash, Hash}; use entry::{create_entry_mut, Entry}; use event::Event; +#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] pub enum Signal { Tick, Event(Event), diff --git a/src/streamer.rs b/src/streamer.rs index 56e5d2f90..54f631b7b 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,4 +1,5 @@ use std::sync::{Arc, Mutex, RwLock}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::fmt; use std::time::Duration; @@ -64,16 +65,16 @@ impl Packet { } pub fn set_addr(&mut self, a: &SocketAddr) { - match a { - &SocketAddr::V4(v4) => { + match *a { + SocketAddr::V4(v4) => { let ip = v4.ip().octets(); - self.addr[0] = ip[0] as u16; - self.addr[1] = ip[1] as u16; - self.addr[2] = ip[2] as u16; - self.addr[3] = ip[3] as u16; + self.addr[0] = u16::from(ip[0]); + self.addr[1] = u16::from(ip[1]); + self.addr[2] = u16::from(ip[2]); + self.addr[3] = u16::from(ip[3]); self.port = a.port(); } - &SocketAddr::V6(v6) => { + SocketAddr::V6(v6) => { self.addr = v6.ip().segments(); self.port = a.port(); self.v6 = true; @@ -82,7 +83,7 @@ impl Packet { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct PacketData { pub packets: Vec, } @@ -101,7 +102,7 @@ impl PacketData { fn run_read_from(&mut self, socket: &UdpSocket) -> Result { self.packets.resize(BLOCK_SIZE, Packet::default()); let mut i = 0; - for p in self.packets.iter_mut() { + for p in &mut self.packets { p.size = 0; match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { @@ -130,7 +131,7 @@ impl PacketData { Ok(()) } fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { - for p in self.packets.iter() { + for p in &self.packets { let a = p.get_addr(); socket.send_to(&p.data[0..p.size], &a)?; //TODO(anatoly): wtf do we do about errors? @@ -140,35 +141,35 @@ impl PacketData { } } -pub fn allocate(recycler: Recycler) -> SharedPacketData { +pub fn allocate(recycler: &Recycler) -> SharedPacketData { let mut gc = recycler.lock().expect("lock"); gc.pop() .unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new()))) } -pub fn recycle(recycler: Recycler, msgs: SharedPacketData) { +pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) { let mut gc = recycler.lock().expect("lock"); gc.push(msgs); } fn recv_loop( sock: &UdpSocket, - exit: Arc>, - recycler: Recycler, - channel: Sender, + exit: &Arc, + recycler: &Recycler, + channel: &Sender, ) -> Result<()> { loop { - let msgs = allocate(recycler.clone()); + let msgs = allocate(recycler); let msgs_ = msgs.clone(); loop { - match msgs.write().unwrap().read_from(&sock) { + match msgs.write().unwrap().read_from(sock) { Ok(()) => { channel.send(msgs_)?; break; } Err(_) => { - if *exit.lock().unwrap() { - recycle(recycler.clone(), msgs_); + if exit.load(Ordering::Relaxed) { + recycle(recycler, msgs_); return Ok(()); } } @@ -179,19 +180,19 @@ fn recv_loop( pub fn receiver( sock: UdpSocket, - exit: Arc>, + exit: Arc, recycler: Recycler, channel: Sender, ) -> Result> { let timer = Duration::new(1, 0); sock.set_read_timeout(Some(timer))?; Ok(spawn(move || { - let _ = recv_loop(&sock, exit, recycler, channel); + let _ = recv_loop(&sock, &exit, &recycler, &channel); () })) } -fn recv_send(sock: &UdpSocket, recycler: Recycler, r: &Receiver) -> Result<()> { +fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r.recv_timeout(timer)?; let msgs_ = msgs.clone(); @@ -203,12 +204,12 @@ fn recv_send(sock: &UdpSocket, recycler: Recycler, r: &Receiver) -> Result<()> { pub fn sender( sock: UdpSocket, - exit: Arc>, + exit: Arc, recycler: Recycler, r: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - if recv_send(&sock, recycler.clone(), &r).is_err() && *exit.lock().unwrap() { + if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { break; } }) @@ -228,16 +229,16 @@ mod bench { use result::Result; use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; - fn producer(addr: &SocketAddr, recycler: Recycler, exit: Arc>) -> JoinHandle<()> { + fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let msgs = allocate(recycler.clone()); + let msgs = allocate(recycler); msgs.write().unwrap().packets.resize(10, Packet::default()); for w in msgs.write().unwrap().packets.iter_mut() { w.size = PACKET_SIZE; w.set_addr(&addr); } spawn(move || loop { - if *exit.lock().unwrap() { + if exit.load(Ordering::Relaxed) { return; } let mut num = 0; @@ -247,13 +248,13 @@ mod bench { } fn sinc( - recycler: Recycler, - exit: Arc>, + recycler: &Recycler, + exit: Arc, rvs: Arc>, r: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - if *exit.lock().unwrap() { + if exit.load(Ordering::Relaxed) { return; } let timer = Duration::new(1, 0); @@ -261,7 +262,7 @@ mod bench { Ok(msgs) => { let msgs_ = msgs.clone(); *rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); - recycle(recycler.clone(), msgs_); + recycle(recycler, msgs_); } _ => (), } @@ -270,14 +271,14 @@ mod bench { fn run_streamer_bench() -> Result<()> { let read = UdpSocket::bind("127.0.0.1:0")?; let addr = read.local_addr()?; - let exit = Arc::new(Mutex::new(false)); + let exit = Arc::new(AtomicBool::new(false)); let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); - let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?; - let t_producer1 = producer(&addr, recycler.clone(), exit.clone()); - let t_producer2 = producer(&addr, recycler.clone(), exit.clone()); - let t_producer3 = producer(&addr, recycler.clone(), exit.clone()); + let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?; + let t_producer1 = producer(&addr, &recycler, exit.clone()); + let t_producer2 = producer(&addr, &recycler, exit.clone()); + let t_producer3 = producer(&addr, &recycler, exit.clone()); let rvs = Arc::new(Mutex::new(0)); let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader); @@ -291,7 +292,7 @@ mod bench { let ftime = (time as f64) / 10000000000f64; let fcount = (end_val - start_val) as f64; println!("performance: {:?}", fcount / ftime); - *exit.lock().unwrap() = true; + exit.store(true, Ordering::Relaxed); t_reader.join()?; t_producer1.join()?; t_producer2.join()?; @@ -310,6 +311,7 @@ mod test { use std::sync::{Arc, Mutex}; use std::net::UdpSocket; use std::time::Duration; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::io::Write; use std::io; @@ -339,7 +341,7 @@ mod test { let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); let (s_sender, r_sender) = channel(); let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); - let msgs = allocate(recycler.clone()); + let msgs = allocate(&recycler); msgs.write().unwrap().packets.resize(10, Packet::default()); for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { w.data[0] = i as u8; @@ -351,7 +353,7 @@ mod test { let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); - *exit.lock().unwrap() = true; + exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_sender.join().expect("join"); } @@ -364,13 +366,13 @@ mod test { let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let exit = Arc::new(Mutex::new(false)); + let exit = Arc::new(AtomicBool::new(false)); let recycler = Arc::new(Mutex::new(Vec::new())); let (s_reader, r_reader) = channel(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); let (s_sender, r_sender) = channel(); let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); - let msgs = allocate(recycler.clone()); + let msgs = allocate(&recycler); msgs.write().unwrap().packets.resize(10, Packet::default()); for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { w.data[0] = i as u8; @@ -382,7 +384,7 @@ mod test { let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); - *exit.lock().unwrap() = true; + exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_sender.join().expect("join"); }