cargo fmt run

This commit is contained in:
Jackson Sandland 2018-05-11 11:38:52 -07:00
parent 458c27c6e9
commit 250830ade9
15 changed files with 174 additions and 56 deletions

View File

@ -16,8 +16,8 @@ use signature::{KeyPair, PublicKey, Signature};
use std::collections::hash_map::Entry::Occupied;
use std::collections::{HashMap, HashSet, VecDeque};
use std::result;
use std::sync::RwLock;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::RwLock;
use transaction::Transaction;
pub const MAX_ENTRY_IDS: usize = 1024 * 4;
@ -34,7 +34,11 @@ pub type Result<T> = result::Result<T, AccountingError>;
/// Commit funds to the 'to' party.
fn apply_payment(balances: &RwLock<HashMap<PublicKey, AtomicIsize>>, payment: &Payment) {
// First we check balances with a read lock to maximize potential parallelization.
if balances.read().expect("'balances' read lock in apply_payment").contains_key(&payment.to) {
if balances
.read()
.expect("'balances' read lock in apply_payment")
.contains_key(&payment.to)
{
let bals = balances.read().expect("'balances' read lock");
bals[&payment.to].fetch_add(payment.tokens as isize, Ordering::Relaxed);
} else {
@ -90,15 +94,25 @@ impl Accountant {
}
fn reserve_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
if signatures.read().expect("'signatures' read lock").contains(sig) {
if signatures
.read()
.expect("'signatures' read lock")
.contains(sig)
{
return false;
}
signatures.write().expect("'signatures' write lock").insert(*sig);
signatures
.write()
.expect("'signatures' write lock")
.insert(*sig);
true
}
fn forget_signature(signatures: &RwLock<HashSet<Signature>>, sig: &Signature) -> bool {
signatures.write().expect("'signatures' write lock in forget_signature").remove(sig)
signatures
.write()
.expect("'signatures' write lock in forget_signature")
.remove(sig)
}
fn forget_signature_with_last_id(&self, sig: &Signature, last_id: &Hash) -> bool {
@ -132,7 +146,9 @@ impl Accountant {
/// the oldest ones once its internal cache is full. Once boot, the
/// accountant will reject transactions using that `last_id`.
pub fn register_entry_id(&self, last_id: &Hash) {
let mut last_ids = self.last_ids.write().expect("'last_ids' write lock in register_entry_id");
let mut last_ids = self.last_ids
.write()
.expect("'last_ids' write lock in register_entry_id");
if last_ids.len() >= MAX_ENTRY_IDS {
last_ids.pop_front();
}
@ -142,7 +158,9 @@ impl Accountant {
/// Deduct tokens from the 'from' address the account has sufficient
/// funds and isn't a duplicate.
pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> {
let bals = self.balances.read().expect("'balances' read lock in process_verified_transaction_debits");
let bals = self.balances
.read()
.expect("'balances' read lock in process_verified_transaction_debits");
let option = bals.get(&tr.from);
if option.is_none() {
@ -178,13 +196,16 @@ impl Accountant {
pub fn process_verified_transaction_credits(&self, tr: &Transaction) {
let mut plan = tr.data.plan.clone();
plan.apply_witness(&Witness::Timestamp(*self.last_time.read()
plan.apply_witness(&Witness::Timestamp(*self.last_time
.read()
.expect("timestamp creation in process_verified_transaction_credits")));
if let Some(ref payment) = plan.final_payment() {
apply_payment(&self.balances, payment);
} else {
let mut pending = self.pending.write().expect("'pending' write lock in process_verified_transaction_credits");
let mut pending = self.pending
.write()
.expect("'pending' write lock in process_verified_transaction_credits");
pending.insert(tr.sig, plan);
}
}
@ -253,7 +274,11 @@ impl Accountant {
/// Process a Witness Signature that has already been verified.
fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> {
if let Occupied(mut e) = self.pending.write().expect("write() in process_verified_sig").entry(tx_sig) {
if let Occupied(mut e) = self.pending
.write()
.expect("write() in process_verified_sig")
.entry(tx_sig)
{
e.get_mut().apply_witness(&Witness::Signature(from));
if let Some(ref payment) = e.get().final_payment() {
apply_payment(&self.balances, payment);
@ -268,11 +293,22 @@ impl Accountant {
fn process_verified_timestamp(&self, from: PublicKey, dt: DateTime<Utc>) -> Result<()> {
// If this is the first timestamp we've seen, it probably came from the genesis block,
// so we'll trust it.
if *self.last_time.read().expect("'last_time' read lock on first timestamp check") == Utc.timestamp(0, 0) {
self.time_sources.write().expect("'time_sources' write lock on first timestamp").insert(from);
if *self.last_time
.read()
.expect("'last_time' read lock on first timestamp check")
== Utc.timestamp(0, 0)
{
self.time_sources
.write()
.expect("'time_sources' write lock on first timestamp")
.insert(from);
}
if self.time_sources.read().expect("'time_sources' read lock").contains(&from) {
if self.time_sources
.read()
.expect("'time_sources' read lock")
.contains(&from)
{
if dt > *self.last_time.read().expect("'last_time' read lock") {
*self.last_time.write().expect("'last_time' write lock") = dt;
}
@ -285,9 +321,13 @@ impl Accountant {
// Hold 'pending' write lock until the end of this function. Otherwise another thread can
// double-spend if it enters before the modified plan is removed from 'pending'.
let mut pending = self.pending.write().expect("'pending' write lock in process_verified_timestamp");
let mut pending = self.pending
.write()
.expect("'pending' write lock in process_verified_timestamp");
for (key, plan) in pending.iter_mut() {
plan.apply_witness(&Witness::Timestamp(*self.last_time.read().expect("'last_time' read lock when creating timestamp")));
plan.apply_witness(&Witness::Timestamp(*self.last_time
.read()
.expect("'last_time' read lock when creating timestamp")));
if let Some(ref payment) = plan.final_payment() {
apply_payment(&self.balances, payment);
completed.push(key.clone());
@ -342,7 +382,9 @@ impl Accountant {
}
pub fn get_balance(&self, pubkey: &PublicKey) -> Option<i64> {
let bals = self.balances.read().expect("'balances' read lock in get_balance");
let bals = self.balances
.read()
.expect("'balances' read lock in get_balance");
bals.get(pubkey).map(|x| x.load(Ordering::Relaxed) as i64)
}
}

View File

@ -17,8 +17,8 @@ use std::env;
use std::io::{stdin, stdout, Read};
use std::net::UdpSocket;
use std::process::exit;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
fn print_usage(program: &str, opts: Options) {
let mut brief = format!("Usage: cat <transaction.log> | {} [options]\n\n", program);

View File

@ -263,7 +263,8 @@ impl Crdt {
let mut buf = [0u8; 8];
rnd.fill(&mut buf).expect("rnd.fill in pub fn random");
let mut rdr = Cursor::new(&buf);
rdr.read_u64::<LittleEndian>().expect("rdr.read_u64 in fn random")
rdr.read_u64::<LittleEndian>()
.expect("rdr.read_u64 in fn random")
}
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
//trace!("get updates since {}", v);
@ -287,10 +288,19 @@ impl Crdt {
return Err(Error::GeneralError);
}
let mut n = (Self::random() as usize) % self.table.len();
while self.table.values().nth(n).expect("'values().nth(n)' while loop in fn gossip_request").id == self.me {
while self.table
.values()
.nth(n)
.expect("'values().nth(n)' while loop in fn gossip_request")
.id == self.me
{
n = (Self::random() as usize) % self.table.len();
}
let v = self.table.values().nth(n).expect("'values().nth(n)' in fn gossip_request").clone();
let v = self.table
.values()
.nth(n)
.expect("'values().nth(n)' in fn gossip_request")
.clone();
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
Ok((v.gossip_addr, req))
@ -303,7 +313,9 @@ impl Crdt {
// Lock the object only to do this operation and not for any longer
// especially not when doing the `sock.send_to`
let (remote_gossip_addr, req) = obj.read().expect("'obj' read lock in fn run_gossip").gossip_request()?;
let (remote_gossip_addr, req) = obj.read()
.expect("'obj' read lock in fn run_gossip")
.gossip_request()?;
let sock = UdpSocket::bind("0.0.0.0:0")?;
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
@ -335,7 +347,11 @@ impl Crdt {
return;
}
//TODO this should be a tuned parameter
sleep(obj.read().expect("'obj' read lock in pub fn gossip").timeout);
sleep(
obj.read()
.expect("'obj' read lock in pub fn gossip")
.timeout,
);
})
}
@ -353,18 +369,25 @@ impl Crdt {
trace!("RequestUpdates {}", v);
let addr = reqdata.gossip_addr;
// only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj.read().expect("'obj' read lock in RequestUpdates").get_updates_since(v);
let (from, ups, data) = obj.read()
.expect("'obj' read lock in RequestUpdates")
.get_updates_since(v);
trace!("get updates since response {} {}", v, data.len());
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
trace!("send_to {}", addr);
//TODO verify reqdata belongs to sender
obj.write().expect("'obj' write lock in RequestUpdates").insert(reqdata);
sock.send_to(&rsp, addr).expect("'sock.send_to' in RequestUpdates");
obj.write()
.expect("'obj' write lock in RequestUpdates")
.insert(reqdata);
sock.send_to(&rsp, addr)
.expect("'sock.send_to' in RequestUpdates");
trace!("send_to done!");
}
Protocol::ReceiveUpdates(from, ups, data) => {
trace!("ReceivedUpdates");
obj.write().expect("'obj' write lock in ReceiveUpdates").apply_updates(from, ups, &data);
obj.write()
.expect("'obj' write lock in ReceiveUpdates")
.apply_updates(from, ups, &data);
}
}
Ok(())
@ -374,7 +397,8 @@ impl Crdt {
sock: UdpSocket,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
sock.set_read_timeout(Some(Duration::new(2, 0))).expect("'sock.set_read_timeout' in crdt.rs");
sock.set_read_timeout(Some(Duration::new(2, 0)))
.expect("'sock.set_read_timeout' in crdt.rs");
spawn(move || loop {
let _ = Self::run_listen(&obj, &sock);
if exit.load(Ordering::Relaxed) {

View File

@ -78,7 +78,11 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
let mut rvs = Vec::new();
for packets in batches {
locks.push(packets.read().expect("'packets' read lock in pub fn ed25519_verify"));
locks.push(
packets
.read()
.expect("'packets' read lock in pub fn ed25519_verify"),
);
}
let mut num = 0;
for p in locks {
@ -135,8 +139,8 @@ mod tests {
use packet::{Packet, Packets, SharedPackets};
use std::sync::RwLock;
use thin_client_service::Request;
use transaction::Transaction;
use transaction::test_tx;
use transaction::Transaction;
fn make_packet_from_transaction(tr: Transaction) -> Packet {
let tx = serialize(&Request::Transaction(tr)).unwrap();

View File

@ -164,7 +164,11 @@ pub fn generate_coding(
let mut coding_ptrs: Vec<&mut [u8]> = Vec::new();
for i in consumed..consumed + NUM_DATA {
let n = i % window.len();
data_blobs.push(window[n].clone().expect("'data_blobs' arr in pub fn generate_coding"));
data_blobs.push(
window[n]
.clone()
.expect("'data_blobs' arr in pub fn generate_coding"),
);
}
for b in &data_blobs {
data_locks.push(b.write().expect("'b' write lock in pub fn generate_coding"));
@ -180,10 +184,17 @@ pub fn generate_coding(
for i in coding_start..coding_end {
let n = i % window.len();
window[n] = re.allocate();
coding_blobs.push(window[n].clone().expect("'coding_blobs' arr in pub fn generate_coding"));
coding_blobs.push(
window[n]
.clone()
.expect("'coding_blobs' arr in pub fn generate_coding"),
);
}
for b in &coding_blobs {
coding_locks.push(b.write().expect("'coding_locks' arr in pub fn generate_coding"));
coding_locks.push(
b.write()
.expect("'coding_locks' arr in pub fn generate_coding"),
);
}
for (i, l) in coding_locks.iter_mut().enumerate() {
trace!("i: {} data: {}", i, l.data[0]);

View File

@ -49,7 +49,10 @@ impl Event {
match *self {
Event::Transaction(ref tr) => tr.verify_sig(),
Event::Signature { from, tx_sig, sig } => sig.verify(&from, &tx_sig),
Event::Timestamp { from, dt, sig } => sig.verify(&from, &serialize(&dt).expect("serialize 'dt' in pub fn verify")),
Event::Timestamp { from, dt, sig } => sig.verify(
&from,
&serialize(&dt).expect("serialize 'dt' in pub fn verify"),
),
}
}
}

View File

@ -1,7 +1,7 @@
//! The `hash` module provides functions for creating SHA-256 hashes.
use generic_array::GenericArray;
use generic_array::typenum::U32;
use generic_array::GenericArray;
use sha2::{Digest, Sha256};
pub type Hash = GenericArray<u8, U32>;

View File

@ -4,8 +4,8 @@
use entry::Entry;
use hash::Hash;
use recorder::{ExitReason, Recorder, Signal};
use std::sync::Mutex;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::Mutex;
use std::thread::{spawn, JoinHandle};
use std::time::Instant;
@ -52,7 +52,10 @@ impl Historian {
}
pub fn receive(self: &Self) -> Result<Entry, TryRecvError> {
self.output.lock().expect("'output' lock in pub fn receive").try_recv()
self.output
.lock()
.expect("'output' lock in pub fn receive")
.try_recv()
}
}

View File

@ -1,7 +1,7 @@
//! The `mint` module is a library for generating the chain's genesis block.
use entry::Entry;
use entry::create_entry;
use entry::Entry;
use event::Event;
use hash::{hash, Hash};
use ring::rand::SystemRandom;
@ -19,8 +19,11 @@ pub struct Mint {
impl Mint {
pub fn new(tokens: i64) -> Self {
let rnd = SystemRandom::new();
let pkcs8 = KeyPair::generate_pkcs8(&rnd).expect("generate_pkcs8 in mint pub fn new").to_vec();
let keypair = KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in mint pub fn new");
let pkcs8 = KeyPair::generate_pkcs8(&rnd)
.expect("generate_pkcs8 in mint pub fn new")
.to_vec();
let keypair =
KeyPair::from_pkcs8(Input::from(&pkcs8)).expect("from_pkcs8 in mint pub fn new");
let pubkey = keypair.pubkey();
Mint {
pkcs8,

View File

@ -78,9 +78,9 @@ mod tests {
use std::io;
use std::io::Write;
use std::net::SocketAddr;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvError;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::channel;
use std::thread;
fn addr_parse_error() -> Result<SocketAddr> {

View File

@ -1,7 +1,7 @@
//! The `signature` module provides functionality for public, and private keys.
use generic_array::GenericArray;
use generic_array::typenum::{U32, U64};
use generic_array::GenericArray;
use ring::signature::Ed25519KeyPair;
use ring::{rand, signature};
use untrusted;
@ -19,8 +19,10 @@ impl KeyPairUtil for Ed25519KeyPair {
/// Return a new ED25519 keypair
fn new() -> Self {
let rng = rand::SystemRandom::new();
let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng).expect("generate_pkcs8 in signature pb fn new");
signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes)).expect("from_pcks8 in signature pb fn new")
let pkcs8_bytes = signature::Ed25519KeyPair::generate_pkcs8(&rng)
.expect("generate_pkcs8 in signature pb fn new");
signature::Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&pkcs8_bytes))
.expect("from_pcks8 in signature pb fn new")
}
/// Return the public key for the given keypair

View File

@ -27,7 +27,10 @@ fn recv_loop(
let msgs = re.allocate();
let msgs_ = msgs.clone();
loop {
match msgs.write().expect("write lock in fn recv_loop").recv_from(sock) {
match msgs.write()
.expect("write lock in fn recv_loop")
.recv_from(sock)
{
Ok(()) => {
channel.send(msgs_)?;
break;
@ -136,7 +139,10 @@ fn recv_window(
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
let leader_id = crdt.read().expect("'crdt' read lock in fn recv_window").leader_data().id;
let leader_id = crdt.read()
.expect("'crdt' read lock in fn recv_window")
.leader_data()
.id;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
@ -457,8 +463,8 @@ mod test {
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use streamer::{BlobReceiver, PacketReceiver};
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
use streamer::{BlobReceiver, PacketReceiver};
fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {

View File

@ -124,7 +124,8 @@ impl ThinClient {
// Wait for at least one EntryInfo.
let mut done = false;
while !done {
let resp = self.recv_response().expect("recv_response in pub fn transaction_count");
let resp = self.recv_response()
.expect("recv_response in pub fn transaction_count");
if let &Response::EntryInfo(_) = &resp {
done = true;
}
@ -132,14 +133,18 @@ impl ThinClient {
}
// Then take the rest.
self.socket.set_nonblocking(true).expect("set_nonblocking in pub fn transaction_count");
self.socket
.set_nonblocking(true)
.expect("set_nonblocking in pub fn transaction_count");
loop {
match self.recv_response() {
Err(_) => break,
Ok(resp) => self.process_response(resp),
}
}
self.socket.set_nonblocking(false).expect("set_nonblocking in pub fn transaction_count");
self.socket
.set_nonblocking(false)
.expect("set_nonblocking in pub fn transaction_count");
self.num_events
}
}

View File

@ -10,6 +10,8 @@ pub fn duration_as_s(d: &Duration) -> f32 {
}
pub fn timestamp() -> u64 {
let now = SystemTime::now().duration_since(UNIX_EPOCH).expect("create timestamp in timing");
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("create timestamp in timing");
return duration_as_ms(&now);
}

View File

@ -12,8 +12,8 @@ use rand::{thread_rng, Rng};
use result::Result;
use serde_json;
use std::collections::VecDeque;
use std::io::Write;
use std::io::sink;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Sender};
@ -66,7 +66,12 @@ impl Tpu {
.recv_timeout(Duration::new(1, 0))?;
self.write_entry(writer, &entry);
l.push(entry);
while let Ok(entry) = self.accounting_stage.output.lock().expect("'output' lock in fn write_entries").try_recv() {
while let Ok(entry) = self.accounting_stage
.output
.lock()
.expect("'output' lock in fn write_entries")
.try_recv()
{
self.write_entry(writer, &entry);
l.push(entry);
}
@ -130,7 +135,10 @@ impl Tpu {
) -> Result<()> {
let r = ecdsa::ed25519_verify(&batch);
let res = batch.into_iter().zip(r).collect();
sendr.lock().expect("lock in fn verify_batch in tpu").send(res)?;
sendr
.lock()
.expect("lock in fn verify_batch in tpu")
.send(res)?;
// TODO: fix error handling here?
Ok(())
}
@ -139,7 +147,8 @@ impl Tpu {
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let (batch, len) = streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
let (batch, len) =
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
let now = Instant::now();
let batch_len = batch.len();
@ -315,8 +324,12 @@ impl Tpu {
) -> Result<Vec<JoinHandle<()>>> {
//replicate pipeline
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
crdt.write().expect("'crdt' write lock in pub fn replicate").set_leader(leader.id);
crdt.write().expect("'crdt' write lock before insert() in pub fn replicate").insert(leader);
crdt.write()
.expect("'crdt' write lock in pub fn replicate")
.set_leader(leader.id);
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(leader);
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());