From 658e787b609fac29821f53f6c9d5046266f4f791 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Tue, 8 May 2018 22:40:07 -0700 Subject: [PATCH 01/23] timing.rs panic cleanup --- src/timing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/timing.rs b/src/timing.rs index 0d3c383839..7b08cfeb94 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -10,6 +10,6 @@ pub fn duration_as_s(d: &Duration) -> f32 { } pub fn timestamp() -> u64 { - let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("failed to create timestamp"); return duration_as_ms(&now); } From 86c1aaf7d8d6fb415d74970a42d0973d37ea8ce4 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Tue, 8 May 2018 22:46:22 -0700 Subject: [PATCH 02/23] transaction.rs - panic cleanup --- src/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction.rs b/src/transaction.rs index 4080d11be2..0ccaf95a35 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -70,7 +70,7 @@ impl Transaction { } fn get_sign_data(&self) -> Vec { - serialize(&(&self.data)).unwrap() + serialize(&(&self.data)).expect("failed to serialize sign_data") } /// Sign this transaction. From 670a6c50c9d5e79fca9e06de3a8d8381278d3ef5 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Tue, 8 May 2018 22:58:48 -0700 Subject: [PATCH 03/23] event.rs - panic cleanup --- src/event.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/event.rs b/src/event.rs index fabd6d7735..902ae8bef3 100644 --- a/src/event.rs +++ b/src/event.rs @@ -24,7 +24,7 @@ pub enum Event { impl Event { /// Create and sign a new Witness Timestamp. Used for unit-testing. pub fn new_timestamp(from: &KeyPair, dt: DateTime) -> Self { - let sign_data = serialize(&dt).unwrap(); + let sign_data = serialize(&dt).expect("failed to serialize Event sign_data"); let sig = Signature::clone_from_slice(from.sign(&sign_data).as_ref()); Event::Timestamp { from: from.pubkey(), @@ -49,7 +49,7 @@ impl Event { match *self { Event::Transaction(ref tr) => tr.verify_sig(), Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig), - Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).unwrap()), + Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).expect("failed to verify Event Timestamp")), } } } From fe51669e85068b6a92e5abdddb871a3d9384cd75 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Tue, 8 May 2018 23:21:45 -0700 Subject: [PATCH 04/23] signature.rs - panic cleanup --- src/signature.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/signature.rs b/src/signature.rs index 1b01e14ef5..c2136556f4 100644 --- a/src/signature.rs +++ b/src/signature.rs @@ -19,8 +19,8 @@ impl KeyPairUtil for Ed25519KeyPair { /// Return a new ED25519 keypair fn new() -> Self { let rng = rand::SystemRandom::new(); - let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng).unwrap(); - signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).unwrap() + let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng).expect("failed to generate_pkcs8"); + signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).expect("failed to construct Ed25519KeyPair") } /// Return the public key for the given keypair From f2de486658f74350a8e6b1ca8f6bb25482a08726 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Wed, 9 May 2018 17:19:12 -0700 Subject: [PATCH 05/23] accountant.rs - panic cleanup --- src/accountant.rs | 49 ++++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index a214aa4196..bc244574f8 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -32,11 +32,11 @@ pub type Result = result::Result; /// Commit funds to the 'to' party. fn apply_payment(balances: &RwLock>, payment: &Payment) { - if balances.read().unwrap().contains_key(&payment.to) { - let bals = balances.read().unwrap(); + if balances.read().expect("failed 'balances' read lock in apply_payment").contains_key(&payment.to) { + let bals = balances.read().expect("failed 'balances' read lock"); bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); } else { - let mut bals = balances.write().unwrap(); + let mut bals = balances.write().expect("failed 'balances' write lock"); bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize)); } } @@ -76,27 +76,27 @@ impl Accountant { /// Return the last entry ID registered pub fn last_id(&self) -> Hash { - let last_ids = self.last_ids.read().unwrap(); - let last_item = last_ids.iter().last().expect("empty last_ids list"); + let last_ids = self.last_ids.read().expect("failed 'last_ids' read lock"); + let last_item = last_ids.iter().last().expect("empty 'last_ids' list"); last_item.0 } fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> bool { - if signatures.read().unwrap().contains(sig) { + if signatures.read().expect("failed 'signatures' read lock").contains(sig) { return false; } - signatures.write().unwrap().insert(*sig); + signatures.write().expect("failed 'signatures' write lock").insert(*sig); true } fn forget_signature(signatures: &RwLock>, sig: &Signature) -> bool { - signatures.write().unwrap().remove(sig) + signatures.write().expect("failed 'signatures' write lock in forget_signature").remove(sig) } fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool { if let Some(entry) = self.last_ids .read() - .unwrap() + .expect("failed 'last_ids' read lock in forget_signature_with_last_id") .iter() .rev() .find(|x| x.0 == *last_id) @@ -109,7 +109,7 @@ impl Accountant { fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool { if let Some(entry) = self.last_ids .read() - .unwrap() + .expect("failed 'last_ids' read lock in reserve_signature_with_last_id") .iter() .rev() .find(|x| x.0 == *last_id) @@ -124,7 +124,7 @@ impl Accountant { /// the oldest ones once its internal cache is full. Once boot, the /// accountant will reject transactions using that `last_id`. pub fn register_entry_id(&self, last_id: &Hash) { - let mut last_ids = self.last_ids.write().unwrap(); + let mut last_ids = self.last_ids.write().expect("failed 'last_ids' write lock in register_entry_id"); if last_ids.len() >= MAX_ENTRY_IDS { last_ids.pop_front(); } @@ -134,7 +134,7 @@ impl Accountant { /// Deduct tokens from the 'from' address the account has sufficient /// funds and isn't a duplicate. pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> { - let bals = self.balances.read().unwrap(); + let bals = self.balances.read().expect("failed 'balances' read lock in process_verified_transaction_debits"); let option = bals.get(&tr.from); if option.is_none() { @@ -146,7 +146,7 @@ impl Accountant { } loop { - let bal = option.unwrap(); + let bal = option.expect("failed on assignment of option to bal"); let current = bal.load(Ordering::Relaxed) as i64; if current < tr.data.tokens { @@ -170,12 +170,13 @@ impl Accountant { pub fn process_verified_transaction_credits(&self, tr: &Transaction) { let mut plan = tr.data.plan.clone(); - plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); + plan.apply_witness(&Witness::Timestamp(*self.last_time.read() + .expect("failed call to apply_witness in process_verified_transaction_credits"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); } else { - let mut pending = self.pending.write().unwrap(); + let mut pending = self.pending.write().expect("failed to "); pending.insert(tr.sig, plan); } } @@ -234,7 +235,7 @@ impl Accountant { /// Process a Witness Signature that has already been verified. fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { - if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { + if let Occupied(mut e) = self.pending.write().expect("failed write() in process_verified_sig").entry(tx_sig) { e.get_mut().apply_witness(&Witness::Signature(from)); if let Some(ref payment) = e.get().final_payment() { apply_payment(&self.balances, payment); @@ -249,13 +250,13 @@ impl Accountant { fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime) -> Result<()> { // If this is the first timestamp we've seen, it probably came from the genesis block, // so we'll trust it. - if *self.last_time.read().unwrap() == Utc.timestamp(0, 0) { - self.time_sources.write().unwrap().insert(from); + if *self.last_time.read().expect("failed 'last_time' read lock on first timestamp check") == Utc.timestamp(0, 0) { + self.time_sources.write().expect("failed 'time_sources' write lock on first timestamp").insert(from); } - if self.time_sources.read().unwrap().contains(&from) { - if dt > *self.last_time.read().unwrap() { - *self.last_time.write().unwrap() = dt; + if self.time_sources.read().expect("failed 'time_sources' read lock").contains(&from) { + if dt > *self.last_time.read().expect("failed 'last_time' read lock") { + *self.last_time.write().expect("failed 'last_time' write lock") = dt; } } else { return Ok(()); @@ -266,9 +267,9 @@ impl Accountant { // Hold 'pending' write lock until the end of this function. Otherwise another thread can // double-spend if it enters before the modified plan is removed from 'pending'. - let mut pending = self.pending.write().unwrap(); + let mut pending = self.pending.write().expect("failed 'pending' write lock"); for (key, plan) in pending.iter_mut() { - plan.apply_witness(&Witness::Timestamp(*self.last_time.read().unwrap())); + plan.apply_witness(&Witness::Timestamp(*self.last_time.read().expect("failed 'last_time' read lock when creating timestamp"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); completed.push(key.clone()); @@ -323,7 +324,7 @@ impl Accountant { } pub fn get_balance(&self, pubkey: &PublicKey) -> Option { - let bals = self.balances.read().unwrap(); + let bals = self.balances.read().expect("failed 'balances' read lock in get_balance"); bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) } } From 02c573986b15e84d1a308e42fb124f69ff8e5d6a Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Wed, 9 May 2018 17:22:14 -0700 Subject: [PATCH 06/23] historian / transaction updates --- src/historian.rs | 2 +- src/transaction.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/historian.rs b/src/historian.rs index 7d2478bf15..6f776aab4d 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -53,7 +53,7 @@ impl Historian { } pub fn receive(self: &Self) -> Result { - self.output.lock().unwrap().try_recv() + self.output.lock().expect("failed 'output' lock in Historian").try_recv() } } diff --git a/src/transaction.rs b/src/transaction.rs index 0ccaf95a35..6918012e30 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -70,7 +70,7 @@ impl Transaction { } fn get_sign_data(&self) -> Vec { - serialize(&(&self.data)).expect("failed to serialize sign_data") + serialize(&(&self.data)).expect("failed on serialize TransactionData") } /// Sign this transaction. From 1e91d09be792dbf96044cdc3066386f07137254d Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Wed, 9 May 2018 18:10:48 -0700 Subject: [PATCH 07/23] crdt.rs - panic cleanup --- src/crdt.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/crdt.rs b/src/crdt.rs index df01d89ec1..b7f712a0c7 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -168,7 +168,7 @@ impl Crdt { ) -> Result<()> { let (me, table): (ReplicatedData, Vec) = { // copy to avoid locking durring IO - let robj = obj.read().unwrap(); + let robj = obj.read().expect("'obj' read lock in pub fn broadcast"); let cloned_table: Vec = robj.table.values().cloned().collect(); (robj.table[&robj.me].clone(), cloned_table) }; @@ -194,10 +194,10 @@ impl Crdt { .map(|((i, v), b)| { // only leader should be broadcasting assert!(me.current_leader_id != v.id); - let mut blob = b.write().unwrap(); - blob.set_id(me.id).expect("set_id"); + let mut blob = b.write().expect("'b' write lock in pub fn broadcast"); + blob.set_id(me.id).expect("set_id in pub fn broadcast"); blob.set_index(*transmit_index + i as u64) - .expect("set_index"); + .expect("set_index in pub fn broadcast"); //TODO profile this, may need multiple sockets for par_iter s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr) }) @@ -219,10 +219,10 @@ impl Crdt { pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { let (me, table): (ReplicatedData, Vec) = { // copy to avoid locking durring IO - let s = obj.read().unwrap(); + let s = obj.read().expect("'obj' read lock in pub fn retransmit"); (s.table[&s.me].clone(), s.table.values().cloned().collect()) }; - let rblob = blob.read().unwrap(); + let rblob = blob.read().expect("'blob' read lock in pub fn retransmit"); let daddr = "0.0.0.0:0".parse().unwrap(); let orders: Vec<_> = table .iter() @@ -261,9 +261,9 @@ impl Crdt { fn random() -> u64 { let rnd = SystemRandom::new(); let mut buf = [0u8; 8]; - rnd.fill(&mut buf).unwrap(); + rnd.fill(&mut buf).expect("rnd.fill in pub fn random"); let mut rdr = Cursor::new(&buf); - rdr.read_u64::().unwrap() + rdr.read_u64::().expect("rdr.read_u64 in fn random") } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); @@ -287,10 +287,10 @@ impl Crdt { return Err(Error::GeneralError); } let mut n = (Self::random() as usize) % self.table.len(); - while self.table.values().nth(n).unwrap().id == self.me { + while self.table.values().nth(n).expect("'values().nth(n)' while loop in fn gossip_request").id == self.me { n = (Self::random() as usize) % self.table.len(); } - let v = self.table.values().nth(n).unwrap().clone(); + let v = self.table.values().nth(n).expect("'values().nth(n)' in fn gossip_request").clone(); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); Ok((v.gossip_addr, req)) @@ -303,7 +303,7 @@ impl Crdt { // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` - let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request()?; + let (remote_gossip_addr, req) = obj.read().expect("'obj' read lock in fn run_gossip").gossip_request()?; let sock = UdpSocket::bind("0.0.0.0:0")?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have @@ -335,7 +335,7 @@ impl Crdt { return; } //TODO this should be a tuned parameter - sleep(obj.read().unwrap().timeout); + sleep(obj.read().expect("'obj' read lock in pub fn gossip").timeout); }) } @@ -353,18 +353,18 @@ impl Crdt { trace!("RequestUpdates {}", v); let addr = reqdata.gossip_addr; // only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from` - let (from, ups, data) = obj.read().unwrap().get_updates_since(v); + let (from, ups, data) = obj.read().expect("'obj' read lock in RequestUpdates").get_updates_since(v); trace!("get updates since response {} {}", v, data.len()); let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; trace!("send_to {}", addr); //TODO verify reqdata belongs to sender - obj.write().unwrap().insert(reqdata); - sock.send_to(&rsp, addr).unwrap(); + obj.write().expect("'obj' write lock in RequestUpdates").insert(reqdata); + sock.send_to(&rsp, addr).expect("'sock.send_to' in RequestUpdates"); trace!("send_to done!"); } Protocol::ReceiveUpdates(from, ups, data) => { trace!("ReceivedUpdates"); - obj.write().unwrap().apply_updates(from, ups, &data); + obj.write().expect("'obj' write lock in ReceiveUpdates").apply_updates(from, ups, &data); } } Ok(()) @@ -374,7 +374,7 @@ impl Crdt { sock: UdpSocket, exit: Arc, ) -> JoinHandle<()> { - sock.set_read_timeout(Some(Duration::new(2, 0))).unwrap(); + sock.set_read_timeout(Some(Duration::new(2, 0))).expect("'sock.set_read_timeout' in crdt.rs"); spawn(move || loop { let _ = Self::run_listen(&obj, &sock); if exit.load(Ordering::Relaxed) { From 52ebb88205a17ef3e21376e858c3e6b84b5dc3b5 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Wed, 9 May 2018 18:16:37 -0700 Subject: [PATCH 08/23] accountant.rs - simplify error messages --- src/accountant.rs | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index bc244574f8..dc27db12a9 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -32,11 +32,11 @@ pub type Result = result::Result; /// Commit funds to the 'to' party. fn apply_payment(balances: &RwLock>, payment: &Payment) { - if balances.read().expect("failed 'balances' read lock in apply_payment").contains_key(&payment.to) { - let bals = balances.read().expect("failed 'balances' read lock"); + if balances.read().expect("'balances' read lock in apply_payment").contains_key(&payment.to) { + let bals = balances.read().expect("'balances' read lock"); bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); } else { - let mut bals = balances.write().expect("failed 'balances' write lock"); + let mut bals = balances.write().expect("'balances' write lock"); bals.insert(payment.to, AtomicIsize::new(payment.tokens as isize)); } } @@ -76,27 +76,27 @@ impl Accountant { /// Return the last entry ID registered pub fn last_id(&self) -> Hash { - let last_ids = self.last_ids.read().expect("failed 'last_ids' read lock"); + let last_ids = self.last_ids.read().expect("'last_ids' read lock"); let last_item = last_ids.iter().last().expect("empty 'last_ids' list"); last_item.0 } fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> bool { - if signatures.read().expect("failed 'signatures' read lock").contains(sig) { + if signatures.read().expect("'signatures' read lock").contains(sig) { return false; } - signatures.write().expect("failed 'signatures' write lock").insert(*sig); + signatures.write().expect("'signatures' write lock").insert(*sig); true } fn forget_signature(signatures: &RwLock>, sig: &Signature) -> bool { - signatures.write().expect("failed 'signatures' write lock in forget_signature").remove(sig) + signatures.write().expect("'signatures' write lock in forget_signature").remove(sig) } fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool { if let Some(entry) = self.last_ids .read() - .expect("failed 'last_ids' read lock in forget_signature_with_last_id") + .expect("'last_ids' read lock in forget_signature_with_last_id") .iter() .rev() .find(|x| x.0 == *last_id) @@ -109,7 +109,7 @@ impl Accountant { fn reserve_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool { if let Some(entry) = self.last_ids .read() - .expect("failed 'last_ids' read lock in reserve_signature_with_last_id") + .expect("'last_ids' read lock in reserve_signature_with_last_id") .iter() .rev() .find(|x| x.0 == *last_id) @@ -124,7 +124,7 @@ impl Accountant { /// the oldest ones once its internal cache is full. Once boot, the /// accountant will reject transactions using that `last_id`. pub fn register_entry_id(&self, last_id: &Hash) { - let mut last_ids = self.last_ids.write().expect("failed 'last_ids' write lock in register_entry_id"); + let mut last_ids = self.last_ids.write().expect("'last_ids' write lock in register_entry_id"); if last_ids.len() >= MAX_ENTRY_IDS { last_ids.pop_front(); } @@ -134,7 +134,7 @@ impl Accountant { /// Deduct tokens from the 'from' address the account has sufficient /// funds and isn't a duplicate. pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> { - let bals = self.balances.read().expect("failed 'balances' read lock in process_verified_transaction_debits"); + let bals = self.balances.read().expect("'balances' read lock in process_verified_transaction_debits"); let option = bals.get(&tr.from); if option.is_none() { @@ -146,7 +146,7 @@ impl Accountant { } loop { - let bal = option.expect("failed on assignment of option to bal"); + let bal = option.expect("assignment of option to bal"); let current = bal.load(Ordering::Relaxed) as i64; if current < tr.data.tokens { @@ -171,12 +171,12 @@ impl Accountant { pub fn process_verified_transaction_credits(&self, tr: &Transaction) { let mut plan = tr.data.plan.clone(); plan.apply_witness(&Witness::Timestamp(*self.last_time.read() - .expect("failed call to apply_witness in process_verified_transaction_credits"))); + .expect("timestamp creation in process_verified_transaction_credits"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); } else { - let mut pending = self.pending.write().expect("failed to "); + let mut pending = self.pending.write().expect("'pending' write lock in process_verified_transaction_credits"); pending.insert(tr.sig, plan); } } @@ -235,7 +235,7 @@ impl Accountant { /// Process a Witness Signature that has already been verified. fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { - if let Occupied(mut e) = self.pending.write().expect("failed write() in process_verified_sig").entry(tx_sig) { + if let Occupied(mut e) = self.pending.write().expect("write() in process_verified_sig").entry(tx_sig) { e.get_mut().apply_witness(&Witness::Signature(from)); if let Some(ref payment) = e.get().final_payment() { apply_payment(&self.balances, payment); @@ -250,13 +250,13 @@ impl Accountant { fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime) -> Result<()> { // If this is the first timestamp we've seen, it probably came from the genesis block, // so we'll trust it. - if *self.last_time.read().expect("failed 'last_time' read lock on first timestamp check") == Utc.timestamp(0, 0) { - self.time_sources.write().expect("failed 'time_sources' write lock on first timestamp").insert(from); + if *self.last_time.read().expect("'last_time' read lock on first timestamp check") == Utc.timestamp(0, 0) { + self.time_sources.write().expect("'time_sources' write lock on first timestamp").insert(from); } - if self.time_sources.read().expect("failed 'time_sources' read lock").contains(&from) { - if dt > *self.last_time.read().expect("failed 'last_time' read lock") { - *self.last_time.write().expect("failed 'last_time' write lock") = dt; + if self.time_sources.read().expect("'time_sources' read lock").contains(&from) { + if dt > *self.last_time.read().expect("'last_time' read lock") { + *self.last_time.write().expect("'last_time' write lock") = dt; } } else { return Ok(()); @@ -267,9 +267,9 @@ impl Accountant { // Hold 'pending' write lock until the end of this function. Otherwise another thread can // double-spend if it enters before the modified plan is removed from 'pending'. - let mut pending = self.pending.write().expect("failed 'pending' write lock"); + let mut pending = self.pending.write().expect("'pending' write lock in process_verified_timestamp"); for (key, plan) in pending.iter_mut() { - plan.apply_witness(&Witness::Timestamp(*self.last_time.read().expect("failed 'last_time' read lock when creating timestamp"))); + plan.apply_witness(&Witness::Timestamp(*self.last_time.read().expect("'last_time' read lock when creating timestamp"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); completed.push(key.clone()); @@ -324,7 +324,7 @@ impl Accountant { } pub fn get_balance(&self, pubkey: &PublicKey) -> Option { - let bals = self.balances.read().expect("failed 'balances' read lock in get_balance"); + let bals = self.balances.read().expect("'balances' read lock in get_balance"); bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) } } From bcd6606a16c8328a19170ef6ef8c3b1497a64c9f Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Wed, 9 May 2018 18:19:23 -0700 Subject: [PATCH 09/23] ecdsa.rs - panic cleanup --- src/ecdsa.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 4d7abbdbb4..d728e81ac7 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -59,7 +59,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { .into_par_iter() .map(|p| { p.read() - .unwrap() + .expect("'p' read lock in ed25519_verify") .packets .par_iter() .map(verify_packet) @@ -78,7 +78,7 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { let mut rvs = Vec::new(); for packets in batches { - locks.push(packets.read().unwrap()); + locks.push(packets.read().expect("'packets' read lock in pub fn ed25519_verify")); } let mut num = 0; for p in locks { From 882ea6b67217c767f3b6d56c332934c48265dc28 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 16:54:21 -0700 Subject: [PATCH 10/23] erasure.rs - panic cleanup --- src/erasure.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/erasure.rs b/src/erasure.rs index 12b4223bb9..62fa073070 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -164,10 +164,10 @@ pub fn generate_coding( let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); for i in consumed..consumed + NUM_DATA { let n = i % window.len(); - data_blobs.push(window[n].clone().unwrap()); + data_blobs.push(window[n].clone().expect("'data_blobs' arr in pub fn generate_coding")); } for b in &data_blobs { - data_locks.push(b.write().unwrap()); + data_locks.push(b.write().expect("'b' write lock in pub fn generate_coding")); } for (i, l) in data_locks.iter_mut().enumerate() { trace!("i: {} data: {}", i, l.data[0]); @@ -180,10 +180,10 @@ pub fn generate_coding( for i in coding_start..coding_end { let n = i % window.len(); window[n] = re.allocate(); - coding_blobs.push(window[n].clone().unwrap()); + coding_blobs.push(window[n].clone().expect("'coding_blobs' arr in pub fn generate_coding")); } for b in &coding_blobs { - coding_locks.push(b.write().unwrap()); + coding_locks.push(b.write().expect("'coding_locks' arr in pub fn generate_coding")); } for (i, l) in coding_locks.iter_mut().enumerate() { trace!("i: {} data: {}", i, l.data[0]); @@ -231,7 +231,7 @@ pub fn recover( let j = i % window.len(); let mut b = &mut window[j]; if b.is_some() { - blobs.push(b.clone().unwrap()); + blobs.push(b.clone().expect("'blobs' arr in pb fn recover")); continue; } let n = re.allocate(); @@ -244,7 +244,7 @@ pub fn recover( trace!("erasures: {:?}", erasures); //lock everything for b in &blobs { - locks.push(b.write().unwrap()); + locks.push(b.write().expect("'locks' arr in pb fn recover")); } for (i, l) in locks.iter_mut().enumerate() { if i >= NUM_DATA { From b7a0bd6347867213bec99d9495a3002eb4081369 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 16:59:13 -0700 Subject: [PATCH 11/23] event.rs - panic cleanup --- src/event.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/event.rs b/src/event.rs index 902ae8bef3..e9a9aac095 100644 --- a/src/event.rs +++ b/src/event.rs @@ -24,7 +24,7 @@ pub enum Event { impl Event { /// Create and sign a new Witness Timestamp. Used for unit-testing. pub fn new_timestamp(from: &KeyPair, dt: DateTime) -> Self { - let sign_data = serialize(&dt).expect("failed to serialize Event sign_data"); + let sign_data = serialize(&dt).expect("serialize 'dt' in pub fn new_timestamp"); let sig = Signature::clone_from_slice(from.sign(&sign_data).as_ref()); Event::Timestamp { from: from.pubkey(), @@ -49,7 +49,7 @@ impl Event { match *self { Event::Transaction(ref tr) => tr.verify_sig(), Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig), - Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).expect("failed to verify Event Timestamp")), + Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).expect("serialize 'dt' in pub fn verify")), } } } From d69beaabe1e894e14a17547d0eebd487cb8abaf5 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 17:00:37 -0700 Subject: [PATCH 12/23] historian.rs - panic cleanup --- src/historian.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/historian.rs b/src/historian.rs index 6f776aab4d..dddb2ff982 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -53,7 +53,7 @@ impl Historian { } pub fn receive(self: &Self) -> Result { - self.output.lock().expect("failed 'output' lock in Historian").try_recv() + self.output.lock().expect("'output' lock in pub fn receive").try_recv() } } From d9f81b0c8c5b6e4d15146438aa94682a763f4b7b Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 17:06:43 -0700 Subject: [PATCH 13/23] mint.rs - panic cleanup --- src/mint.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mint.rs b/src/mint.rs index 754cacaa41..41ccac617e 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -19,8 +19,8 @@ pub struct Mint { impl Mint { pub fn new(tokens: i64) -> Self { let rnd = SystemRandom::new(); - let pkcs8 = KeyPair::generate_pkcs8(&rnd).unwrap().to_vec(); - let keypair = KeyPair::from_pkcs8(Input::from(&pkcs8)).unwrap(); + let pkcs8 = KeyPair::generate_pkcs8(&rnd).expect("generate_pkcs8 in mint pub fn new").to_vec(); + let keypair = KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in mint pub fn new"); let pubkey = keypair.pubkey(); Mint { pkcs8, @@ -38,7 +38,7 @@ impl Mint { } pub fn keypair(&self) -> KeyPair { - KeyPair::from_pkcs8(Input::from(&self.pkcs8)).unwrap() + KeyPair::from_pkcs8(Input::from(&self.pkcs8)).expect("from_pkcs8 in mint pub fn keypair") } pub fn pubkey(&self) -> PublicKey { From c1496722aaefa53f53a2c62117da09dd758156f6 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 17:11:31 -0700 Subject: [PATCH 14/23] packet.rs - panic cleanup --- src/packet.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/packet.rs b/src/packet.rs index 713a166f68..258498dcfc 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -156,12 +156,12 @@ impl Clone for Recycler { impl Recycler { pub fn allocate(&self) -> Arc> { - let mut gc = self.gc.lock().expect("recycler lock"); + let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate"); gc.pop() .unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))) } pub fn recycle(&self, msgs: Arc>) { - let mut gc = self.gc.lock().expect("recycler lock"); + let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); gc.push(msgs); } } @@ -264,7 +264,7 @@ impl Blob { for i in 0..NUM_BLOBS { let r = re.allocate(); { - let mut p = r.write().unwrap(); + let mut p = r.write().expect("'r' write lock in pub fn recv_from"); match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { trace!("got {:?} messages", i); @@ -294,7 +294,7 @@ impl Blob { ) -> Result<()> { while let Some(r) = v.pop_front() { { - let p = r.read().unwrap(); + let p = r.read().expect("'r' read lock in pub fn send_to"); let a = p.meta.addr(); socket.send_to(&p.data[..p.meta.size], &a)?; } From 73c7fb87e8b15b329ad2e16c2c3d748e7e1fde51 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 17:15:53 -0700 Subject: [PATCH 15/23] signature.rs - panic cleanup --- src/signature.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/signature.rs b/src/signature.rs index c2136556f4..8e03a696d2 100644 --- a/src/signature.rs +++ b/src/signature.rs @@ -19,8 +19,8 @@ impl KeyPairUtil for Ed25519KeyPair { /// Return a new ED25519 keypair fn new() -> Self { let rng = rand::SystemRandom::new(); - let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng).expect("failed to generate_pkcs8"); - signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).expect("failed to construct Ed25519KeyPair") + let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng).expect("generate_pkcs8 in signature pb fn new"); + signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).expect("from_pcks8 in signature pb fn new") } /// Return the public key for the given keypair From 4eb2e84c9f069c113c3cedd1f542dfdf1d25b09d Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 17:38:00 -0700 Subject: [PATCH 16/23] streamer.rs - panic cleanup --- src/streamer.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 808eea1e76..11c59409ac 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -27,7 +27,7 @@ fn recv_loop( let msgs = re.allocate(); let msgs_ = msgs.clone(); loop { - match msgs.write().unwrap().recv_from(sock) { + match msgs.write().expect("write lock in fn recv_loop").recv_from(sock) { Ok(()) => { channel.send(msgs_)?; break; @@ -117,7 +117,7 @@ fn recv_window( ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; - let leader_id = crdt.read().unwrap().leader_data().id; + let leader_id = crdt.read().expect("'crdt' read lock in fn recv_window").leader_data().id; while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } @@ -125,17 +125,17 @@ fn recv_window( //retransmit all leader blocks let mut retransmitq = VecDeque::new(); for b in &dq { - let p = b.read().unwrap(); + let p = b.read().expect("'b' read lock in fn recv_window"); //TODO this check isn't safe against adverserial packets //we need to maintain a sequence window trace!( "idx: {} addr: {:?} id: {:?} leader: {:?}", - p.get_index().unwrap(), - p.get_id().unwrap(), + p.get_index().expect("get_index in fn recv_window"), + p.get_id().expect("get_id in trace! fn recv_window"), p.meta.addr(), leader_id ); - if p.get_id().unwrap() == leader_id { + if p.get_id().expect("get_id in fn recv_window") == leader_id { //TODO //need to copy the retransmited blob //otherwise we get into races with which thread @@ -145,7 +145,7 @@ fn recv_window( //is dropped via a weakref to the recycler let nv = recycler.allocate(); { - let mut mnv = nv.write().unwrap(); + let mut mnv = nv.write().expect("recycler write lock in fn recv_window"); let sz = p.meta.size; mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); @@ -161,7 +161,7 @@ fn recv_window( let mut contq = VecDeque::new(); while let Some(b) = dq.pop_front() { let b_ = b.clone(); - let p = b.write().unwrap(); + let p = b.write().expect("'b' write lock in fn recv_window"); let pix = p.get_index()? as usize; let w = pix % NUM_BLOBS; //TODO, after the block are authenticated @@ -180,7 +180,7 @@ fn recv_window( if window[k].is_none() { break; } - contq.push_back(window[k].clone().unwrap()); + contq.push_back(window[k].clone().expect("clone in fn recv_window")); window[k] = None; *consumed += 1; } From 63a4bafa72115b9035fdae05c52256c08c02261b Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 17:46:10 -0700 Subject: [PATCH 17/23] thin_client - panic cleanup --- src/thin_client.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/thin_client.rs b/src/thin_client.rs index 3ae436ef8d..343904d335 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -40,7 +40,7 @@ impl ThinClient { pub fn init(&self) { let subscriptions = vec![Subscription::EntryInfo]; let req = Request::Subscribe { subscriptions }; - let data = serialize(&req).expect("serialize Subscribe"); + let data = serialize(&req).expect("serialize Subscribe in thin_client"); trace!("subscribing to {}", self.addr); let _res = self.socket.send_to(&data, &self.addr); } @@ -50,7 +50,7 @@ impl ThinClient { info!("start recv_from"); self.socket.recv_from(&mut buf)?; info!("end recv_from"); - let resp = deserialize(&buf).expect("deserialize balance"); + let resp = deserialize(&buf).expect("deserialize balance in thin_client"); Ok(resp) } @@ -72,7 +72,7 @@ impl ThinClient { /// does not wait for a response. pub fn transfer_signed(&self, tr: Transaction) -> io::Result { let req = Request::Transaction(tr); - let data = serialize(&req).unwrap(); + let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed"); self.socket.send_to(&data, &self.addr) } @@ -95,10 +95,10 @@ impl ThinClient { pub fn get_balance(&mut self, pubkey: &PublicKey) -> io::Result { info!("get_balance"); let req = Request::GetBalance { key: *pubkey }; - let data = serialize(&req).expect("serialize GetBalance"); + let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance"); self.socket .send_to(&data, &self.addr) - .expect("buffer error"); + .expect("buffer error in pub fn get_balance"); let mut done = false; while !done { let resp = self.recv_response()?; @@ -124,7 +124,7 @@ impl ThinClient { // Wait for at least one EntryInfo. let mut done = false; while !done { - let resp = self.recv_response().expect("recv response"); + let resp = self.recv_response().expect("recv_response in pub fn transaction_count"); if let &Response::EntryInfo(_) = &resp { done = true; } @@ -132,14 +132,14 @@ impl ThinClient { } // Then take the rest. - self.socket.set_nonblocking(true).expect("set nonblocking"); + self.socket.set_nonblocking(true).expect("set_nonblocking in pub fn transaction_count"); loop { match self.recv_response() { Err(_) => break, Ok(resp) => self.process_response(resp), } } - self.socket.set_nonblocking(false).expect("set blocking"); + self.socket.set_nonblocking(false).expect("set_nonblocking in pub fn transaction_count"); self.num_events } } From 18d3659b9135d3817a4a67c333ed69b4bd07f2b4 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 17:47:27 -0700 Subject: [PATCH 18/23] timing.rs - panic cleanup --- src/timing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/timing.rs b/src/timing.rs index 7b08cfeb94..ad720a8dc4 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -10,6 +10,6 @@ pub fn duration_as_s(d: &Duration) -> f32 { } pub fn timestamp() -> u64 { - let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("failed to create timestamp"); + let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("create timestamp in timing"); return duration_as_ms(&now); } From bb654f286c9953aeaa36d04ba67ea6fed1447a9e Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 18:21:10 -0700 Subject: [PATCH 19/23] tpu.rs - panic cleanup --- src/tpu.rs | 52 ++++++++++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index 6da34e1332..86a9a1056b 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -93,7 +93,7 @@ impl Tpu { let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); // copy subscribers to avoid taking lock while doing io - let addrs = obj.entry_info_subscribers.lock().unwrap().clone(); + let addrs = obj.entry_info_subscribers.lock().expect("'entry_info_subscribers' lock").clone(); trace!("Sending to {} addrs", addrs.len()); for addr in addrs { let entry_info = EntryInfo { @@ -113,12 +113,12 @@ impl Tpu { fn update_entry(obj: &SharedTpu, writer: &Arc>, entry: &Entry) { trace!("update_entry entry"); - obj.acc.lock().unwrap().register_entry_id(&entry.id); + obj.acc.lock().expect("'acc' lock in fn update_entry").register_entry_id(&entry.id); writeln!( - writer.lock().unwrap(), + writer.lock().expect("'writer' lock in fn update_entry"), "{}", - serde_json::to_string(&entry).unwrap() - ).unwrap(); + serde_json::to_string(&entry).expect("entry to_string in fn update_entry") + ).expect("writeln! in fn update_entry"); Self::notify_entry_info_subscribers(obj, &entry); } @@ -128,7 +128,7 @@ impl Tpu { let entry = obj.historian .output .lock() - .unwrap() + .expect("'ouput' lock in fn receive_all") .recv_timeout(Duration::new(1, 0))?; Self::update_entry(obj, writer, &entry); l.push(entry); @@ -166,13 +166,13 @@ impl Tpu { let b = blob_recycler.allocate(); let pos = { - let mut bd = b.write().unwrap(); + let mut bd = b.write().expect("'b' write lock in pos in fn process_entry_list_into_blobs"); let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &list[start..end]).expect("failed to serialize output"); + serialize_into(&mut out, &list[start..end]).expect("serialize_into in fn process_entry_list_into_blobs"); out.position() as usize }; assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); + b.write().expect("'b' write lock in fn process_entry_list_into_blobs").set_size(pos); q.push_back(b); start = end; } @@ -255,7 +255,7 @@ impl Tpu { ) -> Option<(Response, SocketAddr)> { match msg { Request::GetBalance { key } => { - let val = self.acc.lock().unwrap().get_balance(&key); + let val = self.acc.lock().expect("'acc' lock in pub fn process_request").get_balance(&key); let rsp = (Response::Balance { key, val }, rsp_addr); info!("Response::Balance {:?}", rsp); Some(rsp) @@ -265,7 +265,7 @@ impl Tpu { for subscription in subscriptions { match subscription { Subscription::EntryInfo => { - self.entry_info_subscribers.lock().unwrap().push(rsp_addr) + self.entry_info_subscribers.lock().expect("lock in Subscribe in fn process_request").push(rsp_addr) } } } @@ -278,11 +278,11 @@ impl Tpu { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; debug!("got msgs"); - let mut len = msgs.read().unwrap().packets.len(); + let mut len = msgs.read().expect("'msgs' read lock in fn recv_batch").packets.len(); let mut batch = vec![msgs]; while let Ok(more) = recvr.try_recv() { trace!("got more msgs"); - len += more.read().unwrap().packets.len(); + len += more.read().expect("'more' read lock in fn recv_batch").packets.len(); batch.push(more); if len > 100_000 { @@ -299,7 +299,7 @@ impl Tpu { ) -> Result<()> { let r = ecdsa::ed25519_verify(&batch); let res = batch.into_iter().zip(r).collect(); - sendr.lock().unwrap().send(res)?; + sendr.lock().expect("lock in fn verify_batch in tpu").send(res)?; // TODO: fix error handling here? Ok(()) } @@ -308,7 +308,7 @@ impl Tpu { recvr: &Arc>, sendr: &Arc)>>>>, ) -> Result<()> { - let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?; + let (batch, len) = Self::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; let now = Instant::now(); let batch_len = batch.len(); let rand_id = thread_rng().gen_range(0, 100); @@ -319,7 +319,7 @@ impl Tpu { rand_id ); - Self::verify_batch(batch, sendr).unwrap(); + Self::verify_batch(batch, sendr).expect("verify_batch in fn verifier"); let total_time_ms = timing::duration_as_ms(&now.elapsed()); let total_time_s = timing::duration_as_s(&now.elapsed()); @@ -367,18 +367,18 @@ impl Tpu { /// Process the transactions in parallel and then log the successful ones. fn process_events(&self, events: Vec) -> Result<()> { - for result in self.acc.lock().unwrap().process_verified_events(events) { + for result in self.acc.lock().expect("'acc' lock in fn process_events").process_verified_events(events) { if let Ok(event) = result { self.historian_input .lock() - .unwrap() + .expect("historian_input lock in in for loop fn process_events") .send(Signal::Event(event))?; } } // Let validators know they should not attempt to process additional // transactions in parallel. - self.historian_input.lock().unwrap().send(Signal::Tick)?; + self.historian_input.lock().expect("'historian_input' lock in fn process_events").send(Signal::Tick)?; debug!("after historian_input"); Ok(()) @@ -397,7 +397,7 @@ impl Tpu { ) -> Result { let blob = blob_recycler.allocate(); { - let mut b = blob.write().unwrap(); + let mut b = blob.write().expect("write in fn serialize_response"); let v = serialize(&resp)?; let len = v.len(); b.data[..len].copy_from_slice(&v); @@ -438,7 +438,7 @@ impl Tpu { ); let proc_start = Instant::now(); for (msgs, vers) in mms { - let reqs = Self::deserialize_packets(&msgs.read().unwrap()); + let reqs = Self::deserialize_packets(&msgs.read().expect("'msgs' read lock in fn process in tpu")); reqs_len += reqs.len(); let req_vers = reqs.into_iter() .zip(vers) @@ -492,9 +492,9 @@ impl Tpu { let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); for msgs in &blobs { - let blob = msgs.read().unwrap(); - let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - let acc = obj.acc.lock().unwrap(); + let blob = msgs.read().expect("'msgs' read lock in fn replicate_state in tpu"); + let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).expect("deserialize in fn replicate_state"); + let acc = obj.acc.lock().expect("'acc' lock in fn replicate_state"); for entry in entries { acc.register_entry_id(&entry.id); for result in acc.process_verified_events(entry.events) { @@ -640,8 +640,8 @@ impl Tpu { ) -> Result>> { //replicate pipeline let crdt = Arc::new(RwLock::new(Crdt::new(me))); - crdt.write().unwrap().set_leader(leader.id); - crdt.write().unwrap().insert(leader); + crdt.write().expect("'crdt' write lock in pub fn replicate").set_leader(leader.id); + crdt.write().expect("'crdt' write lock before insert() in pub fn replicate").insert(leader); let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone()); From 44bf79e35fd752300a42b5064150efa615bdfbfb Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 18:24:33 -0700 Subject: [PATCH 20/23] transaction.rs - panic cleanup --- src/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction.rs b/src/transaction.rs index 6918012e30..ef2acccae4 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -70,7 +70,7 @@ impl Transaction { } fn get_sign_data(&self) -> Vec { - serialize(&(&self.data)).expect("failed on serialize TransactionData") + serialize(&(&self.data)).expect("serialize TransactionData in fn get_sign_data") } /// Sign this transaction. From c95c6a75f821c1b4936e3a4e5e4460fcc59e6a3c Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Thu, 10 May 2018 20:49:58 -0700 Subject: [PATCH 21/23] tpu.rs - panic cleanup --- src/tpu.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index 86a9a1056b..d8bdd6a4a4 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -117,7 +117,7 @@ impl Tpu { writeln!( writer.lock().expect("'writer' lock in fn update_entry"), "{}", - serde_json::to_string(&entry).expect("entry to_string in fn update_entry") + serde_json::to_string(&entry).expect("'entry' to_string in fn update_entry") ).expect("writeln! in fn update_entry"); Self::notify_entry_info_subscribers(obj, &entry); } @@ -371,7 +371,7 @@ impl Tpu { if let Ok(event) = result { self.historian_input .lock() - .expect("historian_input lock in in for loop fn process_events") + .expect("historian_input lock in for loop in fn process_events") .send(Signal::Event(event))?; } } From 250830ade95013759b243ab3e16f495db3d60768 Mon Sep 17 00:00:00 2001 From: Jackson Sandland Date: Fri, 11 May 2018 11:38:52 -0700 Subject: [PATCH 22/23] cargo fmt run --- src/accountant.rs | 74 +++++++++++++++++++++++++++++++++++---------- src/bin/testnode.rs | 2 +- src/crdt.rs | 44 +++++++++++++++++++++------ src/ecdsa.rs | 8 +++-- src/erasure.rs | 17 +++++++++-- src/event.rs | 5 ++- src/hash.rs | 2 +- src/historian.rs | 7 +++-- src/mint.rs | 9 ++++-- src/result.rs | 2 +- src/signature.rs | 8 +++-- src/streamer.rs | 12 ++++++-- src/thin_client.rs | 11 +++++-- src/timing.rs | 4 ++- src/tpu.rs | 25 +++++++++++---- 15 files changed, 174 insertions(+), 56 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index cdc8d92bb7..f1e678ca2d 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -16,8 +16,8 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; use std::result; -use std::sync::RwLock; use std::sync::atomic::{AtomicIsize, Ordering}; +use std::sync::RwLock; use transaction::Transaction; pub const MAX_ENTRY_IDS: usize = 1024 * 4; @@ -34,7 +34,11 @@ pub type Result = result::Result; /// Commit funds to the 'to' party. fn apply_payment(balances: &RwLock>, payment: &Payment) { // First we check balances with a read lock to maximize potential parallelization. - if balances.read().expect("'balances' read lock in apply_payment").contains_key(&payment.to) { + if balances + .read() + .expect("'balances' read lock in apply_payment") + .contains_key(&payment.to) + { let bals = balances.read().expect("'balances' read lock"); bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed); } else { @@ -90,15 +94,25 @@ impl Accountant { } fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> bool { - if signatures.read().expect("'signatures' read lock").contains(sig) { + if signatures + .read() + .expect("'signatures' read lock") + .contains(sig) + { return false; } - signatures.write().expect("'signatures' write lock").insert(*sig); + signatures + .write() + .expect("'signatures' write lock") + .insert(*sig); true } fn forget_signature(signatures: &RwLock>, sig: &Signature) -> bool { - signatures.write().expect("'signatures' write lock in forget_signature").remove(sig) + signatures + .write() + .expect("'signatures' write lock in forget_signature") + .remove(sig) } fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool { @@ -132,7 +146,9 @@ impl Accountant { /// the oldest ones once its internal cache is full. Once boot, the /// accountant will reject transactions using that `last_id`. pub fn register_entry_id(&self, last_id: &Hash) { - let mut last_ids = self.last_ids.write().expect("'last_ids' write lock in register_entry_id"); + let mut last_ids = self.last_ids + .write() + .expect("'last_ids' write lock in register_entry_id"); if last_ids.len() >= MAX_ENTRY_IDS { last_ids.pop_front(); } @@ -142,7 +158,9 @@ impl Accountant { /// Deduct tokens from the 'from' address the account has sufficient /// funds and isn't a duplicate. pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> { - let bals = self.balances.read().expect("'balances' read lock in process_verified_transaction_debits"); + let bals = self.balances + .read() + .expect("'balances' read lock in process_verified_transaction_debits"); let option = bals.get(&tr.from); if option.is_none() { @@ -178,13 +196,16 @@ impl Accountant { pub fn process_verified_transaction_credits(&self, tr: &Transaction) { let mut plan = tr.data.plan.clone(); - plan.apply_witness(&Witness::Timestamp(*self.last_time.read() + plan.apply_witness(&Witness::Timestamp(*self.last_time + .read() .expect("timestamp creation in process_verified_transaction_credits"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); } else { - let mut pending = self.pending.write().expect("'pending' write lock in process_verified_transaction_credits"); + let mut pending = self.pending + .write() + .expect("'pending' write lock in process_verified_transaction_credits"); pending.insert(tr.sig, plan); } } @@ -253,7 +274,11 @@ impl Accountant { /// Process a Witness Signature that has already been verified. fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { - if let Occupied(mut e) = self.pending.write().expect("write() in process_verified_sig").entry(tx_sig) { + if let Occupied(mut e) = self.pending + .write() + .expect("write() in process_verified_sig") + .entry(tx_sig) + { e.get_mut().apply_witness(&Witness::Signature(from)); if let Some(ref payment) = e.get().final_payment() { apply_payment(&self.balances, payment); @@ -268,11 +293,22 @@ impl Accountant { fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime) -> Result<()> { // If this is the first timestamp we've seen, it probably came from the genesis block, // so we'll trust it. - if *self.last_time.read().expect("'last_time' read lock on first timestamp check") == Utc.timestamp(0, 0) { - self.time_sources.write().expect("'time_sources' write lock on first timestamp").insert(from); + if *self.last_time + .read() + .expect("'last_time' read lock on first timestamp check") + == Utc.timestamp(0, 0) + { + self.time_sources + .write() + .expect("'time_sources' write lock on first timestamp") + .insert(from); } - if self.time_sources.read().expect("'time_sources' read lock").contains(&from) { + if self.time_sources + .read() + .expect("'time_sources' read lock") + .contains(&from) + { if dt > *self.last_time.read().expect("'last_time' read lock") { *self.last_time.write().expect("'last_time' write lock") = dt; } @@ -285,9 +321,13 @@ impl Accountant { // Hold 'pending' write lock until the end of this function. Otherwise another thread can // double-spend if it enters before the modified plan is removed from 'pending'. - let mut pending = self.pending.write().expect("'pending' write lock in process_verified_timestamp"); + let mut pending = self.pending + .write() + .expect("'pending' write lock in process_verified_timestamp"); for (key, plan) in pending.iter_mut() { - plan.apply_witness(&Witness::Timestamp(*self.last_time.read().expect("'last_time' read lock when creating timestamp"))); + plan.apply_witness(&Witness::Timestamp(*self.last_time + .read() + .expect("'last_time' read lock when creating timestamp"))); if let Some(ref payment) = plan.final_payment() { apply_payment(&self.balances, payment); completed.push(key.clone()); @@ -342,7 +382,9 @@ impl Accountant { } pub fn get_balance(&self, pubkey: &PublicKey) -> Option { - let bals = self.balances.read().expect("'balances' read lock in get_balance"); + let bals = self.balances + .read() + .expect("'balances' read lock in get_balance"); bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64) } } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 0eb38e195e..1c97043c36 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -17,8 +17,8 @@ use std::env; use std::io::{stdin, stdout, Read}; use std::net::UdpSocket; use std::process::exit; -use std::sync::Arc; use std::sync::atomic::AtomicBool; +use std::sync::Arc; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); diff --git a/src/crdt.rs b/src/crdt.rs index b7f712a0c7..965dac08fe 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -263,7 +263,8 @@ impl Crdt { let mut buf = [0u8; 8]; rnd.fill(&mut buf).expect("rnd.fill in pub fn random"); let mut rdr = Cursor::new(&buf); - rdr.read_u64::().expect("rdr.read_u64 in fn random") + rdr.read_u64::() + .expect("rdr.read_u64 in fn random") } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); @@ -287,10 +288,19 @@ impl Crdt { return Err(Error::GeneralError); } let mut n = (Self::random() as usize) % self.table.len(); - while self.table.values().nth(n).expect("'values().nth(n)' while loop in fn gossip_request").id == self.me { + while self.table + .values() + .nth(n) + .expect("'values().nth(n)' while loop in fn gossip_request") + .id == self.me + { n = (Self::random() as usize) % self.table.len(); } - let v = self.table.values().nth(n).expect("'values().nth(n)' in fn gossip_request").clone(); + let v = self.table + .values() + .nth(n) + .expect("'values().nth(n)' in fn gossip_request") + .clone(); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); Ok((v.gossip_addr, req)) @@ -303,7 +313,9 @@ impl Crdt { // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` - let (remote_gossip_addr, req) = obj.read().expect("'obj' read lock in fn run_gossip").gossip_request()?; + let (remote_gossip_addr, req) = obj.read() + .expect("'obj' read lock in fn run_gossip") + .gossip_request()?; let sock = UdpSocket::bind("0.0.0.0:0")?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have @@ -335,7 +347,11 @@ impl Crdt { return; } //TODO this should be a tuned parameter - sleep(obj.read().expect("'obj' read lock in pub fn gossip").timeout); + sleep( + obj.read() + .expect("'obj' read lock in pub fn gossip") + .timeout, + ); }) } @@ -353,18 +369,25 @@ impl Crdt { trace!("RequestUpdates {}", v); let addr = reqdata.gossip_addr; // only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from` - let (from, ups, data) = obj.read().expect("'obj' read lock in RequestUpdates").get_updates_since(v); + let (from, ups, data) = obj.read() + .expect("'obj' read lock in RequestUpdates") + .get_updates_since(v); trace!("get updates since response {} {}", v, data.len()); let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?; trace!("send_to {}", addr); //TODO verify reqdata belongs to sender - obj.write().expect("'obj' write lock in RequestUpdates").insert(reqdata); - sock.send_to(&rsp, addr).expect("'sock.send_to' in RequestUpdates"); + obj.write() + .expect("'obj' write lock in RequestUpdates") + .insert(reqdata); + sock.send_to(&rsp, addr) + .expect("'sock.send_to' in RequestUpdates"); trace!("send_to done!"); } Protocol::ReceiveUpdates(from, ups, data) => { trace!("ReceivedUpdates"); - obj.write().expect("'obj' write lock in ReceiveUpdates").apply_updates(from, ups, &data); + obj.write() + .expect("'obj' write lock in ReceiveUpdates") + .apply_updates(from, ups, &data); } } Ok(()) @@ -374,7 +397,8 @@ impl Crdt { sock: UdpSocket, exit: Arc, ) -> JoinHandle<()> { - sock.set_read_timeout(Some(Duration::new(2, 0))).expect("'sock.set_read_timeout' in crdt.rs"); + sock.set_read_timeout(Some(Duration::new(2, 0))) + .expect("'sock.set_read_timeout' in crdt.rs"); spawn(move || loop { let _ = Self::run_listen(&obj, &sock); if exit.load(Ordering::Relaxed) { diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 6e79c5f182..1214302133 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -78,7 +78,11 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { let mut rvs = Vec::new(); for packets in batches { - locks.push(packets.read().expect("'packets' read lock in pub fn ed25519_verify")); + locks.push( + packets + .read() + .expect("'packets' read lock in pub fn ed25519_verify"), + ); } let mut num = 0; for p in locks { @@ -135,8 +139,8 @@ mod tests { use packet::{Packet, Packets, SharedPackets}; use std::sync::RwLock; use thin_client_service::Request; - use transaction::Transaction; use transaction::test_tx; + use transaction::Transaction; fn make_packet_from_transaction(tr: Transaction) -> Packet { let tx = serialize(&Request::Transaction(tr)).unwrap(); diff --git a/src/erasure.rs b/src/erasure.rs index 62fa073070..35c543f090 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -164,7 +164,11 @@ pub fn generate_coding( let mut coding_ptrs: Vec<&mut [u8]> = Vec::new(); for i in consumed..consumed + NUM_DATA { let n = i % window.len(); - data_blobs.push(window[n].clone().expect("'data_blobs' arr in pub fn generate_coding")); + data_blobs.push( + window[n] + .clone() + .expect("'data_blobs' arr in pub fn generate_coding"), + ); } for b in &data_blobs { data_locks.push(b.write().expect("'b' write lock in pub fn generate_coding")); @@ -180,10 +184,17 @@ pub fn generate_coding( for i in coding_start..coding_end { let n = i % window.len(); window[n] = re.allocate(); - coding_blobs.push(window[n].clone().expect("'coding_blobs' arr in pub fn generate_coding")); + coding_blobs.push( + window[n] + .clone() + .expect("'coding_blobs' arr in pub fn generate_coding"), + ); } for b in &coding_blobs { - coding_locks.push(b.write().expect("'coding_locks' arr in pub fn generate_coding")); + coding_locks.push( + b.write() + .expect("'coding_locks' arr in pub fn generate_coding"), + ); } for (i, l) in coding_locks.iter_mut().enumerate() { trace!("i: {} data: {}", i, l.data[0]); diff --git a/src/event.rs b/src/event.rs index e9a9aac095..b3a317e199 100644 --- a/src/event.rs +++ b/src/event.rs @@ -49,7 +49,10 @@ impl Event { match *self { Event::Transaction(ref tr) => tr.verify_sig(), Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig), - Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).expect("serialize 'dt' in pub fn verify")), + Event::Timestamp { from, dt, sig } => sig.verify( + &from, + &serialize(&dt).expect("serialize 'dt' in pub fn verify"), + ), } } } diff --git a/src/hash.rs b/src/hash.rs index ee7598a0dc..61dd01468c 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -1,7 +1,7 @@ //! The `hash` module provides functions for creating SHA-256 hashes. -use generic_array::GenericArray; use generic_array::typenum::U32; +use generic_array::GenericArray; use sha2::{Digest, Sha256}; pub type Hash = GenericArray; diff --git a/src/historian.rs b/src/historian.rs index f7f88c5cf3..7a183c1555 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -4,8 +4,8 @@ use entry::Entry; use hash::Hash; use recorder::{ExitReason, Recorder, Signal}; -use std::sync::Mutex; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; +use std::sync::Mutex; use std::thread::{spawn, JoinHandle}; use std::time::Instant; @@ -52,7 +52,10 @@ impl Historian { } pub fn receive(self: &Self) -> Result { - self.output.lock().expect("'output' lock in pub fn receive").try_recv() + self.output + .lock() + .expect("'output' lock in pub fn receive") + .try_recv() } } diff --git a/src/mint.rs b/src/mint.rs index 4502c65d17..39bc4348d5 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -1,7 +1,7 @@ //! The `mint` module is a library for generating the chain's genesis block. -use entry::Entry; use entry::create_entry; +use entry::Entry; use event::Event; use hash::{hash, Hash}; use ring::rand::SystemRandom; @@ -19,8 +19,11 @@ pub struct Mint { impl Mint { pub fn new(tokens: i64) -> Self { let rnd = SystemRandom::new(); - let pkcs8 = KeyPair::generate_pkcs8(&rnd).expect("generate_pkcs8 in mint pub fn new").to_vec(); - let keypair = KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in mint pub fn new"); + let pkcs8 = KeyPair::generate_pkcs8(&rnd) + .expect("generate_pkcs8 in mint pub fn new") + .to_vec(); + let keypair = + KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in mint pub fn new"); let pubkey = keypair.pubkey(); Mint { pkcs8, diff --git a/src/result.rs b/src/result.rs index fca876ebec..d2cb485add 100644 --- a/src/result.rs +++ b/src/result.rs @@ -78,9 +78,9 @@ mod tests { use std::io; use std::io::Write; use std::net::SocketAddr; + use std::sync::mpsc::channel; use std::sync::mpsc::RecvError; use std::sync::mpsc::RecvTimeoutError; - use std::sync::mpsc::channel; use std::thread; fn addr_parse_error() -> Result { diff --git a/src/signature.rs b/src/signature.rs index 08f22166d0..8f8e9b075f 100644 --- a/src/signature.rs +++ b/src/signature.rs @@ -1,7 +1,7 @@ //! The `signature` module provides functionality for public, and private keys. -use generic_array::GenericArray; use generic_array::typenum::{U32, U64}; +use generic_array::GenericArray; use ring::signature::Ed25519KeyPair; use ring::{rand, signature}; use untrusted; @@ -19,8 +19,10 @@ impl KeyPairUtil for Ed25519KeyPair { /// Return a new ED25519 keypair fn new() -> Self { let rng = rand::SystemRandom::new(); - let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng).expect("generate_pkcs8 in signature pb fn new"); - signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).expect("from_pcks8 in signature pb fn new") + let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng) + .expect("generate_pkcs8 in signature pb fn new"); + signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)) + .expect("from_pcks8 in signature pb fn new") } /// Return the public key for the given keypair diff --git a/src/streamer.rs b/src/streamer.rs index a26a609edc..d551a890e4 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -27,7 +27,10 @@ fn recv_loop( let msgs = re.allocate(); let msgs_ = msgs.clone(); loop { - match msgs.write().expect("write lock in fn recv_loop").recv_from(sock) { + match msgs.write() + .expect("write lock in fn recv_loop") + .recv_from(sock) + { Ok(()) => { channel.send(msgs_)?; break; @@ -136,7 +139,10 @@ fn recv_window( ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; - let leader_id = crdt.read().expect("'crdt' read lock in fn recv_window").leader_data().id; + let leader_id = crdt.read() + .expect("'crdt' read lock in fn recv_window") + .leader_data() + .id; while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } @@ -457,8 +463,8 @@ mod test { use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; - use streamer::{BlobReceiver, PacketReceiver}; use streamer::{blob_receiver, receiver, responder, retransmitter, window}; + use streamer::{BlobReceiver, PacketReceiver}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { diff --git a/src/thin_client.rs b/src/thin_client.rs index 31c9a0b79a..e1ee2b4dcd 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -124,7 +124,8 @@ impl ThinClient { // Wait for at least one EntryInfo. let mut done = false; while !done { - let resp = self.recv_response().expect("recv_response in pub fn transaction_count"); + let resp = self.recv_response() + .expect("recv_response in pub fn transaction_count"); if let &Response::EntryInfo(_) = &resp { done = true; } @@ -132,14 +133,18 @@ impl ThinClient { } // Then take the rest. - self.socket.set_nonblocking(true).expect("set_nonblocking in pub fn transaction_count"); + self.socket + .set_nonblocking(true) + .expect("set_nonblocking in pub fn transaction_count"); loop { match self.recv_response() { Err(_) => break, Ok(resp) => self.process_response(resp), } } - self.socket.set_nonblocking(false).expect("set_nonblocking in pub fn transaction_count"); + self.socket + .set_nonblocking(false) + .expect("set_nonblocking in pub fn transaction_count"); self.num_events } } diff --git a/src/timing.rs b/src/timing.rs index ad720a8dc4..4b0b9ab576 100644 --- a/src/timing.rs +++ b/src/timing.rs @@ -10,6 +10,8 @@ pub fn duration_as_s(d: &Duration) -> f32 { } pub fn timestamp() -> u64 { - let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("create timestamp in timing"); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("create timestamp in timing"); return duration_as_ms(&now); } diff --git a/src/tpu.rs b/src/tpu.rs index 3e6dc0fd26..bb3fce0e04 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -12,8 +12,8 @@ use rand::{thread_rng, Rng}; use result::Result; use serde_json; use std::collections::VecDeque; -use std::io::Write; use std::io::sink; +use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Sender}; @@ -66,7 +66,12 @@ impl Tpu { .recv_timeout(Duration::new(1, 0))?; self.write_entry(writer, &entry); l.push(entry); - while let Ok(entry) = self.accounting_stage.output.lock().expect("'output' lock in fn write_entries").try_recv() { + while let Ok(entry) = self.accounting_stage + .output + .lock() + .expect("'output' lock in fn write_entries") + .try_recv() + { self.write_entry(writer, &entry); l.push(entry); } @@ -130,7 +135,10 @@ impl Tpu { ) -> Result<()> { let r = ecdsa::ed25519_verify(&batch); let res = batch.into_iter().zip(r).collect(); - sendr.lock().expect("lock in fn verify_batch in tpu").send(res)?; + sendr + .lock() + .expect("lock in fn verify_batch in tpu") + .send(res)?; // TODO: fix error handling here? Ok(()) } @@ -139,7 +147,8 @@ impl Tpu { recvr: &Arc>, sendr: &Arc)>>>>, ) -> Result<()> { - let (batch, len) = streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; + let (batch, len) = + streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; let now = Instant::now(); let batch_len = batch.len(); @@ -315,8 +324,12 @@ impl Tpu { ) -> Result>> { //replicate pipeline let crdt = Arc::new(RwLock::new(Crdt::new(me))); - crdt.write().expect("'crdt' write lock in pub fn replicate").set_leader(leader.id); - crdt.write().expect("'crdt' write lock before insert() in pub fn replicate").insert(leader); + crdt.write() + .expect("'crdt' write lock in pub fn replicate") + .set_leader(leader.id); + crdt.write() + .expect("'crdt' write lock before insert() in pub fn replicate") + .insert(leader); let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone()); From e779496dfb1c9e2c12b2409e9008b019853d1feb Mon Sep 17 00:00:00 2001 From: Code Cobain Date: Fri, 11 May 2018 11:49:22 -0700 Subject: [PATCH 23/23] Update signature.rs --- src/signature.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/signature.rs b/src/signature.rs index 8966e3b20c..fca8dfb234 100644 --- a/src/signature.rs +++ b/src/signature.rs @@ -1,5 +1,6 @@ //! The `signature` module provides functionality for public, and private keys. +use generic_array::GenericArray; use generic_array::typenum::{U32, U64}; use rand::{ChaChaRng, Rng, SeedableRng}; use ring::error::Unspecified;