Merge pull request #73 from garious/yes-clippy

Automated mentoring by clippy
This commit is contained in:
Greg Fitzgerald 2018-03-22 15:22:12 -06:00 committed by GitHub
commit c385f8bb6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 114 additions and 115 deletions

View File

@ -9,7 +9,7 @@ use plan::{Plan, Witness};
use transaction::Transaction; use transaction::Transaction;
use signature::{KeyPair, PublicKey, Signature}; use signature::{KeyPair, PublicKey, Signature};
use mint::Mint; use mint::Mint;
use historian::{reserve_signature, Historian}; use historian::Historian;
use recorder::Signal; use recorder::Signal;
use std::sync::mpsc::SendError; use std::sync::mpsc::SendError;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -30,13 +30,7 @@ pub type Result<T> = result::Result<T, AccountingError>;
/// Commit funds to the 'to' party. /// Commit funds to the 'to' party.
fn complete_transaction(balances: &mut HashMap<PublicKey, i64>, plan: &Plan) { fn complete_transaction(balances: &mut HashMap<PublicKey, i64>, plan: &Plan) {
if let Plan::Pay(ref payment) = *plan { if let Plan::Pay(ref payment) = *plan {
if balances.contains_key(&payment.to) { *balances.entry(payment.to).or_insert(0) += payment.tokens;
if let Some(x) = balances.get_mut(&payment.to) {
*x += payment.tokens;
}
} else {
balances.insert(payment.to, payment.tokens);
}
} }
} }
@ -122,7 +116,7 @@ impl Accountant {
tr: &Transaction, tr: &Transaction,
allow_deposits: bool, allow_deposits: bool,
) -> Result<()> { ) -> Result<()> {
if !reserve_signature(&mut self.historian.signatures, &tr.sig) { if !self.historian.reserve_signature(&tr.sig) {
return Err(AccountingError::InvalidTransferSignature); return Err(AccountingError::InvalidTransferSignature);
} }
@ -133,7 +127,7 @@ impl Accountant {
} }
let mut plan = tr.plan.clone(); let mut plan = tr.plan.clone();
plan.apply_witness(Witness::Timestamp(self.last_time)); plan.apply_witness(&Witness::Timestamp(self.last_time));
if plan.is_complete() { if plan.is_complete() {
complete_transaction(&mut self.balances, &plan); complete_transaction(&mut self.balances, &plan);
@ -146,7 +140,7 @@ impl Accountant {
fn process_verified_sig(&mut self, from: PublicKey, tx_sig: Signature) -> Result<()> { fn process_verified_sig(&mut self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self.pending.entry(tx_sig) { if let Occupied(mut e) = self.pending.entry(tx_sig) {
e.get_mut().apply_witness(Witness::Signature(from)); e.get_mut().apply_witness(&Witness::Signature(from));
if e.get().is_complete() { if e.get().is_complete() {
complete_transaction(&mut self.balances, e.get()); complete_transaction(&mut self.balances, e.get());
e.remove_entry(); e.remove_entry();
@ -174,9 +168,9 @@ impl Accountant {
// Check to see if any timelocked transactions can be completed. // Check to see if any timelocked transactions can be completed.
let mut completed = vec![]; let mut completed = vec![];
for (key, plan) in &mut self.pending { for (key, plan) in &mut self.pending {
plan.apply_witness(Witness::Timestamp(self.last_time)); plan.apply_witness(&Witness::Timestamp(self.last_time));
if plan.is_complete() { if plan.is_complete() {
complete_transaction(&mut self.balances, &plan); complete_transaction(&mut self.balances, plan);
completed.push(key.clone()); completed.push(key.clone());
} }
} }
@ -222,7 +216,7 @@ impl Accountant {
} }
pub fn get_balance(self: &Self, pubkey: &PublicKey) -> Option<i64> { pub fn get_balance(self: &Self, pubkey: &PublicKey) -> Option<i64> {
self.balances.get(pubkey).map(|x| *x) self.balances.get(pubkey).cloned()
} }
} }

View File

@ -9,6 +9,7 @@ use result::Result;
use streamer; use streamer;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::default::Default; use std::default::Default;
@ -20,6 +21,7 @@ pub struct AccountantSkel {
pub ledger: Vec<Entry>, pub ledger: Vec<Entry>,
} }
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub enum Request { pub enum Request {
Transaction(Transaction), Transaction(Transaction),
@ -91,13 +93,13 @@ impl AccountantSkel {
&mut self, &mut self,
r_reader: &streamer::Receiver, r_reader: &streamer::Receiver,
s_sender: &streamer::Sender, s_sender: &streamer::Sender,
recycler: streamer::Recycler, recycler: &streamer::Recycler,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let msgs = r_reader.recv_timeout(timer)?; let msgs = r_reader.recv_timeout(timer)?;
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
let msgs__ = msgs.clone(); let msgs__ = msgs.clone();
let rsps = streamer::allocate(recycler.clone()); let rsps = streamer::allocate(recycler);
let rsps_ = rsps.clone(); let rsps_ = rsps.clone();
let l = msgs__.read().unwrap().packets.len(); let l = msgs__.read().unwrap().packets.len();
rsps.write() rsps.write()
@ -107,11 +109,11 @@ impl AccountantSkel {
{ {
let mut num = 0; let mut num = 0;
let mut ursps = rsps.write().unwrap(); let mut ursps = rsps.write().unwrap();
for packet in msgs.read().unwrap().packets.iter() { for packet in &msgs.read().unwrap().packets {
let sz = packet.size; let sz = packet.size;
let req = deserialize(&packet.data[0..sz])?; let req = deserialize(&packet.data[0..sz])?;
if let Some(resp) = self.process_request(req) { if let Some(resp) = self.process_request(req) {
let rsp = ursps.packets.get_mut(num).unwrap(); let rsp = &mut ursps.packets[num];
let v = serialize(&resp)?; let v = serialize(&resp)?;
let len = v.len(); let len = v.len();
rsp.data[0..len].copy_from_slice(&v); rsp.data[0..len].copy_from_slice(&v);
@ -131,7 +133,7 @@ impl AccountantSkel {
pub fn serve( pub fn serve(
obj: Arc<Mutex<AccountantSkel>>, obj: Arc<Mutex<AccountantSkel>>,
addr: &str, addr: &str,
exit: Arc<Mutex<bool>>, exit: Arc<AtomicBool>,
) -> Result<[Arc<JoinHandle<()>>; 3]> { ) -> Result<[Arc<JoinHandle<()>>; 3]> {
let read = UdpSocket::bind(addr)?; let read = UdpSocket::bind(addr)?;
// make sure we are on the same interface // make sure we are on the same interface
@ -147,17 +149,14 @@ impl AccountantSkel {
let t_sender = streamer::sender(write, exit.clone(), recycler.clone(), r_sender); let t_sender = streamer::sender(write, exit.clone(), recycler.clone(), r_sender);
let t_server = spawn(move || { let t_server = spawn(move || {
match Arc::try_unwrap(obj) { if let Ok(me) = Arc::try_unwrap(obj) {
Ok(me) => loop { loop {
let e = me.lock() let e = me.lock().unwrap().process(&r_reader, &s_sender, &recycler);
.unwrap() if e.is_err() && exit.load(Ordering::Relaxed) {
.process(&r_reader, &s_sender, recycler.clone());
if e.is_err() && *exit.lock().unwrap() {
break; break;
} }
}, }
_ => (), }
};
}); });
Ok([Arc::new(t_receiver), Arc::new(t_sender), Arc::new(t_server)]) Ok([Arc::new(t_receiver), Arc::new(t_sender), Arc::new(t_server)])
} }

View File

@ -127,6 +127,7 @@ mod tests {
use mint::Mint; use mint::Mint;
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
#[test] #[test]
fn test_accountant_stub() { fn test_accountant_stub() {
@ -135,7 +136,7 @@ mod tests {
let alice = Mint::new(10_000); let alice = Mint::new(10_000);
let acc = Accountant::new(&alice, Some(30)); let acc = Accountant::new(&alice, Some(30));
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(Mutex::new(false)); let exit = Arc::new(AtomicBool::new(false));
let acc = Arc::new(Mutex::new(AccountantSkel::new(acc))); let acc = Arc::new(Mutex::new(AccountantSkel::new(acc)));
let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); let threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap();
sleep(Duration::from_millis(30)); sleep(Duration::from_millis(30));
@ -147,7 +148,7 @@ mod tests {
.unwrap(); .unwrap();
acc.wait_on_signature(&sig, &last_id).unwrap(); acc.wait_on_signature(&sig, &last_id).unwrap();
assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500); assert_eq!(acc.get_balance(&bob_pubkey).unwrap().unwrap(), 500);
*exit.lock().unwrap() = true; exit.store(true, Ordering::Relaxed);
for t in threads.iter() { for t in threads.iter() {
match Arc::try_unwrap((*t).clone()) { match Arc::try_unwrap((*t).clone()) {
Ok(j) => j.join().expect("join"), Ok(j) => j.join().expect("join"),

View File

@ -33,7 +33,7 @@ fn main() {
}) })
.collect(); .collect();
let duration = now.elapsed(); let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64; let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = txs as f64 / ns as f64; let bsps = txs as f64 / ns as f64;
let nsps = ns as f64 / txs as f64; let nsps = ns as f64 / txs as f64;
println!( println!(
@ -48,7 +48,7 @@ fn main() {
assert!(tr.verify()); assert!(tr.verify());
} }
let duration = now.elapsed(); let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64; let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsvps = txs as f64 / ns as f64; let bsvps = txs as f64 / ns as f64;
let nspsv = ns as f64 / txs as f64; let nspsv = ns as f64 / txs as f64;
println!( println!(
@ -68,7 +68,7 @@ fn main() {
acc.wait_on_signature(&sig, &last_id).unwrap(); acc.wait_on_signature(&sig, &last_id).unwrap();
let duration = now.elapsed(); let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64; let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (txs * 1_000_000_000) as f64 / ns as f64; let tps = (txs * 1_000_000_000) as f64 / ns as f64;
println!("Done. {} tps!", tps); println!("Done. {} tps!", tps);
let val = acc.get_balance(&mint_pubkey).unwrap().unwrap(); let val = acc.get_balance(&mint_pubkey).unwrap().unwrap();

View File

@ -10,7 +10,7 @@ use silk::hash::Hash;
use std::io::stdin; use std::io::stdin;
fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event { fn transfer(from: &KeyPair, (to, tokens): (PublicKey, i64), last_id: Hash) -> Event {
Event::Transaction(Transaction::new(&from, to, tokens, last_id)) Event::Transaction(Transaction::new(from, to, tokens, last_id))
} }
fn main() { fn main() {

View File

@ -5,6 +5,7 @@ use silk::accountant_skel::AccountantSkel;
use silk::accountant::Accountant; use silk::accountant::Accountant;
use std::io::{self, BufRead}; use std::io::{self, BufRead};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::atomic::AtomicBool;
fn main() { fn main() {
let addr = "127.0.0.1:8000"; let addr = "127.0.0.1:8000";
@ -14,7 +15,7 @@ fn main() {
.lines() .lines()
.map(|line| serde_json::from_str(&line.unwrap()).unwrap()); .map(|line| serde_json::from_str(&line.unwrap()).unwrap());
let acc = Accountant::new_from_entries(entries, Some(1000)); let acc = Accountant::new_from_entries(entries, Some(1000));
let exit = Arc::new(Mutex::new(false)); let exit = Arc::new(AtomicBool::new(false));
let skel = Arc::new(Mutex::new(AccountantSkel::new(acc))); let skel = Arc::new(Mutex::new(AccountantSkel::new(acc)));
eprintln!("Listening on {}", addr); eprintln!("Listening on {}", addr);
let _threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); let _threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap();

View File

@ -9,8 +9,8 @@ pub struct Entry {
} }
impl Entry { impl Entry {
/// Creates a Entry from the number of hashes 'num_hashes' since the previous event /// Creates a Entry from the number of hashes `num_hashes` since the previous event
/// and that resulting 'id'. /// and that resulting `id`.
pub fn new_tick(num_hashes: u64, id: &Hash) -> Self { pub fn new_tick(num_hashes: u64, id: &Hash) -> Self {
Entry { Entry {
num_hashes, num_hashes,
@ -19,7 +19,7 @@ impl Entry {
} }
} }
/// Verifies self.id is the result of hashing a 'start_hash' 'self.num_hashes' times. /// Verifies self.id is the result of hashing a `start_hash` `self.num_hashes` times.
/// If the event is not a Tick, then hash that as well. /// If the event is not a Tick, then hash that as well.
pub fn verify(&self, start_hash: &Hash) -> bool { pub fn verify(&self, start_hash: &Hash) -> bool {
for event in &self.events { for event in &self.events {
@ -31,7 +31,7 @@ impl Entry {
} }
} }
/// Creates the hash 'num_hashes' after start_hash. If the event contains /// Creates the hash `num_hashes` after `start_hash`. If the event contains
/// signature, the final hash will be a hash of both the previous ID and /// signature, the final hash will be a hash of both the previous ID and
/// the signature. /// the signature.
pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash { pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash {
@ -56,7 +56,7 @@ pub fn next_hash(start_hash: &Hash, num_hashes: u64, events: &[Event]) -> Hash {
id id
} }
/// Creates the next Entry 'num_hashes' after 'start_hash'. /// Creates the next Entry `num_hashes` after `start_hash`.
pub fn create_entry(start_hash: &Hash, cur_hashes: u64, events: Vec<Event>) -> Entry { pub fn create_entry(start_hash: &Hash, cur_hashes: u64, events: Vec<Event>) -> Entry {
let num_hashes = cur_hashes + if events.is_empty() { 0 } else { 1 }; let num_hashes = cur_hashes + if events.is_empty() { 0 } else { 1 };
let id = next_hash(start_hash, 0, &events); let id = next_hash(start_hash, 0, &events);
@ -67,7 +67,7 @@ pub fn create_entry(start_hash: &Hash, cur_hashes: u64, events: Vec<Event>) -> E
} }
} }
/// Creates the next Tick Entry 'num_hashes' after 'start_hash'. /// Creates the next Tick Entry `num_hashes` after `start_hash`.
pub fn create_entry_mut(start_hash: &mut Hash, cur_hashes: &mut u64, events: Vec<Event>) -> Entry { pub fn create_entry_mut(start_hash: &mut Hash, cur_hashes: &mut u64, events: Vec<Event>) -> Entry {
let entry = create_entry(start_hash, *cur_hashes, events); let entry = create_entry(start_hash, *cur_hashes, events);
*start_hash = entry.id; *start_hash = entry.id;
@ -75,7 +75,7 @@ pub fn create_entry_mut(start_hash: &mut Hash, cur_hashes: &mut u64, events: Vec
entry entry
} }
/// Creates the next Tick Entry 'num_hashes' after 'start_hash'. /// Creates the next Tick Entry `num_hashes` after `start_hash`.
pub fn next_tick(start_hash: &Hash, num_hashes: u64) -> Entry { pub fn next_tick(start_hash: &Hash, num_hashes: u64) -> Entry {
Entry { Entry {
num_hashes, num_hashes,

View File

@ -35,8 +35,7 @@ impl Event {
pub fn get_signature(&self) -> Option<Signature> { pub fn get_signature(&self) -> Option<Signature> {
match *self { match *self {
Event::Transaction(ref tr) => Some(tr.sig), Event::Transaction(ref tr) => Some(tr.sig),
Event::Signature { .. } => None, Event::Signature { .. } | Event::Timestamp { .. } => None,
Event::Timestamp { .. } => None,
} }
} }

View File

@ -32,6 +32,14 @@ impl Historian {
} }
} }
pub fn reserve_signature(&mut self, sig: &Signature) -> bool {
if self.signatures.contains(sig) {
return false;
}
self.signatures.insert(*sig);
true
}
/// A background thread that will continue tagging received Event messages and /// A background thread that will continue tagging received Event messages and
/// sending back Entry messages until either the receiver or sender channel is closed. /// sending back Entry messages until either the receiver or sender channel is closed.
fn create_recorder( fn create_recorder(
@ -55,14 +63,6 @@ impl Historian {
} }
} }
pub fn reserve_signature(sigs: &mut HashSet<Signature>, sig: &Signature) -> bool {
if sigs.contains(sig) {
return false;
}
sigs.insert(*sig);
true
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -112,10 +112,11 @@ mod tests {
#[test] #[test]
fn test_duplicate_event_signature() { fn test_duplicate_event_signature() {
let mut sigs = HashSet::new(); let zero = Hash::default();
let mut hist = Historian::new(&zero, None);
let sig = Signature::default(); let sig = Signature::default();
assert!(reserve_signature(&mut sigs, &sig)); assert!(hist.reserve_signature(&sig));
assert!(!reserve_signature(&mut sigs, &sig)); assert!(!hist.reserve_signature(&sig));
} }
#[test] #[test]

View File

@ -1,14 +1,14 @@
//! The `ledger` crate provides the foundational data structures for Proof-of-History, //! The `ledger` crate provides the foundational data structures for Proof-of-History,
//! an ordered log of events in time. //! an ordered log of events in time.
/// Each entry contains three pieces of data. The 'num_hashes' field is the number /// Each entry contains three pieces of data. The `num_hashes` field is the number
/// of hashes performed since the previous entry. The 'id' field is the result /// of hashes performed since the previous entry. The `id` field is the result
/// of hashing 'id' from the previous entry 'num_hashes' times. The 'event' /// of hashing `id` from the previous entry `num_hashes` times. The `event`
/// field points to an Event that took place shortly after 'id' was generated. /// field points to an Event that took place shortly after `id` was generated.
/// ///
/// If you divide 'num_hashes' by the amount of time it takes to generate a new hash, you /// If you divide `num_hashes` by the amount of time it takes to generate a new hash, you
/// get a duration estimate since the last event. Since processing power increases /// get a duration estimate since the last event. Since processing power increases
/// over time, one should expect the duration 'num_hashes' represents to decrease proportionally. /// over time, one should expect the duration `num_hashes` represents to decrease proportionally.
/// Though processing power varies across nodes, the network gives priority to the /// Though processing power varies across nodes, the network gives priority to the
/// fastest processor. Duration should therefore be estimated by assuming that the hash /// fastest processor. Duration should therefore be estimated by assuming that the hash
/// was generated by the fastest processor at the time the entry was recorded. /// was generated by the fastest processor at the time the entry was recorded.
@ -24,7 +24,7 @@ pub fn verify_slice(entries: &[Entry], start_hash: &Hash) -> bool {
event_pairs.all(|(x0, x1)| x1.verify(&x0.id)) event_pairs.all(|(x0, x1)| x1.verify(&x0.id))
} }
/// Create a vector of Ticks of length 'len' from 'start_hash' hash and 'num_hashes'. /// Create a vector of Ticks of length `len` from `start_hash` hash and `num_hashes`.
pub fn next_ticks(start_hash: &Hash, num_hashes: u64, len: usize) -> Vec<Entry> { pub fn next_ticks(start_hash: &Hash, num_hashes: u64, len: usize) -> Vec<Entry> {
let mut id = *start_hash; let mut id = *start_hash;
let mut ticks = vec![]; let mut ticks = vec![];

View File

@ -75,8 +75,9 @@ impl Plan {
pub fn verify(&self, spendable_tokens: i64) -> bool { pub fn verify(&self, spendable_tokens: i64) -> bool {
match *self { match *self {
Plan::Pay(ref payment) => payment.tokens == spendable_tokens, Plan::Pay(ref payment) | Plan::After(_, ref payment) => {
Plan::After(_, ref payment) => payment.tokens == spendable_tokens, payment.tokens == spendable_tokens
}
Plan::Race(ref a, ref b) => { Plan::Race(ref a, ref b) => {
a.1.tokens == spendable_tokens && b.1.tokens == spendable_tokens a.1.tokens == spendable_tokens && b.1.tokens == spendable_tokens
} }
@ -85,13 +86,13 @@ impl Plan {
/// Apply a witness to the spending plan to see if the plan can be reduced. /// Apply a witness to the spending plan to see if the plan can be reduced.
/// If so, modify the plan in-place. /// If so, modify the plan in-place.
pub fn apply_witness(&mut self, witness: Witness) { pub fn apply_witness(&mut self, witness: &Witness) {
let new_payment = match *self { let new_payment = match *self {
Plan::After(ref cond, ref payment) if cond.is_satisfied(&witness) => Some(payment), Plan::After(ref cond, ref payment) if cond.is_satisfied(witness) => Some(payment),
Plan::Race((ref cond, ref payment), _) if cond.is_satisfied(&witness) => Some(payment), Plan::Race((ref cond, ref payment), _) if cond.is_satisfied(witness) => Some(payment),
Plan::Race(_, (ref cond, ref payment)) if cond.is_satisfied(&witness) => Some(payment), Plan::Race(_, (ref cond, ref payment)) if cond.is_satisfied(witness) => Some(payment),
_ => None, _ => None,
}.map(|x| x.clone()); }.cloned();
if let Some(payment) = new_payment { if let Some(payment) = new_payment {
mem::replace(self, Plan::Pay(payment)); mem::replace(self, Plan::Pay(payment));
@ -135,7 +136,7 @@ mod tests {
let to = PublicKey::default(); let to = PublicKey::default();
let mut plan = Plan::new_authorized_payment(from, 42, to); let mut plan = Plan::new_authorized_payment(from, 42, to);
plan.apply_witness(Witness::Signature(from)); plan.apply_witness(&Witness::Signature(from));
assert_eq!(plan, Plan::new_payment(42, to)); assert_eq!(plan, Plan::new_payment(42, to));
} }
@ -145,7 +146,7 @@ mod tests {
let to = PublicKey::default(); let to = PublicKey::default();
let mut plan = Plan::new_future_payment(dt, 42, to); let mut plan = Plan::new_future_payment(dt, 42, to);
plan.apply_witness(Witness::Timestamp(dt)); plan.apply_witness(&Witness::Timestamp(dt));
assert_eq!(plan, Plan::new_payment(42, to)); assert_eq!(plan, Plan::new_payment(42, to));
} }
@ -156,11 +157,11 @@ mod tests {
let to = PublicKey::default(); let to = PublicKey::default();
let mut plan = Plan::new_cancelable_future_payment(dt, from, 42, to); let mut plan = Plan::new_cancelable_future_payment(dt, from, 42, to);
plan.apply_witness(Witness::Timestamp(dt)); plan.apply_witness(&Witness::Timestamp(dt));
assert_eq!(plan, Plan::new_payment(42, to)); assert_eq!(plan, Plan::new_payment(42, to));
let mut plan = Plan::new_cancelable_future_payment(dt, from, 42, to); let mut plan = Plan::new_cancelable_future_payment(dt, from, 42, to);
plan.apply_witness(Witness::Signature(from)); plan.apply_witness(&Witness::Signature(from));
assert_eq!(plan, Plan::new_payment(42, from)); assert_eq!(plan, Plan::new_payment(42, from));
} }
} }

View File

@ -12,6 +12,7 @@ use hash::{hash, Hash};
use entry::{create_entry_mut, Entry}; use entry::{create_entry_mut, Entry};
use event::Event; use event::Event;
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum Signal { pub enum Signal {
Tick, Tick,
Event(Event), Event(Event),

View File

@ -1,4 +1,5 @@
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc; use std::sync::mpsc;
use std::fmt; use std::fmt;
use std::time::Duration; use std::time::Duration;
@ -64,16 +65,16 @@ impl Packet {
} }
pub fn set_addr(&mut self, a: &SocketAddr) { pub fn set_addr(&mut self, a: &SocketAddr) {
match a { match *a {
&SocketAddr::V4(v4) => { SocketAddr::V4(v4) => {
let ip = v4.ip().octets(); let ip = v4.ip().octets();
self.addr[0] = ip[0] as u16; self.addr[0] = u16::from(ip[0]);
self.addr[1] = ip[1] as u16; self.addr[1] = u16::from(ip[1]);
self.addr[2] = ip[2] as u16; self.addr[2] = u16::from(ip[2]);
self.addr[3] = ip[3] as u16; self.addr[3] = u16::from(ip[3]);
self.port = a.port(); self.port = a.port();
} }
&SocketAddr::V6(v6) => { SocketAddr::V6(v6) => {
self.addr = v6.ip().segments(); self.addr = v6.ip().segments();
self.port = a.port(); self.port = a.port();
self.v6 = true; self.v6 = true;
@ -82,7 +83,7 @@ impl Packet {
} }
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug, Default)]
pub struct PacketData { pub struct PacketData {
pub packets: Vec<Packet>, pub packets: Vec<Packet>,
} }
@ -101,7 +102,7 @@ impl PacketData {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> { fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
self.packets.resize(BLOCK_SIZE, Packet::default()); self.packets.resize(BLOCK_SIZE, Packet::default());
let mut i = 0; let mut i = 0;
for p in self.packets.iter_mut() { for p in &mut self.packets {
p.size = 0; p.size = 0;
match socket.recv_from(&mut p.data) { match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => { Err(_) if i > 0 => {
@ -130,7 +131,7 @@ impl PacketData {
Ok(()) Ok(())
} }
fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> {
for p in self.packets.iter() { for p in &self.packets {
let a = p.get_addr(); let a = p.get_addr();
socket.send_to(&p.data[0..p.size], &a)?; socket.send_to(&p.data[0..p.size], &a)?;
//TODO(anatoly): wtf do we do about errors? //TODO(anatoly): wtf do we do about errors?
@ -140,35 +141,35 @@ impl PacketData {
} }
} }
pub fn allocate(recycler: Recycler) -> SharedPacketData { pub fn allocate(recycler: &Recycler) -> SharedPacketData {
let mut gc = recycler.lock().expect("lock"); let mut gc = recycler.lock().expect("lock");
gc.pop() gc.pop()
.unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new()))) .unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new())))
} }
pub fn recycle(recycler: Recycler, msgs: SharedPacketData) { pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) {
let mut gc = recycler.lock().expect("lock"); let mut gc = recycler.lock().expect("lock");
gc.push(msgs); gc.push(msgs);
} }
fn recv_loop( fn recv_loop(
sock: &UdpSocket, sock: &UdpSocket,
exit: Arc<Mutex<bool>>, exit: &Arc<AtomicBool>,
recycler: Recycler, recycler: &Recycler,
channel: Sender, channel: &Sender,
) -> Result<()> { ) -> Result<()> {
loop { loop {
let msgs = allocate(recycler.clone()); let msgs = allocate(recycler);
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
loop { loop {
match msgs.write().unwrap().read_from(&sock) { match msgs.write().unwrap().read_from(sock) {
Ok(()) => { Ok(()) => {
channel.send(msgs_)?; channel.send(msgs_)?;
break; break;
} }
Err(_) => { Err(_) => {
if *exit.lock().unwrap() { if exit.load(Ordering::Relaxed) {
recycle(recycler.clone(), msgs_); recycle(recycler, msgs_);
return Ok(()); return Ok(());
} }
} }
@ -179,19 +180,19 @@ fn recv_loop(
pub fn receiver( pub fn receiver(
sock: UdpSocket, sock: UdpSocket,
exit: Arc<Mutex<bool>>, exit: Arc<AtomicBool>,
recycler: Recycler, recycler: Recycler,
channel: Sender, channel: Sender,
) -> Result<JoinHandle<()>> { ) -> Result<JoinHandle<()>> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?; sock.set_read_timeout(Some(timer))?;
Ok(spawn(move || { Ok(spawn(move || {
let _ = recv_loop(&sock, exit, recycler, channel); let _ = recv_loop(&sock, &exit, &recycler, &channel);
() ()
})) }))
} }
fn recv_send(sock: &UdpSocket, recycler: Recycler, r: &Receiver) -> Result<()> { fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let msgs = r.recv_timeout(timer)?; let msgs = r.recv_timeout(timer)?;
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
@ -203,12 +204,12 @@ fn recv_send(sock: &UdpSocket, recycler: Recycler, r: &Receiver) -> Result<()> {
pub fn sender( pub fn sender(
sock: UdpSocket, sock: UdpSocket,
exit: Arc<Mutex<bool>>, exit: Arc<AtomicBool>,
recycler: Recycler, recycler: Recycler,
r: Receiver, r: Receiver,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
if recv_send(&sock, recycler.clone(), &r).is_err() && *exit.lock().unwrap() { if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
break; break;
} }
}) })
@ -228,16 +229,16 @@ mod bench {
use result::Result; use result::Result;
use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE}; use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE};
fn producer(addr: &SocketAddr, recycler: Recycler, exit: Arc<Mutex<bool>>) -> JoinHandle<()> { fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap(); let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let msgs = allocate(recycler.clone()); let msgs = allocate(recycler);
msgs.write().unwrap().packets.resize(10, Packet::default()); msgs.write().unwrap().packets.resize(10, Packet::default());
for w in msgs.write().unwrap().packets.iter_mut() { for w in msgs.write().unwrap().packets.iter_mut() {
w.size = PACKET_SIZE; w.size = PACKET_SIZE;
w.set_addr(&addr); w.set_addr(&addr);
} }
spawn(move || loop { spawn(move || loop {
if *exit.lock().unwrap() { if exit.load(Ordering::Relaxed) {
return; return;
} }
let mut num = 0; let mut num = 0;
@ -247,13 +248,13 @@ mod bench {
} }
fn sinc( fn sinc(
recycler: Recycler, recycler: &Recycler,
exit: Arc<Mutex<bool>>, exit: Arc<AtomicBool>,
rvs: Arc<Mutex<usize>>, rvs: Arc<Mutex<usize>>,
r: Receiver, r: Receiver,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
if *exit.lock().unwrap() { if exit.load(Ordering::Relaxed) {
return; return;
} }
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
@ -261,7 +262,7 @@ mod bench {
Ok(msgs) => { Ok(msgs) => {
let msgs_ = msgs.clone(); let msgs_ = msgs.clone();
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); *rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
recycle(recycler.clone(), msgs_); recycle(recycler, msgs_);
} }
_ => (), _ => (),
} }
@ -270,14 +271,14 @@ mod bench {
fn run_streamer_bench() -> Result<()> { fn run_streamer_bench() -> Result<()> {
let read = UdpSocket::bind("127.0.0.1:0")?; let read = UdpSocket::bind("127.0.0.1:0")?;
let addr = read.local_addr()?; let addr = read.local_addr()?;
let exit = Arc::new(Mutex::new(false)); let exit = Arc::new(AtomicBool::new(false));
let recycler = Arc::new(Mutex::new(Vec::new())); let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?; let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?;
let t_producer1 = producer(&addr, recycler.clone(), exit.clone()); let t_producer1 = producer(&addr, &recycler, exit.clone());
let t_producer2 = producer(&addr, recycler.clone(), exit.clone()); let t_producer2 = producer(&addr, &recycler, exit.clone());
let t_producer3 = producer(&addr, recycler.clone(), exit.clone()); let t_producer3 = producer(&addr, &recycler, exit.clone());
let rvs = Arc::new(Mutex::new(0)); let rvs = Arc::new(Mutex::new(0));
let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader); let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader);
@ -291,7 +292,7 @@ mod bench {
let ftime = (time as f64) / 10000000000f64; let ftime = (time as f64) / 10000000000f64;
let fcount = (end_val - start_val) as f64; let fcount = (end_val - start_val) as f64;
println!("performance: {:?}", fcount / ftime); println!("performance: {:?}", fcount / ftime);
*exit.lock().unwrap() = true; exit.store(true, Ordering::Relaxed);
t_reader.join()?; t_reader.join()?;
t_producer1.join()?; t_producer1.join()?;
t_producer2.join()?; t_producer2.join()?;
@ -310,6 +311,7 @@ mod test {
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::net::UdpSocket; use std::net::UdpSocket;
use std::time::Duration; use std::time::Duration;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::io::Write; use std::io::Write;
use std::io; use std::io;
@ -339,7 +341,7 @@ mod test {
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
let (s_sender, r_sender) = channel(); let (s_sender, r_sender) = channel();
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
let msgs = allocate(recycler.clone()); let msgs = allocate(&recycler);
msgs.write().unwrap().packets.resize(10, Packet::default()); msgs.write().unwrap().packets.resize(10, Packet::default());
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
w.data[0] = i as u8; w.data[0] = i as u8;
@ -351,7 +353,7 @@ mod test {
let mut num = 0; let mut num = 0;
get_msgs(r_reader, &mut num); get_msgs(r_reader, &mut num);
assert_eq!(num, 10); assert_eq!(num, 10);
*exit.lock().unwrap() = true; exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join"); t_receiver.join().expect("join");
t_sender.join().expect("join"); t_sender.join().expect("join");
} }
@ -364,13 +366,13 @@ mod test {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap(); let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(Mutex::new(false)); let exit = Arc::new(AtomicBool::new(false));
let recycler = Arc::new(Mutex::new(Vec::new())); let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
let (s_sender, r_sender) = channel(); let (s_sender, r_sender) = channel();
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender); let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
let msgs = allocate(recycler.clone()); let msgs = allocate(&recycler);
msgs.write().unwrap().packets.resize(10, Packet::default()); msgs.write().unwrap().packets.resize(10, Packet::default());
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
w.data[0] = i as u8; w.data[0] = i as u8;
@ -382,7 +384,7 @@ mod test {
let mut num = 0; let mut num = 0;
get_msgs(r_reader, &mut num); get_msgs(r_reader, &mut num);
assert_eq!(num, 10); assert_eq!(num, 10);
*exit.lock().unwrap() = true; exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join"); t_receiver.join().expect("join");
t_sender.join().expect("join"); t_sender.join().expect("join");
} }