From 250830ade95013759b243ab3e16f495db3d60768 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Fri, 11 May 2018 11:38:52 -0700 Subject: [PATCH] cargo fmt run --- src/accountant.rs | 74 +++++++++++++++++++++++++++++++++++---------- src/bin/testnode.rs | 2 +- src/crdt.rs | 44 +++++++++++++++++++++------ src/ecdsa.rs | 8 +++-- src/erasure.rs | 17 +++++++++-- src/event.rs | 5 ++- src/hash.rs | 2 +- src/historian.rs | 7 +++-- src/mint.rs | 9 ++++-- src/result.rs | 2 +- src/signature.rs | 8 +++-- src/streamer.rs | 12 ++++++-- src/thin_client.rs | 11 +++++-- src/timing.rs | 4 ++- src/tpu.rs | 25 +++++++++++---- 15 files changed, 174 insertions(+), 56 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index cdc8d92bb7..f1e678ca2d 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -16,8 +16,8 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; use std::result; -use std::sync::RwLock; use std::sync::atomic::{AtomicIsize, Ordering}; +use std::sync::RwLock; use transaction::Transaction; pub const MAX_ENTRY_IDS: usize = 1024 * 4; @@ -34,7 +34,11 @@ pub type Result = result::Result; /// Commit funds to the 'to' party. fn apply_payment(balances: &RwLock>, payment: &Payment) { // First we check balances with a read lock to maximize potential parallelization. - if balances.read().expect("'balances' read lock in apply_payment").contains_key(&payment.to) { + if balances + .read() + .expect("'balances' read lock in apply_payment") + .contains_key(&payment.to) + { let bals = balances.read().expect("'balances' read lock"); bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); } else { @@ -90,15 +94,25 @@ impl Accountant { } fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> bool { - if signatures.read().expect("'signatures' read lock").contains(sig) { + if signatures + .read() + .expect("'signatures' read lock") + .contains(sig) + { return false; } - signatures.write().expect("'signatures' write lock").insert(*sig); + signatures + .write() + .expect("'signatures' write lock") + .insert(*sig); true } fn forget_signature(signatures: &RwLock>, sig: &Signature) -> bool { - signatures.write().expect("'signatures' write lock in forget_signature").remove(sig) + signatures + .write() + .expect("'signatures' write lock in forget_signature") + .remove(sig) } fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool { @@ -132,7 +146,9 @@ impl Accountant { /// the oldest ones once its internal cache is full. Once boot, the /// accountant will reject transactions using that `last_id`. pub fn register_entry_id(&self, last_id: &Hash) { - let mut last_ids = self.last_ids.write().expect("'last_ids' write lock in register_entry_id"); + let mut last_ids = self.last_ids + .write() + .expect("'last_ids' write lock in register_entry_id"); if last_ids.len() >= MAX_ENTRY_IDS { last_ids.pop_front(); } @@ -142,7 +158,9 @@ impl Accountant { /// Deduct tokens from the 'from' address the account has sufficient /// funds and isn't a duplicate. pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> { - let bals = self.balances.read().expect("'balances' read lock in process_verified_transaction_debits"); + let bals = self.balances + .read() + .expect("'balances' read lock in process_verified_transaction_debits"); let option = bals.get(&tr.from); if option.is_none() { @@ -178,13 +196,16 @@ impl Accountant { pub fn process_verified_transaction_credits(&self, tr: &Transaction) { let mut plan = tr.data.plan.clone(); - plan.apply_witness(&Witness::Timestamp(*self.last_time.read() + plan.apply_witness(&Witness::Timestamp(*self.last_time + .read() .expect("timestamp creation in process_verified_transaction_credits"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); } else { - let mut pending = self.pending.write().expect("'pending' write lock in process_verified_transaction_credits"); + let mut pending = self.pending + .write() + .expect("'pending' write lock in process_verified_transaction_credits"); pending.insert(tr.sig, plan); } } @@ -253,7 +274,11 @@ impl Accountant { /// Process a Witness Signature that has already been verified. fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { - if let Occupied(mut e) = self.pending.write().expect("write() in process_verified_sig").entry(tx_sig) { + if let Occupied(mut e) = self.pending + .write() + .expect("write() in process_verified_sig") + .entry(tx_sig) + { e.get_mut().apply_witness(&Witness::Signature(from)); if let Some(ref payment) = e.get().final_payment() { apply_payment(&self.balances, payment); @@ -268,11 +293,22 @@ impl Accountant { fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime) -> Result<()> { // If this is the first timestamp we've seen, it probably came from the genesis block, // so we'll trust it. - if *self.last_time.read().expect("'last_time' read lock on first timestamp check") == Utc.timestamp(0, 0) { - self.time_sources.write().expect("'time_sources' write lock on first timestamp").insert(from); + if *self.last_time + .read() + .expect("'last_time' read lock on first timestamp check") + == Utc.timestamp(0, 0) + { + self.time_sources + .write() + .expect("'time_sources' write lock on first timestamp") + .insert(from); } - if self.time_sources.read().expect("'time_sources' read lock").contains(&from) { + if self.time_sources + .read() + .expect("'time_sources' read lock") + .contains(&from) + { if dt > *self.last_time.read().expect("'last_time' read lock") { *self.last_time.write().expect("'last_time' write lock") = dt; } @@ -285,9 +321,13 @@ impl Accountant { // Hold 'pending' write lock until the end of this function. Otherwise another thread can // double-spend if it enters before the modified plan is removed from 'pending'. - let mut pending = self.pending.write().expect("'pending' write lock in process_verified_timestamp"); + let mut pending = self.pending + .write() + .expect("'pending' write lock in process_verified_timestamp"); for (key, plan) in pending.iter_mut() { - plan.apply_witness(&Witness::Timestamp(*self.last_time.read().expect("'last_time' read lock when creating timestamp"))); + plan.apply_witness(&Witness::Timestamp(*self.last_time + .read() + .expect("'last_time' read lock when creating timestamp"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); completed.push(key.clone()); @@ -342,7 +382,9 @@ impl Accountant { } pub fn get_balance(&self, pubkey: &PublicKey) -> Option { - let bals = self.balances.read().expect("'balances' read lock in get_balance"); + let bals = self.balances + .read() + .expect("'balances' read lock in get_balance"); bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) } } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 0eb38e195e..1c97043c36 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -17,8 +17,8 @@ use std::env; use std::io::{stdin, stdout, Read}; use std::net::UdpSocket; use std::process::exit; -use std::sync::Arc; use std::sync::atomic::AtomicBool; +use std::sync::Arc; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); diff --git a/src/crdt.rs b/src/crdt.rs index b7f712a0c7..965dac08fe 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -263,7 +263,8 @@ impl Crdt { let mut buf = [0u8; 8]; rnd.fill(&mut buf).expect("rnd.fill in pub fn random"); let mut rdr = Cursor::new(&buf); - rdr.read_u64::().expect("rdr.read_u64 in fn random") + rdr.read_u64::() + .expect("rdr.read_u64 in fn random") } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); @@ -287,10 +288,19 @@ impl Crdt { return Err(Error::GeneralError); } let mut n = (Self::random() as usize) % self.table.len(); - while self.table.values().nth(n).expect("'values().nth(n)' while loop in fn gossip_request").id == self.me { + while self.table + .values() + .nth(n) + .expect("'values().nth(n)' while loop in fn gossip_request") + .id == self.me + { n = (Self::random() as usize) % self.table.len(); } - let v = self.table.values().nth(n).expect("'values().nth(n)' in fn gossip_request").clone(); + let v = self.table + .values() + .nth(n) + .expect("'values().nth(n)' in fn gossip_request") + .clone(); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); Ok((v.gossip_addr, req)) @@ -303,7 +313,9 @@ impl Crdt { // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` - let (remote_gossip_addr, req) = obj.read().expect("'obj' read lock in fn run_gossip").gossip_request()?; + let (remote_gossip_addr, req) = obj.read() + .expect("'obj' read lock in fn run_gossip") + .gossip_request()?; let sock = UdpSocket::bind("0.0.0.0:0")?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have @@ -335,7 +347,11 @@ impl Crdt { return; } //TODO this should be a tuned parameter - sleep(obj.read().expect("'obj' read lock in pub fn gossip").timeout); + sleep( + obj.read() + .expect("'obj' read lock in pub fn gossip") + .timeout, + ); }) } @@ -353,18 +369,25 @@ impl Crdt { trace!("RequestUpdates {}", v); let addr = reqdata.gossip_addr; // only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from` - let (from, ups, data) = obj.read().expect("'obj' read lock in RequestUpdates").get_updates_since(v); + let (from, ups, data) = obj.read() + .expect("'obj' read lock in RequestUpdates") + .get_updates_since(v); trace!("get updates since response {} {}", v, data.len()); let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; trace!("send_to {}", addr); //TODO verify reqdata belongs to sender - obj.write().expect("'obj' write lock in RequestUpdates").insert(reqdata); - sock.send_to(&rsp, addr).expect("'sock.send_to' in RequestUpdates"); + obj.write() + .expect("'obj' write lock in RequestUpdates") + .insert(reqdata); + sock.send_to(&rsp, addr) + .expect("'sock.send_to' in RequestUpdates"); trace!("send_to done!"); } Protocol::ReceiveUpdates(from, ups, data) => { trace!("ReceivedUpdates"); - obj.write().expect("'obj' write lock in ReceiveUpdates").apply_updates(from, ups, &data); + obj.write() + .expect("'obj' write lock in ReceiveUpdates") + .apply_updates(from, ups, &data); } } Ok(()) @@ -374,7 +397,8 @@ impl Crdt { sock: UdpSocket, exit: Arc, ) -> JoinHandle<()> { - sock.set_read_timeout(Some(Duration::new(2, 0))).expect("'sock.set_read_timeout' in crdt.rs"); + sock.set_read_timeout(Some(Duration::new(2, 0))) + .expect("'sock.set_read_timeout' in crdt.rs"); spawn(move || loop { let _ = Self::run_listen(&obj, &sock); if exit.load(Ordering::Relaxed) { diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 6e79c5f182..1214302133 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -78,7 +78,11 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { let mut rvs = Vec::new(); for packets in batches { - locks.push(packets.read().expect("'packets' read lock in pub fn ed25519_verify")); + locks.push( + packets + .read() + .expect("'packets' read lock in pub fn ed25519_verify"), + ); } let mut num = 0; for p in locks { @@ -135,8 +139,8 @@ mod tests { use packet::{Packet, Packets, SharedPackets}; use std::sync::RwLock; use thin_client_service::Request; - use transaction::Transaction; use transaction::test_tx; + use transaction::Transaction; fn make_packet_from_transaction(tr: Transaction) -> Packet { let tx = serialize(&Request::Transaction(tr)).unwrap(); diff --git a/src/erasure.rs b/src/erasure.rs index 62fa073070..35c543f090 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -164,7 +164,11 @@ pub fn generate_coding( let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); for i in consumed..consumed + NUM_DATA { let n = i % window.len(); - data_blobs.push(window[n].clone().expect("'data_blobs' arr in pub fn generate_coding")); + data_blobs.push( + window[n] + .clone() + .expect("'data_blobs' arr in pub fn generate_coding"), + ); } for b in &data_blobs { data_locks.push(b.write().expect("'b' write lock in pub fn generate_coding")); @@ -180,10 +184,17 @@ pub fn generate_coding( for i in coding_start..coding_end { let n = i % window.len(); window[n] = re.allocate(); - coding_blobs.push(window[n].clone().expect("'coding_blobs' arr in pub fn generate_coding")); + coding_blobs.push( + window[n] + .clone() + .expect("'coding_blobs' arr in pub fn generate_coding"), + ); } for b in &coding_blobs { - coding_locks.push(b.write().expect("'coding_locks' arr in pub fn generate_coding")); + coding_locks.push( + b.write() + .expect("'coding_locks' arr in pub fn generate_coding"), + ); } for (i, l) in coding_locks.iter_mut().enumerate() { trace!("i: {} data: {}", i, l.data[0]); diff --git a/src/event.rs b/src/event.rs index e9a9aac095..b3a317e199 100644 --- a/src/event.rs +++ b/src/event.rs @@ -49,7 +49,10 @@ impl Event { match *self { Event::Transaction(ref tr) => tr.verify_sig(), Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig), - Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).expect("serialize 'dt' in pub fn verify")), + Event::Timestamp { from, dt, sig } => sig.verify( + &from, + &serialize(&dt).expect("serialize 'dt' in pub fn verify"), + ), } } } diff --git a/src/hash.rs b/src/hash.rs index ee7598a0dc..61dd01468c 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -1,7 +1,7 @@ //! The `hash` module provides functions for creating SHA-256 hashes. -use generic_array::GenericArray; use generic_array::typenum::U32; +use generic_array::GenericArray; use sha2::{Digest, Sha256}; pub type Hash = GenericArray; diff --git a/src/historian.rs b/src/historian.rs index f7f88c5cf3..7a183c1555 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -4,8 +4,8 @@ use entry::Entry; use hash::Hash; use recorder::{ExitReason, Recorder, Signal}; -use std::sync::Mutex; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; +use std::sync::Mutex; use std::thread::{spawn, JoinHandle}; use std::time::Instant; @@ -52,7 +52,10 @@ impl Historian { } pub fn receive(self: &Self) -> Result { - self.output.lock().expect("'output' lock in pub fn receive").try_recv() + self.output + .lock() + .expect("'output' lock in pub fn receive") + .try_recv() } } diff --git a/src/mint.rs b/src/mint.rs index 4502c65d17..39bc4348d5 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -1,7 +1,7 @@ //! The `mint` module is a library for generating the chain's genesis block. -use entry::Entry; use entry::create_entry; +use entry::Entry; use event::Event; use hash::{hash, Hash}; use ring::rand::SystemRandom; @@ -19,8 +19,11 @@ pub struct Mint { impl Mint { pub fn new(tokens: i64) -> Self { let rnd = SystemRandom::new(); - let pkcs8 = KeyPair::generate_pkcs8(&rnd).expect("generate_pkcs8 in mint pub fn new").to_vec(); - let keypair = KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in mint pub fn new"); + let pkcs8 = KeyPair::generate_pkcs8(&rnd) + .expect("generate_pkcs8 in mint pub fn new") + .to_vec(); + let keypair = + KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in mint pub fn new"); let pubkey = keypair.pubkey(); Mint { pkcs8, diff --git a/src/result.rs b/src/result.rs index fca876ebec..d2cb485add 100644 --- a/src/result.rs +++ b/src/result.rs @@ -78,9 +78,9 @@ mod tests { use std::io; use std::io::Write; use std::net::SocketAddr; + use std::sync::mpsc::channel; use std::sync::mpsc::RecvError; use std::sync::mpsc::RecvTimeoutError; - use std::sync::mpsc::channel; use std::thread; fn addr_parse_error() -> Result { diff --git a/src/signature.rs b/src/signature.rs index 08f22166d0..8f8e9b075f 100644 --- a/src/signature.rs +++ b/src/signature.rs @@ -1,7 +1,7 @@ //! The `signature` module provides functionality for public, and private keys. -use generic_array::GenericArray; use generic_array::typenum::{U32, U64}; +use generic_array::GenericArray; use ring::signature::Ed25519KeyPair; use ring::{rand, signature}; use untrusted; @@ -19,8 +19,10 @@ impl KeyPairUtil for Ed25519KeyPair { /// Return a new ED25519 keypair fn new() -> Self { let rng = rand::SystemRandom::new(); - let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng).expect("generate_pkcs8 in signature pb fn new"); - signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).expect("from_pcks8 in signature pb fn new") + let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng) + .expect("generate_pkcs8 in signature pb fn new"); + signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)) + .expect("from_pcks8 in signature pb fn new") } /// Return the public key for the given keypair diff --git a/src/streamer.rs b/src/streamer.rs index a26a609edc..d551a890e4 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -27,7 +27,10 @@ fn recv_loop( let msgs = re.allocate(); let msgs_ = msgs.clone(); loop { - match msgs.write().expect("write lock in fn recv_loop").recv_from(sock) { + match msgs.write() + .expect("write lock in fn recv_loop") + .recv_from(sock) + { Ok(()) => { channel.send(msgs_)?; break; @@ -136,7 +139,10 @@ fn recv_window( ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; - let leader_id = crdt.read().expect("'crdt' read lock in fn recv_window").leader_data().id; + let leader_id = crdt.read() + .expect("'crdt' read lock in fn recv_window") + .leader_data() + .id; while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } @@ -457,8 +463,8 @@ mod test { use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; - use streamer::{BlobReceiver, PacketReceiver}; use streamer::{blob_receiver, receiver, responder, retransmitter, window}; + use streamer::{BlobReceiver, PacketReceiver}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { diff --git a/src/thin_client.rs b/src/thin_client.rs index 31c9a0b79a..e1ee2b4dcd 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -124,7 +124,8 @@ impl ThinClient { // Wait for at least one EntryInfo. let mut done = false; while !done { - let resp = self.recv_response().expect("recv_response in pub fn transaction_count"); + let resp = self.recv_response() + .expect("recv_response in pub fn transaction_count"); if let &Response::EntryInfo(_) = &resp { done = true; } @@ -132,14 +133,18 @@ impl ThinClient { } // Then take the rest. - self.socket.set_nonblocking(true).expect("set_nonblocking in pub fn transaction_count"); + self.socket + .set_nonblocking(true) + .expect("set_nonblocking in pub fn transaction_count"); loop { match self.recv_response() { Err(_) => break, Ok(resp) => self.process_response(resp), } } - self.socket.set_nonblocking(false).expect("set_nonblocking in pub fn transaction_count"); + self.socket + .set_nonblocking(false) + .expect("set_nonblocking in pub fn transaction_count"); self.num_events } } diff --git a/src/timing.rs b/src/timing.rs index ad720a8dc4..4b0b9ab576 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -10,6 +10,8 @@ pub fn duration_as_s(d: &Duration) -> f32 { } pub fn timestamp() -> u64 { - let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("create timestamp in timing"); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("create timestamp in timing"); return duration_as_ms(&now); } diff --git a/src/tpu.rs b/src/tpu.rs index 3e6dc0fd26..bb3fce0e04 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -12,8 +12,8 @@ use rand::{thread_rng, Rng}; use result::Result; use serde_json; use std::collections::VecDeque; -use std::io::Write; use std::io::sink; +use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Sender}; @@ -66,7 +66,12 @@ impl Tpu { .recv_timeout(Duration::new(1, 0))?; self.write_entry(writer, &entry); l.push(entry); - while let Ok(entry) = self.accounting_stage.output.lock().expect("'output' lock in fn write_entries").try_recv() { + while let Ok(entry) = self.accounting_stage + .output + .lock() + .expect("'output' lock in fn write_entries") + .try_recv() + { self.write_entry(writer, &entry); l.push(entry); } @@ -130,7 +135,10 @@ impl Tpu { ) -> Result<()> { let r = ecdsa::ed25519_verify(&batch); let res = batch.into_iter().zip(r).collect(); - sendr.lock().expect("lock in fn verify_batch in tpu").send(res)?; + sendr + .lock() + .expect("lock in fn verify_batch in tpu") + .send(res)?; // TODO: fix error handling here? Ok(()) } @@ -139,7 +147,8 @@ impl Tpu { recvr: &Arc>, sendr: &Arc)>>>>, ) -> Result<()> { - let (batch, len) = streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; + let (batch, len) = + streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; let now = Instant::now(); let batch_len = batch.len(); @@ -315,8 +324,12 @@ impl Tpu { ) -> Result>> { //replicate pipeline let crdt = Arc::new(RwLock::new(Crdt::new(me))); - crdt.write().expect("'crdt' write lock in pub fn replicate").set_leader(leader.id); - crdt.write().expect("'crdt' write lock before insert() in pub fn replicate").insert(leader); + crdt.write() + .expect("'crdt' write lock in pub fn replicate") + .set_leader(leader.id); + crdt.write() + .expect("'crdt' write lock before insert() in pub fn replicate") + .insert(leader); let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());